rustkernel_core/observability/
metrics.rs1use serde::{Deserialize, Serialize};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::Duration;
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct MetricsConfig {
34 pub enabled: bool,
36 pub prometheus_port: Option<u16>,
38 pub push_gateway: Option<String>,
40 pub push_interval: Duration,
42 pub include_process_metrics: bool,
44 pub include_runtime_metrics: bool,
46 pub latency_buckets: Vec<f64>,
48}
49
50impl Default for MetricsConfig {
51 fn default() -> Self {
52 Self {
53 enabled: true,
54 prometheus_port: Some(9090),
55 push_gateway: None,
56 push_interval: Duration::from_secs(15),
57 include_process_metrics: true,
58 include_runtime_metrics: true,
59 latency_buckets: vec![
60 0.000_1, 0.000_5, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, ],
71 }
72 }
73}
74
75impl MetricsConfig {
76 pub fn prometheus(port: u16) -> Self {
78 Self {
79 prometheus_port: Some(port),
80 ..Default::default()
81 }
82 }
83
84 pub fn push_gateway(url: impl Into<String>) -> Self {
86 Self {
87 prometheus_port: None,
88 push_gateway: Some(url.into()),
89 ..Default::default()
90 }
91 }
92
93 #[cfg(feature = "metrics")]
95 pub async fn init(&self) -> crate::error::Result<()> {
96 use ::metrics_exporter_prometheus::PrometheusBuilder;
97
98 if !self.enabled {
99 return Ok(());
100 }
101
102 if let Some(port) = self.prometheus_port {
103 let builder = PrometheusBuilder::new();
104 builder
105 .with_http_listener(([0, 0, 0, 0], port))
106 .install()
107 .map_err(|e| crate::error::KernelError::ConfigError(e.to_string()))?;
108 }
109
110 Ok(())
111 }
112
113 #[cfg(not(feature = "metrics"))]
115 pub async fn init(&self) -> crate::error::Result<()> {
116 Ok(())
117 }
118}
119
120pub struct MetricsExporter {
122 #[allow(dead_code)]
124 config: MetricsConfig,
125}
126
127impl MetricsExporter {
128 pub fn new(config: MetricsConfig) -> Self {
130 Self { config }
131 }
132
133 #[cfg(feature = "metrics")]
135 pub async fn start(&self) -> crate::error::Result<()> {
136 self.config.init().await
137 }
138
139 #[cfg(not(feature = "metrics"))]
141 pub async fn start(&self) -> crate::error::Result<()> {
142 Ok(())
143 }
144}
145
146#[derive(Debug)]
148pub struct KernelMetrics {
149 pub kernel_id: String,
151 pub domain: String,
153 pub messages_total: AtomicU64,
155 pub messages_success: AtomicU64,
157 pub messages_failed: AtomicU64,
159 pub processing_time_ns: AtomicU64,
161 pub queue_depth: AtomicU64,
163}
164
165impl KernelMetrics {
166 pub fn new(kernel_id: impl Into<String>, domain: impl Into<String>) -> Self {
168 Self {
169 kernel_id: kernel_id.into(),
170 domain: domain.into(),
171 messages_total: AtomicU64::new(0),
172 messages_success: AtomicU64::new(0),
173 messages_failed: AtomicU64::new(0),
174 processing_time_ns: AtomicU64::new(0),
175 queue_depth: AtomicU64::new(0),
176 }
177 }
178
179 pub fn record_execution(&self, latency: Duration, success: bool) {
181 self.messages_total.fetch_add(1, Ordering::Relaxed);
182
183 if success {
184 self.messages_success.fetch_add(1, Ordering::Relaxed);
185 } else {
186 self.messages_failed.fetch_add(1, Ordering::Relaxed);
187 }
188
189 self.processing_time_ns
190 .fetch_add(latency.as_nanos() as u64, Ordering::Relaxed);
191
192 #[cfg(feature = "metrics")]
194 {
195 use ::metrics::{counter, histogram};
196
197 counter!("rustkernel_messages_total",
198 "kernel_id" => self.kernel_id.clone(),
199 "domain" => self.domain.clone(),
200 "status" => if success { "success" } else { "error" }
201 )
202 .increment(1);
203
204 histogram!("rustkernel_message_latency_seconds",
205 "kernel_id" => self.kernel_id.clone(),
206 "domain" => self.domain.clone()
207 )
208 .record(latency.as_secs_f64());
209 }
210 }
211
212 pub fn set_queue_depth(&self, depth: u64) {
214 self.queue_depth.store(depth, Ordering::Relaxed);
215
216 #[cfg(feature = "metrics")]
217 {
218 use ::metrics::gauge;
219 gauge!("rustkernel_queue_depth",
220 "kernel_id" => self.kernel_id.clone(),
221 "domain" => self.domain.clone()
222 )
223 .set(depth as f64);
224 }
225 }
226
227 pub fn avg_latency_us(&self) -> f64 {
229 let total = self.messages_total.load(Ordering::Relaxed);
230 if total == 0 {
231 return 0.0;
232 }
233 let time_ns = self.processing_time_ns.load(Ordering::Relaxed);
234 (time_ns as f64 / total as f64) / 1000.0
235 }
236
237 pub fn success_rate(&self) -> f64 {
239 let total = self.messages_total.load(Ordering::Relaxed);
240 if total == 0 {
241 return 1.0;
242 }
243 let success = self.messages_success.load(Ordering::Relaxed);
244 success as f64 / total as f64
245 }
246
247 pub fn throughput(&self, duration: Duration) -> f64 {
249 let total = self.messages_total.load(Ordering::Relaxed);
250 total as f64 / duration.as_secs_f64()
251 }
252}
253
254pub struct RuntimeMetrics {
256 inner: Arc<RuntimeMetricsInner>,
257}
258
259struct RuntimeMetricsInner {
260 pub kernels_registered: AtomicU64,
262 pub kernels_active: AtomicU64,
264 pub messages_total: AtomicU64,
266 pub gpu_memory_bytes: AtomicU64,
268 pub gpu_memory_peak_bytes: AtomicU64,
270}
271
272impl RuntimeMetrics {
273 pub fn new() -> Self {
275 Self {
276 inner: Arc::new(RuntimeMetricsInner {
277 kernels_registered: AtomicU64::new(0),
278 kernels_active: AtomicU64::new(0),
279 messages_total: AtomicU64::new(0),
280 gpu_memory_bytes: AtomicU64::new(0),
281 gpu_memory_peak_bytes: AtomicU64::new(0),
282 }),
283 }
284 }
285
286 pub fn record_kernel_registered(&self) {
288 self.inner
289 .kernels_registered
290 .fetch_add(1, Ordering::Relaxed);
291
292 #[cfg(feature = "metrics")]
293 {
294 use ::metrics::gauge;
295 gauge!("rustkernel_kernels_registered")
296 .set(self.inner.kernels_registered.load(Ordering::Relaxed) as f64);
297 }
298 }
299
300 pub fn record_kernel_activated(&self) {
302 self.inner.kernels_active.fetch_add(1, Ordering::Relaxed);
303
304 #[cfg(feature = "metrics")]
305 {
306 use ::metrics::gauge;
307 gauge!("rustkernel_kernels_active")
308 .set(self.inner.kernels_active.load(Ordering::Relaxed) as f64);
309 }
310 }
311
312 pub fn record_kernel_deactivated(&self) {
314 self.inner.kernels_active.fetch_sub(1, Ordering::Relaxed);
315
316 #[cfg(feature = "metrics")]
317 {
318 use ::metrics::gauge;
319 gauge!("rustkernel_kernels_active")
320 .set(self.inner.kernels_active.load(Ordering::Relaxed) as f64);
321 }
322 }
323
324 pub fn record_message(&self) {
326 self.inner.messages_total.fetch_add(1, Ordering::Relaxed);
327
328 #[cfg(feature = "metrics")]
329 {
330 use ::metrics::counter;
331 counter!("rustkernel_messages_total_global").increment(1);
332 }
333 }
334
335 pub fn set_gpu_memory(&self, bytes: u64) {
337 self.inner.gpu_memory_bytes.store(bytes, Ordering::Relaxed);
338
339 let current_peak = self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed);
340 if bytes > current_peak {
341 self.inner
342 .gpu_memory_peak_bytes
343 .store(bytes, Ordering::Relaxed);
344 }
345
346 #[cfg(feature = "metrics")]
347 {
348 use ::metrics::gauge;
349 gauge!("rustkernel_gpu_memory_bytes").set(bytes as f64);
350 gauge!("rustkernel_gpu_memory_peak_bytes")
351 .set(self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed) as f64);
352 }
353 }
354
355 pub fn gpu_memory(&self) -> u64 {
357 self.inner.gpu_memory_bytes.load(Ordering::Relaxed)
358 }
359
360 pub fn gpu_memory_peak(&self) -> u64 {
362 self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed)
363 }
364}
365
366impl Default for RuntimeMetrics {
367 fn default() -> Self {
368 Self::new()
369 }
370}
371
372impl Clone for RuntimeMetrics {
373 fn clone(&self) -> Self {
374 Self {
375 inner: self.inner.clone(),
376 }
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383
384 #[test]
385 fn test_kernel_metrics() {
386 let metrics = KernelMetrics::new("graph/pagerank", "GraphAnalytics");
387
388 metrics.record_execution(Duration::from_micros(100), true);
389 metrics.record_execution(Duration::from_micros(200), true);
390 metrics.record_execution(Duration::from_micros(300), false);
391
392 assert_eq!(metrics.messages_total.load(Ordering::Relaxed), 3);
393 assert_eq!(metrics.messages_success.load(Ordering::Relaxed), 2);
394 assert_eq!(metrics.messages_failed.load(Ordering::Relaxed), 1);
395
396 assert!((metrics.avg_latency_us() - 200.0).abs() < 1.0);
397 assert!((metrics.success_rate() - 0.666).abs() < 0.01);
398 }
399
400 #[test]
401 fn test_runtime_metrics() {
402 let metrics = RuntimeMetrics::new();
403
404 metrics.record_kernel_registered();
405 metrics.record_kernel_registered();
406 metrics.record_kernel_activated();
407
408 assert_eq!(metrics.inner.kernels_registered.load(Ordering::Relaxed), 2);
409 assert_eq!(metrics.inner.kernels_active.load(Ordering::Relaxed), 1);
410
411 metrics.set_gpu_memory(1024 * 1024);
412 assert_eq!(metrics.gpu_memory(), 1024 * 1024);
413 }
414
415 #[test]
416 fn test_metrics_config() {
417 let config = MetricsConfig::prometheus(9090);
418 assert_eq!(config.prometheus_port, Some(9090));
419 assert!(config.enabled);
420
421 let push_config = MetricsConfig::push_gateway("http://localhost:9091");
422 assert!(push_config.push_gateway.is_some());
423 assert!(push_config.prometheus_port.is_none());
424 }
425}