camel_prometheus/
service.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthChecker, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9
10pub struct PrometheusService {
11 addr: SocketAddr,
12 metrics: Arc<PrometheusMetrics>,
13 server_handle: Option<JoinHandle<()>>,
14 bound_port: Arc<AtomicU16>,
16 status: Arc<AtomicU8>,
18 health_checker: Option<HealthChecker>,
19}
20
21impl PrometheusService {
22 pub fn new(addr: SocketAddr) -> Self {
23 Self {
24 addr,
25 metrics: Arc::new(PrometheusMetrics::new()),
26 server_handle: None,
27 bound_port: Arc::new(AtomicU16::new(0)),
28 status: Arc::new(AtomicU8::new(0)),
29 health_checker: None,
30 }
31 }
32
33 pub fn port(&self) -> u16 {
37 self.bound_port.load(Ordering::SeqCst)
38 }
39
40 pub fn port_accessor(&self) -> Arc<AtomicU16> {
45 Arc::clone(&self.bound_port)
46 }
47
48 pub fn status_arc(&self) -> Arc<AtomicU8> {
49 Arc::clone(&self.status)
50 }
51
52 pub fn set_health_checker(&mut self, checker: HealthChecker) {
56 self.health_checker = Some(checker);
57 }
58
59 pub fn health_checker(&self) -> Option<HealthChecker> {
61 self.health_checker.clone()
62 }
63}
64
65#[async_trait]
66impl Lifecycle for PrometheusService {
67 fn name(&self) -> &str {
68 "prometheus"
69 }
70
71 fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
72 Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
73 }
74
75 fn status(&self) -> ServiceStatus {
76 match self.status.load(Ordering::SeqCst) {
77 0 => ServiceStatus::Stopped,
78 1 => ServiceStatus::Started,
79 2 => ServiceStatus::Failed,
80 _ => ServiceStatus::Failed,
81 }
82 }
83
84 async fn start(&mut self) -> Result<(), CamelError> {
85 use tokio::net::TcpListener;
86
87 let listener = TcpListener::bind(self.addr).await.map_err(|e| {
88 self.status.store(2, Ordering::SeqCst);
89 CamelError::Io(e.to_string())
90 })?;
91
92 let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
93 self.status.store(2, Ordering::SeqCst);
94 CamelError::Io(e.to_string())
95 })?;
96
97 self.bound_port.store(actual_port, Ordering::SeqCst);
98
99 let metrics = Arc::clone(&self.metrics);
100 let health_checker = self.health_checker.clone();
101
102 let handle = tokio::spawn(async move {
103 crate::MetricsServer::run_with_listener_and_health_checker(
104 listener,
105 metrics,
106 health_checker,
107 )
108 .await;
109 });
110
111 self.server_handle = Some(handle);
112 self.status.store(1, Ordering::SeqCst);
113 Ok(())
114 }
115
116 async fn stop(&mut self) -> Result<(), CamelError> {
117 if let Some(handle) = self.server_handle.take() {
118 handle.abort();
119 }
120 self.status.store(0, Ordering::SeqCst);
121 Ok(())
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use camel_api::HealthReport;
129 use std::net::{IpAddr, Ipv4Addr};
130 use std::sync::atomic::Ordering;
131
132 #[test]
133 fn test_create_prometheus_service() {
134 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
135 let service = PrometheusService::new(addr);
136 assert_eq!(service.name(), "prometheus");
137 }
138
139 #[tokio::test]
140 async fn test_prometheus_service_status_transitions() {
141 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
142 let mut service = PrometheusService::new(addr);
143
144 assert_eq!(service.status(), ServiceStatus::Stopped);
145
146 service.start().await.unwrap();
147 assert_eq!(service.status(), ServiceStatus::Started);
148
149 service.stop().await.unwrap();
150 assert_eq!(service.status(), ServiceStatus::Stopped);
151 }
152
153 #[tokio::test]
154 async fn test_port_and_port_accessor_after_start() {
155 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
156 let mut service = PrometheusService::new(addr);
157
158 assert_eq!(service.port(), 0);
159 let accessor = service.port_accessor();
160 assert_eq!(accessor.load(Ordering::SeqCst), 0);
161
162 service.start().await.unwrap();
163 let port = service.port();
164 assert!(port > 0);
165 assert_eq!(accessor.load(Ordering::SeqCst), port);
166
167 service.stop().await.unwrap();
168 }
169
170 #[test]
171 fn test_as_metrics_collector_returns_metrics_instance() {
172 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
173 let service = PrometheusService::new(addr);
174 let collector = service.as_metrics_collector().unwrap();
175 collector.increment_exchanges("route-a");
176 let output = service.metrics.gather();
177 assert!(output.contains("camel_exchanges_total"));
178 assert!(output.contains("route-a"));
179 }
180
181 #[test]
182 fn test_unknown_internal_status_maps_to_failed() {
183 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
184 let service = PrometheusService::new(addr);
185 service.status_arc().store(9, Ordering::SeqCst);
186 assert_eq!(service.status(), ServiceStatus::Failed);
187 }
188
189 #[test]
190 fn test_health_checker_injection() {
191 use camel_api::HealthStatus;
192
193 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
194 let mut service = PrometheusService::new(addr);
195
196 assert!(service.health_checker().is_none());
198
199 let checker = Arc::new(|| HealthReport {
201 status: HealthStatus::Healthy,
202 services: vec![],
203 ..Default::default()
204 });
205
206 service.set_health_checker(checker);
207 assert!(service.health_checker().is_some());
208
209 let report = service.health_checker().unwrap()();
211 assert_eq!(report.status, HealthStatus::Healthy);
212 }
213
214 #[test]
215 fn test_prometheus_service_with_socket_addr() {
216 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
217 let service = PrometheusService::new(addr);
218 assert_eq!(service.name(), "prometheus");
219 }
220}