Skip to main content

camel_prometheus/
server.rs

1//! HTTP server for exposing Prometheus metrics
2//!
3//! This module provides an HTTP server that exposes metrics via the `/metrics` endpoint
4//! in Prometheus text format.
5
6use std::net::SocketAddr;
7use std::sync::Arc;
8
9use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
10use tokio::net::TcpListener;
11use tracing::info;
12
13use camel_api::{HealthReport, HealthStatus};
14
15use crate::PrometheusMetrics;
16
17/// Type alias for the health checker function
18type HealthChecker = Arc<dyn Fn() -> HealthReport + Send + Sync>;
19
20/// Type alias for the combined state
21type ServerState = (Arc<PrometheusMetrics>, Option<HealthChecker>);
22
23/// HTTP server for exposing Prometheus metrics
24pub struct MetricsServer;
25
26impl MetricsServer {
27    /// Starts the metrics server on the given address
28    ///
29    /// This function runs indefinitely until the server is stopped.
30    ///
31    /// # Arguments
32    /// * `addr` - The socket address to bind to (e.g., "0.0.0.0:9090")
33    /// * `metrics` - The PrometheusMetrics instance to expose
34    ///
35    /// # Panics
36    /// Panics if unable to bind to the specified address or if the server fails to start.
37    pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) {
38        Self::run_with_health_checker(addr, metrics, HealthReport::default).await;
39    }
40
41    /// Starts the metrics server with a health checker function
42    ///
43    /// This function runs indefinitely until the server is stopped.
44    ///
45    /// # Arguments
46    /// * `addr` - The socket address to bind to (e.g., "0.0.0.0:9090")
47    /// * `metrics` - The PrometheusMetrics instance to expose
48    /// * `health_checker` - Function that returns a HealthReport when called
49    ///
50    /// # Panics
51    /// Panics if unable to bind to the specified address or if the server fails to start.
52    pub async fn run_with_health_checker<F>(
53        addr: SocketAddr,
54        metrics: Arc<PrometheusMetrics>,
55        health_checker: F,
56    ) where
57        F: Fn() -> HealthReport + Send + Sync + 'static,
58    {
59        let health_checker = Arc::new(health_checker);
60
61        let app = Router::new()
62            .route("/metrics", get(Self::metrics_handler))
63            .route("/healthz", get(Self::healthz))
64            .route("/readyz", get(Self::readyz))
65            .route("/health", get(Self::health))
66            .with_state((metrics, Some(health_checker)));
67
68        let listener = tokio::net::TcpListener::bind(addr)
69            .await
70            .unwrap_or_else(|e| panic!("Failed to bind to {}: {}", addr, e));
71
72        info!("Prometheus metrics server listening on {}", addr);
73
74        axum::serve(listener, app)
75            .await
76            .expect("Failed to start metrics server");
77    }
78
79    /// Starts the metrics server with an existing listener
80    ///
81    /// This function runs indefinitely until the server is stopped.
82    /// Use this when you need to bind the listener before spawning the server task.
83    ///
84    /// # Arguments
85    /// * `listener` - A bound TCP listener
86    /// * `metrics` - The PrometheusMetrics instance to expose
87    ///
88    /// # Panics
89    /// Panics if the server fails to start.
90    pub async fn run_with_listener(listener: TcpListener, metrics: Arc<PrometheusMetrics>) {
91        let app = Router::new()
92            .route("/metrics", get(Self::metrics_handler))
93            .route("/healthz", get(Self::healthz))
94            .route("/readyz", get(Self::readyz))
95            .route("/health", get(Self::health))
96            .with_state((metrics, None));
97
98        info!(
99            "Prometheus metrics server listening on {:?}",
100            listener.local_addr()
101        );
102
103        axum::serve(listener, app)
104            .await
105            .expect("Failed to start metrics server");
106    }
107
108    /// Starts the metrics server with an existing listener and health checker
109    ///
110    /// This function runs indefinitely until the server is stopped.
111    /// Use this when you need to bind the listener before spawning the server task.
112    ///
113    /// # Arguments
114    /// * `listener` - A bound TCP listener
115    /// * `metrics` - The PrometheusMetrics instance to expose
116    /// * `health_checker` - Function that returns a HealthReport when called
117    ///
118    /// # Panics
119    /// Panics if the server fails to start.
120    pub async fn run_with_listener_and_health_checker(
121        listener: TcpListener,
122        metrics: Arc<PrometheusMetrics>,
123        health_checker: HealthChecker,
124    ) {
125        let app = Router::new()
126            .route("/metrics", get(Self::metrics_handler))
127            .route("/healthz", get(Self::healthz))
128            .route("/readyz", get(Self::readyz))
129            .route("/health", get(Self::health))
130            .with_state((metrics, Some(health_checker)));
131
132        info!(
133            "Prometheus metrics server listening on {:?}",
134            listener.local_addr()
135        );
136
137        axum::serve(listener, app)
138            .await
139            .expect("Failed to start metrics server");
140    }
141
142    /// Handler for GET /healthz - Kubernetes liveness probe
143    ///
144    /// Returns 200 OK if the service is running.
145    async fn healthz() -> StatusCode {
146        StatusCode::OK
147    }
148
149    /// Handler for GET /readyz - Kubernetes readiness probe
150    ///
151    /// Returns 200 OK if ready to serve traffic, 503 otherwise.
152    async fn readyz(State((_, checker)): State<ServerState>) -> impl IntoResponse {
153        match checker {
154            Some(checker) => {
155                let report = checker();
156                let status = if report.status == HealthStatus::Healthy {
157                    StatusCode::OK
158                } else {
159                    StatusCode::SERVICE_UNAVAILABLE
160                };
161                (status, Json(report))
162            }
163            None => (StatusCode::OK, Json(HealthReport::default())),
164        }
165    }
166
167    /// Handler for GET /health - Detailed health report
168    ///
169    /// Returns JSON with detailed service health status.
170    async fn health(State((_, checker)): State<ServerState>) -> impl IntoResponse {
171        match checker {
172            Some(checker) => {
173                let report = checker();
174                (StatusCode::OK, Json(report))
175            }
176            None => (StatusCode::OK, Json(HealthReport::default())),
177        }
178    }
179
180    /// Handler for GET /metrics - Prometheus metrics endpoint
181    async fn metrics_handler(State((metrics, _)): State<ServerState>) -> impl IntoResponse {
182        let output = metrics.gather();
183        (
184            StatusCode::OK,
185            [("content-type", "text/plain; version=0.0.4")],
186            output,
187        )
188    }
189}
190
191/// Response type for metrics endpoint (used for testing)
192pub struct MetricsResponse {
193    pub status: u16,
194    pub content_type: String,
195    pub body: String,
196}
197
198/// Handles GET /metrics requests
199///
200/// Returns metrics in Prometheus text format with appropriate content-type header.
201/// This is a testable version of the handler that returns a struct.
202pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
203    let output = metrics.gather();
204    MetricsResponse {
205        status: 200,
206        content_type: "text/plain; version=0.0.4".to_string(),
207        body: output,
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use axum::body::Body;
215    use axum::http::Request;
216    use camel_api::metrics::MetricsCollector;
217    use camel_api::{ServiceHealth, ServiceStatus};
218    use tower::ServiceExt;
219
220    #[tokio::test]
221    async fn test_metrics_handler_returns_prometheus_format() {
222        let metrics = Arc::new(PrometheusMetrics::new());
223        metrics.increment_exchanges("test-route");
224
225        let response = metrics_handler(metrics).await;
226
227        assert_eq!(response.status, 200);
228        assert_eq!(response.content_type, "text/plain; version=0.0.4");
229        assert!(response.body.contains("camel_exchanges_total"));
230        assert!(response.body.contains("test-route"));
231    }
232
233    #[tokio::test]
234    async fn test_metrics_handler_content_type() {
235        let metrics = Arc::new(PrometheusMetrics::new());
236
237        let response = metrics_handler(metrics).await;
238
239        // Prometheus requires this specific content-type
240        assert_eq!(response.content_type, "text/plain; version=0.0.4");
241    }
242
243    #[tokio::test]
244    async fn test_healthz_returns_200_ok() {
245        let health_checker = Arc::new(HealthReport::default) as HealthChecker;
246        let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
247        let app = Router::new()
248            .route("/healthz", get(MetricsServer::healthz))
249            .with_state(state);
250
251        let response = app
252            .oneshot(
253                Request::builder()
254                    .uri("/healthz")
255                    .body(Body::empty())
256                    .unwrap(),
257            )
258            .await
259            .unwrap();
260
261        assert_eq!(response.status(), StatusCode::OK);
262    }
263
264    #[tokio::test]
265    async fn test_readyz_returns_200_when_healthy() {
266        let health_checker = Arc::new(|| HealthReport {
267            status: HealthStatus::Healthy,
268            ..Default::default()
269        }) as HealthChecker;
270        let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
271        let app = Router::new()
272            .route("/readyz", get(MetricsServer::readyz))
273            .with_state(state);
274
275        let response = app
276            .oneshot(
277                Request::builder()
278                    .uri("/readyz")
279                    .body(Body::empty())
280                    .unwrap(),
281            )
282            .await
283            .unwrap();
284
285        assert_eq!(response.status(), StatusCode::OK);
286    }
287
288    #[tokio::test]
289    async fn test_readyz_returns_503_when_unhealthy() {
290        let health_checker = Arc::new(|| HealthReport {
291            status: HealthStatus::Unhealthy,
292            ..Default::default()
293        }) as HealthChecker;
294        let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
295        let app = Router::new()
296            .route("/readyz", get(MetricsServer::readyz))
297            .with_state(state);
298
299        let response = app
300            .oneshot(
301                Request::builder()
302                    .uri("/readyz")
303                    .body(Body::empty())
304                    .unwrap(),
305            )
306            .await
307            .unwrap();
308
309        assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
310    }
311
312    #[tokio::test]
313    async fn test_health_returns_json_health_report() {
314        let health_checker = Arc::new(|| HealthReport {
315            status: HealthStatus::Healthy,
316            services: vec![ServiceHealth {
317                name: "test-service".to_string(),
318                status: ServiceStatus::Started,
319            }],
320            ..Default::default()
321        }) as HealthChecker;
322        let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
323        let app = Router::new()
324            .route("/health", get(MetricsServer::health))
325            .with_state(state);
326
327        let response = app
328            .oneshot(
329                Request::builder()
330                    .uri("/health")
331                    .body(Body::empty())
332                    .unwrap(),
333            )
334            .await
335            .unwrap();
336
337        assert_eq!(response.status(), StatusCode::OK);
338
339        let body = axum::body::to_bytes(response.into_body(), usize::MAX)
340            .await
341            .unwrap();
342        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
343
344        assert_eq!(json["status"], "Healthy");
345        assert!(json["services"].is_array());
346    }
347}