offline_intelligence/
metrics.rs1use prometheus::{Encoder, TextEncoder, Registry, IntCounterVec, IntGauge, Histogram};
5use lazy_static::lazy_static;
6use std::sync::OnceLock;
7use axum::response::IntoResponse;
8use axum::http::StatusCode;
9
10lazy_static! {
11 static ref REGISTRY: Registry = Registry::new();
12}
13
14static REQ_COUNTER: OnceLock<IntCounterVec> = OnceLock::new();
15static ACTIVE_SESSIONS: OnceLock<IntGauge> = OnceLock::new();
16static QUEUE_DEPTH: OnceLock<IntGauge> = OnceLock::new();
17static QUEUE_WAIT_TIME: OnceLock<Histogram> = OnceLock::new();
18
19pub fn init_metrics() {
20 let req_counter = REQ_COUNTER.get_or_init(|| {
22 IntCounterVec::new(
23 prometheus::opts!("requests_total", "Total requests per route"),
24 &["route", "status"]
25 ).unwrap()
26 });
27
28 let active_sessions = ACTIVE_SESSIONS.get_or_init(|| {
29 IntGauge::new("active_sessions", "Active streaming sessions").unwrap()
30 });
31
32 let queue_depth = QUEUE_DEPTH.get_or_init(|| {
33 IntGauge::new("queue_depth", "Number of requests waiting in queue").unwrap()
34 });
35
36 let queue_wait_time = QUEUE_WAIT_TIME.get_or_init(|| {
37 Histogram::with_opts(prometheus::HistogramOpts::new(
38 "queue_wait_time_seconds",
39 "Time spent waiting in queue"
40 )).unwrap()
41 });
42
43 REGISTRY.register(Box::new(req_counter.clone())).ok();
44 REGISTRY.register(Box::new(active_sessions.clone())).ok();
45 REGISTRY.register(Box::new(queue_depth.clone())).ok();
46 REGISTRY.register(Box::new(queue_wait_time.clone())).ok();
47}
48
49pub fn inc_request(route: &str, status: &str) {
51 if let Some(counter) = REQ_COUNTER.get() {
52 counter.with_label_values(&[route, status]).inc();
53 }
54}
55
56pub fn inc_sessions() {
57 if let Some(gauge) = ACTIVE_SESSIONS.get() {
58 gauge.inc();
59 }
60}
61
62pub fn dec_sessions() {
63 if let Some(gauge) = ACTIVE_SESSIONS.get() {
64 gauge.dec();
65 }
66}
67
68pub fn inc_queue() {
69 if let Some(gauge) = QUEUE_DEPTH.get() {
70 gauge.inc();
71 }
72}
73
74pub fn dec_queue() {
75 if let Some(gauge) = QUEUE_DEPTH.get() {
76 gauge.dec();
77 }
78}
79
80pub fn observe_queue_wait(duration: f64) {
81 if let Some(histogram) = QUEUE_WAIT_TIME.get() {
82 histogram.observe(duration);
83 }
84}
85
86pub async fn get_metrics() -> impl IntoResponse {
87 let encoder = TextEncoder::new();
88 let metric_families = REGISTRY.gather();
89 let mut buffer = vec![];
90 encoder.encode(&metric_families, &mut buffer).unwrap();
91
92 (
93 StatusCode::OK,
94 [("content-type", "text/plain; version=0.0.4")],
95 buffer,
96 )
97}