Skip to main content

camel_prometheus/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
5use tokio::net::TcpListener;
6use tokio::sync::oneshot;
7use tracing::info;
8
9use async_trait::async_trait;
10use camel_api::{CamelError, HealthSource, HealthStatus};
11
12use crate::PrometheusMetrics;
13
14pub struct MetricsServer;
15
16impl MetricsServer {
17    pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) -> Result<(), CamelError> {
18        Self::run_with_health_source(addr, metrics, None).await
19    }
20
21    pub async fn run_with_health_source(
22        addr: SocketAddr,
23        metrics: Arc<PrometheusMetrics>,
24        source: Option<Arc<dyn HealthSource>>,
25    ) -> Result<(), CamelError> {
26        let source =
27            source.unwrap_or_else(|| Arc::new(DefaultHealthSource) as Arc<dyn HealthSource>);
28        let health = camel_health::health_router(source);
29        let app = Router::new()
30            .route("/metrics", get(Self::metrics_handler))
31            .with_state(metrics)
32            .merge(health);
33
34        let listener = tokio::net::TcpListener::bind(addr)
35            .await
36            .map_err(|e| CamelError::Io(format!("Failed to bind to {addr}: {e}")))?;
37
38        info!("Prometheus metrics server listening on {}", addr);
39        axum::serve(listener, app)
40            .await
41            .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
42    }
43
44    pub async fn run_with_listener(
45        listener: TcpListener,
46        metrics: Arc<PrometheusMetrics>,
47    ) -> Result<(), CamelError> {
48        Self::run_with_listener_and_health_source(listener, metrics, None).await
49    }
50
51    pub async fn run_with_listener_and_health_source(
52        listener: TcpListener,
53        metrics: Arc<PrometheusMetrics>,
54        source: Option<Arc<dyn HealthSource>>,
55    ) -> Result<(), CamelError> {
56        let source =
57            source.unwrap_or_else(|| Arc::new(DefaultHealthSource) as Arc<dyn HealthSource>);
58        let health = camel_health::health_router(source);
59        let app = Router::new()
60            .route("/metrics", get(Self::metrics_handler))
61            .with_state(metrics)
62            .merge(health);
63
64        info!(
65            "Prometheus metrics server listening on {}",
66            listener.local_addr().unwrap() // allow-unwrap
67        );
68        axum::serve(listener, app)
69            .await
70            .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
71    }
72
73    pub async fn run_with_listener_and_health_source_with_shutdown(
74        listener: TcpListener,
75        metrics: Arc<PrometheusMetrics>,
76        source: Option<Arc<dyn HealthSource>>,
77        shutdown: oneshot::Receiver<()>,
78    ) -> Result<(), CamelError> {
79        let source =
80            source.unwrap_or_else(|| Arc::new(DefaultHealthSource) as Arc<dyn HealthSource>);
81        let health = camel_health::health_router(source);
82        let app = Router::new()
83            .route("/metrics", get(Self::metrics_handler))
84            .with_state(metrics)
85            .merge(health);
86
87        info!(
88            "Prometheus metrics server listening on {}",
89            listener.local_addr().unwrap() // allow-unwrap
90        );
91
92        #[cfg(test)]
93        let shutdown_signal_count = test_graceful_shutdown_signal_count();
94
95        axum::serve(listener, app)
96            .with_graceful_shutdown(async move {
97                let _ = shutdown.await;
98                #[cfg(test)]
99                shutdown_signal_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
100            })
101            .await
102            .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
103    }
104
105    async fn metrics_handler(State(metrics): State<Arc<PrometheusMetrics>>) -> impl IntoResponse {
106        let output = metrics.gather();
107        (
108            StatusCode::OK,
109            [("content-type", "text/plain; version=0.0.4")],
110            output,
111        )
112    }
113}
114
115struct DefaultHealthSource;
116
117#[async_trait]
118impl HealthSource for DefaultHealthSource {
119    async fn liveness(&self) -> HealthStatus {
120        HealthStatus::Healthy
121    }
122
123    async fn readiness(&self) -> HealthStatus {
124        HealthStatus::Healthy
125    }
126
127    async fn startup(&self) -> HealthStatus {
128        HealthStatus::Healthy
129    }
130}
131
132pub struct MetricsResponse {
133    pub status: u16,
134    pub content_type: String,
135    pub body: String,
136}
137
138pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
139    let output = metrics.gather();
140    MetricsResponse {
141        status: 200,
142        content_type: "text/plain; version=0.0.4".to_string(),
143        body: output,
144    }
145}
146
147#[cfg(test)]
148static GRACEFUL_SHUTDOWN_SIGNAL_COUNT: std::sync::OnceLock<
149    std::sync::Arc<std::sync::atomic::AtomicUsize>,
150> = std::sync::OnceLock::new();
151
152#[cfg(test)]
153pub fn test_graceful_shutdown_signal_count() -> std::sync::Arc<std::sync::atomic::AtomicUsize> {
154    std::sync::Arc::clone(
155        GRACEFUL_SHUTDOWN_SIGNAL_COUNT
156            .get_or_init(|| std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0))),
157    )
158}
159
160#[cfg(test)]
161pub fn test_reset_graceful_shutdown_observability() {
162    test_graceful_shutdown_signal_count().store(0, std::sync::atomic::Ordering::SeqCst);
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use camel_api::metrics::MetricsCollector;
169    use tokio::time::{Duration, sleep};
170
171    struct MockHealthSource {
172        readiness: HealthStatus,
173    }
174
175    #[async_trait]
176    impl HealthSource for MockHealthSource {
177        async fn liveness(&self) -> HealthStatus {
178            HealthStatus::Healthy
179        }
180
181        async fn readiness(&self) -> HealthStatus {
182            self.readiness
183        }
184
185        async fn startup(&self) -> HealthStatus {
186            HealthStatus::Healthy
187        }
188    }
189
190    #[tokio::test]
191    async fn test_metrics_handler_returns_prometheus_format() {
192        let metrics = Arc::new(PrometheusMetrics::new());
193        metrics.increment_exchanges("test-route");
194
195        let response = metrics_handler(metrics).await;
196
197        assert_eq!(response.status, 200);
198        assert_eq!(response.content_type, "text/plain; version=0.0.4");
199        assert!(response.body.contains("camel_exchanges_total"));
200        assert!(response.body.contains("test-route"));
201    }
202
203    #[tokio::test]
204    async fn test_metrics_handler_content_type() {
205        let metrics = Arc::new(PrometheusMetrics::new());
206
207        let response = metrics_handler(metrics).await;
208
209        assert_eq!(response.content_type, "text/plain; version=0.0.4");
210    }
211
212    #[tokio::test]
213    async fn test_run_with_listener_serves_metrics_and_health() {
214        let metrics = Arc::new(PrometheusMetrics::new());
215        metrics.increment_exchanges("route-http");
216
217        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
218        let addr = listener.local_addr().unwrap();
219        let source = Arc::new(MockHealthSource {
220            readiness: HealthStatus::Healthy,
221        });
222
223        let handle = tokio::spawn(MetricsServer::run_with_listener_and_health_source(
224            listener,
225            Arc::clone(&metrics),
226            Some(source),
227        ));
228
229        sleep(Duration::from_millis(50)).await;
230
231        let client = reqwest::Client::new();
232        let metrics_resp = client
233            .get(format!("http://{}/metrics", addr))
234            .send()
235            .await
236            .unwrap();
237        assert_eq!(metrics_resp.status().as_u16(), 200);
238        assert_eq!(
239            metrics_resp
240                .headers()
241                .get(reqwest::header::CONTENT_TYPE)
242                .unwrap()
243                .to_str()
244                .unwrap(),
245            "text/plain; version=0.0.4"
246        );
247        let body = metrics_resp.text().await.unwrap();
248        assert!(body.contains("camel_exchanges_total"));
249        assert!(body.contains("route-http"));
250
251        let health_resp = client
252            .get(format!("http://{}/health", addr))
253            .send()
254            .await
255            .unwrap();
256        assert_eq!(health_resp.status().as_u16(), 200);
257
258        let not_found = client
259            .get(format!("http://{}/does-not-exist", addr))
260            .send()
261            .await
262            .unwrap();
263        assert_eq!(not_found.status().as_u16(), 404);
264
265        handle.abort();
266    }
267
268    #[tokio::test]
269    async fn test_graceful_shutdown_signal_observed() {
270        let metrics = Arc::new(PrometheusMetrics::new());
271        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
272        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
273
274        let handle = tokio::spawn(
275            MetricsServer::run_with_listener_and_health_source_with_shutdown(
276                listener,
277                metrics,
278                None,
279                shutdown_rx,
280            ),
281        );
282
283        sleep(Duration::from_millis(30)).await;
284        let _ = shutdown_tx.send(());
285
286        let join = tokio::time::timeout(Duration::from_secs(2), handle)
287            .await
288            .expect("server did not shutdown in time")
289            .expect("join should succeed");
290        assert!(join.is_ok(), "server run should return Ok, got: {join:?}");
291    }
292
293    #[tokio::test]
294    async fn test_run_returns_err_on_bind_failure() {
295        let metrics = Arc::new(PrometheusMetrics::new());
296        let occupied = TcpListener::bind("127.0.0.1:0").await.unwrap();
297        let addr = occupied.local_addr().unwrap();
298
299        let result = MetricsServer::run(addr, metrics).await;
300        assert!(result.is_err(), "expected bind failure, got {result:?}");
301    }
302}