1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthChecker, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio::time::{Duration, timeout};
11use tracing::{debug, info, warn};
12
13pub struct PrometheusService {
14 addr: SocketAddr,
15 metrics: Arc<PrometheusMetrics>,
16 server_handle: Option<JoinHandle<()>>,
17 shutdown_tx: Option<oneshot::Sender<()>>,
18 bound_port: Arc<AtomicU16>,
20 status: Arc<AtomicU8>,
22 health_checker: Option<HealthChecker>,
23 started: AtomicBool,
25}
26
27impl PrometheusService {
28 pub fn new(addr: SocketAddr) -> Self {
29 Self {
30 addr,
31 metrics: Arc::new(PrometheusMetrics::new()),
32 server_handle: None,
33 shutdown_tx: None,
34 bound_port: Arc::new(AtomicU16::new(0)),
35 status: Arc::new(AtomicU8::new(0)),
36 health_checker: None,
37 started: AtomicBool::new(false),
38 }
39 }
40
41 pub fn port(&self) -> u16 {
45 self.bound_port.load(Ordering::SeqCst)
46 }
47
48 pub fn port_accessor(&self) -> Arc<AtomicU16> {
53 Arc::clone(&self.bound_port)
54 }
55
56 pub fn status_arc(&self) -> Arc<AtomicU8> {
57 Arc::clone(&self.status)
58 }
59
60 pub fn set_health_checker(&mut self, checker: HealthChecker) {
64 self.health_checker = Some(checker);
65 }
66
67 pub fn health_checker(&self) -> Option<HealthChecker> {
69 self.health_checker.clone()
70 }
71}
72
73#[async_trait]
74impl Lifecycle for PrometheusService {
75 fn name(&self) -> &str {
76 "prometheus"
77 }
78
79 fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
80 Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
81 }
82
83 fn status(&self) -> ServiceStatus {
84 match self.status.load(Ordering::SeqCst) {
85 0 => ServiceStatus::Stopped,
86 1 => ServiceStatus::Started,
87 2 => ServiceStatus::Failed,
88 _ => ServiceStatus::Failed,
89 }
90 }
91
92 async fn start(&mut self) -> Result<(), CamelError> {
93 use tokio::net::TcpListener;
94
95 if self
96 .started
97 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
98 .is_err()
99 {
100 return Err(CamelError::Config(
101 "PrometheusService already started".to_string(),
102 ));
103 }
104
105 let listener = TcpListener::bind(self.addr).await.map_err(|e| {
106 self.status.store(2, Ordering::SeqCst);
107 self.started.store(false, Ordering::SeqCst);
108 CamelError::Io(e.to_string())
109 })?;
110
111 let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
112 self.status.store(2, Ordering::SeqCst);
113 self.started.store(false, Ordering::SeqCst);
114 CamelError::Io(e.to_string())
115 })?;
116
117 self.bound_port.store(actual_port, Ordering::SeqCst);
118
119 let metrics = Arc::clone(&self.metrics);
120 let health_checker = self.health_checker.clone();
121 let (shutdown_tx, shutdown_rx) = oneshot::channel();
122
123 let handle = tokio::spawn(async move {
124 if let Err(err) =
125 crate::MetricsServer::run_with_listener_and_health_checker_with_shutdown(
126 listener,
127 metrics,
128 health_checker,
129 shutdown_rx,
130 )
131 .await
132 {
133 warn!("prometheus metrics server exited with error: {err}");
134 }
135 });
136
137 self.shutdown_tx = Some(shutdown_tx);
138 self.server_handle = Some(handle);
139 self.status.store(1, Ordering::SeqCst);
140 info!(port = %actual_port, "prometheus metrics service started");
141 Ok(())
142 }
143
144 async fn stop(&mut self) -> Result<(), CamelError> {
145 if let Some(tx) = self.shutdown_tx.take() {
146 let _ = tx.send(());
147 }
148
149 if let Some(handle) = self.server_handle.take() {
150 let mut handle = handle;
151 match timeout(Duration::from_secs(5), &mut handle).await {
152 Ok(join_result) => {
153 if let Err(e) = join_result {
154 return Err(CamelError::Io(format!(
155 "prometheus server task join failed: {e}"
156 )));
157 }
158 }
159 Err(_) => {
160 debug!("prometheus server shutdown timed out; aborting task");
161 handle.abort();
162 let _ = handle.await;
163 }
164 }
165 }
166 self.status.store(0, Ordering::SeqCst);
167 self.started.store(false, Ordering::SeqCst);
168 debug!("prometheus metrics service stopped");
169 Ok(())
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use camel_api::HealthReport;
177 use std::net::{IpAddr, Ipv4Addr};
178 use std::sync::atomic::Ordering;
179
180 #[test]
181 fn test_create_prometheus_service() {
182 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
183 let service = PrometheusService::new(addr);
184 assert_eq!(service.name(), "prometheus");
185 }
186
187 #[tokio::test]
188 async fn test_prometheus_service_status_transitions() {
189 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
190 let mut service = PrometheusService::new(addr);
191
192 assert_eq!(service.status(), ServiceStatus::Stopped);
193
194 service.start().await.unwrap();
195 assert_eq!(service.status(), ServiceStatus::Started);
196
197 service.stop().await.unwrap();
198 assert_eq!(service.status(), ServiceStatus::Stopped);
199 }
200
201 #[tokio::test]
202 async fn test_stop_uses_graceful_shutdown_signal() {
203 use std::sync::atomic::Ordering as AtomicOrdering;
204
205 crate::server::test_reset_graceful_shutdown_observability();
206
207 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
208 let mut service = PrometheusService::new(addr);
209 service.start().await.unwrap();
210
211 service.stop().await.unwrap();
212
213 let signal_count = crate::server::test_graceful_shutdown_signal_count();
214 assert!(
215 signal_count.load(AtomicOrdering::SeqCst) >= 1,
216 "expected at least one graceful shutdown signal"
217 );
218 }
219
220 #[tokio::test]
221 async fn test_port_and_port_accessor_after_start() {
222 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
223 let mut service = PrometheusService::new(addr);
224
225 assert_eq!(service.port(), 0);
226 let accessor = service.port_accessor();
227 assert_eq!(accessor.load(Ordering::SeqCst), 0);
228
229 service.start().await.unwrap();
230 let port = service.port();
231 assert!(port > 0);
232 assert_eq!(accessor.load(Ordering::SeqCst), port);
233
234 service.stop().await.unwrap();
235 }
236
237 #[test]
238 fn test_as_metrics_collector_returns_metrics_instance() {
239 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
240 let service = PrometheusService::new(addr);
241 let collector = service.as_metrics_collector().unwrap();
242 collector.increment_exchanges("route-a");
243 let output = service.metrics.gather();
244 assert!(output.contains("camel_exchanges_total"));
245 assert!(output.contains("route-a"));
246 }
247
248 #[test]
249 fn test_unknown_internal_status_maps_to_failed() {
250 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
251 let service = PrometheusService::new(addr);
252 service.status_arc().store(9, Ordering::SeqCst);
253 assert_eq!(service.status(), ServiceStatus::Failed);
254 }
255
256 #[test]
257 fn test_health_checker_injection() {
258 use camel_api::HealthStatus;
259
260 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
261 let mut service = PrometheusService::new(addr);
262
263 assert!(service.health_checker().is_none());
265
266 let checker = Arc::new(|| HealthReport {
268 status: HealthStatus::Healthy,
269 services: vec![],
270 ..Default::default()
271 });
272
273 service.set_health_checker(checker);
274 assert!(service.health_checker().is_some());
275
276 let report = service.health_checker().unwrap()();
278 assert_eq!(report.status, HealthStatus::Healthy);
279 }
280
281 #[test]
282 fn test_prometheus_service_with_socket_addr() {
283 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
284 let service = PrometheusService::new(addr);
285 assert_eq!(service.name(), "prometheus");
286 }
287
288 #[tokio::test]
289 async fn test_double_start_returns_error() {
290 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
291 let mut service = PrometheusService::new(addr);
292
293 service.start().await.unwrap();
294 assert_eq!(service.status(), ServiceStatus::Started);
295
296 let result = service.start().await;
297 assert!(result.is_err(), "second start() should return an error");
298 let err_msg = result.unwrap_err().to_string();
299 assert!(
300 err_msg.contains("already started"),
301 "error should mention 'already started', got: {err_msg}"
302 );
303
304 service.stop().await.unwrap();
305 }
306
307 #[tokio::test]
308 async fn test_start_allowed_again_after_stop() {
309 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
310 let mut service = PrometheusService::new(addr);
311
312 service.start().await.unwrap();
313 service.stop().await.unwrap();
314
315 service.start().await.unwrap();
317 assert_eq!(service.status(), ServiceStatus::Started);
318
319 service.stop().await.unwrap();
320 }
321}