Skip to main content

camel_prometheus/
service.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthChecker, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9
10pub struct PrometheusService {
11    addr: SocketAddr,
12    metrics: Arc<PrometheusMetrics>,
13    server_handle: Option<JoinHandle<()>>,
14    /// The actual bound port (set after start(), 0 before)
15    bound_port: Arc<AtomicU16>,
16    /// Current service status (Stopped=0, Started=1, Failed=2)
17    status: Arc<AtomicU8>,
18    health_checker: Option<HealthChecker>,
19}
20
21impl PrometheusService {
22    pub fn new(addr: SocketAddr) -> Self {
23        Self {
24            addr,
25            metrics: Arc::new(PrometheusMetrics::new()),
26            server_handle: None,
27            bound_port: Arc::new(AtomicU16::new(0)),
28            status: Arc::new(AtomicU8::new(0)),
29            health_checker: None,
30        }
31    }
32
33    /// Returns the actual port the server is listening on (after start())
34    ///
35    /// Returns 0 if the service hasn't been started yet.
36    pub fn port(&self) -> u16 {
37        self.bound_port.load(Ordering::SeqCst)
38    }
39
40    /// Returns a cloneable accessor for the bound port.
41    ///
42    /// Use this when you need to read the port after the service has been
43    /// moved into a CamelContext or other container.
44    pub fn port_accessor(&self) -> Arc<AtomicU16> {
45        Arc::clone(&self.bound_port)
46    }
47
48    pub fn status_arc(&self) -> Arc<AtomicU8> {
49        Arc::clone(&self.status)
50    }
51
52    /// Set the health checker closure that will be called to get system health.
53    ///
54    /// The closure should capture a reference to CamelContext's health_check method.
55    pub fn set_health_checker(&mut self, checker: HealthChecker) {
56        self.health_checker = Some(checker);
57    }
58
59    /// Get a clone of the health checker if one is set.
60    pub fn health_checker(&self) -> Option<HealthChecker> {
61        self.health_checker.clone()
62    }
63}
64
65#[async_trait]
66impl Lifecycle for PrometheusService {
67    fn name(&self) -> &str {
68        "prometheus"
69    }
70
71    fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
72        Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
73    }
74
75    fn status(&self) -> ServiceStatus {
76        match self.status.load(Ordering::SeqCst) {
77            0 => ServiceStatus::Stopped,
78            1 => ServiceStatus::Started,
79            2 => ServiceStatus::Failed,
80            _ => ServiceStatus::Failed,
81        }
82    }
83
84    async fn start(&mut self) -> Result<(), CamelError> {
85        use tokio::net::TcpListener;
86
87        let listener = TcpListener::bind(self.addr).await.map_err(|e| {
88            self.status.store(2, Ordering::SeqCst);
89            CamelError::Io(e.to_string())
90        })?;
91
92        let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
93            self.status.store(2, Ordering::SeqCst);
94            CamelError::Io(e.to_string())
95        })?;
96
97        self.bound_port.store(actual_port, Ordering::SeqCst);
98
99        let metrics = Arc::clone(&self.metrics);
100        let health_checker = self.health_checker.clone();
101
102        let handle = tokio::spawn(async move {
103            crate::MetricsServer::run_with_listener_and_health_checker(
104                listener,
105                metrics,
106                health_checker,
107            )
108            .await;
109        });
110
111        self.server_handle = Some(handle);
112        self.status.store(1, Ordering::SeqCst);
113        Ok(())
114    }
115
116    async fn stop(&mut self) -> Result<(), CamelError> {
117        if let Some(handle) = self.server_handle.take() {
118            handle.abort();
119        }
120        self.status.store(0, Ordering::SeqCst);
121        Ok(())
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use camel_api::HealthReport;
129    use std::net::{IpAddr, Ipv4Addr};
130
131    #[test]
132    fn test_create_prometheus_service() {
133        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
134        let service = PrometheusService::new(addr);
135        assert_eq!(service.name(), "prometheus");
136    }
137
138    #[tokio::test]
139    async fn test_prometheus_service_status_transitions() {
140        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
141        let mut service = PrometheusService::new(addr);
142
143        assert_eq!(service.status(), ServiceStatus::Stopped);
144
145        service.start().await.unwrap();
146        assert_eq!(service.status(), ServiceStatus::Started);
147
148        service.stop().await.unwrap();
149        assert_eq!(service.status(), ServiceStatus::Stopped);
150    }
151
152    #[test]
153    fn test_health_checker_injection() {
154        use camel_api::HealthStatus;
155
156        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
157        let mut service = PrometheusService::new(addr);
158
159        // Initially no health checker
160        assert!(service.health_checker().is_none());
161
162        // Inject health checker
163        let checker = Arc::new(|| HealthReport {
164            status: HealthStatus::Healthy,
165            services: vec![],
166            ..Default::default()
167        });
168
169        service.set_health_checker(checker);
170        assert!(service.health_checker().is_some());
171
172        // Call the checker
173        let report = service.health_checker().unwrap()();
174        assert_eq!(report.status, HealthStatus::Healthy);
175    }
176
177    #[test]
178    fn test_prometheus_service_with_socket_addr() {
179        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
180        let service = PrometheusService::new(addr);
181        assert_eq!(service.name(), "prometheus");
182    }
183}