Skip to main content

elara_runtime/observability/
metrics_server.rs

1//! HTTP server for exposing Prometheus metrics.
2//!
3//! This module provides an HTTP server that exposes metrics in Prometheus text format
4//! via a `/metrics` endpoint. The server is built on axum for production-grade async
5//! performance and reliability.
6//!
7//! # Example
8//!
9//! ```rust,no_run
10//! use elara_runtime::observability::metrics::MetricsRegistry;
11//! use elara_runtime::observability::metrics_server::{MetricsServer, MetricsServerConfig};
12//!
13//! #[tokio::main]
14//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
15//!     let registry = MetricsRegistry::new();
16//!     
17//!     let config = MetricsServerConfig {
18//!         bind_address: "0.0.0.0".to_string(),
19//!         port: 9090,
20//!     };
21//!     
22//!     let mut server = MetricsServer::new(config, registry);
23//!     server.start().await?;
24//!     
25//!     Ok(())
26//! }
27//! ```
28
29use super::metrics::MetricsRegistry;
30use axum::{
31    extract::State,
32    http::StatusCode,
33    response::{IntoResponse, Response},
34    routing::get,
35    Router,
36};
37use std::net::SocketAddr;
38use std::sync::Arc;
39use tokio::task::JoinHandle;
40
41/// Configuration for the metrics HTTP server.
42#[derive(Debug, Clone)]
43pub struct MetricsServerConfig {
44    /// IP address to bind to (e.g., "0.0.0.0" for all interfaces, "127.0.0.1" for localhost only)
45    pub bind_address: String,
46    
47    /// Port to listen on (typically 9090 for Prometheus)
48    pub port: u16,
49}
50
51impl Default for MetricsServerConfig {
52    fn default() -> Self {
53        Self {
54            bind_address: "0.0.0.0".to_string(),
55            port: 9090,
56        }
57    }
58}
59
60/// Errors that can occur during metrics server operations.
61#[derive(Debug, thiserror::Error)]
62pub enum MetricsServerError {
63    /// Failed to bind to the specified address/port
64    #[error("Failed to bind to {0}: {1}")]
65    BindError(String, std::io::Error),
66    
67    /// Server encountered a runtime error
68    #[error("Server error: {0}")]
69    ServerError(String),
70}
71
72/// HTTP server for exposing Prometheus metrics.
73///
74/// The server provides a `/metrics` endpoint that returns all registered metrics
75/// in Prometheus text exposition format. The server runs asynchronously and can
76/// be gracefully shut down.
77pub struct MetricsServer {
78    config: MetricsServerConfig,
79    registry: Arc<MetricsRegistry>,
80    handle: Option<JoinHandle<()>>,
81}
82
83impl MetricsServer {
84    /// Creates a new metrics server with the given configuration and registry.
85    ///
86    /// # Arguments
87    ///
88    /// * `config` - Server configuration (bind address and port)
89    /// * `registry` - Metrics registry to export
90    ///
91    /// # Example
92    ///
93    /// ```rust
94    /// use elara_runtime::observability::metrics::MetricsRegistry;
95    /// use elara_runtime::observability::metrics_server::{MetricsServer, MetricsServerConfig};
96    ///
97    /// let registry = MetricsRegistry::new();
98    /// let config = MetricsServerConfig::default();
99    /// let server = MetricsServer::new(config, registry);
100    /// ```
101    pub fn new(config: MetricsServerConfig, registry: MetricsRegistry) -> Self {
102        Self {
103            config,
104            registry: Arc::new(registry),
105            handle: None,
106        }
107    }
108
109    /// Starts the metrics server.
110    ///
111    /// This method spawns the HTTP server on a background task and returns immediately.
112    /// The server will continue running until `shutdown()` is called or the process exits.
113    ///
114    /// # Errors
115    ///
116    /// Returns `MetricsServerError::BindError` if the server cannot bind to the
117    /// specified address/port (e.g., port already in use).
118    ///
119    /// # Example
120    ///
121    /// ```rust,no_run
122    /// # use elara_runtime::observability::metrics::MetricsRegistry;
123    /// # use elara_runtime::observability::metrics_server::{MetricsServer, MetricsServerConfig};
124    /// # #[tokio::main]
125    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
126    /// let registry = MetricsRegistry::new();
127    /// let config = MetricsServerConfig::default();
128    /// let mut server = MetricsServer::new(config, registry);
129    /// 
130    /// server.start().await?;
131    /// # Ok(())
132    /// # }
133    /// ```
134    pub async fn start(&mut self) -> Result<(), MetricsServerError> {
135        let addr = format!("{}:{}", self.config.bind_address, self.config.port);
136        let socket_addr: SocketAddr = addr
137            .parse()
138            .map_err(|e| MetricsServerError::BindError(addr.clone(), 
139                std::io::Error::new(std::io::ErrorKind::InvalidInput, e)))?;
140
141        // Create the router with the /metrics endpoint
142        let app = Router::new()
143            .route("/metrics", get(metrics_handler))
144            .with_state(self.registry.clone());
145
146        // Bind to the address
147        let listener = tokio::net::TcpListener::bind(&socket_addr)
148            .await
149            .map_err(|e| MetricsServerError::BindError(addr.clone(), e))?;
150
151        tracing::info!(
152            address = %socket_addr,
153            "Metrics server started"
154        );
155
156        // Spawn the server on a background task
157        let handle = tokio::spawn(async move {
158            if let Err(e) = axum::serve(listener, app).await {
159                tracing::error!(error = %e, "Metrics server error");
160            }
161        });
162
163        self.handle = Some(handle);
164        Ok(())
165    }
166
167    /// Shuts down the metrics server gracefully.
168    ///
169    /// This method aborts the server task and waits for it to complete.
170    /// After shutdown, the server can be restarted by calling `start()` again.
171    ///
172    /// # Example
173    ///
174    /// ```rust,no_run
175    /// # use elara_runtime::observability::metrics::MetricsRegistry;
176    /// # use elara_runtime::observability::metrics_server::{MetricsServer, MetricsServerConfig};
177    /// # #[tokio::main]
178    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
179    /// # let registry = MetricsRegistry::new();
180    /// # let config = MetricsServerConfig::default();
181    /// let mut server = MetricsServer::new(config, registry);
182    /// server.start().await?;
183    /// 
184    /// // ... do work ...
185    /// 
186    /// server.shutdown().await;
187    /// # Ok(())
188    /// # }
189    /// ```
190    pub async fn shutdown(&mut self) {
191        if let Some(handle) = self.handle.take() {
192            handle.abort();
193            let _ = handle.await;
194            tracing::info!("Metrics server shut down");
195        }
196    }
197
198    /// Returns true if the server is currently running.
199    pub fn is_running(&self) -> bool {
200        self.handle.as_ref().map_or(false, |h| !h.is_finished())
201    }
202
203    /// Returns the configured bind address.
204    pub fn bind_address(&self) -> &str {
205        &self.config.bind_address
206    }
207
208    /// Returns the configured port.
209    pub fn port(&self) -> u16 {
210        self.config.port
211    }
212}
213
214/// Handler for the /metrics endpoint.
215///
216/// This function is called by axum when a request is made to /metrics.
217/// It exports all metrics from the registry in Prometheus text format.
218async fn metrics_handler(State(registry): State<Arc<MetricsRegistry>>) -> Response {
219    let prometheus_text = registry.export_prometheus();
220    
221    (
222        StatusCode::OK,
223        [("Content-Type", "text/plain; version=0.0.4; charset=utf-8")],
224        prometheus_text,
225    )
226        .into_response()
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232
233    #[tokio::test]
234    async fn test_metrics_server_creation() {
235        let registry = MetricsRegistry::new();
236        let config = MetricsServerConfig::default();
237        let server = MetricsServer::new(config, registry);
238        
239        assert_eq!(server.bind_address(), "0.0.0.0");
240        assert_eq!(server.port(), 9090);
241        assert!(!server.is_running());
242    }
243
244    #[tokio::test]
245    async fn test_metrics_server_start_stop() {
246        let registry = MetricsRegistry::new();
247        let config = MetricsServerConfig {
248            bind_address: "127.0.0.1".to_string(),
249            port: 0, // Use port 0 to let OS assign a free port
250        };
251        let mut server = MetricsServer::new(config, registry);
252        
253        // Start server
254        let result = server.start().await;
255        assert!(result.is_ok(), "Failed to start server: {:?}", result);
256        assert!(server.is_running());
257        
258        // Shutdown server
259        server.shutdown().await;
260        assert!(!server.is_running());
261    }
262
263    #[tokio::test]
264    async fn test_metrics_endpoint_response() {
265        let registry = MetricsRegistry::new();
266        
267        // Register some test metrics
268        let counter = registry.register_counter("test_counter", vec![]);
269        counter.inc_by(42);
270        
271        let gauge = registry.register_gauge("test_gauge", vec![]);
272        gauge.set(100);
273        
274        // Start server on a random port
275        let config = MetricsServerConfig {
276            bind_address: "127.0.0.1".to_string(),
277            port: 0,
278        };
279        let mut server = MetricsServer::new(config, registry);
280        server.start().await.unwrap();
281        
282        // Give server a moment to start
283        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
284        
285        // Note: We can't easily test the HTTP endpoint without knowing the assigned port
286        // In a real scenario, we'd need to expose the actual bound port from the server
287        
288        server.shutdown().await;
289    }
290
291    #[test]
292    fn test_prometheus_export_format() {
293        let registry = MetricsRegistry::new();
294        
295        // Register metrics
296        let counter = registry.register_counter("test_counter", vec![]);
297        counter.inc_by(5);
298        
299        let gauge = registry.register_gauge("test_gauge", vec![]);
300        gauge.set(-10);
301        
302        let histogram = registry.register_histogram(
303            "test_histogram",
304            vec![1.0, 5.0, 10.0],
305            vec![],
306        );
307        histogram.observe(3.0);
308        histogram.observe(7.0);
309        
310        let output = registry.export_prometheus();
311        
312        // Debug: print the output to see what we're getting
313        println!("Prometheus output:\n{}", output);
314        
315        // Verify counter format
316        assert!(output.contains("# TYPE test_counter counter"));
317        assert!(output.contains("test_counter 5"));
318        
319        // Verify gauge format
320        assert!(output.contains("# TYPE test_gauge gauge"));
321        assert!(output.contains("test_gauge -10"));
322        
323        // Verify histogram format
324        assert!(output.contains("# TYPE test_histogram histogram"));
325        assert!(output.contains("test_histogram_bucket"));
326        assert!(output.contains("le=\"1.0\""));
327        assert!(output.contains("le=\"+Inf\""));
328        assert!(output.contains("test_histogram_sum"));
329        assert!(output.contains("test_histogram_count 2"));
330    }
331
332    #[test]
333    fn test_prometheus_export_with_labels() {
334        let registry = MetricsRegistry::new();
335        
336        let counter = registry.register_counter(
337            "labeled_counter",
338            vec![
339                ("node_id".to_string(), "node-1".to_string()),
340                ("region".to_string(), "us-west".to_string()),
341            ],
342        );
343        counter.inc();
344        
345        let output = registry.export_prometheus();
346        
347        // Verify labels are formatted correctly
348        assert!(output.contains("labeled_counter{"));
349        assert!(output.contains("node_id=\"node-1\""));
350        assert!(output.contains("region=\"us-west\""));
351    }
352
353    #[test]
354    fn test_label_escaping() {
355        let registry = MetricsRegistry::new();
356        
357        let counter = registry.register_counter(
358            "escaped_counter",
359            vec![
360                ("label".to_string(), "value with \"quotes\" and \\backslash".to_string()),
361            ],
362        );
363        counter.inc();
364        
365        let output = registry.export_prometheus();
366        
367        // Verify special characters are escaped
368        assert!(output.contains("\\\""));
369        assert!(output.contains("\\\\"));
370    }
371}