camel_prometheus/
server.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
5use tokio::net::TcpListener;
6use tokio::sync::oneshot;
7use tracing::info;
8
9use camel_api::CamelError;
10use camel_api::HealthChecker;
11
12use crate::PrometheusMetrics;
13
14pub struct MetricsServer;
15
16impl MetricsServer {
17 pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) -> Result<(), CamelError> {
18 Self::run_with_health_checker(addr, metrics, None).await
19 }
20
21 pub async fn run_with_health_checker(
22 addr: SocketAddr,
23 metrics: Arc<PrometheusMetrics>,
24 checker: Option<HealthChecker>,
25 ) -> Result<(), CamelError> {
26 let health = camel_health::health_router(checker, None, None);
27 let app = Router::new()
28 .route("/metrics", get(Self::metrics_handler))
29 .with_state(metrics)
30 .merge(health);
31
32 let listener = tokio::net::TcpListener::bind(addr)
33 .await
34 .map_err(|e| CamelError::Io(format!("Failed to bind to {addr}: {e}")))?;
35
36 info!("Prometheus metrics server listening on {}", addr);
37 axum::serve(listener, app)
38 .await
39 .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
40 }
41
42 pub async fn run_with_listener(
43 listener: TcpListener,
44 metrics: Arc<PrometheusMetrics>,
45 ) -> Result<(), CamelError> {
46 Self::run_with_listener_and_health_checker(listener, metrics, None).await
47 }
48
49 pub async fn run_with_listener_and_health_checker(
50 listener: TcpListener,
51 metrics: Arc<PrometheusMetrics>,
52 checker: Option<HealthChecker>,
53 ) -> Result<(), CamelError> {
54 let health = camel_health::health_router(checker, None, None);
55 let app = Router::new()
56 .route("/metrics", get(Self::metrics_handler))
57 .with_state(metrics)
58 .merge(health);
59
60 info!(
61 "Prometheus metrics server listening on {}",
62 listener.local_addr().unwrap() );
64 axum::serve(listener, app)
65 .await
66 .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
67 }
68
69 pub async fn run_with_listener_and_health_checker_with_shutdown(
70 listener: TcpListener,
71 metrics: Arc<PrometheusMetrics>,
72 checker: Option<HealthChecker>,
73 shutdown: oneshot::Receiver<()>,
74 ) -> Result<(), CamelError> {
75 let health = camel_health::health_router(checker, None, None);
76 let app = Router::new()
77 .route("/metrics", get(Self::metrics_handler))
78 .with_state(metrics)
79 .merge(health);
80
81 info!(
82 "Prometheus metrics server listening on {}",
83 listener.local_addr().unwrap() );
85
86 #[cfg(test)]
87 let shutdown_signal_count = test_graceful_shutdown_signal_count();
88
89 axum::serve(listener, app)
90 .with_graceful_shutdown(async move {
91 let _ = shutdown.await;
92 #[cfg(test)]
93 shutdown_signal_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
94 })
95 .await
96 .map_err(|e| CamelError::Io(format!("Prometheus server failed: {e}")))
97 }
98
99 async fn metrics_handler(State(metrics): State<Arc<PrometheusMetrics>>) -> impl IntoResponse {
100 let output = metrics.gather();
101 (
102 StatusCode::OK,
103 [("content-type", "text/plain; version=0.0.4")],
104 output,
105 )
106 }
107}
108
109pub struct MetricsResponse {
110 pub status: u16,
111 pub content_type: String,
112 pub body: String,
113}
114
115pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
116 let output = metrics.gather();
117 MetricsResponse {
118 status: 200,
119 content_type: "text/plain; version=0.0.4".to_string(),
120 body: output,
121 }
122}
123
124#[cfg(test)]
125static GRACEFUL_SHUTDOWN_SIGNAL_COUNT: std::sync::OnceLock<
126 std::sync::Arc<std::sync::atomic::AtomicUsize>,
127> = std::sync::OnceLock::new();
128
129#[cfg(test)]
130pub fn test_graceful_shutdown_signal_count() -> std::sync::Arc<std::sync::atomic::AtomicUsize> {
131 std::sync::Arc::clone(
132 GRACEFUL_SHUTDOWN_SIGNAL_COUNT
133 .get_or_init(|| std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0))),
134 )
135}
136
137#[cfg(test)]
138pub fn test_reset_graceful_shutdown_observability() {
139 test_graceful_shutdown_signal_count().store(0, std::sync::atomic::Ordering::SeqCst);
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use camel_api::metrics::MetricsCollector;
146 use camel_api::{HealthReport, HealthStatus};
147 use tokio::time::{Duration, sleep};
148
149 #[tokio::test]
150 async fn test_metrics_handler_returns_prometheus_format() {
151 let metrics = Arc::new(PrometheusMetrics::new());
152 metrics.increment_exchanges("test-route");
153
154 let response = metrics_handler(metrics).await;
155
156 assert_eq!(response.status, 200);
157 assert_eq!(response.content_type, "text/plain; version=0.0.4");
158 assert!(response.body.contains("camel_exchanges_total"));
159 assert!(response.body.contains("test-route"));
160 }
161
162 #[tokio::test]
163 async fn test_metrics_handler_content_type() {
164 let metrics = Arc::new(PrometheusMetrics::new());
165
166 let response = metrics_handler(metrics).await;
167
168 assert_eq!(response.content_type, "text/plain; version=0.0.4");
169 }
170
171 #[tokio::test]
172 async fn test_run_with_listener_serves_metrics_and_health() {
173 let metrics = Arc::new(PrometheusMetrics::new());
174 metrics.increment_exchanges("route-http");
175
176 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
177 let addr = listener.local_addr().unwrap();
178 let checker = Arc::new(|| HealthReport {
179 status: HealthStatus::Healthy,
180 services: vec![],
181 ..Default::default()
182 });
183
184 let handle = tokio::spawn(MetricsServer::run_with_listener_and_health_checker(
185 listener,
186 Arc::clone(&metrics),
187 Some(checker),
188 ));
189
190 sleep(Duration::from_millis(50)).await;
191
192 let client = reqwest::Client::new();
193 let metrics_resp = client
194 .get(format!("http://{}/metrics", addr))
195 .send()
196 .await
197 .unwrap();
198 assert_eq!(metrics_resp.status().as_u16(), 200);
199 assert_eq!(
200 metrics_resp
201 .headers()
202 .get(reqwest::header::CONTENT_TYPE)
203 .unwrap()
204 .to_str()
205 .unwrap(),
206 "text/plain; version=0.0.4"
207 );
208 let body = metrics_resp.text().await.unwrap();
209 assert!(body.contains("camel_exchanges_total"));
210 assert!(body.contains("route-http"));
211
212 let health_resp = client
213 .get(format!("http://{}/health", addr))
214 .send()
215 .await
216 .unwrap();
217 assert_eq!(health_resp.status().as_u16(), 200);
218
219 let not_found = client
220 .get(format!("http://{}/does-not-exist", addr))
221 .send()
222 .await
223 .unwrap();
224 assert_eq!(not_found.status().as_u16(), 404);
225
226 handle.abort();
227 }
228
229 #[tokio::test]
230 async fn test_graceful_shutdown_signal_observed() {
231 let metrics = Arc::new(PrometheusMetrics::new());
232 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
233 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
234
235 let handle = tokio::spawn(
236 MetricsServer::run_with_listener_and_health_checker_with_shutdown(
237 listener,
238 metrics,
239 None,
240 shutdown_rx,
241 ),
242 );
243
244 sleep(Duration::from_millis(30)).await;
245 let _ = shutdown_tx.send(());
246
247 let join = tokio::time::timeout(Duration::from_secs(2), handle)
248 .await
249 .expect("server did not shutdown in time")
250 .expect("join should succeed");
251 assert!(join.is_ok(), "server run should return Ok, got: {join:?}");
252 }
253
254 #[tokio::test]
255 async fn test_run_returns_err_on_bind_failure() {
256 let metrics = Arc::new(PrometheusMetrics::new());
257 let occupied = TcpListener::bind("127.0.0.1:0").await.unwrap();
258 let addr = occupied.local_addr().unwrap();
259
260 let result = MetricsServer::run(addr, metrics).await;
261 assert!(result.is_err(), "expected bind failure, got {result:?}");
262 }
263}