rustkernel_core/observability/
metrics.rs

1//! Kernel Metrics
2//!
3//! Prometheus-compatible metrics for kernel performance monitoring.
4//!
5//! # Metrics
6//!
7//! - `rustkernel_messages_total` - Total messages processed
8//! - `rustkernel_message_latency_seconds` - Message processing latency histogram
9//! - `rustkernel_kernel_health` - Kernel health gauge (1=healthy, 0=unhealthy)
10//! - `rustkernel_gpu_memory_bytes` - GPU memory usage
11//! - `rustkernel_queue_depth` - Message queue depth per kernel
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! use rustkernel_core::observability::metrics::{KernelMetrics, MetricsConfig};
17//!
18//! // Initialize metrics
19//! let config = MetricsConfig::prometheus(9090);
20//! config.init().await?;
21//!
22//! // Record kernel execution
23//! KernelMetrics::record_execution("graph/pagerank", Duration::from_micros(150), true);
24//! ```
25
26use serde::{Deserialize, Serialize};
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::time::Duration;
30
31/// Metrics configuration
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct MetricsConfig {
34    /// Enable metrics collection
35    pub enabled: bool,
36    /// Prometheus endpoint port
37    pub prometheus_port: Option<u16>,
38    /// Metrics push gateway URL
39    pub push_gateway: Option<String>,
40    /// Push interval for gateway
41    pub push_interval: Duration,
42    /// Include default process metrics
43    pub include_process_metrics: bool,
44    /// Include default runtime metrics
45    pub include_runtime_metrics: bool,
46    /// Histogram buckets for latency (in seconds)
47    pub latency_buckets: Vec<f64>,
48}
49
50impl Default for MetricsConfig {
51    fn default() -> Self {
52        Self {
53            enabled: true,
54            prometheus_port: Some(9090),
55            push_gateway: None,
56            push_interval: Duration::from_secs(15),
57            include_process_metrics: true,
58            include_runtime_metrics: true,
59            latency_buckets: vec![
60                0.000_1, // 100μs
61                0.000_5, // 500μs
62                0.001,   // 1ms
63                0.005,   // 5ms
64                0.01,    // 10ms
65                0.05,    // 50ms
66                0.1,     // 100ms
67                0.5,     // 500ms
68                1.0,     // 1s
69                5.0,     // 5s
70            ],
71        }
72    }
73}
74
75impl MetricsConfig {
76    /// Create config for Prometheus scraping
77    pub fn prometheus(port: u16) -> Self {
78        Self {
79            prometheus_port: Some(port),
80            ..Default::default()
81        }
82    }
83
84    /// Create config for push gateway
85    pub fn push_gateway(url: impl Into<String>) -> Self {
86        Self {
87            prometheus_port: None,
88            push_gateway: Some(url.into()),
89            ..Default::default()
90        }
91    }
92
93    /// Initialize metrics collection
94    #[cfg(feature = "metrics")]
95    pub async fn init(&self) -> crate::error::Result<()> {
96        use ::metrics_exporter_prometheus::PrometheusBuilder;
97
98        if !self.enabled {
99            return Ok(());
100        }
101
102        if let Some(port) = self.prometheus_port {
103            let builder = PrometheusBuilder::new();
104            builder
105                .with_http_listener(([0, 0, 0, 0], port))
106                .install()
107                .map_err(|e| crate::error::KernelError::ConfigError(e.to_string()))?;
108        }
109
110        Ok(())
111    }
112
113    /// No-op init when metrics feature is disabled
114    #[cfg(not(feature = "metrics"))]
115    pub async fn init(&self) -> crate::error::Result<()> {
116        Ok(())
117    }
118}
119
120/// Metrics exporter handle
121pub struct MetricsExporter {
122    /// The metrics configuration
123    #[allow(dead_code)]
124    config: MetricsConfig,
125}
126
127impl MetricsExporter {
128    /// Create a new metrics exporter
129    pub fn new(config: MetricsConfig) -> Self {
130        Self { config }
131    }
132
133    /// Start the metrics server
134    #[cfg(feature = "metrics")]
135    pub async fn start(&self) -> crate::error::Result<()> {
136        self.config.init().await
137    }
138
139    /// No-op start when metrics feature is disabled
140    #[cfg(not(feature = "metrics"))]
141    pub async fn start(&self) -> crate::error::Result<()> {
142        Ok(())
143    }
144}
145
146/// Kernel-specific metrics
147#[derive(Debug)]
148pub struct KernelMetrics {
149    /// Kernel ID
150    pub kernel_id: String,
151    /// Domain
152    pub domain: String,
153    /// Total messages processed
154    pub messages_total: AtomicU64,
155    /// Successful messages
156    pub messages_success: AtomicU64,
157    /// Failed messages
158    pub messages_failed: AtomicU64,
159    /// Total processing time in nanoseconds
160    pub processing_time_ns: AtomicU64,
161    /// Current queue depth
162    pub queue_depth: AtomicU64,
163}
164
165impl KernelMetrics {
166    /// Create new metrics for a kernel
167    pub fn new(kernel_id: impl Into<String>, domain: impl Into<String>) -> Self {
168        Self {
169            kernel_id: kernel_id.into(),
170            domain: domain.into(),
171            messages_total: AtomicU64::new(0),
172            messages_success: AtomicU64::new(0),
173            messages_failed: AtomicU64::new(0),
174            processing_time_ns: AtomicU64::new(0),
175            queue_depth: AtomicU64::new(0),
176        }
177    }
178
179    /// Record a message execution
180    pub fn record_execution(&self, latency: Duration, success: bool) {
181        self.messages_total.fetch_add(1, Ordering::Relaxed);
182
183        if success {
184            self.messages_success.fetch_add(1, Ordering::Relaxed);
185        } else {
186            self.messages_failed.fetch_add(1, Ordering::Relaxed);
187        }
188
189        self.processing_time_ns
190            .fetch_add(latency.as_nanos() as u64, Ordering::Relaxed);
191
192        // Record to global metrics if available
193        #[cfg(feature = "metrics")]
194        {
195            use ::metrics::{counter, histogram};
196
197            counter!("rustkernel_messages_total",
198                "kernel_id" => self.kernel_id.clone(),
199                "domain" => self.domain.clone(),
200                "status" => if success { "success" } else { "error" }
201            )
202            .increment(1);
203
204            histogram!("rustkernel_message_latency_seconds",
205                "kernel_id" => self.kernel_id.clone(),
206                "domain" => self.domain.clone()
207            )
208            .record(latency.as_secs_f64());
209        }
210    }
211
212    /// Update queue depth
213    pub fn set_queue_depth(&self, depth: u64) {
214        self.queue_depth.store(depth, Ordering::Relaxed);
215
216        #[cfg(feature = "metrics")]
217        {
218            use ::metrics::gauge;
219            gauge!("rustkernel_queue_depth",
220                "kernel_id" => self.kernel_id.clone(),
221                "domain" => self.domain.clone()
222            )
223            .set(depth as f64);
224        }
225    }
226
227    /// Get average latency in microseconds
228    pub fn avg_latency_us(&self) -> f64 {
229        let total = self.messages_total.load(Ordering::Relaxed);
230        if total == 0 {
231            return 0.0;
232        }
233        let time_ns = self.processing_time_ns.load(Ordering::Relaxed);
234        (time_ns as f64 / total as f64) / 1000.0
235    }
236
237    /// Get success rate (0.0 - 1.0)
238    pub fn success_rate(&self) -> f64 {
239        let total = self.messages_total.load(Ordering::Relaxed);
240        if total == 0 {
241            return 1.0;
242        }
243        let success = self.messages_success.load(Ordering::Relaxed);
244        success as f64 / total as f64
245    }
246
247    /// Get throughput (messages per second) over a given duration
248    pub fn throughput(&self, duration: Duration) -> f64 {
249        let total = self.messages_total.load(Ordering::Relaxed);
250        total as f64 / duration.as_secs_f64()
251    }
252}
253
254/// Global metrics for the entire runtime
255pub struct RuntimeMetrics {
256    inner: Arc<RuntimeMetricsInner>,
257}
258
259struct RuntimeMetricsInner {
260    /// Total kernels registered
261    pub kernels_registered: AtomicU64,
262    /// Active kernel instances
263    pub kernels_active: AtomicU64,
264    /// Total messages processed across all kernels
265    pub messages_total: AtomicU64,
266    /// GPU memory usage in bytes
267    pub gpu_memory_bytes: AtomicU64,
268    /// Peak GPU memory usage
269    pub gpu_memory_peak_bytes: AtomicU64,
270}
271
272impl RuntimeMetrics {
273    /// Create new runtime metrics
274    pub fn new() -> Self {
275        Self {
276            inner: Arc::new(RuntimeMetricsInner {
277                kernels_registered: AtomicU64::new(0),
278                kernels_active: AtomicU64::new(0),
279                messages_total: AtomicU64::new(0),
280                gpu_memory_bytes: AtomicU64::new(0),
281                gpu_memory_peak_bytes: AtomicU64::new(0),
282            }),
283        }
284    }
285
286    /// Record kernel registration
287    pub fn record_kernel_registered(&self) {
288        self.inner
289            .kernels_registered
290            .fetch_add(1, Ordering::Relaxed);
291
292        #[cfg(feature = "metrics")]
293        {
294            use ::metrics::gauge;
295            gauge!("rustkernel_kernels_registered")
296                .set(self.inner.kernels_registered.load(Ordering::Relaxed) as f64);
297        }
298    }
299
300    /// Record kernel activation
301    pub fn record_kernel_activated(&self) {
302        self.inner.kernels_active.fetch_add(1, Ordering::Relaxed);
303
304        #[cfg(feature = "metrics")]
305        {
306            use ::metrics::gauge;
307            gauge!("rustkernel_kernels_active")
308                .set(self.inner.kernels_active.load(Ordering::Relaxed) as f64);
309        }
310    }
311
312    /// Record kernel deactivation
313    pub fn record_kernel_deactivated(&self) {
314        self.inner.kernels_active.fetch_sub(1, Ordering::Relaxed);
315
316        #[cfg(feature = "metrics")]
317        {
318            use ::metrics::gauge;
319            gauge!("rustkernel_kernels_active")
320                .set(self.inner.kernels_active.load(Ordering::Relaxed) as f64);
321        }
322    }
323
324    /// Record message processed
325    pub fn record_message(&self) {
326        self.inner.messages_total.fetch_add(1, Ordering::Relaxed);
327
328        #[cfg(feature = "metrics")]
329        {
330            use ::metrics::counter;
331            counter!("rustkernel_messages_total_global").increment(1);
332        }
333    }
334
335    /// Update GPU memory usage
336    pub fn set_gpu_memory(&self, bytes: u64) {
337        self.inner.gpu_memory_bytes.store(bytes, Ordering::Relaxed);
338
339        let current_peak = self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed);
340        if bytes > current_peak {
341            self.inner
342                .gpu_memory_peak_bytes
343                .store(bytes, Ordering::Relaxed);
344        }
345
346        #[cfg(feature = "metrics")]
347        {
348            use ::metrics::gauge;
349            gauge!("rustkernel_gpu_memory_bytes").set(bytes as f64);
350            gauge!("rustkernel_gpu_memory_peak_bytes")
351                .set(self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed) as f64);
352        }
353    }
354
355    /// Get current GPU memory usage
356    pub fn gpu_memory(&self) -> u64 {
357        self.inner.gpu_memory_bytes.load(Ordering::Relaxed)
358    }
359
360    /// Get peak GPU memory usage
361    pub fn gpu_memory_peak(&self) -> u64 {
362        self.inner.gpu_memory_peak_bytes.load(Ordering::Relaxed)
363    }
364}
365
366impl Default for RuntimeMetrics {
367    fn default() -> Self {
368        Self::new()
369    }
370}
371
372impl Clone for RuntimeMetrics {
373    fn clone(&self) -> Self {
374        Self {
375            inner: self.inner.clone(),
376        }
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383
384    #[test]
385    fn test_kernel_metrics() {
386        let metrics = KernelMetrics::new("graph/pagerank", "GraphAnalytics");
387
388        metrics.record_execution(Duration::from_micros(100), true);
389        metrics.record_execution(Duration::from_micros(200), true);
390        metrics.record_execution(Duration::from_micros(300), false);
391
392        assert_eq!(metrics.messages_total.load(Ordering::Relaxed), 3);
393        assert_eq!(metrics.messages_success.load(Ordering::Relaxed), 2);
394        assert_eq!(metrics.messages_failed.load(Ordering::Relaxed), 1);
395
396        assert!((metrics.avg_latency_us() - 200.0).abs() < 1.0);
397        assert!((metrics.success_rate() - 0.666).abs() < 0.01);
398    }
399
400    #[test]
401    fn test_runtime_metrics() {
402        let metrics = RuntimeMetrics::new();
403
404        metrics.record_kernel_registered();
405        metrics.record_kernel_registered();
406        metrics.record_kernel_activated();
407
408        assert_eq!(metrics.inner.kernels_registered.load(Ordering::Relaxed), 2);
409        assert_eq!(metrics.inner.kernels_active.load(Ordering::Relaxed), 1);
410
411        metrics.set_gpu_memory(1024 * 1024);
412        assert_eq!(metrics.gpu_memory(), 1024 * 1024);
413    }
414
415    #[test]
416    fn test_metrics_config() {
417        let config = MetricsConfig::prometheus(9090);
418        assert_eq!(config.prometheus_port, Some(9090));
419        assert!(config.enabled);
420
421        let push_config = MetricsConfig::push_gateway("http://localhost:9091");
422        assert!(push_config.push_gateway.is_some());
423        assert!(push_config.prometheus_port.is_none());
424    }
425}