Skip to main content

camel_prometheus/
service.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthChecker, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9
10pub struct PrometheusService {
11    addr: SocketAddr,
12    metrics: Arc<PrometheusMetrics>,
13    server_handle: Option<JoinHandle<()>>,
14    /// The actual bound port (set after start(), 0 before)
15    bound_port: Arc<AtomicU16>,
16    /// Current service status (Stopped=0, Started=1, Failed=2)
17    status: Arc<AtomicU8>,
18    health_checker: Option<HealthChecker>,
19}
20
21impl PrometheusService {
22    pub fn new(addr: SocketAddr) -> Self {
23        Self {
24            addr,
25            metrics: Arc::new(PrometheusMetrics::new()),
26            server_handle: None,
27            bound_port: Arc::new(AtomicU16::new(0)),
28            status: Arc::new(AtomicU8::new(0)),
29            health_checker: None,
30        }
31    }
32
33    /// Returns the actual port the server is listening on (after start())
34    ///
35    /// Returns 0 if the service hasn't been started yet.
36    pub fn port(&self) -> u16 {
37        self.bound_port.load(Ordering::SeqCst)
38    }
39
40    /// Returns a cloneable accessor for the bound port.
41    ///
42    /// Use this when you need to read the port after the service has been
43    /// moved into a CamelContext or other container.
44    pub fn port_accessor(&self) -> Arc<AtomicU16> {
45        Arc::clone(&self.bound_port)
46    }
47
48    pub fn status_arc(&self) -> Arc<AtomicU8> {
49        Arc::clone(&self.status)
50    }
51
52    /// Set the health checker closure that will be called to get system health.
53    ///
54    /// The closure should capture a reference to CamelContext's health_check method.
55    pub fn set_health_checker(&mut self, checker: HealthChecker) {
56        self.health_checker = Some(checker);
57    }
58
59    /// Get a clone of the health checker if one is set.
60    pub fn health_checker(&self) -> Option<HealthChecker> {
61        self.health_checker.clone()
62    }
63}
64
65#[async_trait]
66impl Lifecycle for PrometheusService {
67    fn name(&self) -> &str {
68        "prometheus"
69    }
70
71    fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
72        Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
73    }
74
75    fn status(&self) -> ServiceStatus {
76        match self.status.load(Ordering::SeqCst) {
77            0 => ServiceStatus::Stopped,
78            1 => ServiceStatus::Started,
79            2 => ServiceStatus::Failed,
80            _ => ServiceStatus::Failed,
81        }
82    }
83
84    async fn start(&mut self) -> Result<(), CamelError> {
85        use tokio::net::TcpListener;
86
87        let listener = TcpListener::bind(self.addr).await.map_err(|e| {
88            self.status.store(2, Ordering::SeqCst);
89            CamelError::Io(e.to_string())
90        })?;
91
92        let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
93            self.status.store(2, Ordering::SeqCst);
94            CamelError::Io(e.to_string())
95        })?;
96
97        self.bound_port.store(actual_port, Ordering::SeqCst);
98
99        let metrics = Arc::clone(&self.metrics);
100        let health_checker = self.health_checker.clone();
101
102        let handle = tokio::spawn(async move {
103            crate::MetricsServer::run_with_listener_and_health_checker(
104                listener,
105                metrics,
106                health_checker,
107            )
108            .await;
109        });
110
111        self.server_handle = Some(handle);
112        self.status.store(1, Ordering::SeqCst);
113        Ok(())
114    }
115
116    async fn stop(&mut self) -> Result<(), CamelError> {
117        if let Some(handle) = self.server_handle.take() {
118            handle.abort();
119        }
120        self.status.store(0, Ordering::SeqCst);
121        Ok(())
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use camel_api::HealthReport;
129    use std::net::{IpAddr, Ipv4Addr};
130    use std::sync::atomic::Ordering;
131
132    #[test]
133    fn test_create_prometheus_service() {
134        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
135        let service = PrometheusService::new(addr);
136        assert_eq!(service.name(), "prometheus");
137    }
138
139    #[tokio::test]
140    async fn test_prometheus_service_status_transitions() {
141        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
142        let mut service = PrometheusService::new(addr);
143
144        assert_eq!(service.status(), ServiceStatus::Stopped);
145
146        service.start().await.unwrap();
147        assert_eq!(service.status(), ServiceStatus::Started);
148
149        service.stop().await.unwrap();
150        assert_eq!(service.status(), ServiceStatus::Stopped);
151    }
152
153    #[tokio::test]
154    async fn test_port_and_port_accessor_after_start() {
155        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
156        let mut service = PrometheusService::new(addr);
157
158        assert_eq!(service.port(), 0);
159        let accessor = service.port_accessor();
160        assert_eq!(accessor.load(Ordering::SeqCst), 0);
161
162        service.start().await.unwrap();
163        let port = service.port();
164        assert!(port > 0);
165        assert_eq!(accessor.load(Ordering::SeqCst), port);
166
167        service.stop().await.unwrap();
168    }
169
170    #[test]
171    fn test_as_metrics_collector_returns_metrics_instance() {
172        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
173        let service = PrometheusService::new(addr);
174        let collector = service.as_metrics_collector().unwrap();
175        collector.increment_exchanges("route-a");
176        let output = service.metrics.gather();
177        assert!(output.contains("camel_exchanges_total"));
178        assert!(output.contains("route-a"));
179    }
180
181    #[test]
182    fn test_unknown_internal_status_maps_to_failed() {
183        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
184        let service = PrometheusService::new(addr);
185        service.status_arc().store(9, Ordering::SeqCst);
186        assert_eq!(service.status(), ServiceStatus::Failed);
187    }
188
189    #[test]
190    fn test_health_checker_injection() {
191        use camel_api::HealthStatus;
192
193        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
194        let mut service = PrometheusService::new(addr);
195
196        // Initially no health checker
197        assert!(service.health_checker().is_none());
198
199        // Inject health checker
200        let checker = Arc::new(|| HealthReport {
201            status: HealthStatus::Healthy,
202            services: vec![],
203            ..Default::default()
204        });
205
206        service.set_health_checker(checker);
207        assert!(service.health_checker().is_some());
208
209        // Call the checker
210        let report = service.health_checker().unwrap()();
211        assert_eq!(report.status, HealthStatus::Healthy);
212    }
213
214    #[test]
215    fn test_prometheus_service_with_socket_addr() {
216        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
217        let service = PrometheusService::new(addr);
218        assert_eq!(service.name(), "prometheus");
219    }
220}