camel_prometheus/
server.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
5use tokio::net::TcpListener;
6use tracing::info;
7
8use camel_api::HealthChecker;
9
10use crate::PrometheusMetrics;
11
12pub struct MetricsServer;
13
14impl MetricsServer {
15 pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) {
16 Self::run_with_health_checker(addr, metrics, None).await;
17 }
18
19 pub async fn run_with_health_checker(
20 addr: SocketAddr,
21 metrics: Arc<PrometheusMetrics>,
22 checker: Option<HealthChecker>,
23 ) {
24 let health = camel_health::health_router(checker);
25 let app = Router::new()
26 .route("/metrics", get(Self::metrics_handler))
27 .with_state(metrics)
28 .merge(health);
29
30 let listener = tokio::net::TcpListener::bind(addr)
31 .await
32 .unwrap_or_else(|e| panic!("Failed to bind to {}: {}", addr, e));
33
34 info!("Prometheus metrics server listening on {}", addr);
35 axum::serve(listener, app).await.unwrap();
36 }
37
38 pub async fn run_with_listener(listener: TcpListener, metrics: Arc<PrometheusMetrics>) {
39 Self::run_with_listener_and_health_checker(listener, metrics, None).await;
40 }
41
42 pub async fn run_with_listener_and_health_checker(
43 listener: TcpListener,
44 metrics: Arc<PrometheusMetrics>,
45 checker: Option<HealthChecker>,
46 ) {
47 let health = camel_health::health_router(checker);
48 let app = Router::new()
49 .route("/metrics", get(Self::metrics_handler))
50 .with_state(metrics)
51 .merge(health);
52
53 info!(
54 "Prometheus metrics server listening on {}",
55 listener.local_addr().unwrap()
56 );
57 axum::serve(listener, app).await.unwrap();
58 }
59
60 async fn metrics_handler(State(metrics): State<Arc<PrometheusMetrics>>) -> impl IntoResponse {
61 let output = metrics.gather();
62 (
63 StatusCode::OK,
64 [("content-type", "text/plain; version=0.0.4")],
65 output,
66 )
67 }
68}
69
70pub struct MetricsResponse {
71 pub status: u16,
72 pub content_type: String,
73 pub body: String,
74}
75
76pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
77 let output = metrics.gather();
78 MetricsResponse {
79 status: 200,
80 content_type: "text/plain; version=0.0.4".to_string(),
81 body: output,
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use camel_api::metrics::MetricsCollector;
89
90 #[tokio::test]
91 async fn test_metrics_handler_returns_prometheus_format() {
92 let metrics = Arc::new(PrometheusMetrics::new());
93 metrics.increment_exchanges("test-route");
94
95 let response = metrics_handler(metrics).await;
96
97 assert_eq!(response.status, 200);
98 assert_eq!(response.content_type, "text/plain; version=0.0.4");
99 assert!(response.body.contains("camel_exchanges_total"));
100 assert!(response.body.contains("test-route"));
101 }
102
103 #[tokio::test]
104 async fn test_metrics_handler_content_type() {
105 let metrics = Arc::new(PrometheusMetrics::new());
106
107 let response = metrics_handler(metrics).await;
108
109 assert_eq!(response.content_type, "text/plain; version=0.0.4");
110 }
111}