systemprompt_api/services/server/
metrics.rs1use std::sync::{Mutex, OnceLock};
2use std::time::Instant;
3
4use axum::extract::{MatchedPath, Request};
5use axum::http::header::CONTENT_TYPE;
6use axum::middleware::Next;
7use axum::response::{IntoResponse, Response};
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9use systemprompt_events::{
10 A2A_BROADCASTER, AGUI_BROADCASTER, ANALYTICS_BROADCASTER, Broadcaster, CONTEXT_BROADCASTER,
11};
12
13const METRICS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
14
15const HTTP_REQUESTS_TOTAL: &str = "http_requests_total";
16const HTTP_REQUEST_DURATION_SECONDS: &str = "http_request_duration_seconds";
17const HTTP_REQUESTS_IN_FLIGHT: &str = "http_requests_in_flight";
18const SSE_CONNECTIONS: &str = "sse_active_connections";
19
20static RECORDER: OnceLock<PrometheusHandle> = OnceLock::new();
25static RECORDER_INIT: Mutex<()> = Mutex::new(());
31
32pub fn install_recorder() -> anyhow::Result<PrometheusHandle> {
33 if let Some(handle) = RECORDER.get() {
34 return Ok(handle.clone());
35 }
36 let _guard = RECORDER_INIT
37 .lock()
38 .unwrap_or_else(std::sync::PoisonError::into_inner);
39 if let Some(handle) = RECORDER.get() {
40 return Ok(handle.clone());
41 }
42 let handle = PrometheusBuilder::new()
43 .install_recorder()
44 .map_err(|e| anyhow::anyhow!("failed to install Prometheus recorder: {e}"))?;
45 drop(RECORDER.set(handle.clone()));
46 Ok(handle)
47}
48
49pub async fn handle_metrics(
50 axum::extract::State(handle): axum::extract::State<PrometheusHandle>,
51) -> Response {
52 refresh_connection_gauges().await;
53 let body = handle.render();
54 ([(CONTENT_TYPE, METRICS_CONTENT_TYPE)], body).into_response()
55}
56
57async fn refresh_connection_gauges() {
58 let context = CONTEXT_BROADCASTER.total_connections().await;
59 let agui = AGUI_BROADCASTER.total_connections().await;
60 let a2a = A2A_BROADCASTER.total_connections().await;
61 let analytics = ANALYTICS_BROADCASTER.total_connections().await;
62
63 metrics::gauge!(SSE_CONNECTIONS, "channel" => "context").set(context as f64);
64 metrics::gauge!(SSE_CONNECTIONS, "channel" => "agui").set(agui as f64);
65 metrics::gauge!(SSE_CONNECTIONS, "channel" => "a2a").set(a2a as f64);
66 metrics::gauge!(SSE_CONNECTIONS, "channel" => "analytics").set(analytics as f64);
67}
68
69pub async fn track_metrics(req: Request, next: Next) -> Response {
70 let method = req.method().clone();
71 let path = req
72 .extensions()
73 .get::<MatchedPath>()
74 .map_or_else(|| req.uri().path().to_owned(), |m| m.as_str().to_owned());
75
76 let in_flight = metrics::gauge!(HTTP_REQUESTS_IN_FLIGHT);
77 in_flight.increment(1.0);
78
79 let start = Instant::now();
80 let response = next.run(req).await;
81 let elapsed = start.elapsed().as_secs_f64();
82
83 in_flight.decrement(1.0);
84
85 let status = response.status().as_u16().to_string();
86 let method = method.to_string();
87
88 metrics::counter!(
89 HTTP_REQUESTS_TOTAL,
90 "method" => method.clone(),
91 "path" => path.clone(),
92 "status" => status.clone(),
93 )
94 .increment(1);
95 metrics::histogram!(
96 HTTP_REQUEST_DURATION_SECONDS,
97 "method" => method,
98 "path" => path,
99 "status" => status,
100 )
101 .record(elapsed);
102
103 response
104}