Skip to main content

camel_prometheus/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
5use tokio::net::TcpListener;
6use tokio::sync::oneshot;
7use tracing::info;
8
9use camel_api::CamelError;
10use camel_api::HealthChecker;
11
12use crate::PrometheusMetrics;
13
14pub struct MetricsServer;
15
16impl MetricsServer {
17    pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) -> Result<(), CamelError> {
18        Self::run_with_health_checker(addr, metrics, None).await
19    }
20
21    pub async fn run_with_health_checker(
22        addr: SocketAddr,
23        metrics: Arc<PrometheusMetrics>,
24        checker: Option<HealthChecker>,
25    ) -> Result<(), CamelError> {
26        let health = camel_health::health_router(checker, None, None);
27        let app = Router::new()
28            .route("/metrics", get(Self::metrics_handler))
29            .with_state(metrics)
30            .merge(health);
31
32        let listener = tokio::net::TcpListener::bind(addr)
33            .await
34            .map_err(|e| CamelError::Io(format!("Failed to bind to {addr}: {e}")))?;
35
36        info!("Prometheus metrics server listening on {}", addr);
37        axum::serve(listener, app)
38            .await
39            .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
40    }
41
42    pub async fn run_with_listener(
43        listener: TcpListener,
44        metrics: Arc<PrometheusMetrics>,
45    ) -> Result<(), CamelError> {
46        Self::run_with_listener_and_health_checker(listener, metrics, None).await
47    }
48
49    pub async fn run_with_listener_and_health_checker(
50        listener: TcpListener,
51        metrics: Arc<PrometheusMetrics>,
52        checker: Option<HealthChecker>,
53    ) -> Result<(), CamelError> {
54        let health = camel_health::health_router(checker, None, None);
55        let app = Router::new()
56            .route("/metrics", get(Self::metrics_handler))
57            .with_state(metrics)
58            .merge(health);
59
60        info!(
61            "Prometheus metrics server listening on {}",
62            listener.local_addr().unwrap() // allow-unwrap
63        );
64        axum::serve(listener, app)
65            .await
66            .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
67    }
68
69    pub async fn run_with_listener_and_health_checker_with_shutdown(
70        listener: TcpListener,
71        metrics: Arc<PrometheusMetrics>,
72        checker: Option<HealthChecker>,
73        shutdown: oneshot::Receiver<()>,
74    ) -> Result<(), CamelError> {
75        let health = camel_health::health_router(checker, None, None);
76        let app = Router::new()
77            .route("/metrics", get(Self::metrics_handler))
78            .with_state(metrics)
79            .merge(health);
80
81        info!(
82            "Prometheus metrics server listening on {}",
83            listener.local_addr().unwrap() // allow-unwrap
84        );
85
86        #[cfg(test)]
87        let shutdown_signal_count = test_graceful_shutdown_signal_count();
88
89        axum::serve(listener, app)
90            .with_graceful_shutdown(async move {
91                let _ = shutdown.await;
92                #[cfg(test)]
93                shutdown_signal_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
94            })
95            .await
96            .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
97    }
98
99    async fn metrics_handler(State(metrics): State<Arc<PrometheusMetrics>>) -> impl IntoResponse {
100        let output = metrics.gather();
101        (
102            StatusCode::OK,
103            [("content-type", "text/plain; version=0.0.4")],
104            output,
105        )
106    }
107}
108
109pub struct MetricsResponse {
110    pub status: u16,
111    pub content_type: String,
112    pub body: String,
113}
114
115pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
116    let output = metrics.gather();
117    MetricsResponse {
118        status: 200,
119        content_type: "text/plain; version=0.0.4".to_string(),
120        body: output,
121    }
122}
123
124#[cfg(test)]
125static GRACEFUL_SHUTDOWN_SIGNAL_COUNT: std::sync::OnceLock<
126    std::sync::Arc<std::sync::atomic::AtomicUsize>,
127> = std::sync::OnceLock::new();
128
129#[cfg(test)]
130pub fn test_graceful_shutdown_signal_count() -> std::sync::Arc<std::sync::atomic::AtomicUsize> {
131    std::sync::Arc::clone(
132        GRACEFUL_SHUTDOWN_SIGNAL_COUNT
133            .get_or_init(|| std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0))),
134    )
135}
136
137#[cfg(test)]
138pub fn test_reset_graceful_shutdown_observability() {
139    test_graceful_shutdown_signal_count().store(0, std::sync::atomic::Ordering::SeqCst);
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use camel_api::metrics::MetricsCollector;
146    use camel_api::{HealthReport, HealthStatus};
147    use tokio::time::{Duration, sleep};
148
149    #[tokio::test]
150    async fn test_metrics_handler_returns_prometheus_format() {
151        let metrics = Arc::new(PrometheusMetrics::new());
152        metrics.increment_exchanges("test-route");
153
154        let response = metrics_handler(metrics).await;
155
156        assert_eq!(response.status, 200);
157        assert_eq!(response.content_type, "text/plain; version=0.0.4");
158        assert!(response.body.contains("camel_exchanges_total"));
159        assert!(response.body.contains("test-route"));
160    }
161
162    #[tokio::test]
163    async fn test_metrics_handler_content_type() {
164        let metrics = Arc::new(PrometheusMetrics::new());
165
166        let response = metrics_handler(metrics).await;
167
168        assert_eq!(response.content_type, "text/plain; version=0.0.4");
169    }
170
171    #[tokio::test]
172    async fn test_run_with_listener_serves_metrics_and_health() {
173        let metrics = Arc::new(PrometheusMetrics::new());
174        metrics.increment_exchanges("route-http");
175
176        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
177        let addr = listener.local_addr().unwrap();
178        let checker = Arc::new(|| HealthReport {
179            status: HealthStatus::Healthy,
180            services: vec![],
181            ..Default::default()
182        });
183
184        let handle = tokio::spawn(MetricsServer::run_with_listener_and_health_checker(
185            listener,
186            Arc::clone(&metrics),
187            Some(checker),
188        ));
189
190        sleep(Duration::from_millis(50)).await;
191
192        let client = reqwest::Client::new();
193        let metrics_resp = client
194            .get(format!("http://{}/metrics", addr))
195            .send()
196            .await
197            .unwrap();
198        assert_eq!(metrics_resp.status().as_u16(), 200);
199        assert_eq!(
200            metrics_resp
201                .headers()
202                .get(reqwest::header::CONTENT_TYPE)
203                .unwrap()
204                .to_str()
205                .unwrap(),
206            "text/plain; version=0.0.4"
207        );
208        let body = metrics_resp.text().await.unwrap();
209        assert!(body.contains("camel_exchanges_total"));
210        assert!(body.contains("route-http"));
211
212        let health_resp = client
213            .get(format!("http://{}/health", addr))
214            .send()
215            .await
216            .unwrap();
217        assert_eq!(health_resp.status().as_u16(), 200);
218
219        let not_found = client
220            .get(format!("http://{}/does-not-exist", addr))
221            .send()
222            .await
223            .unwrap();
224        assert_eq!(not_found.status().as_u16(), 404);
225
226        handle.abort();
227    }
228
229    #[tokio::test]
230    async fn test_graceful_shutdown_signal_observed() {
231        let metrics = Arc::new(PrometheusMetrics::new());
232        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
233        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
234
235        let handle = tokio::spawn(
236            MetricsServer::run_with_listener_and_health_checker_with_shutdown(
237                listener,
238                metrics,
239                None,
240                shutdown_rx,
241            ),
242        );
243
244        sleep(Duration::from_millis(30)).await;
245        let _ = shutdown_tx.send(());
246
247        let join = tokio::time::timeout(Duration::from_secs(2), handle)
248            .await
249            .expect("server did not shutdown in time")
250            .expect("join should succeed");
251        assert!(join.is_ok(), "server run should return Ok, got: {join:?}");
252    }
253
254    #[tokio::test]
255    async fn test_run_returns_err_on_bind_failure() {
256        let metrics = Arc::new(PrometheusMetrics::new());
257        let occupied = TcpListener::bind("127.0.0.1:0").await.unwrap();
258        let addr = occupied.local_addr().unwrap();
259
260        let result = MetricsServer::run(addr, metrics).await;
261        assert!(result.is_err(), "expected bind failure, got {result:?}");
262    }
263}