camel_prometheus/
server.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3
4use axum::{Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
5use tokio::net::TcpListener;
6use tracing::info;
7
8use camel_api::HealthChecker;
9
10use crate::PrometheusMetrics;
11
12pub struct MetricsServer;
13
14impl MetricsServer {
15 pub async fn run(addr: SocketAddr, metrics: Arc<PrometheusMetrics>) {
16 Self::run_with_health_checker(addr, metrics, None).await;
17 }
18
19 pub async fn run_with_health_checker(
20 addr: SocketAddr,
21 metrics: Arc<PrometheusMetrics>,
22 checker: Option<HealthChecker>,
23 ) {
24 let health = camel_health::health_router(checker, None, None);
25 let app = Router::new()
26 .route("/metrics", get(Self::metrics_handler))
27 .with_state(metrics)
28 .merge(health);
29
30 let listener = tokio::net::TcpListener::bind(addr)
31 .await
32 .unwrap_or_else(|e| panic!("Failed to bind to {}: {}", addr, e));
33
34 info!("Prometheus metrics server listening on {}", addr);
35 axum::serve(listener, app).await.unwrap();
36 }
37
38 pub async fn run_with_listener(listener: TcpListener, metrics: Arc<PrometheusMetrics>) {
39 Self::run_with_listener_and_health_checker(listener, metrics, None).await;
40 }
41
42 pub async fn run_with_listener_and_health_checker(
43 listener: TcpListener,
44 metrics: Arc<PrometheusMetrics>,
45 checker: Option<HealthChecker>,
46 ) {
47 let health = camel_health::health_router(checker, None, None);
48 let app = Router::new()
49 .route("/metrics", get(Self::metrics_handler))
50 .with_state(metrics)
51 .merge(health);
52
53 info!(
54 "Prometheus metrics server listening on {}",
55 listener.local_addr().unwrap()
56 );
57 axum::serve(listener, app).await.unwrap();
58 }
59
60 async fn metrics_handler(State(metrics): State<Arc<PrometheusMetrics>>) -> impl IntoResponse {
61 let output = metrics.gather();
62 (
63 StatusCode::OK,
64 [("content-type", "text/plain; version=0.0.4")],
65 output,
66 )
67 }
68}
69
70pub struct MetricsResponse {
71 pub status: u16,
72 pub content_type: String,
73 pub body: String,
74}
75
76pub async fn metrics_handler(metrics: Arc<PrometheusMetrics>) -> MetricsResponse {
77 let output = metrics.gather();
78 MetricsResponse {
79 status: 200,
80 content_type: "text/plain; version=0.0.4".to_string(),
81 body: output,
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use camel_api::metrics::MetricsCollector;
89 use camel_api::{HealthReport, HealthStatus};
90 use tokio::time::{Duration, sleep};
91
92 #[tokio::test]
93 async fn test_metrics_handler_returns_prometheus_format() {
94 let metrics = Arc::new(PrometheusMetrics::new());
95 metrics.increment_exchanges("test-route");
96
97 let response = metrics_handler(metrics).await;
98
99 assert_eq!(response.status, 200);
100 assert_eq!(response.content_type, "text/plain; version=0.0.4");
101 assert!(response.body.contains("camel_exchanges_total"));
102 assert!(response.body.contains("test-route"));
103 }
104
105 #[tokio::test]
106 async fn test_metrics_handler_content_type() {
107 let metrics = Arc::new(PrometheusMetrics::new());
108
109 let response = metrics_handler(metrics).await;
110
111 assert_eq!(response.content_type, "text/plain; version=0.0.4");
112 }
113
114 #[tokio::test]
115 async fn test_run_with_listener_serves_metrics_and_health() {
116 let metrics = Arc::new(PrometheusMetrics::new());
117 metrics.increment_exchanges("route-http");
118
119 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
120 let addr = listener.local_addr().unwrap();
121 let checker = Arc::new(|| HealthReport {
122 status: HealthStatus::Healthy,
123 services: vec![],
124 ..Default::default()
125 });
126
127 let handle = tokio::spawn(MetricsServer::run_with_listener_and_health_checker(
128 listener,
129 Arc::clone(&metrics),
130 Some(checker),
131 ));
132
133 sleep(Duration::from_millis(50)).await;
134
135 let client = reqwest::Client::new();
136 let metrics_resp = client
137 .get(format!("http://{}/metrics", addr))
138 .send()
139 .await
140 .unwrap();
141 assert_eq!(metrics_resp.status().as_u16(), 200);
142 assert_eq!(
143 metrics_resp
144 .headers()
145 .get(reqwest::header::CONTENT_TYPE)
146 .unwrap()
147 .to_str()
148 .unwrap(),
149 "text/plain; version=0.0.4"
150 );
151 let body = metrics_resp.text().await.unwrap();
152 assert!(body.contains("camel_exchanges_total"));
153 assert!(body.contains("route-http"));
154
155 let health_resp = client
156 .get(format!("http://{}/health", addr))
157 .send()
158 .await
159 .unwrap();
160 assert_eq!(health_resp.status().as_u16(), 200);
161
162 let not_found = client
163 .get(format!("http://{}/does-not-exist", addr))
164 .send()
165 .await
166 .unwrap();
167 assert_eq!(not_found.status().as_u16(), 404);
168
169 handle.abort();
170 }
171}