1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use prometheus::{Registry, TextEncoder};
use crate::metrics::METRICS;
#[cfg(feature = "system-metrics")]
use crate::process::SystemMetrics;
/// Configuration for the Prometheus server
#[derive(Debug, Clone)]
pub struct PrometheusConfig {
/// Address to bind the server to (default: "127.0.0.1:9090")
pub bind_address: SocketAddr,
/// Path to serve metrics on (default: "/metrics")
pub metrics_path: String,
/// Whether to include system metrics (default: true if feature enabled)
#[cfg(feature = "system-metrics")]
pub include_system_metrics: bool,
/// How often to update system metrics in seconds (default: 15)
#[cfg(feature = "system-metrics")]
pub system_metrics_interval: u64,
}
impl Default for PrometheusConfig {
fn default() -> Self {
Self {
bind_address: "127.0.0.1:9090".parse().expect("Invalid default address"),
metrics_path: "/metrics".to_string(),
#[cfg(feature = "system-metrics")]
include_system_metrics: true,
#[cfg(feature = "system-metrics")]
system_metrics_interval: 15,
}
}
}
/// Prometheus metrics server
#[derive(Debug)]
pub struct PrometheusServer {
config: PrometheusConfig,
registry: Arc<Registry>,
#[cfg(feature = "system-metrics")]
system_metrics: Option<SystemMetrics>,
}
impl PrometheusServer {
/// Create a new Prometheus server with CDK metrics
///
/// # Errors
/// Returns an error if system metrics cannot be created (when enabled)
pub fn new(config: PrometheusConfig) -> crate::Result<Self> {
let registry = METRICS.registry();
#[cfg(feature = "system-metrics")]
let system_metrics = if config.include_system_metrics {
let sys_metrics = SystemMetrics::new()?;
Some(sys_metrics)
} else {
None
};
Ok(Self {
config,
registry,
#[cfg(feature = "system-metrics")]
system_metrics,
})
}
/// Create a new Prometheus server with custom registry
#[must_use]
pub const fn with_registry(config: PrometheusConfig, registry: Arc<Registry>) -> Self {
Self {
config,
registry,
#[cfg(feature = "system-metrics")]
system_metrics: None,
}
}
/// Create a metrics handler function that gathers and encodes metrics
fn create_metrics_handler(
registry: Arc<Registry>,
#[cfg(feature = "system-metrics")] system_metrics: Option<SystemMetrics>,
) -> impl Fn() -> String {
move || {
let encoder = TextEncoder::new();
// Collect metrics from our registry
#[cfg(feature = "system-metrics")]
let mut metric_families = registry.gather();
#[cfg(not(feature = "system-metrics"))]
let metric_families = registry.gather();
// Add system metrics if available
#[cfg(feature = "system-metrics")]
if let Some(ref sys_metrics) = system_metrics {
// Update system metrics before collection
if let Err(e) = sys_metrics.update_metrics() {
tracing::warn!("Failed to update system metrics: {e}");
}
let sys_registry = sys_metrics.registry();
let mut sys_families = sys_registry.gather();
metric_families.append(&mut sys_families);
}
// Encode metrics to string
encoder
.encode_to_string(&metric_families)
.unwrap_or_else(|e| {
tracing::error!("Failed to encode metrics: {e}");
format!("Failed to encode metrics: {e}")
})
}
}
/// Start the Prometheus HTTP server
///
/// # Errors
/// This function always returns Ok as errors are handled internally
pub async fn start(
self,
shutdown_signal: impl std::future::Future<Output = ()> + Send + 'static,
) -> crate::Result<()> {
// Create and start the exporter
let binding = self.config.bind_address;
let registry_clone = Arc::<Registry>::clone(&self.registry);
// Create a handler that exposes our registry
#[cfg(feature = "system-metrics")]
let metrics_handler =
Self::create_metrics_handler(registry_clone, self.system_metrics.clone());
#[cfg(not(feature = "system-metrics"))]
let metrics_handler = Self::create_metrics_handler(registry_clone);
// Start the exporter in a background task
let path = self.config.metrics_path.clone();
// Create a channel for signaling the server task to shutdown
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
// Spawn the server task
let server_handle = tokio::spawn(async move {
// We're using a simple HTTP server to expose our metrics
use std::io::{Read, Write};
use std::net::TcpListener;
// Create a TCP listener
let listener = match TcpListener::bind(binding) {
Ok(listener) => {
// Set non-blocking mode to allow for shutdown checking
if let Err(e) = listener.set_nonblocking(true) {
tracing::error!("Failed to set non-blocking mode: {e}");
return;
}
listener
}
Err(e) => {
tracing::error!("Failed to bind TCP listener: {e}");
return;
}
};
tracing::info!("Started Prometheus server on {} at path {}", binding, path);
// Accept connections with shutdown signal handling
loop {
// Check for shutdown signal
if shutdown_rx.try_recv().is_ok() {
tracing::info!("Shutdown signal received, stopping Prometheus server");
break;
}
// Try to accept a connection (non-blocking)
match listener.accept() {
Ok((mut stream, _)) => {
// Handle the connection
let mut buffer = [0; 1024];
match stream.read(&mut buffer) {
Ok(0) => {}
Ok(bytes_read) => {
// Convert the buffer to a string
let request = String::from_utf8_lossy(&buffer[..bytes_read]);
// Check if the request is for our metrics path
if request.contains(&format!("GET {path} HTTP")) {
// Get the metrics
let metrics = metrics_handler();
// Write the response
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}",
metrics.len(),
metrics
);
if let Err(e) = stream.write_all(response.as_bytes()) {
tracing::error!("Failed to write response: {e}");
}
} else {
// Write a 404 response
let response = "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 9\r\n\r\nNot Found";
if let Err(e) = stream.write_all(response.as_bytes()) {
tracing::error!("Failed to write response: {e}");
}
}
}
Err(e) => {
tracing::error!("Failed to read from stream: {e}");
}
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// No connection available, continue the loop
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(e) => {
tracing::error!("Failed to accept connection: {e}");
// Add a small delay to prevent busy looping on persistent errors
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
tracing::info!("Prometheus server stopped");
});
// Wait for the shutdown signal
shutdown_signal.await;
// Signal the server to shutdown
let _ = shutdown_tx.send(());
// Wait for the server task to complete (with a timeout)
match tokio::time::timeout(Duration::from_secs(5), server_handle).await {
Ok(result) => {
if let Err(e) = result {
tracing::error!("Server task failed: {e}");
}
}
Err(_) => {
tracing::warn!("Server shutdown timed out after 5 seconds");
}
}
Ok(())
}
}
/// Builder for easy Prometheus server setup
#[derive(Debug)]
pub struct PrometheusBuilder {
config: PrometheusConfig,
}
impl PrometheusBuilder {
/// Create a new builder with default configuration
#[must_use]
pub fn new() -> Self {
Self {
config: PrometheusConfig::default(),
}
}
/// Set the bind address
#[must_use]
pub const fn bind_address(mut self, addr: SocketAddr) -> Self {
self.config.bind_address = addr;
self
}
/// Set the metrics path
#[must_use]
pub fn metrics_path<S: Into<String>>(mut self, path: S) -> Self {
self.config.metrics_path = path.into();
self
}
/// Enable or disable system metrics
#[cfg(feature = "system-metrics")]
#[must_use]
pub const fn system_metrics(mut self, enabled: bool) -> Self {
self.config.include_system_metrics = enabled;
self
}
/// Set system metrics update interval
#[cfg(feature = "system-metrics")]
#[must_use]
pub const fn system_metrics_interval(mut self, seconds: u64) -> Self {
self.config.system_metrics_interval = seconds;
self
}
/// Build the server with specific CDK metrics instance
///
/// # Errors
/// Returns an error if system metrics cannot be created (when enabled)
pub fn build_with_cdk_metrics(self) -> crate::Result<PrometheusServer> {
PrometheusServer::new(self.config)
}
/// Build the server with custom registry
#[must_use]
pub fn build_with_registry(self, registry: Arc<Registry>) -> PrometheusServer {
PrometheusServer::with_registry(self.config, registry)
}
}
impl Default for PrometheusBuilder {
fn default() -> Self {
Self::new()
}
}