camel_prometheus/
service.rs1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU8, AtomicU16, Ordering};
4
5use crate::PrometheusMetrics;
6use async_trait::async_trait;
7use camel_api::{CamelError, HealthReport, Lifecycle, MetricsCollector, ServiceStatus};
8use tokio::task::JoinHandle;
9
10type HealthChecker = Arc<dyn Fn() -> HealthReport + Send + Sync>;
11
12pub struct PrometheusService {
13 addr: SocketAddr,
14 metrics: Arc<PrometheusMetrics>,
15 server_handle: Option<JoinHandle<()>>,
16 bound_port: Arc<AtomicU16>,
18 status: Arc<AtomicU8>,
20 health_checker: Option<HealthChecker>,
21}
22
23impl PrometheusService {
24 pub fn new(port: u16) -> Self {
25 Self {
26 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port),
27 metrics: Arc::new(PrometheusMetrics::new()),
28 server_handle: None,
29 bound_port: Arc::new(AtomicU16::new(0)),
30 status: Arc::new(AtomicU8::new(0)),
31 health_checker: None,
32 }
33 }
34
35 pub fn port(&self) -> u16 {
39 self.bound_port.load(Ordering::SeqCst)
40 }
41
42 pub fn port_accessor(&self) -> Arc<AtomicU16> {
47 Arc::clone(&self.bound_port)
48 }
49
50 pub fn set_health_checker(&mut self, checker: HealthChecker) {
54 self.health_checker = Some(checker);
55 }
56
57 pub fn health_checker(&self) -> Option<HealthChecker> {
59 self.health_checker.clone()
60 }
61}
62
63#[async_trait]
64impl Lifecycle for PrometheusService {
65 fn name(&self) -> &str {
66 "prometheus"
67 }
68
69 fn as_metrics_collector(&self) -> Option<Arc<dyn MetricsCollector>> {
70 Some(Arc::clone(&self.metrics) as Arc<dyn MetricsCollector>)
71 }
72
73 fn status(&self) -> ServiceStatus {
74 match self.status.load(Ordering::SeqCst) {
75 0 => ServiceStatus::Stopped,
76 1 => ServiceStatus::Started,
77 2 => ServiceStatus::Failed,
78 _ => ServiceStatus::Failed,
79 }
80 }
81
82 async fn start(&mut self) -> Result<(), CamelError> {
83 use tokio::net::TcpListener;
84
85 let listener = TcpListener::bind(self.addr).await.map_err(|e| {
86 self.status.store(2, Ordering::SeqCst);
87 CamelError::Io(e.to_string())
88 })?;
89
90 let actual_port = listener.local_addr().map(|addr| addr.port()).map_err(|e| {
91 self.status.store(2, Ordering::SeqCst);
92 CamelError::Io(e.to_string())
93 })?;
94
95 self.bound_port.store(actual_port, Ordering::SeqCst);
96
97 let metrics = Arc::clone(&self.metrics);
98 let health_checker = self.health_checker.clone();
99
100 let handle = tokio::spawn(async move {
101 match health_checker {
102 Some(checker) => {
103 crate::MetricsServer::run_with_listener_and_health_checker(
104 listener, metrics, checker,
105 )
106 .await;
107 }
108 None => {
109 crate::MetricsServer::run_with_listener(listener, metrics).await;
110 }
111 }
112 });
113
114 self.server_handle = Some(handle);
115 self.status.store(1, Ordering::SeqCst);
116 Ok(())
117 }
118
119 async fn stop(&mut self) -> Result<(), CamelError> {
120 if let Some(handle) = self.server_handle.take() {
121 handle.abort();
122 }
123 self.status.store(0, Ordering::SeqCst);
124 Ok(())
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn test_create_prometheus_service() {
134 let service = PrometheusService::new(9090);
135 assert_eq!(service.name(), "prometheus");
136 }
137
138 #[tokio::test]
139 async fn test_prometheus_service_status_transitions() {
140 let mut service = PrometheusService::new(0);
141
142 assert_eq!(service.status(), ServiceStatus::Stopped);
143
144 service.start().await.unwrap();
145 assert_eq!(service.status(), ServiceStatus::Started);
146
147 service.stop().await.unwrap();
148 assert_eq!(service.status(), ServiceStatus::Stopped);
149 }
150
151 #[test]
152 fn test_health_checker_injection() {
153 use camel_api::HealthStatus;
154
155 let mut service = PrometheusService::new(9090);
156
157 assert!(service.health_checker().is_none());
159
160 let checker = Arc::new(|| HealthReport {
162 status: HealthStatus::Healthy,
163 services: vec![],
164 ..Default::default()
165 });
166
167 service.set_health_checker(checker);
168 assert!(service.health_checker().is_some());
169
170 let report = service.health_checker().unwrap()();
172 assert_eq!(report.status, HealthStatus::Healthy);
173 }
174}