blueprint_qos/metrics/prometheus/
server.rs1#![allow(clippy::doc_markdown)]
2
3use axum::{
4 Json, Router,
5 extract::State,
6 http::StatusCode,
7 response::{IntoResponse, Response},
8 routing::get,
9};
10use blueprint_core::{error, info};
11use blueprint_std::fmt::Write;
12use blueprint_std::net::SocketAddr;
13use blueprint_std::sync::Arc;
14use prometheus::{Registry, TextEncoder};
15use tokio::sync::oneshot;
16
17use crate::error::{Error, Result};
18use crate::metrics::provider::EnhancedMetricsProvider;
19
20#[derive(Clone)]
22struct ServerState {
23 registry: Arc<Registry>,
24 enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
25}
26
27pub struct PrometheusServer {
29 registry: Arc<Registry>,
30 enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
31 bind_address: String,
32 shutdown_tx: Option<oneshot::Sender<()>>,
33}
34
35impl PrometheusServer {
36 #[must_use]
38 pub fn new(
39 registry: Arc<Registry>,
40 enhanced_metrics_provider: Arc<EnhancedMetricsProvider>,
41 bind_address: String,
42 ) -> Self {
43 Self {
44 registry,
45 enhanced_metrics_provider,
46 bind_address,
47 shutdown_tx: None,
48 }
49 }
50
51 #[allow(clippy::unused_async)]
59 pub async fn start(&mut self) -> Result<()> {
60 let addr: SocketAddr = self
61 .bind_address
62 .parse()
63 .map_err(|e| Error::Other(format!("Failed to parse bind address: {}", e)))?;
64
65 let state = ServerState {
66 registry: self.registry.clone(),
67 enhanced_metrics_provider: self.enhanced_metrics_provider.clone(),
68 };
69
70 let app = Router::new()
71 .route("/metrics", get(metrics_handler))
72 .route("/health", get(health_handler))
73 .route("/api/v1/query", get(api_v1_query_handler))
74 .route("/api/v1/labels", get(api_v1_labels_handler))
75 .route("/api/v1/metadata", get(api_v1_metadata_handler))
76 .route("/api/v1/series", get(api_v1_series_handler))
77 .route("/api/v1/query_range", get(api_v1_query_range_handler))
78 .with_state(state);
79
80 let (tx, rx) = oneshot::channel();
81 self.shutdown_tx = Some(tx);
82
83 info!("Starting Prometheus metrics server on {}", addr);
84
85 let listener = match tokio::net::TcpListener::bind(addr).await {
86 Ok(l) => l,
87 Err(e) => {
88 error!(
89 "Failed to bind Prometheus metrics server to {}: {}",
90 addr, e
91 );
92 return Err(Error::Other(format!(
93 "Failed to bind Prometheus server to {}: {}",
94 addr, e
95 )));
96 }
97 };
98
99 tokio::spawn(async move {
100 axum::serve(listener, app)
101 .with_graceful_shutdown(async {
102 rx.await.ok();
103 })
104 .await
105 .unwrap_or_else(|e| {
106 error!("Prometheus metrics server execution error: {}", e);
107 });
108 });
109
110 Ok(())
111 }
112
113 pub fn stop(&mut self) {
115 if let Some(tx) = self.shutdown_tx.take() {
116 let _ = tx.send(());
117 info!("Stopped Prometheus metrics server");
118 }
119 }
120}
121
122impl Drop for PrometheusServer {
123 fn drop(&mut self) {
124 self.stop();
125 }
126}
127
128async fn metrics_handler(State(state): State<ServerState>) -> Response {
132 match state.enhanced_metrics_provider.force_flush_otel_metrics() {
134 Ok(()) => info!(
135 "PrometheusServer: OpenTelemetry metrics force_flush successful via EnhancedMetricsProvider."
136 ),
137 Err(err) => error!(
138 "PrometheusServer: OpenTelemetry metrics force_flush failed via EnhancedMetricsProvider: {:?}",
139 err
140 ),
141 }
142
143 info!(
144 "Metrics handler invoked (post OTel flush via provider). Gathering metrics from registry..."
145 );
146 let encoder = TextEncoder::new();
147 let metric_families = state.registry.gather();
148
149 info!(
150 "Gathered {} metric families. Dumping details:",
151 metric_families.len()
152 );
153 for mf in &metric_families {
154 info!(" Metric Family: {}", mf.name());
155 info!(" Type: {:?}", mf.get_field_type());
156 info!(" Help: {}", mf.help());
157 info!(" Metrics ({}):", mf.get_metric().len());
158 for m in mf.get_metric() {
159 if let Some(counter) = m.counter.as_ref() {
160 info!(" Counter Value: {}", counter.value.unwrap_or(0.0));
161 }
162 if let Some(gauge) = m.gauge.as_ref() {
163 info!(" Gauge Value: {}", gauge.value.unwrap_or(0.0));
164 }
165 if m.histogram.is_some() {
166 let hist = m.get_histogram();
167 info!(
168 " Histogram: Sum={}, Count={}",
169 hist.get_sample_sum(),
170 hist.get_sample_count()
171 );
172 }
173 if m.summary.is_some() {
174 let sum = m.get_summary();
175 info!(
176 " Summary: Sum={}, Count={}",
177 sum.sample_sum(),
178 sum.sample_count()
179 );
180 }
181 let mut labels_str = String::new();
182 for label_pair in m.get_label() {
183 write!(
184 labels_str,
185 "{}='{}' ",
186 label_pair.name(),
187 label_pair.value()
188 )
189 .unwrap();
190 }
191 if !labels_str.is_empty() {
192 info!(" Labels: {}", labels_str.trim());
193 }
194 }
195 }
196 info!("Finished dumping metric_families details.");
197
198 match encoder.encode_to_string(&metric_families) {
199 Ok(metrics) => (StatusCode::OK, metrics).into_response(),
200 Err(e) => {
201 error!("Failed to encode metrics: {}", e);
202 (
203 StatusCode::INTERNAL_SERVER_ERROR,
204 "Failed to encode metrics",
205 )
206 .into_response()
207 }
208 }
209}
210
211async fn health_handler() -> Response {
215 (StatusCode::OK, "OK").into_response()
216}
217
218async fn api_v1_query_handler() -> (StatusCode, Json<serde_json::Value>) {
222 let response_body = serde_json::json!({
223 "status": "success",
224 "data": {
225 "resultType": "scalar",
226 "result": [
227 0,
228 "1"
229 ]
230 }
231 });
232 (StatusCode::OK, Json(response_body))
233}
234
235async fn api_v1_labels_handler() -> (StatusCode, Json<serde_json::Value>) {
239 let response_body = serde_json::json!({
240 "status": "success",
241 "data": []
242 });
243 (StatusCode::OK, Json(response_body))
244}
245
246async fn api_v1_metadata_handler() -> (StatusCode, Json<serde_json::Value>) {
250 let response_body = serde_json::json!({
251 "status": "success",
252 "data": {}
253 });
254 (StatusCode::OK, Json(response_body))
255}
256
257async fn api_v1_series_handler() -> (StatusCode, Json<serde_json::Value>) {
261 let response_body = serde_json::json!({
262 "status": "success",
263 "data": []
264 });
265 (StatusCode::OK, Json(response_body))
266}
267
268async fn api_v1_query_range_handler() -> (StatusCode, Json<serde_json::Value>) {
272 let response_body = serde_json::json!({
273 "status": "success",
274 "data": {
275 "resultType": "matrix",
276 "result": []
277 }
278 });
279 (StatusCode::OK, Json(response_body))
280}