1use std::net::SocketAddr;
7use std::sync::Arc;
8
9use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
10use tokio::net::TcpListener;
11use tracing::info;
12
13use camel_api::{HealthReport, HealthStatus};
14
15use crate::PrometheusMetrics;
16
17type HealthChecker = Arc<dyn Fn() -> HealthReport + Send + Sync>;
19
20type ServerState = (Arc<PrometheusMetrics>, Option<HealthChecker>);
22
23pub struct MetricsServer;
25
26impl MetricsServer {
27 pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) {
38 Self::run_with_health_checker(addr, metrics, HealthReport::default).await;
39 }
40
41 pub async fn run_with_health_checker<F>(
53 addr: SocketAddr,
54 metrics: Arc<PrometheusMetrics>,
55 health_checker: F,
56 ) where
57 F: Fn() -> HealthReport + Send + Sync + 'static,
58 {
59 let health_checker = Arc::new(health_checker);
60
61 let app = Router::new()
62 .route("/metrics", get(Self::metrics_handler))
63 .route("/healthz", get(Self::healthz))
64 .route("/readyz", get(Self::readyz))
65 .route("/health", get(Self::health))
66 .with_state((metrics, Some(health_checker)));
67
68 let listener = tokio::net::TcpListener::bind(addr)
69 .await
70 .unwrap_or_else(|e| panic!("Failed to bind to {}: {}", addr, e));
71
72 info!("Prometheus metrics server listening on {}", addr);
73
74 axum::serve(listener, app)
75 .await
76 .expect("Failed to start metrics server");
77 }
78
79 pub async fn run_with_listener(listener: TcpListener, metrics: Arc<PrometheusMetrics>) {
91 let app = Router::new()
92 .route("/metrics", get(Self::metrics_handler))
93 .route("/healthz", get(Self::healthz))
94 .route("/readyz", get(Self::readyz))
95 .route("/health", get(Self::health))
96 .with_state((metrics, None));
97
98 info!(
99 "Prometheus metrics server listening on {:?}",
100 listener.local_addr()
101 );
102
103 axum::serve(listener, app)
104 .await
105 .expect("Failed to start metrics server");
106 }
107
108 pub async fn run_with_listener_and_health_checker(
121 listener: TcpListener,
122 metrics: Arc<PrometheusMetrics>,
123 health_checker: HealthChecker,
124 ) {
125 let app = Router::new()
126 .route("/metrics", get(Self::metrics_handler))
127 .route("/healthz", get(Self::healthz))
128 .route("/readyz", get(Self::readyz))
129 .route("/health", get(Self::health))
130 .with_state((metrics, Some(health_checker)));
131
132 info!(
133 "Prometheus metrics server listening on {:?}",
134 listener.local_addr()
135 );
136
137 axum::serve(listener, app)
138 .await
139 .expect("Failed to start metrics server");
140 }
141
142 async fn healthz() -> StatusCode {
146 StatusCode::OK
147 }
148
149 async fn readyz(State((_, checker)): State<ServerState>) -> impl IntoResponse {
153 match checker {
154 Some(checker) => {
155 let report = checker();
156 let status = if report.status == HealthStatus::Healthy {
157 StatusCode::OK
158 } else {
159 StatusCode::SERVICE_UNAVAILABLE
160 };
161 (status, Json(report))
162 }
163 None => (StatusCode::OK, Json(HealthReport::default())),
164 }
165 }
166
167 async fn health(State((_, checker)): State<ServerState>) -> impl IntoResponse {
171 match checker {
172 Some(checker) => {
173 let report = checker();
174 (StatusCode::OK, Json(report))
175 }
176 None => (StatusCode::OK, Json(HealthReport::default())),
177 }
178 }
179
180 async fn metrics_handler(State((metrics, _)): State<ServerState>) -> impl IntoResponse {
182 let output = metrics.gather();
183 (
184 StatusCode::OK,
185 [("content-type", "text/plain; version=0.0.4")],
186 output,
187 )
188 }
189}
190
191pub struct MetricsResponse {
193 pub status: u16,
194 pub content_type: String,
195 pub body: String,
196}
197
198pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
203 let output = metrics.gather();
204 MetricsResponse {
205 status: 200,
206 content_type: "text/plain; version=0.0.4".to_string(),
207 body: output,
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use axum::body::Body;
215 use axum::http::Request;
216 use camel_api::metrics::MetricsCollector;
217 use camel_api::{ServiceHealth, ServiceStatus};
218 use tower::ServiceExt;
219
220 #[tokio::test]
221 async fn test_metrics_handler_returns_prometheus_format() {
222 let metrics = Arc::new(PrometheusMetrics::new());
223 metrics.increment_exchanges("test-route");
224
225 let response = metrics_handler(metrics).await;
226
227 assert_eq!(response.status, 200);
228 assert_eq!(response.content_type, "text/plain; version=0.0.4");
229 assert!(response.body.contains("camel_exchanges_total"));
230 assert!(response.body.contains("test-route"));
231 }
232
233 #[tokio::test]
234 async fn test_metrics_handler_content_type() {
235 let metrics = Arc::new(PrometheusMetrics::new());
236
237 let response = metrics_handler(metrics).await;
238
239 assert_eq!(response.content_type, "text/plain; version=0.0.4");
241 }
242
243 #[tokio::test]
244 async fn test_healthz_returns_200_ok() {
245 let health_checker = Arc::new(HealthReport::default) as HealthChecker;
246 let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
247 let app = Router::new()
248 .route("/healthz", get(MetricsServer::healthz))
249 .with_state(state);
250
251 let response = app
252 .oneshot(
253 Request::builder()
254 .uri("/healthz")
255 .body(Body::empty())
256 .unwrap(),
257 )
258 .await
259 .unwrap();
260
261 assert_eq!(response.status(), StatusCode::OK);
262 }
263
264 #[tokio::test]
265 async fn test_readyz_returns_200_when_healthy() {
266 let health_checker = Arc::new(|| HealthReport {
267 status: HealthStatus::Healthy,
268 ..Default::default()
269 }) as HealthChecker;
270 let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
271 let app = Router::new()
272 .route("/readyz", get(MetricsServer::readyz))
273 .with_state(state);
274
275 let response = app
276 .oneshot(
277 Request::builder()
278 .uri("/readyz")
279 .body(Body::empty())
280 .unwrap(),
281 )
282 .await
283 .unwrap();
284
285 assert_eq!(response.status(), StatusCode::OK);
286 }
287
288 #[tokio::test]
289 async fn test_readyz_returns_503_when_unhealthy() {
290 let health_checker = Arc::new(|| HealthReport {
291 status: HealthStatus::Unhealthy,
292 ..Default::default()
293 }) as HealthChecker;
294 let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
295 let app = Router::new()
296 .route("/readyz", get(MetricsServer::readyz))
297 .with_state(state);
298
299 let response = app
300 .oneshot(
301 Request::builder()
302 .uri("/readyz")
303 .body(Body::empty())
304 .unwrap(),
305 )
306 .await
307 .unwrap();
308
309 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
310 }
311
312 #[tokio::test]
313 async fn test_health_returns_json_health_report() {
314 let health_checker = Arc::new(|| HealthReport {
315 status: HealthStatus::Healthy,
316 services: vec![ServiceHealth {
317 name: "test-service".to_string(),
318 status: ServiceStatus::Started,
319 }],
320 ..Default::default()
321 }) as HealthChecker;
322 let state: ServerState = (Arc::new(PrometheusMetrics::new()), Some(health_checker));
323 let app = Router::new()
324 .route("/health", get(MetricsServer::health))
325 .with_state(state);
326
327 let response = app
328 .oneshot(
329 Request::builder()
330 .uri("/health")
331 .body(Body::empty())
332 .unwrap(),
333 )
334 .await
335 .unwrap();
336
337 assert_eq!(response.status(), StatusCode::OK);
338
339 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
340 .await
341 .unwrap();
342 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
343
344 assert_eq!(json["status"], "Healthy");
345 assert!(json["services"].is_array());
346 }
347}