camel_prometheus/
service.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthReport, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9
10type HealthChecker = Arc<dyn Fn() -> HealthReport + Send + Sync>;
11
12pub struct PrometheusService {
13 addr: SocketAddr,
14 metrics: Arc<PrometheusMetrics>,
15 server_handle: Option<JoinHandle<()>>,
16 bound_port: Arc<AtomicU16>,
18 status: Arc<AtomicU8>,
20 health_checker: Option<HealthChecker>,
21}
22
23impl PrometheusService {
24 pub fn new(addr: SocketAddr) -> Self {
25 Self {
26 addr,
27 metrics: Arc::new(PrometheusMetrics::new()),
28 server_handle: None,
29 bound_port: Arc::new(AtomicU16::new(0)),
30 status: Arc::new(AtomicU8::new(0)),
31 health_checker: None,
32 }
33 }
34
35 pub fn port(&self) -> u16 {
39 self.bound_port.load(Ordering::SeqCst)
40 }
41
42 pub fn port_accessor(&self) -> Arc<AtomicU16> {
47 Arc::clone(&self.bound_port)
48 }
49
50 pub fn set_health_checker(&mut self, checker: HealthChecker) {
54 self.health_checker = Some(checker);
55 }
56
57 pub fn health_checker(&self) -> Option<HealthChecker> {
59 self.health_checker.clone()
60 }
61}
62
63#[async_trait]
64impl Lifecycle for PrometheusService {
65 fn name(&self) -> &str {
66 "prometheus"
67 }
68
69 fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
70 Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
71 }
72
73 fn status(&self) -> ServiceStatus {
74 match self.status.load(Ordering::SeqCst) {
75 0 => ServiceStatus::Stopped,
76 1 => ServiceStatus::Started,
77 2 => ServiceStatus::Failed,
78 _ => ServiceStatus::Failed,
79 }
80 }
81
82 async fn start(&mut self) -> Result<(), CamelError> {
83 use tokio::net::TcpListener;
84
85 let listener = TcpListener::bind(self.addr).await.map_err(|e| {
86 self.status.store(2, Ordering::SeqCst);
87 CamelError::Io(e.to_string())
88 })?;
89
90 let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
91 self.status.store(2, Ordering::SeqCst);
92 CamelError::Io(e.to_string())
93 })?;
94
95 self.bound_port.store(actual_port, Ordering::SeqCst);
96
97 let metrics = Arc::clone(&self.metrics);
98 let health_checker = self.health_checker.clone();
99
100 let handle = tokio::spawn(async move {
101 match health_checker {
102 Some(checker) => {
103 crate::MetricsServer::run_with_listener_and_health_checker(
104 listener, metrics, checker,
105 )
106 .await;
107 }
108 None => {
109 crate::MetricsServer::run_with_listener(listener, metrics).await;
110 }
111 }
112 });
113
114 self.server_handle = Some(handle);
115 self.status.store(1, Ordering::SeqCst);
116 Ok(())
117 }
118
119 async fn stop(&mut self) -> Result<(), CamelError> {
120 if let Some(handle) = self.server_handle.take() {
121 handle.abort();
122 }
123 self.status.store(0, Ordering::SeqCst);
124 Ok(())
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use std::net::{IpAddr, Ipv4Addr};
132
133 #[test]
134 fn test_create_prometheus_service() {
135 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
136 let service = PrometheusService::new(addr);
137 assert_eq!(service.name(), "prometheus");
138 }
139
140 #[tokio::test]
141 async fn test_prometheus_service_status_transitions() {
142 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
143 let mut service = PrometheusService::new(addr);
144
145 assert_eq!(service.status(), ServiceStatus::Stopped);
146
147 service.start().await.unwrap();
148 assert_eq!(service.status(), ServiceStatus::Started);
149
150 service.stop().await.unwrap();
151 assert_eq!(service.status(), ServiceStatus::Stopped);
152 }
153
154 #[test]
155 fn test_health_checker_injection() {
156 use camel_api::HealthStatus;
157
158 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
159 let mut service = PrometheusService::new(addr);
160
161 assert!(service.health_checker().is_none());
163
164 let checker = Arc::new(|| HealthReport {
166 status: HealthStatus::Healthy,
167 services: vec![],
168 ..Default::default()
169 });
170
171 service.set_health_checker(checker);
172 assert!(service.health_checker().is_some());
173
174 let report = service.health_checker().unwrap()();
176 assert_eq!(report.status, HealthStatus::Healthy);
177 }
178
179 #[test]
180 fn test_prometheus_service_with_socket_addr() {
181 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
182 let service = PrometheusService::new(addr);
183 assert_eq!(service.name(), "prometheus");
184 }
185}