1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthSource, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio::time::{Duration, timeout};
11use tracing::{debug, info, warn};
12
13pub struct PrometheusService {
14 addr: SocketAddr,
15 metrics: Arc<PrometheusMetrics>,
16 server_handle: Option<JoinHandle<()>>,
17 shutdown_tx: Option<oneshot::Sender<()>>,
18 bound_port: Arc<AtomicU16>,
20 status: Arc<AtomicU8>,
22 health_source: Option<Arc<dyn HealthSource>>,
23 started: AtomicBool,
25}
26
27impl PrometheusService {
28 pub fn new(addr: SocketAddr) -> Self {
29 Self {
30 addr,
31 metrics: Arc::new(PrometheusMetrics::new()),
32 server_handle: None,
33 shutdown_tx: None,
34 bound_port: Arc::new(AtomicU16::new(0)),
35 status: Arc::new(AtomicU8::new(0)),
36 health_source: None,
37 started: AtomicBool::new(false),
38 }
39 }
40
41 pub fn port(&self) -> u16 {
45 self.bound_port.load(Ordering::SeqCst)
46 }
47
48 pub fn port_accessor(&self) -> Arc<AtomicU16> {
53 Arc::clone(&self.bound_port)
54 }
55
56 pub fn status_arc(&self) -> Arc<AtomicU8> {
57 Arc::clone(&self.status)
58 }
59
60 pub fn set_health_source(&mut self, source: Arc<dyn HealthSource>) {
61 self.health_source = Some(source);
62 }
63
64 pub fn health_source(&self) -> Option<Arc<dyn HealthSource>> {
65 self.health_source.clone()
66 }
67}
68
69#[async_trait]
70impl Lifecycle for PrometheusService {
71 fn name(&self) -> &str {
72 "prometheus"
73 }
74
75 fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
76 Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
77 }
78
79 fn status(&self) -> ServiceStatus {
80 match self.status.load(Ordering::SeqCst) {
81 0 => ServiceStatus::Stopped,
82 1 => ServiceStatus::Started,
83 2 => ServiceStatus::Failed,
84 _ => ServiceStatus::Failed,
85 }
86 }
87
88 async fn start(&mut self) -> Result<(), CamelError> {
89 use tokio::net::TcpListener;
90
91 if self
92 .started
93 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
94 .is_err()
95 {
96 return Err(CamelError::Config(
97 "PrometheusService already started".to_string(),
98 ));
99 }
100
101 let listener = TcpListener::bind(self.addr).await.map_err(|e| {
102 self.status.store(2, Ordering::SeqCst);
103 self.started.store(false, Ordering::SeqCst);
104 CamelError::Io(e.to_string())
105 })?;
106
107 let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
108 self.status.store(2, Ordering::SeqCst);
109 self.started.store(false, Ordering::SeqCst);
110 CamelError::Io(e.to_string())
111 })?;
112
113 self.bound_port.store(actual_port, Ordering::SeqCst);
114
115 let metrics = Arc::clone(&self.metrics);
116 let health_source = self.health_source.clone();
117 let (shutdown_tx, shutdown_rx) = oneshot::channel();
118
119 let handle = tokio::spawn(async move {
120 if let Err(err) =
121 crate::MetricsServer::run_with_listener_and_health_source_with_shutdown(
122 listener,
123 metrics,
124 health_source,
125 shutdown_rx,
126 )
127 .await
128 {
129 warn!("prometheus metrics server exited with error: {err}");
130 }
131 });
132
133 self.shutdown_tx = Some(shutdown_tx);
134 self.server_handle = Some(handle);
135 self.status.store(1, Ordering::SeqCst);
136 info!(port = %actual_port, "prometheus metrics service started");
137 Ok(())
138 }
139
140 async fn stop(&mut self) -> Result<(), CamelError> {
141 if let Some(tx) = self.shutdown_tx.take() {
142 let _ = tx.send(());
143 }
144
145 if let Some(handle) = self.server_handle.take() {
146 let mut handle = handle;
147 match timeout(Duration::from_secs(5), &mut handle).await {
148 Ok(join_result) => {
149 if let Err(e) = join_result {
150 return Err(CamelError::Io(format!(
151 "prometheus server task join failed: {e}"
152 )));
153 }
154 }
155 Err(_) => {
156 debug!("prometheus server shutdown timed out; aborting task");
157 handle.abort();
158 let _ = handle.await;
159 }
160 }
161 }
162 self.status.store(0, Ordering::SeqCst);
163 self.started.store(false, Ordering::SeqCst);
164 debug!("prometheus metrics service stopped");
165 Ok(())
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use camel_api::HealthStatus;
173 use std::net::{IpAddr, Ipv4Addr};
174 use std::sync::atomic::Ordering;
175
176 struct MockHealthSource {
177 readiness: HealthStatus,
178 }
179
180 #[async_trait]
181 impl HealthSource for MockHealthSource {
182 async fn liveness(&self) -> HealthStatus {
183 HealthStatus::Healthy
184 }
185
186 async fn readiness(&self) -> HealthStatus {
187 self.readiness
188 }
189
190 async fn startup(&self) -> HealthStatus {
191 HealthStatus::Healthy
192 }
193 }
194
195 #[test]
196 fn test_create_prometheus_service() {
197 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
198 let service = PrometheusService::new(addr);
199 assert_eq!(service.name(), "prometheus");
200 }
201
202 #[tokio::test]
203 async fn test_prometheus_service_status_transitions() {
204 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
205 let mut service = PrometheusService::new(addr);
206
207 assert_eq!(service.status(), ServiceStatus::Stopped);
208
209 service.start().await.unwrap();
210 assert_eq!(service.status(), ServiceStatus::Started);
211
212 service.stop().await.unwrap();
213 assert_eq!(service.status(), ServiceStatus::Stopped);
214 }
215
216 #[tokio::test]
217 async fn test_stop_uses_graceful_shutdown_signal() {
218 use std::sync::atomic::Ordering as AtomicOrdering;
219
220 crate::server::test_reset_graceful_shutdown_observability();
221
222 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
223 let mut service = PrometheusService::new(addr);
224 service.start().await.unwrap();
225
226 service.stop().await.unwrap();
227
228 let signal_count = crate::server::test_graceful_shutdown_signal_count();
229 assert!(
230 signal_count.load(AtomicOrdering::SeqCst) >= 1,
231 "expected at least one graceful shutdown signal"
232 );
233 }
234
235 #[tokio::test]
236 async fn test_port_and_port_accessor_after_start() {
237 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
238 let mut service = PrometheusService::new(addr);
239
240 assert_eq!(service.port(), 0);
241 let accessor = service.port_accessor();
242 assert_eq!(accessor.load(Ordering::SeqCst), 0);
243
244 service.start().await.unwrap();
245 let port = service.port();
246 assert!(port > 0);
247 assert_eq!(accessor.load(Ordering::SeqCst), port);
248
249 service.stop().await.unwrap();
250 }
251
252 #[test]
253 fn test_as_metrics_collector_returns_metrics_instance() {
254 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
255 let service = PrometheusService::new(addr);
256 let collector = service.as_metrics_collector().unwrap();
257 collector.increment_exchanges("route-a");
258 let output = service.metrics.gather();
259 assert!(output.contains("camel_exchanges_total"));
260 assert!(output.contains("route-a"));
261 }
262
263 #[test]
264 fn test_unknown_internal_status_maps_to_failed() {
265 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
266 let service = PrometheusService::new(addr);
267 service.status_arc().store(9, Ordering::SeqCst);
268 assert_eq!(service.status(), ServiceStatus::Failed);
269 }
270
271 #[tokio::test]
272 async fn test_health_source_injection() {
273 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
274 let mut service = PrometheusService::new(addr);
275
276 assert!(service.health_source().is_none());
277
278 let source = Arc::new(MockHealthSource {
279 readiness: HealthStatus::Healthy,
280 });
281
282 service.set_health_source(source);
283 assert!(service.health_source().is_some());
284
285 let status = service.health_source().unwrap().readiness().await;
286 assert_eq!(status, HealthStatus::Healthy);
287 }
288
289 #[test]
290 fn test_prometheus_service_with_socket_addr() {
291 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
292 let service = PrometheusService::new(addr);
293 assert_eq!(service.name(), "prometheus");
294 }
295
296 #[tokio::test]
297 async fn test_double_start_returns_error() {
298 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
299 let mut service = PrometheusService::new(addr);
300
301 service.start().await.unwrap();
302 assert_eq!(service.status(), ServiceStatus::Started);
303
304 let result = service.start().await;
305 assert!(result.is_err(), "second start() should return an error");
306 let err_msg = result.unwrap_err().to_string();
307 assert!(
308 err_msg.contains("already started"),
309 "error should mention 'already started', got: {err_msg}"
310 );
311
312 service.stop().await.unwrap();
313 }
314
315 #[tokio::test]
316 async fn test_start_allowed_again_after_stop() {
317 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
318 let mut service = PrometheusService::new(addr);
319
320 service.start().await.unwrap();
321 service.stop().await.unwrap();
322
323 service.start().await.unwrap();
325 assert_eq!(service.status(), ServiceStatus::Started);
326
327 service.stop().await.unwrap();
328 }
329}