Skip to main content

camel_prometheus/
service.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthSource, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::sync::oneshot;
9use tokio::task::JoinHandle;
10use tokio::time::{Duration, timeout};
11use tracing::{debug, info, warn};
12
13pub struct PrometheusService {
14    addr: SocketAddr,
15    metrics: Arc<PrometheusMetrics>,
16    server_handle: Option<JoinHandle<()>>,
17    shutdown_tx: Option<oneshot::Sender<()>>,
18    /// The actual bound port (set after start(), 0 before)
19    bound_port: Arc<AtomicU16>,
20    /// Current service status (Stopped=0, Started=1, Failed=2)
21    status: Arc<AtomicU8>,
22    health_source: Option<Arc<dyn HealthSource>>,
23    /// Guard to prevent double-start
24    started: AtomicBool,
25}
26
27impl PrometheusService {
28    pub fn new(addr: SocketAddr) -> Self {
29        Self {
30            addr,
31            metrics: Arc::new(PrometheusMetrics::new()),
32            server_handle: None,
33            shutdown_tx: None,
34            bound_port: Arc::new(AtomicU16::new(0)),
35            status: Arc::new(AtomicU8::new(0)),
36            health_source: None,
37            started: AtomicBool::new(false),
38        }
39    }
40
41    /// Returns the actual port the server is listening on (after start())
42    ///
43    /// Returns 0 if the service hasn't been started yet.
44    pub fn port(&self) -> u16 {
45        self.bound_port.load(Ordering::SeqCst)
46    }
47
48    /// Returns a cloneable accessor for the bound port.
49    ///
50    /// Use this when you need to read the port after the service has been
51    /// moved into a CamelContext or other container.
52    pub fn port_accessor(&self) -> Arc<AtomicU16> {
53        Arc::clone(&self.bound_port)
54    }
55
56    pub fn status_arc(&self) -> Arc<AtomicU8> {
57        Arc::clone(&self.status)
58    }
59
60    pub fn set_health_source(&mut self, source: Arc<dyn HealthSource>) {
61        self.health_source = Some(source);
62    }
63
64    pub fn health_source(&self) -> Option<Arc<dyn HealthSource>> {
65        self.health_source.clone()
66    }
67}
68
69#[async_trait]
70impl Lifecycle for PrometheusService {
71    fn name(&self) -> &str {
72        "prometheus"
73    }
74
75    fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
76        Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
77    }
78
79    fn status(&self) -> ServiceStatus {
80        match self.status.load(Ordering::SeqCst) {
81            0 => ServiceStatus::Stopped,
82            1 => ServiceStatus::Started,
83            2 => ServiceStatus::Failed,
84            _ => ServiceStatus::Failed,
85        }
86    }
87
88    async fn start(&mut self) -> Result<(), CamelError> {
89        use tokio::net::TcpListener;
90
91        if self
92            .started
93            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
94            .is_err()
95        {
96            return Err(CamelError::Config(
97                "PrometheusService already started".to_string(),
98            ));
99        }
100
101        let listener = TcpListener::bind(self.addr).await.map_err(|e| {
102            self.status.store(2, Ordering::SeqCst);
103            self.started.store(false, Ordering::SeqCst);
104            CamelError::Io(e.to_string())
105        })?;
106
107        let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
108            self.status.store(2, Ordering::SeqCst);
109            self.started.store(false, Ordering::SeqCst);
110            CamelError::Io(e.to_string())
111        })?;
112
113        self.bound_port.store(actual_port, Ordering::SeqCst);
114
115        let metrics = Arc::clone(&self.metrics);
116        let health_source = self.health_source.clone();
117        let (shutdown_tx, shutdown_rx) = oneshot::channel();
118
119        let handle = tokio::spawn(async move {
120            if let Err(err) =
121                crate::MetricsServer::run_with_listener_and_health_source_with_shutdown(
122                    listener,
123                    metrics,
124                    health_source,
125                    shutdown_rx,
126                )
127                .await
128            {
129                warn!("prometheus metrics server exited with error: {err}");
130            }
131        });
132
133        self.shutdown_tx = Some(shutdown_tx);
134        self.server_handle = Some(handle);
135        self.status.store(1, Ordering::SeqCst);
136        info!(port = %actual_port, "prometheus metrics service started");
137        Ok(())
138    }
139
140    async fn stop(&mut self) -> Result<(), CamelError> {
141        if let Some(tx) = self.shutdown_tx.take() {
142            let _ = tx.send(());
143        }
144
145        if let Some(handle) = self.server_handle.take() {
146            let mut handle = handle;
147            match timeout(Duration::from_secs(5), &mut handle).await {
148                Ok(join_result) => {
149                    if let Err(e) = join_result {
150                        return Err(CamelError::Io(format!(
151                            "prometheus server task join failed: {e}"
152                        )));
153                    }
154                }
155                Err(_) => {
156                    debug!("prometheus server shutdown timed out; aborting task");
157                    handle.abort();
158                    let _ = handle.await;
159                }
160            }
161        }
162        self.status.store(0, Ordering::SeqCst);
163        self.started.store(false, Ordering::SeqCst);
164        debug!("prometheus metrics service stopped");
165        Ok(())
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use camel_api::HealthStatus;
173    use std::net::{IpAddr, Ipv4Addr};
174    use std::sync::atomic::Ordering;
175
176    struct MockHealthSource {
177        readiness: HealthStatus,
178    }
179
180    #[async_trait]
181    impl HealthSource for MockHealthSource {
182        async fn liveness(&self) -> HealthStatus {
183            HealthStatus::Healthy
184        }
185
186        async fn readiness(&self) -> HealthStatus {
187            self.readiness
188        }
189
190        async fn startup(&self) -> HealthStatus {
191            HealthStatus::Healthy
192        }
193    }
194
195    #[test]
196    fn test_create_prometheus_service() {
197        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
198        let service = PrometheusService::new(addr);
199        assert_eq!(service.name(), "prometheus");
200    }
201
202    #[tokio::test]
203    async fn test_prometheus_service_status_transitions() {
204        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
205        let mut service = PrometheusService::new(addr);
206
207        assert_eq!(service.status(), ServiceStatus::Stopped);
208
209        service.start().await.unwrap();
210        assert_eq!(service.status(), ServiceStatus::Started);
211
212        service.stop().await.unwrap();
213        assert_eq!(service.status(), ServiceStatus::Stopped);
214    }
215
216    #[tokio::test]
217    async fn test_stop_uses_graceful_shutdown_signal() {
218        use std::sync::atomic::Ordering as AtomicOrdering;
219
220        crate::server::test_reset_graceful_shutdown_observability();
221
222        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
223        let mut service = PrometheusService::new(addr);
224        service.start().await.unwrap();
225
226        service.stop().await.unwrap();
227
228        let signal_count = crate::server::test_graceful_shutdown_signal_count();
229        assert!(
230            signal_count.load(AtomicOrdering::SeqCst) >= 1,
231            "expected at least one graceful shutdown signal"
232        );
233    }
234
235    #[tokio::test]
236    async fn test_port_and_port_accessor_after_start() {
237        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
238        let mut service = PrometheusService::new(addr);
239
240        assert_eq!(service.port(), 0);
241        let accessor = service.port_accessor();
242        assert_eq!(accessor.load(Ordering::SeqCst), 0);
243
244        service.start().await.unwrap();
245        let port = service.port();
246        assert!(port > 0);
247        assert_eq!(accessor.load(Ordering::SeqCst), port);
248
249        service.stop().await.unwrap();
250    }
251
252    #[test]
253    fn test_as_metrics_collector_returns_metrics_instance() {
254        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
255        let service = PrometheusService::new(addr);
256        let collector = service.as_metrics_collector().unwrap();
257        collector.increment_exchanges("route-a");
258        let output = service.metrics.gather();
259        assert!(output.contains("camel_exchanges_total"));
260        assert!(output.contains("route-a"));
261    }
262
263    #[test]
264    fn test_unknown_internal_status_maps_to_failed() {
265        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
266        let service = PrometheusService::new(addr);
267        service.status_arc().store(9, Ordering::SeqCst);
268        assert_eq!(service.status(), ServiceStatus::Failed);
269    }
270
271    #[tokio::test]
272    async fn test_health_source_injection() {
273        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
274        let mut service = PrometheusService::new(addr);
275
276        assert!(service.health_source().is_none());
277
278        let source = Arc::new(MockHealthSource {
279            readiness: HealthStatus::Healthy,
280        });
281
282        service.set_health_source(source);
283        assert!(service.health_source().is_some());
284
285        let status = service.health_source().unwrap().readiness().await;
286        assert_eq!(status, HealthStatus::Healthy);
287    }
288
289    #[test]
290    fn test_prometheus_service_with_socket_addr() {
291        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
292        let service = PrometheusService::new(addr);
293        assert_eq!(service.name(), "prometheus");
294    }
295
296    #[tokio::test]
297    async fn test_double_start_returns_error() {
298        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
299        let mut service = PrometheusService::new(addr);
300
301        service.start().await.unwrap();
302        assert_eq!(service.status(), ServiceStatus::Started);
303
304        let result = service.start().await;
305        assert!(result.is_err(), "second start() should return an error");
306        let err_msg = result.unwrap_err().to_string();
307        assert!(
308            err_msg.contains("already started"),
309            "error should mention 'already started', got: {err_msg}"
310        );
311
312        service.stop().await.unwrap();
313    }
314
315    #[tokio::test]
316    async fn test_start_allowed_again_after_stop() {
317        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
318        let mut service = PrometheusService::new(addr);
319
320        service.start().await.unwrap();
321        service.stop().await.unwrap();
322
323        // After stop, start should succeed again
324        service.start().await.unwrap();
325        assert_eq!(service.status(), ServiceStatus::Started);
326
327        service.stop().await.unwrap();
328    }
329}