camel_prometheus/
service.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthChecker, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9use tracing::{debug, info};
10
11pub struct PrometheusService {
12 addr: SocketAddr,
13 metrics: Arc<PrometheusMetrics>,
14 server_handle: Option<JoinHandle<()>>,
15 bound_port: Arc<AtomicU16>,
17 status: Arc<AtomicU8>,
19 health_checker: Option<HealthChecker>,
20}
21
22impl PrometheusService {
23 pub fn new(addr: SocketAddr) -> Self {
24 Self {
25 addr,
26 metrics: Arc::new(PrometheusMetrics::new()),
27 server_handle: None,
28 bound_port: Arc::new(AtomicU16::new(0)),
29 status: Arc::new(AtomicU8::new(0)),
30 health_checker: None,
31 }
32 }
33
34 pub fn port(&self) -> u16 {
38 self.bound_port.load(Ordering::SeqCst)
39 }
40
41 pub fn port_accessor(&self) -> Arc<AtomicU16> {
46 Arc::clone(&self.bound_port)
47 }
48
49 pub fn status_arc(&self) -> Arc<AtomicU8> {
50 Arc::clone(&self.status)
51 }
52
53 pub fn set_health_checker(&mut self, checker: HealthChecker) {
57 self.health_checker = Some(checker);
58 }
59
60 pub fn health_checker(&self) -> Option<HealthChecker> {
62 self.health_checker.clone()
63 }
64}
65
66#[async_trait]
67impl Lifecycle for PrometheusService {
68 fn name(&self) -> &str {
69 "prometheus"
70 }
71
72 fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
73 Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
74 }
75
76 fn status(&self) -> ServiceStatus {
77 match self.status.load(Ordering::SeqCst) {
78 0 => ServiceStatus::Stopped,
79 1 => ServiceStatus::Started,
80 2 => ServiceStatus::Failed,
81 _ => ServiceStatus::Failed,
82 }
83 }
84
85 async fn start(&mut self) -> Result<(), CamelError> {
86 use tokio::net::TcpListener;
87
88 let listener = TcpListener::bind(self.addr).await.map_err(|e| {
89 self.status.store(2, Ordering::SeqCst);
90 CamelError::Io(e.to_string())
91 })?;
92
93 let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
94 self.status.store(2, Ordering::SeqCst);
95 CamelError::Io(e.to_string())
96 })?;
97
98 self.bound_port.store(actual_port, Ordering::SeqCst);
99
100 let metrics = Arc::clone(&self.metrics);
101 let health_checker = self.health_checker.clone();
102
103 let handle = tokio::spawn(async move {
104 crate::MetricsServer::run_with_listener_and_health_checker(
105 listener,
106 metrics,
107 health_checker,
108 )
109 .await;
110 });
111
112 self.server_handle = Some(handle);
113 self.status.store(1, Ordering::SeqCst);
114 info!(port = %actual_port, "prometheus metrics service started");
115 Ok(())
116 }
117
118 async fn stop(&mut self) -> Result<(), CamelError> {
119 if let Some(handle) = self.server_handle.take() {
120 handle.abort();
121 }
122 self.status.store(0, Ordering::SeqCst);
123 debug!("prometheus metrics service stopped");
124 Ok(())
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use camel_api::HealthReport;
132 use std::net::{IpAddr, Ipv4Addr};
133 use std::sync::atomic::Ordering;
134
135 #[test]
136 fn test_create_prometheus_service() {
137 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
138 let service = PrometheusService::new(addr);
139 assert_eq!(service.name(), "prometheus");
140 }
141
142 #[tokio::test]
143 async fn test_prometheus_service_status_transitions() {
144 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
145 let mut service = PrometheusService::new(addr);
146
147 assert_eq!(service.status(), ServiceStatus::Stopped);
148
149 service.start().await.unwrap();
150 assert_eq!(service.status(), ServiceStatus::Started);
151
152 service.stop().await.unwrap();
153 assert_eq!(service.status(), ServiceStatus::Stopped);
154 }
155
156 #[tokio::test]
157 async fn test_port_and_port_accessor_after_start() {
158 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0);
159 let mut service = PrometheusService::new(addr);
160
161 assert_eq!(service.port(), 0);
162 let accessor = service.port_accessor();
163 assert_eq!(accessor.load(Ordering::SeqCst), 0);
164
165 service.start().await.unwrap();
166 let port = service.port();
167 assert!(port > 0);
168 assert_eq!(accessor.load(Ordering::SeqCst), port);
169
170 service.stop().await.unwrap();
171 }
172
173 #[test]
174 fn test_as_metrics_collector_returns_metrics_instance() {
175 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
176 let service = PrometheusService::new(addr);
177 let collector = service.as_metrics_collector().unwrap();
178 collector.increment_exchanges("route-a");
179 let output = service.metrics.gather();
180 assert!(output.contains("camel_exchanges_total"));
181 assert!(output.contains("route-a"));
182 }
183
184 #[test]
185 fn test_unknown_internal_status_maps_to_failed() {
186 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090);
187 let service = PrometheusService::new(addr);
188 service.status_arc().store(9, Ordering::SeqCst);
189 assert_eq!(service.status(), ServiceStatus::Failed);
190 }
191
192 #[test]
193 fn test_health_checker_injection() {
194 use camel_api::HealthStatus;
195
196 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
197 let mut service = PrometheusService::new(addr);
198
199 assert!(service.health_checker().is_none());
201
202 let checker = Arc::new(|| HealthReport {
204 status: HealthStatus::Healthy,
205 services: vec![],
206 ..Default::default()
207 });
208
209 service.set_health_checker(checker);
210 assert!(service.health_checker().is_some());
211
212 let report = service.health_checker().unwrap()();
214 assert_eq!(report.status, HealthStatus::Healthy);
215 }
216
217 #[test]
218 fn test_prometheus_service_with_socket_addr() {
219 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
220 let service = PrometheusService::new(addr);
221 assert_eq!(service.name(), "prometheus");
222 }
223}