Skip to main content

camel_prometheus/
service.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthChecker, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9use tracing::{debug, info};
10
11pub struct PrometheusService {
12    addr: SocketAddr,
13    metrics: Arc<PrometheusMetrics>,
14    server_handle: Option<JoinHandle<()>>,
15    /// The actual bound port (set after start(), 0 before)
16    bound_port: Arc<AtomicU16>,
17    /// Current service status (Stopped=0, Started=1, Failed=2)
18    status: Arc<AtomicU8>,
19    health_checker: Option<HealthChecker>,
20}
21
22impl PrometheusService {
23    pub fn new(addr: SocketAddr) -> Self {
24        Self {
25            addr,
26            metrics: Arc::new(PrometheusMetrics::new()),
27            server_handle: None,
28            bound_port: Arc::new(AtomicU16::new(0)),
29            status: Arc::new(AtomicU8::new(0)),
30            health_checker: None,
31        }
32    }
33
34    /// Returns the actual port the server is listening on (after start())
35    ///
36    /// Returns 0 if the service hasn't been started yet.
37    pub fn port(&self) -> u16 {
38        self.bound_port.load(Ordering::SeqCst)
39    }
40
41    /// Returns a cloneable accessor for the bound port.
42    ///
43    /// Use this when you need to read the port after the service has been
44    /// moved into a CamelContext or other container.
45    pub fn port_accessor(&self) -> Arc<AtomicU16> {
46        Arc::clone(&self.bound_port)
47    }
48
49    pub fn status_arc(&self) -> Arc<AtomicU8> {
50        Arc::clone(&self.status)
51    }
52
53    /// Set the health checker closure that will be called to get system health.
54    ///
55    /// The closure should capture a reference to CamelContext's health_check method.
56    pub fn set_health_checker(&mut self, checker: HealthChecker) {
57        self.health_checker = Some(checker);
58    }
59
60    /// Get a clone of the health checker if one is set.
61    pub fn health_checker(&self) -> Option<HealthChecker> {
62        self.health_checker.clone()
63    }
64}
65
66#[async_trait]
67impl Lifecycle for PrometheusService {
68    fn name(&self) -> &str {
69        "prometheus"
70    }
71
72    fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
73        Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
74    }
75
76    fn status(&self) -> ServiceStatus {
77        match self.status.load(Ordering::SeqCst) {
78            0 => ServiceStatus::Stopped,
79            1 => ServiceStatus::Started,
80            2 => ServiceStatus::Failed,
81            _ => ServiceStatus::Failed,
82        }
83    }
84
85    async fn start(&mut self) -> Result<(), CamelError> {
86        use tokio::net::TcpListener;
87
88        let listener = TcpListener::bind(self.addr).await.map_err(|e| {
89            self.status.store(2, Ordering::SeqCst);
90            CamelError::Io(e.to_string())
91        })?;
92
93        let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
94            self.status.store(2, Ordering::SeqCst);
95            CamelError::Io(e.to_string())
96        })?;
97
98        self.bound_port.store(actual_port, Ordering::SeqCst);
99
100        let metrics = Arc::clone(&self.metrics);
101        let health_checker = self.health_checker.clone();
102
103        let handle = tokio::spawn(async move {
104            crate::MetricsServer::run_with_listener_and_health_checker(
105                listener,
106                metrics,
107                health_checker,
108            )
109            .await;
110        });
111
112        self.server_handle = Some(handle);
113        self.status.store(1, Ordering::SeqCst);
114        info!(port = %actual_port, "prometheus metrics service started");
115        Ok(())
116    }
117
118    async fn stop(&mut self) -> Result<(), CamelError> {
119        if let Some(handle) = self.server_handle.take() {
120            handle.abort();
121        }
122        self.status.store(0, Ordering::SeqCst);
123        debug!("prometheus metrics service stopped");
124        Ok(())
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use camel_api::HealthReport;
132    use std::net::{IpAddr, Ipv4Addr};
133    use std::sync::atomic::Ordering;
134
135    #[test]
136    fn test_create_prometheus_service() {
137        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
138        let service = PrometheusService::new(addr);
139        assert_eq!(service.name(), "prometheus");
140    }
141
142    #[tokio::test]
143    async fn test_prometheus_service_status_transitions() {
144        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
145        let mut service = PrometheusService::new(addr);
146
147        assert_eq!(service.status(), ServiceStatus::Stopped);
148
149        service.start().await.unwrap();
150        assert_eq!(service.status(), ServiceStatus::Started);
151
152        service.stop().await.unwrap();
153        assert_eq!(service.status(), ServiceStatus::Stopped);
154    }
155
156    #[tokio::test]
157    async fn test_port_and_port_accessor_after_start() {
158        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
159        let mut service = PrometheusService::new(addr);
160
161        assert_eq!(service.port(), 0);
162        let accessor = service.port_accessor();
163        assert_eq!(accessor.load(Ordering::SeqCst), 0);
164
165        service.start().await.unwrap();
166        let port = service.port();
167        assert!(port > 0);
168        assert_eq!(accessor.load(Ordering::SeqCst), port);
169
170        service.stop().await.unwrap();
171    }
172
173    #[test]
174    fn test_as_metrics_collector_returns_metrics_instance() {
175        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
176        let service = PrometheusService::new(addr);
177        let collector = service.as_metrics_collector().unwrap();
178        collector.increment_exchanges("route-a");
179        let output = service.metrics.gather();
180        assert!(output.contains("camel_exchanges_total"));
181        assert!(output.contains("route-a"));
182    }
183
184    #[test]
185    fn test_unknown_internal_status_maps_to_failed() {
186        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
187        let service = PrometheusService::new(addr);
188        service.status_arc().store(9, Ordering::SeqCst);
189        assert_eq!(service.status(), ServiceStatus::Failed);
190    }
191
192    #[test]
193    fn test_health_checker_injection() {
194        use camel_api::HealthStatus;
195
196        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
197        let mut service = PrometheusService::new(addr);
198
199        // Initially no health checker
200        assert!(service.health_checker().is_none());
201
202        // Inject health checker
203        let checker = Arc::new(|| HealthReport {
204            status: HealthStatus::Healthy,
205            services: vec![],
206            ..Default::default()
207        });
208
209        service.set_health_checker(checker);
210        assert!(service.health_checker().is_some());
211
212        // Call the checker
213        let report = service.health_checker().unwrap()();
214        assert_eq!(report.status, HealthStatus::Healthy);
215    }
216
217    #[test]
218    fn test_prometheus_service_with_socket_addr() {
219        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
220        let service = PrometheusService::new(addr);
221        assert_eq!(service.name(), "prometheus");
222    }
223}