Skip to main content

camel_prometheus/
service.rs

1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthReport, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9
10type HealthChecker = Arc<dyn Fn() -> HealthReport + Send + Sync>;
11
12pub struct PrometheusService {
13    addr: SocketAddr,
14    metrics: Arc<PrometheusMetrics>,
15    server_handle: Option<JoinHandle<()>>,
16    /// The actual bound port (set after start(), 0 before)
17    bound_port: Arc<AtomicU16>,
18    /// Current service status (Stopped=0, Started=1, Failed=2)
19    status: Arc<AtomicU8>,
20    health_checker: Option<HealthChecker>,
21}
22
23impl PrometheusService {
24    pub fn new(port: u16) -> Self {
25        Self {
26            addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port),
27            metrics: Arc::new(PrometheusMetrics::new()),
28            server_handle: None,
29            bound_port: Arc::new(AtomicU16::new(0)),
30            status: Arc::new(AtomicU8::new(0)),
31            health_checker: None,
32        }
33    }
34
35    /// Returns the actual port the server is listening on (after start())
36    ///
37    /// Returns 0 if the service hasn't been started yet.
38    pub fn port(&self) -> u16 {
39        self.bound_port.load(Ordering::SeqCst)
40    }
41
42    /// Returns a cloneable accessor for the bound port.
43    ///
44    /// Use this when you need to read the port after the service has been
45    /// moved into a CamelContext or other container.
46    pub fn port_accessor(&self) -> Arc<AtomicU16> {
47        Arc::clone(&self.bound_port)
48    }
49
50    /// Set the health checker closure that will be called to get system health.
51    ///
52    /// The closure should capture a reference to CamelContext's health_check method.
53    pub fn set_health_checker(&mut self, checker: HealthChecker) {
54        self.health_checker = Some(checker);
55    }
56
57    /// Get a clone of the health checker if one is set.
58    pub fn health_checker(&self) -> Option<HealthChecker> {
59        self.health_checker.clone()
60    }
61}
62
63#[async_trait]
64impl Lifecycle for PrometheusService {
65    fn name(&self) -> &str {
66        "prometheus"
67    }
68
69    fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
70        Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
71    }
72
73    fn status(&self) -> ServiceStatus {
74        match self.status.load(Ordering::SeqCst) {
75            0 => ServiceStatus::Stopped,
76            1 => ServiceStatus::Started,
77            2 => ServiceStatus::Failed,
78            _ => ServiceStatus::Failed,
79        }
80    }
81
82    async fn start(&mut self) -> Result<(), CamelError> {
83        use tokio::net::TcpListener;
84
85        let listener = TcpListener::bind(self.addr).await.map_err(|e| {
86            self.status.store(2, Ordering::SeqCst);
87            CamelError::Io(e.to_string())
88        })?;
89
90        let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
91            self.status.store(2, Ordering::SeqCst);
92            CamelError::Io(e.to_string())
93        })?;
94
95        self.bound_port.store(actual_port, Ordering::SeqCst);
96
97        let metrics = Arc::clone(&self.metrics);
98        let health_checker = self.health_checker.clone();
99
100        let handle = tokio::spawn(async move {
101            match health_checker {
102                Some(checker) => {
103                    crate::MetricsServer::run_with_listener_and_health_checker(
104                        listener, metrics, checker,
105                    )
106                    .await;
107                }
108                None => {
109                    crate::MetricsServer::run_with_listener(listener, metrics).await;
110                }
111            }
112        });
113
114        self.server_handle = Some(handle);
115        self.status.store(1, Ordering::SeqCst);
116        Ok(())
117    }
118
119    async fn stop(&mut self) -> Result<(), CamelError> {
120        if let Some(handle) = self.server_handle.take() {
121            handle.abort();
122        }
123        self.status.store(0, Ordering::SeqCst);
124        Ok(())
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn test_create_prometheus_service() {
134        let service = PrometheusService::new(9090);
135        assert_eq!(service.name(), "prometheus");
136    }
137
138    #[tokio::test]
139    async fn test_prometheus_service_status_transitions() {
140        let mut service = PrometheusService::new(0);
141
142        assert_eq!(service.status(), ServiceStatus::Stopped);
143
144        service.start().await.unwrap();
145        assert_eq!(service.status(), ServiceStatus::Started);
146
147        service.stop().await.unwrap();
148        assert_eq!(service.status(), ServiceStatus::Stopped);
149    }
150
151    #[test]
152    fn test_health_checker_injection() {
153        use camel_api::HealthStatus;
154
155        let mut service = PrometheusService::new(9090);
156
157        // Initially no health checker
158        assert!(service.health_checker().is_none());
159
160        // Inject health checker
161        let checker = Arc::new(|| HealthReport {
162            status: HealthStatus::Healthy,
163            services: vec![],
164            ..Default::default()
165        });
166
167        service.set_health_checker(checker);
168        assert!(service.health_checker().is_some());
169
170        // Call the checker
171        let report = service.health_checker().unwrap()();
172        assert_eq!(report.status, HealthStatus::Healthy);
173    }
174}