blueprint_qos/metrics/prometheus/
server.rs

1#![allow(clippy::doc_markdown)]
2
3use axum::{
4    Json, Router,
5    extract::State,
6    http::StatusCode,
7    response::{IntoResponse, Response},
8    routing::get,
9};
10use blueprint_core::{error, info};
11use blueprint_std::fmt::Write;
12use blueprint_std::net::SocketAddr;
13use blueprint_std::sync::Arc;
14use prometheus::{Registry, TextEncoder};
15use tokio::sync::oneshot;
16
17use crate::error::{Error, Result};
18use crate::metrics::provider::EnhancedMetricsProvider;
19
20/// Prometheus metrics server state
21#[derive(Clone)]
22struct ServerState {
23    registry: Arc<Registry>,
24    enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
25}
26
27/// Prometheus metrics server
28pub struct PrometheusServer {
29    registry: Arc<Registry>,
30    enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
31    bind_address: String,
32    shutdown_tx: Option<oneshot::Sender<()>>,
33}
34
35impl PrometheusServer {
36    /// Create a new Prometheus metrics server
37    #[must_use]
38    pub fn new(
39        registry: Arc<Registry>,
40        enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
41        bind_address: String,
42    ) -> Self {
43        Self {
44            registry,
45            enhanced_metrics_provider,
46            bind_address,
47            shutdown_tx: None,
48        }
49    }
50
51    /// Start the Prometheus metrics server
52    ///
53    /// # Errors
54    /// Returns an error if the bind address cannot be parsed or the server fails to start
55    ///
56    /// # Panics
57    /// Panics if the TCP listener cannot be bound to the address or if the server encounters an error
58    #[allow(clippy::unused_async)]
59    pub async fn start(&mut self) -> Result<()> {
60        let addr: SocketAddr = self
61            .bind_address
62            .parse()
63            .map_err(|e| Error::Other(format!("Failed to parse bind address: {}", e)))?;
64
65        let state = ServerState {
66            registry: self.registry.clone(),
67            enhanced_metrics_provider: self.enhanced_metrics_provider.clone(),
68        };
69
70        let app = Router::new()
71            .route("/metrics", get(metrics_handler))
72            .route("/health", get(health_handler))
73            .route("/api/v1/query", get(api_v1_query_handler))
74            .route("/api/v1/labels", get(api_v1_labels_handler))
75            .route("/api/v1/metadata", get(api_v1_metadata_handler))
76            .route("/api/v1/series", get(api_v1_series_handler))
77            .route("/api/v1/query_range", get(api_v1_query_range_handler))
78            .with_state(state);
79
80        let (tx, rx) = oneshot::channel();
81        self.shutdown_tx = Some(tx);
82
83        info!("Starting Prometheus metrics server on {}", addr);
84
85        let listener = match tokio::net::TcpListener::bind(addr).await {
86            Ok(l) => l,
87            Err(e) => {
88                error!(
89                    "Failed to bind Prometheus metrics server to {}: {}",
90                    addr, e
91                );
92                return Err(Error::Other(format!(
93                    "Failed to bind Prometheus server to {}: {}",
94                    addr, e
95                )));
96            }
97        };
98
99        tokio::spawn(async move {
100            axum::serve(listener, app)
101                .with_graceful_shutdown(async {
102                    rx.await.ok();
103                })
104                .await
105                .unwrap_or_else(|e| {
106                    error!("Prometheus metrics server execution error: {}", e);
107                });
108        });
109
110        Ok(())
111    }
112
113    /// Stop the Prometheus metrics server
114    pub fn stop(&mut self) {
115        if let Some(tx) = self.shutdown_tx.take() {
116            let _ = tx.send(());
117            info!("Stopped Prometheus metrics server");
118        }
119    }
120}
121
122impl Drop for PrometheusServer {
123    fn drop(&mut self) {
124        self.stop();
125    }
126}
127
128/// Handler for /metrics endpoint
129///
130/// Returns the current metrics in Prometheus text format
131async fn metrics_handler(State(state): State<ServerState>) -> Response {
132    // Attempt to force flush OpenTelemetry metrics via the EnhancedMetricsProvider
133    match state.enhanced_metrics_provider.force_flush_otel_metrics() {
134        Ok(()) => info!(
135            "PrometheusServer: OpenTelemetry metrics force_flush successful via EnhancedMetricsProvider."
136        ),
137        Err(err) => error!(
138            "PrometheusServer: OpenTelemetry metrics force_flush failed via EnhancedMetricsProvider: {:?}",
139            err
140        ),
141    }
142
143    info!(
144        "Metrics handler invoked (post OTel flush via provider). Gathering metrics from registry..."
145    );
146    let encoder = TextEncoder::new();
147    let metric_families = state.registry.gather();
148
149    info!(
150        "Gathered {} metric families. Dumping details:",
151        metric_families.len()
152    );
153    for mf in &metric_families {
154        info!("  Metric Family: {}", mf.name());
155        info!("    Type: {:?}", mf.get_field_type());
156        info!("    Help: {}", mf.help());
157        info!("    Metrics ({}):", mf.get_metric().len());
158        for m in mf.get_metric() {
159            if let Some(counter) = m.counter.as_ref() {
160                info!("      Counter Value: {}", counter.value.unwrap_or(0.0));
161            }
162            if let Some(gauge) = m.gauge.as_ref() {
163                info!("      Gauge Value: {}", gauge.value.unwrap_or(0.0));
164            }
165            if m.histogram.is_some() {
166                let hist = m.get_histogram();
167                info!(
168                    "      Histogram: Sum={}, Count={}",
169                    hist.get_sample_sum(),
170                    hist.get_sample_count()
171                );
172            }
173            if m.summary.is_some() {
174                let sum = m.get_summary();
175                info!(
176                    "      Summary: Sum={}, Count={}",
177                    sum.sample_sum(),
178                    sum.sample_count()
179                );
180            }
181            let mut labels_str = String::new();
182            for label_pair in m.get_label() {
183                write!(
184                    labels_str,
185                    "{}='{}' ",
186                    label_pair.name(),
187                    label_pair.value()
188                )
189                .unwrap();
190            }
191            if !labels_str.is_empty() {
192                info!("      Labels: {}", labels_str.trim());
193            }
194        }
195    }
196    info!("Finished dumping metric_families details.");
197
198    match encoder.encode_to_string(&metric_families) {
199        Ok(metrics) => (StatusCode::OK, metrics).into_response(),
200        Err(e) => {
201            error!("Failed to encode metrics: {}", e);
202            (
203                StatusCode::INTERNAL_SERVER_ERROR,
204                "Failed to encode metrics",
205            )
206                .into_response()
207        }
208    }
209}
210
211/// Handler for /health endpoint
212///
213/// Returns a simple OK response to indicate the server is running
214async fn health_handler() -> Response {
215    (StatusCode::OK, "OK").into_response()
216}
217
218/// Handler for /api/v1/query endpoint (minimal for Grafana health check)
219///
220/// Returns a static successful response to mimic Prometheus API for simple queries.
221async fn api_v1_query_handler() -> (StatusCode, Json<serde_json::Value>) {
222    let response_body = serde_json::json!({
223        "status": "success",
224        "data": {
225            "resultType": "scalar",
226            "result": [
227                0,
228                "1"
229            ]
230        }
231    });
232    (StatusCode::OK, Json(response_body))
233}
234
235/// Handler for /api/v1/labels endpoint
236///
237/// Returns an empty list of labels, conforming to Prometheus API.
238async fn api_v1_labels_handler() -> (StatusCode, Json<serde_json::Value>) {
239    let response_body = serde_json::json!({
240        "status": "success",
241        "data": []
242    });
243    (StatusCode::OK, Json(response_body))
244}
245
246/// Handler for /api/v1/metadata endpoint
247///
248/// Returns an empty map of metadata, conforming to Prometheus API.
249async fn api_v1_metadata_handler() -> (StatusCode, Json<serde_json::Value>) {
250    let response_body = serde_json::json!({
251        "status": "success",
252        "data": {}
253    });
254    (StatusCode::OK, Json(response_body))
255}
256
257/// Handler for /api/v1/series endpoint
258///
259/// Returns an empty list of series, conforming to Prometheus API.
260async fn api_v1_series_handler() -> (StatusCode, Json<serde_json::Value>) {
261    let response_body = serde_json::json!({
262        "status": "success",
263        "data": []
264    });
265    (StatusCode::OK, Json(response_body))
266}
267
268/// Handler for /api/v1/query_range endpoint
269///
270/// Returns an empty matrix result, conforming to Prometheus API.
271async fn api_v1_query_range_handler() -> (StatusCode, Json<serde_json::Value>) {
272    let response_body = serde_json::json!({
273        "status": "success",
274        "data": {
275            "resultType": "matrix",
276            "result": []
277        }
278    });
279    (StatusCode::OK, Json(response_body))
280}