elara_runtime/observability/metrics_server.rs
1//! HTTP server for exposing Prometheus metrics.
2//!
3//! This module provides an HTTP server that exposes metrics in Prometheus text format
4//! via a `/metrics` endpoint. The server is built on axum for production-grade async
5//! performance and reliability.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use elara_runtime::observability::metrics::MetricsRegistry;
11//! use elara_runtime::observability::metrics_server::{MetricsServer, MetricsServerConfig};
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//! let registry = MetricsRegistry::new();
16//!
17//! let config = MetricsServerConfig {
18//! bind_address: "0.0.0.0".to_string(),
19//! port: 9090,
20//! };
21//!
22//! let mut server = MetricsServer::new(config, registry);
23//! server.start().await?;
24//!
25//! Ok(())
26//! }
27//! ```
28
29use super::metrics::MetricsRegistry;
30use axum::{
31 extract::State,
32 http::StatusCode,
33 response::{IntoResponse, Response},
34 routing::get,
35 Router,
36};
37use std::net::SocketAddr;
38use std::sync::Arc;
39use tokio::task::JoinHandle;
40
41/// Configuration for the metrics HTTP server.
42#[derive(Debug, Clone)]
43pub struct MetricsServerConfig {
44 /// IP address to bind to (e.g., "0.0.0.0" for all interfaces, "127.0.0.1" for localhost only)
45 pub bind_address: String,
46
47 /// Port to listen on (typically 9090 for Prometheus)
48 pub port: u16,
49}
50
51impl Default for MetricsServerConfig {
52 fn default() -> Self {
53 Self {
54 bind_address: "0.0.0.0".to_string(),
55 port: 9090,
56 }
57 }
58}
59
60/// Errors that can occur during metrics server operations.
61#[derive(Debug, thiserror::Error)]
62pub enum MetricsServerError {
63 /// Failed to bind to the specified address/port
64 #[error("Failed to bind to {0}: {1}")]
65 BindError(String, std::io::Error),
66
67 /// Server encountered a runtime error
68 #[error("Server error: {0}")]
69 ServerError(String),
70}
71
72/// HTTP server for exposing Prometheus metrics.
73///
74/// The server provides a `/metrics` endpoint that returns all registered metrics
75/// in Prometheus text exposition format. The server runs asynchronously and can
76/// be gracefully shut down.
77pub struct MetricsServer {
78 config: MetricsServerConfig,
79 registry: Arc<MetricsRegistry>,
80 handle: Option<JoinHandle<()>>,
81}
82
83impl MetricsServer {
84 /// Creates a new metrics server with the given configuration and registry.
85 ///
86 /// # Arguments
87 ///
88 /// * `config` - Server configuration (bind address and port)
89 /// * `registry` - Metrics registry to export
90 ///
91 /// # Example
92 ///
93 /// ```rust
94 /// use elara_runtime::observability::metrics::MetricsRegistry;
95 /// use elara_runtime::observability::metrics_server::{MetricsServer, MetricsServerConfig};
96 ///
97 /// let registry = MetricsRegistry::new();
98 /// let config = MetricsServerConfig::default();
99 /// let server = MetricsServer::new(config, registry);
100 /// ```
101 pub fn new(config: MetricsServerConfig, registry: MetricsRegistry) -> Self {
102 Self {
103 config,
104 registry: Arc::new(registry),
105 handle: None,
106 }
107 }
108
109 /// Starts the metrics server.
110 ///
111 /// This method spawns the HTTP server on a background task and returns immediately.
112 /// The server will continue running until `shutdown()` is called or the process exits.
113 ///
114 /// # Errors
115 ///
116 /// Returns `MetricsServerError::BindError` if the server cannot bind to the
117 /// specified address/port (e.g., port already in use).
118 ///
119 /// # Example
120 ///
121 /// ```rust,no_run
122 /// # use elara_runtime::observability::metrics::MetricsRegistry;
123 /// # use elara_runtime::observability::metrics_server::{MetricsServer, MetricsServerConfig};
124 /// # #[tokio::main]
125 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
126 /// let registry = MetricsRegistry::new();
127 /// let config = MetricsServerConfig::default();
128 /// let mut server = MetricsServer::new(config, registry);
129 ///
130 /// server.start().await?;
131 /// # Ok(())
132 /// # }
133 /// ```
134 pub async fn start(&mut self) -> Result<(), MetricsServerError> {
135 let addr = format!("{}:{}", self.config.bind_address, self.config.port);
136 let socket_addr: SocketAddr = addr
137 .parse()
138 .map_err(|e| MetricsServerError::BindError(addr.clone(),
139 std::io::Error::new(std::io::ErrorKind::InvalidInput, e)))?;
140
141 // Create the router with the /metrics endpoint
142 let app = Router::new()
143 .route("/metrics", get(metrics_handler))
144 .with_state(self.registry.clone());
145
146 // Bind to the address
147 let listener = tokio::net::TcpListener::bind(&socket_addr)
148 .await
149 .map_err(|e| MetricsServerError::BindError(addr.clone(), e))?;
150
151 tracing::info!(
152 address = %socket_addr,
153 "Metrics server started"
154 );
155
156 // Spawn the server on a background task
157 let handle = tokio::spawn(async move {
158 if let Err(e) = axum::serve(listener, app).await {
159 tracing::error!(error = %e, "Metrics server error");
160 }
161 });
162
163 self.handle = Some(handle);
164 Ok(())
165 }
166
167 /// Shuts down the metrics server gracefully.
168 ///
169 /// This method aborts the server task and waits for it to complete.
170 /// After shutdown, the server can be restarted by calling `start()` again.
171 ///
172 /// # Example
173 ///
174 /// ```rust,no_run
175 /// # use elara_runtime::observability::metrics::MetricsRegistry;
176 /// # use elara_runtime::observability::metrics_server::{MetricsServer, MetricsServerConfig};
177 /// # #[tokio::main]
178 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
179 /// # let registry = MetricsRegistry::new();
180 /// # let config = MetricsServerConfig::default();
181 /// let mut server = MetricsServer::new(config, registry);
182 /// server.start().await?;
183 ///
184 /// // ... do work ...
185 ///
186 /// server.shutdown().await;
187 /// # Ok(())
188 /// # }
189 /// ```
190 pub async fn shutdown(&mut self) {
191 if let Some(handle) = self.handle.take() {
192 handle.abort();
193 let _ = handle.await;
194 tracing::info!("Metrics server shut down");
195 }
196 }
197
198 /// Returns true if the server is currently running.
199 pub fn is_running(&self) -> bool {
200 self.handle.as_ref().map_or(false, |h| !h.is_finished())
201 }
202
203 /// Returns the configured bind address.
204 pub fn bind_address(&self) -> &str {
205 &self.config.bind_address
206 }
207
208 /// Returns the configured port.
209 pub fn port(&self) -> u16 {
210 self.config.port
211 }
212}
213
214/// Handler for the /metrics endpoint.
215///
216/// This function is called by axum when a request is made to /metrics.
217/// It exports all metrics from the registry in Prometheus text format.
218async fn metrics_handler(State(registry): State<Arc<MetricsRegistry>>) -> Response {
219 let prometheus_text = registry.export_prometheus();
220
221 (
222 StatusCode::OK,
223 [("Content-Type", "text/plain; version=0.0.4; charset=utf-8")],
224 prometheus_text,
225 )
226 .into_response()
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232
233 #[tokio::test]
234 async fn test_metrics_server_creation() {
235 let registry = MetricsRegistry::new();
236 let config = MetricsServerConfig::default();
237 let server = MetricsServer::new(config, registry);
238
239 assert_eq!(server.bind_address(), "0.0.0.0");
240 assert_eq!(server.port(), 9090);
241 assert!(!server.is_running());
242 }
243
244 #[tokio::test]
245 async fn test_metrics_server_start_stop() {
246 let registry = MetricsRegistry::new();
247 let config = MetricsServerConfig {
248 bind_address: "127.0.0.1".to_string(),
249 port: 0, // Use port 0 to let OS assign a free port
250 };
251 let mut server = MetricsServer::new(config, registry);
252
253 // Start server
254 let result = server.start().await;
255 assert!(result.is_ok(), "Failed to start server: {:?}", result);
256 assert!(server.is_running());
257
258 // Shutdown server
259 server.shutdown().await;
260 assert!(!server.is_running());
261 }
262
263 #[tokio::test]
264 async fn test_metrics_endpoint_response() {
265 let registry = MetricsRegistry::new();
266
267 // Register some test metrics
268 let counter = registry.register_counter("test_counter", vec![]);
269 counter.inc_by(42);
270
271 let gauge = registry.register_gauge("test_gauge", vec![]);
272 gauge.set(100);
273
274 // Start server on a random port
275 let config = MetricsServerConfig {
276 bind_address: "127.0.0.1".to_string(),
277 port: 0,
278 };
279 let mut server = MetricsServer::new(config, registry);
280 server.start().await.unwrap();
281
282 // Give server a moment to start
283 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
284
285 // Note: We can't easily test the HTTP endpoint without knowing the assigned port
286 // In a real scenario, we'd need to expose the actual bound port from the server
287
288 server.shutdown().await;
289 }
290
291 #[test]
292 fn test_prometheus_export_format() {
293 let registry = MetricsRegistry::new();
294
295 // Register metrics
296 let counter = registry.register_counter("test_counter", vec![]);
297 counter.inc_by(5);
298
299 let gauge = registry.register_gauge("test_gauge", vec![]);
300 gauge.set(-10);
301
302 let histogram = registry.register_histogram(
303 "test_histogram",
304 vec![1.0, 5.0, 10.0],
305 vec![],
306 );
307 histogram.observe(3.0);
308 histogram.observe(7.0);
309
310 let output = registry.export_prometheus();
311
312 // Debug: print the output to see what we're getting
313 println!("Prometheus output:\n{}", output);
314
315 // Verify counter format
316 assert!(output.contains("# TYPE test_counter counter"));
317 assert!(output.contains("test_counter 5"));
318
319 // Verify gauge format
320 assert!(output.contains("# TYPE test_gauge gauge"));
321 assert!(output.contains("test_gauge -10"));
322
323 // Verify histogram format
324 assert!(output.contains("# TYPE test_histogram histogram"));
325 assert!(output.contains("test_histogram_bucket"));
326 assert!(output.contains("le=\"1.0\""));
327 assert!(output.contains("le=\"+Inf\""));
328 assert!(output.contains("test_histogram_sum"));
329 assert!(output.contains("test_histogram_count 2"));
330 }
331
332 #[test]
333 fn test_prometheus_export_with_labels() {
334 let registry = MetricsRegistry::new();
335
336 let counter = registry.register_counter(
337 "labeled_counter",
338 vec![
339 ("node_id".to_string(), "node-1".to_string()),
340 ("region".to_string(), "us-west".to_string()),
341 ],
342 );
343 counter.inc();
344
345 let output = registry.export_prometheus();
346
347 // Verify labels are formatted correctly
348 assert!(output.contains("labeled_counter{"));
349 assert!(output.contains("node_id=\"node-1\""));
350 assert!(output.contains("region=\"us-west\""));
351 }
352
353 #[test]
354 fn test_label_escaping() {
355 let registry = MetricsRegistry::new();
356
357 let counter = registry.register_counter(
358 "escaped_counter",
359 vec![
360 ("label".to_string(), "value with \"quotes\" and \\backslash".to_string()),
361 ],
362 );
363 counter.inc();
364
365 let output = registry.export_prometheus();
366
367 // Verify special characters are escaped
368 assert!(output.contains("\\\""));
369 assert!(output.contains("\\\\"));
370 }
371}