camel_prometheus/
service.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthChecker, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9
10pub struct PrometheusService {
11 addr: SocketAddr,
12 metrics: Arc<PrometheusMetrics>,
13 server_handle: Option<JoinHandle<()>>,
14 bound_port: Arc<AtomicU16>,
16 status: Arc<AtomicU8>,
18 health_checker: Option<HealthChecker>,
19}
20
21impl PrometheusService {
22 pub fn new(addr: SocketAddr) -> Self {
23 Self {
24 addr,
25 metrics: Arc::new(PrometheusMetrics::new()),
26 server_handle: None,
27 bound_port: Arc::new(AtomicU16::new(0)),
28 status: Arc::new(AtomicU8::new(0)),
29 health_checker: None,
30 }
31 }
32
33 pub fn port(&self) -> u16 {
37 self.bound_port.load(Ordering::SeqCst)
38 }
39
40 pub fn port_accessor(&self) -> Arc<AtomicU16> {
45 Arc::clone(&self.bound_port)
46 }
47
48 pub fn status_arc(&self) -> Arc<AtomicU8> {
49 Arc::clone(&self.status)
50 }
51
52 pub fn set_health_checker(&mut self, checker: HealthChecker) {
56 self.health_checker = Some(checker);
57 }
58
59 pub fn health_checker(&self) -> Option<HealthChecker> {
61 self.health_checker.clone()
62 }
63}
64
65#[async_trait]
66impl Lifecycle for PrometheusService {
67 fn name(&self) -> &str {
68 "prometheus"
69 }
70
71 fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
72 Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
73 }
74
75 fn status(&self) -> ServiceStatus {
76 match self.status.load(Ordering::SeqCst) {
77 0 => ServiceStatus::Stopped,
78 1 => ServiceStatus::Started,
79 2 => ServiceStatus::Failed,
80 _ => ServiceStatus::Failed,
81 }
82 }
83
84 async fn start(&mut self) -> Result<(), CamelError> {
85 use tokio::net::TcpListener;
86
87 let listener = TcpListener::bind(self.addr).await.map_err(|e| {
88 self.status.store(2, Ordering::SeqCst);
89 CamelError::Io(e.to_string())
90 })?;
91
92 let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
93 self.status.store(2, Ordering::SeqCst);
94 CamelError::Io(e.to_string())
95 })?;
96
97 self.bound_port.store(actual_port, Ordering::SeqCst);
98
99 let metrics = Arc::clone(&self.metrics);
100 let health_checker = self.health_checker.clone();
101
102 let handle = tokio::spawn(async move {
103 crate::MetricsServer::run_with_listener_and_health_checker(
104 listener,
105 metrics,
106 health_checker,
107 )
108 .await;
109 });
110
111 self.server_handle = Some(handle);
112 self.status.store(1, Ordering::SeqCst);
113 Ok(())
114 }
115
116 async fn stop(&mut self) -> Result<(), CamelError> {
117 if let Some(handle) = self.server_handle.take() {
118 handle.abort();
119 }
120 self.status.store(0, Ordering::SeqCst);
121 Ok(())
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use camel_api::HealthReport;
129 use std::net::{IpAddr, Ipv4Addr};
130
131 #[test]
132 fn test_create_prometheus_service() {
133 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
134 let service = PrometheusService::new(addr);
135 assert_eq!(service.name(), "prometheus");
136 }
137
138 #[tokio::test]
139 async fn test_prometheus_service_status_transitions() {
140 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
141 let mut service = PrometheusService::new(addr);
142
143 assert_eq!(service.status(), ServiceStatus::Stopped);
144
145 service.start().await.unwrap();
146 assert_eq!(service.status(), ServiceStatus::Started);
147
148 service.stop().await.unwrap();
149 assert_eq!(service.status(), ServiceStatus::Stopped);
150 }
151
152 #[test]
153 fn test_health_checker_injection() {
154 use camel_api::HealthStatus;
155
156 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9090);
157 let mut service = PrometheusService::new(addr);
158
159 assert!(service.health_checker().is_none());
161
162 let checker = Arc::new(|| HealthReport {
164 status: HealthStatus::Healthy,
165 services: vec![],
166 ..Default::default()
167 });
168
169 service.set_health_checker(checker);
170 assert!(service.health_checker().is_some());
171
172 let report = service.health_checker().unwrap()();
174 assert_eq!(report.status, HealthStatus::Healthy);
175 }
176
177 #[test]
178 fn test_prometheus_service_with_socket_addr() {
179 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9091);
180 let service = PrometheusService::new(addr);
181 assert_eq!(service.name(), "prometheus");
182 }
183}