Skip to main content

camel_prometheus/
service.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthChecker, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio::time::{Duration, timeout};
11use tracing::{debug, info, warn};
12
13pub struct PrometheusService {
14    addr: SocketAddr,
15    metrics: Arc<PrometheusMetrics>,
16    server_handle: Option<JoinHandle<()>>,
17    shutdown_tx: Option<oneshot::Sender<()>>,
18    /// The actual bound port (set after start(), 0 before)
19    bound_port: Arc<AtomicU16>,
20    /// Current service status (Stopped=0, Started=1, Failed=2)
21    status: Arc<AtomicU8>,
22    health_checker: Option<HealthChecker>,
23    /// Guard to prevent double-start
24    started: AtomicBool,
25}
26
27impl PrometheusService {
28    pub fn new(addr: SocketAddr) -> Self {
29        Self {
30            addr,
31            metrics: Arc::new(PrometheusMetrics::new()),
32            server_handle: None,
33            shutdown_tx: None,
34            bound_port: Arc::new(AtomicU16::new(0)),
35            status: Arc::new(AtomicU8::new(0)),
36            health_checker: None,
37            started: AtomicBool::new(false),
38        }
39    }
40
41    /// Returns the actual port the server is listening on (after start())
42    ///
43    /// Returns 0 if the service hasn't been started yet.
44    pub fn port(&self) -> u16 {
45        self.bound_port.load(Ordering::SeqCst)
46    }
47
48    /// Returns a cloneable accessor for the bound port.
49    ///
50    /// Use this when you need to read the port after the service has been
51    /// moved into a CamelContext or other container.
52    pub fn port_accessor(&self) -> Arc<AtomicU16> {
53        Arc::clone(&self.bound_port)
54    }
55
56    pub fn status_arc(&self) -> Arc<AtomicU8> {
57        Arc::clone(&self.status)
58    }
59
60    /// Set the health checker closure that will be called to get system health.
61    ///
62    /// The closure should capture a reference to CamelContext's health_check method.
63    pub fn set_health_checker(&mut self, checker: HealthChecker) {
64        self.health_checker = Some(checker);
65    }
66
67    /// Get a clone of the health checker if one is set.
68    pub fn health_checker(&self) -> Option<HealthChecker> {
69        self.health_checker.clone()
70    }
71}
72
73#[async_trait]
74impl Lifecycle for PrometheusService {
75    fn name(&self) -> &str {
76        "prometheus"
77    }
78
79    fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
80        Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
81    }
82
83    fn status(&self) -> ServiceStatus {
84        match self.status.load(Ordering::SeqCst) {
85            0 => ServiceStatus::Stopped,
86            1 => ServiceStatus::Started,
87            2 => ServiceStatus::Failed,
88            _ => ServiceStatus::Failed,
89        }
90    }
91
92    async fn start(&mut self) -> Result<(), CamelError> {
93        use tokio::net::TcpListener;
94
95        if self
96            .started
97            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
98            .is_err()
99        {
100            return Err(CamelError::Config(
101                "PrometheusService already started".to_string(),
102            ));
103        }
104
105        let listener = TcpListener::bind(self.addr).await.map_err(|e| {
106            self.status.store(2, Ordering::SeqCst);
107            self.started.store(false, Ordering::SeqCst);
108            CamelError::Io(e.to_string())
109        })?;
110
111        let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
112            self.status.store(2, Ordering::SeqCst);
113            self.started.store(false, Ordering::SeqCst);
114            CamelError::Io(e.to_string())
115        })?;
116
117        self.bound_port.store(actual_port, Ordering::SeqCst);
118
119        let metrics = Arc::clone(&self.metrics);
120        let health_checker = self.health_checker.clone();
121        let (shutdown_tx, shutdown_rx) = oneshot::channel();
122
123        let handle = tokio::spawn(async move {
124            if let Err(err) =
125                crate::MetricsServer::run_with_listener_and_health_checker_with_shutdown(
126                    listener,
127                    metrics,
128                    health_checker,
129                    shutdown_rx,
130                )
131                .await
132            {
133                warn!("prometheus metrics server exited with error: {err}");
134            }
135        });
136
137        self.shutdown_tx = Some(shutdown_tx);
138        self.server_handle = Some(handle);
139        self.status.store(1, Ordering::SeqCst);
140        info!(port = %actual_port, "prometheus metrics service started");
141        Ok(())
142    }
143
144    async fn stop(&mut self) -> Result<(), CamelError> {
145        if let Some(tx) = self.shutdown_tx.take() {
146            let _ = tx.send(());
147        }
148
149        if let Some(handle) = self.server_handle.take() {
150            let mut handle = handle;
151            match timeout(Duration::from_secs(5), &mut handle).await {
152                Ok(join_result) => {
153                    if let Err(e) = join_result {
154                        return Err(CamelError::Io(format!(
155                            "prometheus server task join failed: {e}"
156                        )));
157                    }
158                }
159                Err(_) => {
160                    debug!("prometheus server shutdown timed out; aborting task");
161                    handle.abort();
162                    let _ = handle.await;
163                }
164            }
165        }
166        self.status.store(0, Ordering::SeqCst);
167        self.started.store(false, Ordering::SeqCst);
168        debug!("prometheus metrics service stopped");
169        Ok(())
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use camel_api::HealthReport;
177    use std::net::{IpAddr, Ipv4Addr};
178    use std::sync::atomic::Ordering;
179
180    #[test]
181    fn test_create_prometheus_service() {
182        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
183        let service = PrometheusService::new(addr);
184        assert_eq!(service.name(), "prometheus");
185    }
186
187    #[tokio::test]
188    async fn test_prometheus_service_status_transitions() {
189        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
190        let mut service = PrometheusService::new(addr);
191
192        assert_eq!(service.status(), ServiceStatus::Stopped);
193
194        service.start().await.unwrap();
195        assert_eq!(service.status(), ServiceStatus::Started);
196
197        service.stop().await.unwrap();
198        assert_eq!(service.status(), ServiceStatus::Stopped);
199    }
200
201    #[tokio::test]
202    async fn test_stop_uses_graceful_shutdown_signal() {
203        use std::sync::atomic::Ordering as AtomicOrdering;
204
205        crate::server::test_reset_graceful_shutdown_observability();
206
207        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
208        let mut service = PrometheusService::new(addr);
209        service.start().await.unwrap();
210
211        service.stop().await.unwrap();
212
213        let signal_count = crate::server::test_graceful_shutdown_signal_count();
214        assert!(
215            signal_count.load(AtomicOrdering::SeqCst) >= 1,
216            "expected at least one graceful shutdown signal"
217        );
218    }
219
220    #[tokio::test]
221    async fn test_port_and_port_accessor_after_start() {
222        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
223        let mut service = PrometheusService::new(addr);
224
225        assert_eq!(service.port(), 0);
226        let accessor = service.port_accessor();
227        assert_eq!(accessor.load(Ordering::SeqCst), 0);
228
229        service.start().await.unwrap();
230        let port = service.port();
231        assert!(port > 0);
232        assert_eq!(accessor.load(Ordering::SeqCst), port);
233
234        service.stop().await.unwrap();
235    }
236
237    #[test]
238    fn test_as_metrics_collector_returns_metrics_instance() {
239        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
240        let service = PrometheusService::new(addr);
241        let collector = service.as_metrics_collector().unwrap();
242        collector.increment_exchanges("route-a");
243        let output = service.metrics.gather();
244        assert!(output.contains("camel_exchanges_total"));
245        assert!(output.contains("route-a"));
246    }
247
248    #[test]
249    fn test_unknown_internal_status_maps_to_failed() {
250        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
251        let service = PrometheusService::new(addr);
252        service.status_arc().store(9, Ordering::SeqCst);
253        assert_eq!(service.status(), ServiceStatus::Failed);
254    }
255
256    #[test]
257    fn test_health_checker_injection() {
258        use camel_api::HealthStatus;
259
260        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
261        let mut service = PrometheusService::new(addr);
262
263        // Initially no health checker
264        assert!(service.health_checker().is_none());
265
266        // Inject health checker
267        let checker = Arc::new(|| HealthReport {
268            status: HealthStatus::Healthy,
269            services: vec![],
270            ..Default::default()
271        });
272
273        service.set_health_checker(checker);
274        assert!(service.health_checker().is_some());
275
276        // Call the checker
277        let report = service.health_checker().unwrap()();
278        assert_eq!(report.status, HealthStatus::Healthy);
279    }
280
281    #[test]
282    fn test_prometheus_service_with_socket_addr() {
283        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
284        let service = PrometheusService::new(addr);
285        assert_eq!(service.name(), "prometheus");
286    }
287
288    #[tokio::test]
289    async fn test_double_start_returns_error() {
290        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
291        let mut service = PrometheusService::new(addr);
292
293        service.start().await.unwrap();
294        assert_eq!(service.status(), ServiceStatus::Started);
295
296        let result = service.start().await;
297        assert!(result.is_err(), "second start() should return an error");
298        let err_msg = result.unwrap_err().to_string();
299        assert!(
300            err_msg.contains("already started"),
301            "error should mention 'already started', got: {err_msg}"
302        );
303
304        service.stop().await.unwrap();
305    }
306
307    #[tokio::test]
308    async fn test_start_allowed_again_after_stop() {
309        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
310        let mut service = PrometheusService::new(addr);
311
312        service.start().await.unwrap();
313        service.stop().await.unwrap();
314
315        // After stop, start should succeed again
316        service.start().await.unwrap();
317        assert_eq!(service.status(), ServiceStatus::Started);
318
319        service.stop().await.unwrap();
320    }
321}