Skip to main content

camel_prometheus/
service.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthReport, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9
10type HealthChecker = Arc<dyn Fn() -> HealthReport + Send + Sync>;
11
12pub struct PrometheusService {
13    addr: SocketAddr,
14    metrics: Arc<PrometheusMetrics>,
15    server_handle: Option<JoinHandle<()>>,
16    /// The actual bound port (set after start(), 0 before)
17    bound_port: Arc<AtomicU16>,
18    /// Current service status (Stopped=0, Started=1, Failed=2)
19    status: Arc<AtomicU8>,
20    health_checker: Option<HealthChecker>,
21}
22
23impl PrometheusService {
24    pub fn new(addr: SocketAddr) -> Self {
25        Self {
26            addr,
27            metrics: Arc::new(PrometheusMetrics::new()),
28            server_handle: None,
29            bound_port: Arc::new(AtomicU16::new(0)),
30            status: Arc::new(AtomicU8::new(0)),
31            health_checker: None,
32        }
33    }
34
35    /// Returns the actual port the server is listening on (after start())
36    ///
37    /// Returns 0 if the service hasn't been started yet.
38    pub fn port(&self) -> u16 {
39        self.bound_port.load(Ordering::SeqCst)
40    }
41
42    /// Returns a cloneable accessor for the bound port.
43    ///
44    /// Use this when you need to read the port after the service has been
45    /// moved into a CamelContext or other container.
46    pub fn port_accessor(&self) -> Arc<AtomicU16> {
47        Arc::clone(&self.bound_port)
48    }
49
50    /// Set the health checker closure that will be called to get system health.
51    ///
52    /// The closure should capture a reference to CamelContext's health_check method.
53    pub fn set_health_checker(&mut self, checker: HealthChecker) {
54        self.health_checker = Some(checker);
55    }
56
57    /// Get a clone of the health checker if one is set.
58    pub fn health_checker(&self) -> Option<HealthChecker> {
59        self.health_checker.clone()
60    }
61}
62
63#[async_trait]
64impl Lifecycle for PrometheusService {
65    fn name(&self) -> &str {
66        "prometheus"
67    }
68
69    fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
70        Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
71    }
72
73    fn status(&self) -> ServiceStatus {
74        match self.status.load(Ordering::SeqCst) {
75            0 => ServiceStatus::Stopped,
76            1 => ServiceStatus::Started,
77            2 => ServiceStatus::Failed,
78            _ => ServiceStatus::Failed,
79        }
80    }
81
82    async fn start(&mut self) -> Result<(), CamelError> {
83        use tokio::net::TcpListener;
84
85        let listener = TcpListener::bind(self.addr).await.map_err(|e| {
86            self.status.store(2, Ordering::SeqCst);
87            CamelError::Io(e.to_string())
88        })?;
89
90        let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
91            self.status.store(2, Ordering::SeqCst);
92            CamelError::Io(e.to_string())
93        })?;
94
95        self.bound_port.store(actual_port, Ordering::SeqCst);
96
97        let metrics = Arc::clone(&self.metrics);
98        let health_checker = self.health_checker.clone();
99
100        let handle = tokio::spawn(async move {
101            match health_checker {
102                Some(checker) => {
103                    crate::MetricsServer::run_with_listener_and_health_checker(
104                        listener, metrics, checker,
105                    )
106                    .await;
107                }
108                None => {
109                    crate::MetricsServer::run_with_listener(listener, metrics).await;
110                }
111            }
112        });
113
114        self.server_handle = Some(handle);
115        self.status.store(1, Ordering::SeqCst);
116        Ok(())
117    }
118
119    async fn stop(&mut self) -> Result<(), CamelError> {
120        if let Some(handle) = self.server_handle.take() {
121            handle.abort();
122        }
123        self.status.store(0, Ordering::SeqCst);
124        Ok(())
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use std::net::{IpAddr, Ipv4Addr};
132
133    #[test]
134    fn test_create_prometheus_service() {
135        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
136        let service = PrometheusService::new(addr);
137        assert_eq!(service.name(), "prometheus");
138    }
139
140    #[tokio::test]
141    async fn test_prometheus_service_status_transitions() {
142        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
143        let mut service = PrometheusService::new(addr);
144
145        assert_eq!(service.status(), ServiceStatus::Stopped);
146
147        service.start().await.unwrap();
148        assert_eq!(service.status(), ServiceStatus::Started);
149
150        service.stop().await.unwrap();
151        assert_eq!(service.status(), ServiceStatus::Stopped);
152    }
153
154    #[test]
155    fn test_health_checker_injection() {
156        use camel_api::HealthStatus;
157
158        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
159        let mut service = PrometheusService::new(addr);
160
161        // Initially no health checker
162        assert!(service.health_checker().is_none());
163
164        // Inject health checker
165        let checker = Arc::new(|| HealthReport {
166            status: HealthStatus::Healthy,
167            services: vec![],
168            ..Default::default()
169        });
170
171        service.set_health_checker(checker);
172        assert!(service.health_checker().is_some());
173
174        // Call the checker
175        let report = service.health_checker().unwrap()();
176        assert_eq!(report.status, HealthStatus::Healthy);
177    }
178
179    #[test]
180    fn test_prometheus_service_with_socket_addr() {
181        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
182        let service = PrometheusService::new(addr);
183        assert_eq!(service.name(), "prometheus");
184    }
185}