Skip to main content

camel_prometheus/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
5use tokio::net::TcpListener;
6use tracing::info;
7
8use camel_api::HealthChecker;
9
10use crate::PrometheusMetrics;
11
12pub struct MetricsServer;
13
14impl MetricsServer {
15    pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) {
16        Self::run_with_health_checker(addr, metrics, None).await;
17    }
18
19    pub async fn run_with_health_checker(
20        addr: SocketAddr,
21        metrics: Arc<PrometheusMetrics>,
22        checker: Option<HealthChecker>,
23    ) {
24        let health = camel_health::health_router(checker, None, None);
25        let app = Router::new()
26            .route("/metrics", get(Self::metrics_handler))
27            .with_state(metrics)
28            .merge(health);
29
30        let listener = tokio::net::TcpListener::bind(addr)
31            .await
32            .unwrap_or_else(|e| panic!("Failed to bind to {}: {}", addr, e));
33
34        info!("Prometheus metrics server listening on {}", addr);
35        axum::serve(listener, app).await.unwrap();
36    }
37
38    pub async fn run_with_listener(listener: TcpListener, metrics: Arc<PrometheusMetrics>) {
39        Self::run_with_listener_and_health_checker(listener, metrics, None).await;
40    }
41
42    pub async fn run_with_listener_and_health_checker(
43        listener: TcpListener,
44        metrics: Arc<PrometheusMetrics>,
45        checker: Option<HealthChecker>,
46    ) {
47        let health = camel_health::health_router(checker, None, None);
48        let app = Router::new()
49            .route("/metrics", get(Self::metrics_handler))
50            .with_state(metrics)
51            .merge(health);
52
53        info!(
54            "Prometheus metrics server listening on {}",
55            listener.local_addr().unwrap()
56        );
57        axum::serve(listener, app).await.unwrap();
58    }
59
60    async fn metrics_handler(State(metrics): State<Arc<PrometheusMetrics>>) -> impl IntoResponse {
61        let output = metrics.gather();
62        (
63            StatusCode::OK,
64            [("content-type", "text/plain; version=0.0.4")],
65            output,
66        )
67    }
68}
69
70pub struct MetricsResponse {
71    pub status: u16,
72    pub content_type: String,
73    pub body: String,
74}
75
76pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
77    let output = metrics.gather();
78    MetricsResponse {
79        status: 200,
80        content_type: "text/plain; version=0.0.4".to_string(),
81        body: output,
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use camel_api::metrics::MetricsCollector;
89    use camel_api::{HealthReport, HealthStatus};
90    use tokio::time::{Duration, sleep};
91
92    #[tokio::test]
93    async fn test_metrics_handler_returns_prometheus_format() {
94        let metrics = Arc::new(PrometheusMetrics::new());
95        metrics.increment_exchanges("test-route");
96
97        let response = metrics_handler(metrics).await;
98
99        assert_eq!(response.status, 200);
100        assert_eq!(response.content_type, "text/plain; version=0.0.4");
101        assert!(response.body.contains("camel_exchanges_total"));
102        assert!(response.body.contains("test-route"));
103    }
104
105    #[tokio::test]
106    async fn test_metrics_handler_content_type() {
107        let metrics = Arc::new(PrometheusMetrics::new());
108
109        let response = metrics_handler(metrics).await;
110
111        assert_eq!(response.content_type, "text/plain; version=0.0.4");
112    }
113
114    #[tokio::test]
115    async fn test_run_with_listener_serves_metrics_and_health() {
116        let metrics = Arc::new(PrometheusMetrics::new());
117        metrics.increment_exchanges("route-http");
118
119        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
120        let addr = listener.local_addr().unwrap();
121        let checker = Arc::new(|| HealthReport {
122            status: HealthStatus::Healthy,
123            services: vec![],
124            ..Default::default()
125        });
126
127        let handle = tokio::spawn(MetricsServer::run_with_listener_and_health_checker(
128            listener,
129            Arc::clone(&metrics),
130            Some(checker),
131        ));
132
133        sleep(Duration::from_millis(50)).await;
134
135        let client = reqwest::Client::new();
136        let metrics_resp = client
137            .get(format!("http://{}/metrics", addr))
138            .send()
139            .await
140            .unwrap();
141        assert_eq!(metrics_resp.status().as_u16(), 200);
142        assert_eq!(
143            metrics_resp
144                .headers()
145                .get(reqwest::header::CONTENT_TYPE)
146                .unwrap()
147                .to_str()
148                .unwrap(),
149            "text/plain; version=0.0.4"
150        );
151        let body = metrics_resp.text().await.unwrap();
152        assert!(body.contains("camel_exchanges_total"));
153        assert!(body.contains("route-http"));
154
155        let health_resp = client
156            .get(format!("http://{}/health", addr))
157            .send()
158            .await
159            .unwrap();
160        assert_eq!(health_resp.status().as_u16(), 200);
161
162        let not_found = client
163            .get(format!("http://{}/does-not-exist", addr))
164            .send()
165            .await
166            .unwrap();
167        assert_eq!(not_found.status().as_u16(), 404);
168
169        handle.abort();
170    }
171}