camel_prometheus/
server.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
5use tokio::net::TcpListener;
6use tokio::sync::oneshot;
7use tracing::info;
8
9use async_trait::async_trait;
10use camel_api::{CamelError, HealthSource, HealthStatus};
11
12use crate::PrometheusMetrics;
13
14pub struct MetricsServer;
15
16impl MetricsServer {
17 pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) -> Result<(), CamelError> {
18 Self::run_with_health_source(addr, metrics, None).await
19 }
20
21 pub async fn run_with_health_source(
22 addr: SocketAddr,
23 metrics: Arc<PrometheusMetrics>,
24 source: Option<Arc<dyn HealthSource>>,
25 ) -> Result<(), CamelError> {
26 let source =
27 source.unwrap_or_else(|| Arc::new(DefaultHealthSource) as Arc<dyn HealthSource>);
28 let health = camel_health::health_router(source);
29 let app = Router::new()
30 .route("/metrics", get(Self::metrics_handler))
31 .with_state(metrics)
32 .merge(health);
33
34 let listener = tokio::net::TcpListener::bind(addr)
35 .await
36 .map_err(|e| CamelError::Io(format!("Failed to bind to {addr}: {e}")))?;
37
38 info!("Prometheus metrics server listening on {}", addr);
39 axum::serve(listener, app)
40 .await
41 .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
42 }
43
44 pub async fn run_with_listener(
45 listener: TcpListener,
46 metrics: Arc<PrometheusMetrics>,
47 ) -> Result<(), CamelError> {
48 Self::run_with_listener_and_health_source(listener, metrics, None).await
49 }
50
51 pub async fn run_with_listener_and_health_source(
52 listener: TcpListener,
53 metrics: Arc<PrometheusMetrics>,
54 source: Option<Arc<dyn HealthSource>>,
55 ) -> Result<(), CamelError> {
56 let source =
57 source.unwrap_or_else(|| Arc::new(DefaultHealthSource) as Arc<dyn HealthSource>);
58 let health = camel_health::health_router(source);
59 let app = Router::new()
60 .route("/metrics", get(Self::metrics_handler))
61 .with_state(metrics)
62 .merge(health);
63
64 info!(
65 "Prometheus metrics server listening on {}",
66 listener.local_addr().unwrap() );
68 axum::serve(listener, app)
69 .await
70 .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
71 }
72
73 pub async fn run_with_listener_and_health_source_with_shutdown(
74 listener: TcpListener,
75 metrics: Arc<PrometheusMetrics>,
76 source: Option<Arc<dyn HealthSource>>,
77 shutdown: oneshot::Receiver<()>,
78 ) -> Result<(), CamelError> {
79 let source =
80 source.unwrap_or_else(|| Arc::new(DefaultHealthSource) as Arc<dyn HealthSource>);
81 let health = camel_health::health_router(source);
82 let app = Router::new()
83 .route("/metrics", get(Self::metrics_handler))
84 .with_state(metrics)
85 .merge(health);
86
87 info!(
88 "Prometheus metrics server listening on {}",
89 listener.local_addr().unwrap() );
91
92 #[cfg(test)]
93 let shutdown_signal_count = test_graceful_shutdown_signal_count();
94
95 axum::serve(listener, app)
96 .with_graceful_shutdown(async move {
97 let _ = shutdown.await;
98 #[cfg(test)]
99 shutdown_signal_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
100 })
101 .await
102 .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
103 }
104
105 async fn metrics_handler(State(metrics): State<Arc<PrometheusMetrics>>) -> impl IntoResponse {
106 let output = metrics.gather();
107 (
108 StatusCode::OK,
109 [("content-type", "text/plain; version=0.0.4")],
110 output,
111 )
112 }
113}
114
115struct DefaultHealthSource;
116
117#[async_trait]
118impl HealthSource for DefaultHealthSource {
119 async fn liveness(&self) -> HealthStatus {
120 HealthStatus::Healthy
121 }
122
123 async fn readiness(&self) -> HealthStatus {
124 HealthStatus::Healthy
125 }
126
127 async fn startup(&self) -> HealthStatus {
128 HealthStatus::Healthy
129 }
130}
131
132pub struct MetricsResponse {
133 pub status: u16,
134 pub content_type: String,
135 pub body: String,
136}
137
138pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
139 let output = metrics.gather();
140 MetricsResponse {
141 status: 200,
142 content_type: "text/plain; version=0.0.4".to_string(),
143 body: output,
144 }
145}
146
147#[cfg(test)]
148static GRACEFUL_SHUTDOWN_SIGNAL_COUNT: std::sync::OnceLock<
149 std::sync::Arc<std::sync::atomic::AtomicUsize>,
150> = std::sync::OnceLock::new();
151
152#[cfg(test)]
153pub fn test_graceful_shutdown_signal_count() -> std::sync::Arc<std::sync::atomic::AtomicUsize> {
154 std::sync::Arc::clone(
155 GRACEFUL_SHUTDOWN_SIGNAL_COUNT
156 .get_or_init(|| std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0))),
157 )
158}
159
160#[cfg(test)]
161pub fn test_reset_graceful_shutdown_observability() {
162 test_graceful_shutdown_signal_count().store(0, std::sync::atomic::Ordering::SeqCst);
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use camel_api::metrics::MetricsCollector;
169 use tokio::time::{Duration, sleep};
170
171 struct MockHealthSource {
172 readiness: HealthStatus,
173 }
174
175 #[async_trait]
176 impl HealthSource for MockHealthSource {
177 async fn liveness(&self) -> HealthStatus {
178 HealthStatus::Healthy
179 }
180
181 async fn readiness(&self) -> HealthStatus {
182 self.readiness
183 }
184
185 async fn startup(&self) -> HealthStatus {
186 HealthStatus::Healthy
187 }
188 }
189
190 #[tokio::test]
191 async fn test_metrics_handler_returns_prometheus_format() {
192 let metrics = Arc::new(PrometheusMetrics::new());
193 metrics.increment_exchanges("test-route");
194
195 let response = metrics_handler(metrics).await;
196
197 assert_eq!(response.status, 200);
198 assert_eq!(response.content_type, "text/plain; version=0.0.4");
199 assert!(response.body.contains("camel_exchanges_total"));
200 assert!(response.body.contains("test-route"));
201 }
202
203 #[tokio::test]
204 async fn test_metrics_handler_content_type() {
205 let metrics = Arc::new(PrometheusMetrics::new());
206
207 let response = metrics_handler(metrics).await;
208
209 assert_eq!(response.content_type, "text/plain; version=0.0.4");
210 }
211
212 #[tokio::test]
213 async fn test_run_with_listener_serves_metrics_and_health() {
214 let metrics = Arc::new(PrometheusMetrics::new());
215 metrics.increment_exchanges("route-http");
216
217 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
218 let addr = listener.local_addr().unwrap();
219 let source = Arc::new(MockHealthSource {
220 readiness: HealthStatus::Healthy,
221 });
222
223 let handle = tokio::spawn(MetricsServer::run_with_listener_and_health_source(
224 listener,
225 Arc::clone(&metrics),
226 Some(source),
227 ));
228
229 sleep(Duration::from_millis(50)).await;
230
231 let client = reqwest::Client::new();
232 let metrics_resp = client
233 .get(format!("http://{}/metrics", addr))
234 .send()
235 .await
236 .unwrap();
237 assert_eq!(metrics_resp.status().as_u16(), 200);
238 assert_eq!(
239 metrics_resp
240 .headers()
241 .get(reqwest::header::CONTENT_TYPE)
242 .unwrap()
243 .to_str()
244 .unwrap(),
245 "text/plain; version=0.0.4"
246 );
247 let body = metrics_resp.text().await.unwrap();
248 assert!(body.contains("camel_exchanges_total"));
249 assert!(body.contains("route-http"));
250
251 let health_resp = client
252 .get(format!("http://{}/health", addr))
253 .send()
254 .await
255 .unwrap();
256 assert_eq!(health_resp.status().as_u16(), 200);
257
258 let not_found = client
259 .get(format!("http://{}/does-not-exist", addr))
260 .send()
261 .await
262 .unwrap();
263 assert_eq!(not_found.status().as_u16(), 404);
264
265 handle.abort();
266 }
267
268 #[tokio::test]
269 async fn test_graceful_shutdown_signal_observed() {
270 let metrics = Arc::new(PrometheusMetrics::new());
271 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
272 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
273
274 let handle = tokio::spawn(
275 MetricsServer::run_with_listener_and_health_source_with_shutdown(
276 listener,
277 metrics,
278 None,
279 shutdown_rx,
280 ),
281 );
282
283 sleep(Duration::from_millis(30)).await;
284 let _ = shutdown_tx.send(());
285
286 let join = tokio::time::timeout(Duration::from_secs(2), handle)
287 .await
288 .expect("server did not shutdown in time")
289 .expect("join should succeed");
290 assert!(join.is_ok(), "server run should return Ok, got: {join:?}");
291 }
292
293 #[tokio::test]
294 async fn test_run_returns_err_on_bind_failure() {
295 let metrics = Arc::new(PrometheusMetrics::new());
296 let occupied = TcpListener::bind("127.0.0.1:0").await.unwrap();
297 let addr = occupied.local_addr().unwrap();
298
299 let result = MetricsServer::run(addr, metrics).await;
300 assert!(result.is_err(), "expected bind failure, got {result:?}");
301 }
302}