Skip to main content

systemprompt_api/services/server/
metrics.rs

1use std::sync::{Mutex, OnceLock};
2use std::time::Instant;
3
4use axum::extract::{MatchedPath, Request};
5use axum::http::header::CONTENT_TYPE;
6use axum::middleware::Next;
7use axum::response::{IntoResponse, Response};
8use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
9use systemprompt_events::{
10    A2A_BROADCASTER, AGUI_BROADCASTER, ANALYTICS_BROADCASTER, Broadcaster, CONTEXT_BROADCASTER,
11};
12
13const METRICS_CONTENT_TYPE: &str = "text/plain; version=0.0.4; charset=utf-8";
14
15const HTTP_REQUESTS_TOTAL: &str = "http_requests_total";
16const HTTP_REQUEST_DURATION_SECONDS: &str = "http_request_duration_seconds";
17const HTTP_REQUESTS_IN_FLIGHT: &str = "http_requests_in_flight";
18const SSE_CONNECTIONS: &str = "sse_active_connections";
19
20// The Prometheus recorder is a process global: `install_recorder` errors in
21// `metrics::set_global_recorder` if called twice. Cache our handle so repeat
22// callers (test binaries that boot multiple `ApiServer`s, or any future
23// hot-reload path) get a clone of the original instead of a hard error.
24static RECORDER: OnceLock<PrometheusHandle> = OnceLock::new();
25// Serialises concurrent installers so the first writer wins the global recorder
26// race outright; without it a parallel test runner with two `setup_api_server`
27// calls both observe an empty `RECORDER`, both call `install_recorder`, and the
28// loser surfaces "attempted to set a recorder after the metrics system was
29// already initialized" instead of getting the cached handle.
30static RECORDER_INIT: Mutex<()> = Mutex::new(());
31
32pub fn install_recorder() -> anyhow::Result<PrometheusHandle> {
33    if let Some(handle) = RECORDER.get() {
34        return Ok(handle.clone());
35    }
36    let _guard = RECORDER_INIT
37        .lock()
38        .unwrap_or_else(std::sync::PoisonError::into_inner);
39    if let Some(handle) = RECORDER.get() {
40        return Ok(handle.clone());
41    }
42    let handle = PrometheusBuilder::new()
43        .install_recorder()
44        .map_err(|e| anyhow::anyhow!("failed to install Prometheus recorder: {e}"))?;
45    drop(RECORDER.set(handle.clone()));
46    Ok(handle)
47}
48
49pub async fn handle_metrics(
50    axum::extract::State(handle): axum::extract::State<PrometheusHandle>,
51) -> Response {
52    refresh_connection_gauges().await;
53    let body = handle.render();
54    ([(CONTENT_TYPE, METRICS_CONTENT_TYPE)], body).into_response()
55}
56
57async fn refresh_connection_gauges() {
58    let context = CONTEXT_BROADCASTER.total_connections().await;
59    let agui = AGUI_BROADCASTER.total_connections().await;
60    let a2a = A2A_BROADCASTER.total_connections().await;
61    let analytics = ANALYTICS_BROADCASTER.total_connections().await;
62
63    metrics::gauge!(SSE_CONNECTIONS, "channel" => "context").set(context as f64);
64    metrics::gauge!(SSE_CONNECTIONS, "channel" => "agui").set(agui as f64);
65    metrics::gauge!(SSE_CONNECTIONS, "channel" => "a2a").set(a2a as f64);
66    metrics::gauge!(SSE_CONNECTIONS, "channel" => "analytics").set(analytics as f64);
67}
68
69pub async fn track_metrics(req: Request, next: Next) -> Response {
70    let method = req.method().clone();
71    let path = req
72        .extensions()
73        .get::<MatchedPath>()
74        .map_or_else(|| req.uri().path().to_owned(), |m| m.as_str().to_owned());
75
76    let in_flight = metrics::gauge!(HTTP_REQUESTS_IN_FLIGHT);
77    in_flight.increment(1.0);
78
79    let start = Instant::now();
80    let response = next.run(req).await;
81    let elapsed = start.elapsed().as_secs_f64();
82
83    in_flight.decrement(1.0);
84
85    let status = response.status().as_u16().to_string();
86    let method = method.to_string();
87
88    metrics::counter!(
89        HTTP_REQUESTS_TOTAL,
90        "method" => method.clone(),
91        "path" => path.clone(),
92        "status" => status.clone(),
93    )
94    .increment(1);
95    metrics::histogram!(
96        HTTP_REQUEST_DURATION_SECONDS,
97        "method" => method,
98        "path" => path,
99        "status" => status,
100    )
101    .record(elapsed);
102
103    response
104}