1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use prometheus::{Registry, TextEncoder};
6
7use crate::metrics::METRICS;
8#[cfg(feature = "system-metrics")]
9use crate::process::SystemMetrics;
10
11#[derive(Debug, Clone)]
13pub struct PrometheusConfig {
14 pub bind_address: SocketAddr,
16 pub metrics_path: String,
18 #[cfg(feature = "system-metrics")]
20 pub include_system_metrics: bool,
21 #[cfg(feature = "system-metrics")]
23 pub system_metrics_interval: u64,
24}
25
26impl Default for PrometheusConfig {
27 fn default() -> Self {
28 Self {
29 bind_address: "127.0.0.1:9090".parse().expect("Invalid default address"),
30 metrics_path: "/metrics".to_string(),
31 #[cfg(feature = "system-metrics")]
32 include_system_metrics: true,
33 #[cfg(feature = "system-metrics")]
34 system_metrics_interval: 15,
35 }
36 }
37}
38
39#[derive(Debug)]
41pub struct PrometheusServer {
42 config: PrometheusConfig,
43 registry: Arc<Registry>,
44 #[cfg(feature = "system-metrics")]
45 system_metrics: Option<SystemMetrics>,
46}
47
48impl PrometheusServer {
49 pub fn new(config: PrometheusConfig) -> crate::Result<Self> {
54 let registry = METRICS.registry();
55
56 #[cfg(feature = "system-metrics")]
57 let system_metrics = if config.include_system_metrics {
58 let sys_metrics = SystemMetrics::new()?;
59 Some(sys_metrics)
60 } else {
61 None
62 };
63
64 Ok(Self {
65 config,
66 registry,
67 #[cfg(feature = "system-metrics")]
68 system_metrics,
69 })
70 }
71
72 #[must_use]
74 pub const fn with_registry(config: PrometheusConfig, registry: Arc<Registry>) -> Self {
75 Self {
76 config,
77 registry,
78 #[cfg(feature = "system-metrics")]
79 system_metrics: None,
80 }
81 }
82
83 fn create_metrics_handler(
85 registry: Arc<Registry>,
86 #[cfg(feature = "system-metrics")] system_metrics: Option<SystemMetrics>,
87 ) -> impl Fn() -> String {
88 move || {
89 let encoder = TextEncoder::new();
90
91 #[cfg(feature = "system-metrics")]
93 let mut metric_families = registry.gather();
94 #[cfg(not(feature = "system-metrics"))]
95 let metric_families = registry.gather();
96
97 #[cfg(feature = "system-metrics")]
99 if let Some(ref sys_metrics) = system_metrics {
100 if let Err(e) = sys_metrics.update_metrics() {
102 tracing::warn!("Failed to update system metrics: {e}");
103 }
104
105 let sys_registry = sys_metrics.registry();
106 let mut sys_families = sys_registry.gather();
107 metric_families.append(&mut sys_families);
108 }
109
110 encoder
112 .encode_to_string(&metric_families)
113 .unwrap_or_else(|e| {
114 tracing::error!("Failed to encode metrics: {e}");
115 format!("Failed to encode metrics: {e}")
116 })
117 }
118 }
119
120 pub async fn start(
125 self,
126 shutdown_signal: impl std::future::Future<Output = ()> + Send + 'static,
127 ) -> crate::Result<()> {
128 let binding = self.config.bind_address;
130 let registry_clone = Arc::<Registry>::clone(&self.registry);
131
132 #[cfg(feature = "system-metrics")]
134 let metrics_handler =
135 Self::create_metrics_handler(registry_clone, self.system_metrics.clone());
136
137 #[cfg(not(feature = "system-metrics"))]
138 let metrics_handler = Self::create_metrics_handler(registry_clone);
139
140 let path = self.config.metrics_path.clone();
142
143 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
145
146 let server_handle = tokio::spawn(async move {
148 use std::io::{Read, Write};
150 use std::net::TcpListener;
151
152 let listener = match TcpListener::bind(binding) {
154 Ok(listener) => {
155 if let Err(e) = listener.set_nonblocking(true) {
157 tracing::error!("Failed to set non-blocking mode: {e}");
158 return;
159 }
160 listener
161 }
162 Err(e) => {
163 tracing::error!("Failed to bind TCP listener: {e}");
164 return;
165 }
166 };
167 tracing::info!("Started Prometheus server on {} at path {}", binding, path);
168
169 loop {
171 if shutdown_rx.try_recv().is_ok() {
173 tracing::info!("Shutdown signal received, stopping Prometheus server");
174 break;
175 }
176
177 match listener.accept() {
179 Ok((mut stream, _)) => {
180 let mut buffer = [0; 1024];
182 match stream.read(&mut buffer) {
183 Ok(0) => {}
184 Ok(bytes_read) => {
185 let request = String::from_utf8_lossy(&buffer[..bytes_read]);
187
188 if request.contains(&format!("GET {path} HTTP")) {
190 let metrics = metrics_handler();
192
193 let response = format!(
195 "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}",
196 metrics.len(),
197 metrics
198 );
199
200 if let Err(e) = stream.write_all(response.as_bytes()) {
201 tracing::error!("Failed to write response: {e}");
202 }
203 } else {
204 let response = "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 9\r\n\r\nNot Found";
206 if let Err(e) = stream.write_all(response.as_bytes()) {
207 tracing::error!("Failed to write response: {e}");
208 }
209 }
210 }
211 Err(e) => {
212 tracing::error!("Failed to read from stream: {e}");
213 }
214 }
215 }
216 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
217 tokio::time::sleep(Duration::from_millis(10)).await;
219 }
220 Err(e) => {
221 tracing::error!("Failed to accept connection: {e}");
222 tokio::time::sleep(Duration::from_millis(100)).await;
224 }
225 }
226 }
227
228 tracing::info!("Prometheus server stopped");
229 });
230
231 shutdown_signal.await;
233
234 let _ = shutdown_tx.send(());
236
237 match tokio::time::timeout(Duration::from_secs(5), server_handle).await {
239 Ok(result) => {
240 if let Err(e) = result {
241 tracing::error!("Server task failed: {e}");
242 }
243 }
244 Err(_) => {
245 tracing::warn!("Server shutdown timed out after 5 seconds");
246 }
247 }
248
249 Ok(())
250 }
251}
252
253#[derive(Debug)]
255pub struct PrometheusBuilder {
256 config: PrometheusConfig,
257}
258
259impl PrometheusBuilder {
260 #[must_use]
262 pub fn new() -> Self {
263 Self {
264 config: PrometheusConfig::default(),
265 }
266 }
267
268 #[must_use]
270 pub const fn bind_address(mut self, addr: SocketAddr) -> Self {
271 self.config.bind_address = addr;
272 self
273 }
274
275 #[must_use]
277 pub fn metrics_path<S: Into<String>>(mut self, path: S) -> Self {
278 self.config.metrics_path = path.into();
279 self
280 }
281
282 #[cfg(feature = "system-metrics")]
284 #[must_use]
285 pub const fn system_metrics(mut self, enabled: bool) -> Self {
286 self.config.include_system_metrics = enabled;
287 self
288 }
289
290 #[cfg(feature = "system-metrics")]
292 #[must_use]
293 pub const fn system_metrics_interval(mut self, seconds: u64) -> Self {
294 self.config.system_metrics_interval = seconds;
295 self
296 }
297
298 pub fn build_with_cdk_metrics(self) -> crate::Result<PrometheusServer> {
303 PrometheusServer::new(self.config)
304 }
305
306 #[must_use]
308 pub fn build_with_registry(self, registry: Arc<Registry>) -> PrometheusServer {
309 PrometheusServer::with_registry(self.config, registry)
310 }
311}
312
313impl Default for PrometheusBuilder {
314 fn default() -> Self {
315 Self::new()
316 }
317}