ringkernel_core/
telemetry_pipeline.rs

1//! Real-time telemetry pipeline for streaming metrics.
2//!
3//! This module provides a real-time metrics pipeline that allows
4//! subscribers to receive continuous updates about kernel performance.
5
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::broadcast;
12
13use crate::runtime::KernelId;
14use crate::telemetry::{KernelMetrics, LatencyHistogram, TelemetryBuffer};
15
16/// Configuration for the telemetry pipeline.
17#[derive(Debug, Clone)]
18pub struct TelemetryConfig {
19    /// Collection interval in milliseconds.
20    pub collection_interval_ms: u64,
21    /// Maximum number of samples to retain in history.
22    pub max_history_samples: usize,
23    /// Channel buffer size for subscribers.
24    pub channel_buffer_size: usize,
25    /// Whether to enable latency histograms.
26    pub enable_histograms: bool,
27    /// Alert threshold for message drop rate (0.0-1.0).
28    pub drop_rate_alert_threshold: f64,
29    /// Alert threshold for average latency in microseconds.
30    pub latency_alert_threshold_us: u64,
31}
32
33impl Default for TelemetryConfig {
34    fn default() -> Self {
35        Self {
36            collection_interval_ms: 100,
37            max_history_samples: 1000,
38            channel_buffer_size: 256,
39            enable_histograms: true,
40            drop_rate_alert_threshold: 0.01,    // 1%
41            latency_alert_threshold_us: 10_000, // 10ms
42        }
43    }
44}
45
46/// A telemetry event that is broadcast to subscribers.
47#[derive(Debug, Clone)]
48pub enum TelemetryEvent {
49    /// Periodic metrics snapshot.
50    MetricsSnapshot(MetricsSnapshot),
51    /// Alert when thresholds are exceeded.
52    Alert(TelemetryAlert),
53    /// Kernel state change event.
54    KernelStateChange {
55        /// Kernel identifier.
56        kernel_id: KernelId,
57        /// Previous state.
58        previous: String,
59        /// New state.
60        new: String,
61    },
62}
63
64/// A snapshot of metrics at a point in time.
65#[derive(Debug, Clone)]
66pub struct MetricsSnapshot {
67    /// Timestamp when snapshot was taken.
68    pub timestamp: Instant,
69    /// Metrics for each kernel.
70    pub kernel_metrics: HashMap<KernelId, KernelMetrics>,
71    /// Aggregate metrics across all kernels.
72    pub aggregate: AggregateMetrics,
73}
74
75/// Aggregate metrics across all kernels.
76#[derive(Debug, Clone, Default)]
77pub struct AggregateMetrics {
78    /// Total messages processed across all kernels.
79    pub total_messages_processed: u64,
80    /// Total messages dropped across all kernels.
81    pub total_messages_dropped: u64,
82    /// Average latency across all kernels.
83    pub avg_latency_us: f64,
84    /// Minimum latency across all kernels.
85    pub min_latency_us: u64,
86    /// Maximum latency across all kernels.
87    pub max_latency_us: u64,
88    /// Total throughput (messages/sec).
89    pub throughput: f64,
90    /// Number of active kernels.
91    pub active_kernels: usize,
92    /// Total GPU memory used.
93    pub total_gpu_memory: u64,
94}
95
96/// An alert when telemetry thresholds are exceeded.
97#[derive(Debug, Clone)]
98pub struct TelemetryAlert {
99    /// Alert severity.
100    pub severity: AlertSeverity,
101    /// Alert type.
102    pub alert_type: AlertType,
103    /// Human-readable message.
104    pub message: String,
105    /// Related kernel (if applicable).
106    pub kernel_id: Option<KernelId>,
107    /// Timestamp when alert was generated.
108    pub timestamp: Instant,
109}
110
111/// Alert severity level.
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum AlertSeverity {
114    /// Informational.
115    Info,
116    /// Warning - may indicate a problem.
117    Warning,
118    /// Error - action required.
119    Error,
120    /// Critical - immediate action required.
121    Critical,
122}
123
124/// Type of alert.
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum AlertType {
127    /// Message drop rate exceeded threshold.
128    HighDropRate,
129    /// Latency exceeded threshold.
130    HighLatency,
131    /// Queue approaching capacity.
132    QueueNearFull,
133    /// Kernel error occurred.
134    KernelError,
135    /// Memory pressure detected.
136    MemoryPressure,
137}
138
139/// Real-time telemetry pipeline.
140///
141/// Collects metrics from kernels and broadcasts them to subscribers.
142pub struct TelemetryPipeline {
143    /// Configuration.
144    config: TelemetryConfig,
145    /// Running state.
146    running: Arc<AtomicBool>,
147    /// Broadcast sender for events.
148    sender: broadcast::Sender<TelemetryEvent>,
149    /// Registered metrics sources.
150    sources: Arc<RwLock<HashMap<KernelId, Arc<dyn MetricsSource>>>>,
151    /// Historical snapshots.
152    history: Arc<RwLock<Vec<MetricsSnapshot>>>,
153    /// Pipeline start time.
154    start_time: Instant,
155    /// Sequence number for events.
156    #[allow(dead_code)]
157    sequence: AtomicU64,
158}
159
160/// Trait for providing metrics from a kernel.
161pub trait MetricsSource: Send + Sync {
162    /// Get current metrics.
163    fn get_metrics(&self) -> KernelMetrics;
164
165    /// Get kernel ID.
166    fn kernel_id(&self) -> &KernelId;
167
168    /// Check if kernel is active.
169    fn is_active(&self) -> bool;
170}
171
172impl TelemetryPipeline {
173    /// Create a new telemetry pipeline.
174    pub fn new(config: TelemetryConfig) -> Self {
175        let (sender, _) = broadcast::channel(config.channel_buffer_size);
176
177        Self {
178            config,
179            running: Arc::new(AtomicBool::new(false)),
180            sender,
181            sources: Arc::new(RwLock::new(HashMap::new())),
182            history: Arc::new(RwLock::new(Vec::new())),
183            start_time: Instant::now(),
184            sequence: AtomicU64::new(0),
185        }
186    }
187
188    /// Subscribe to telemetry events.
189    pub fn subscribe(&self) -> broadcast::Receiver<TelemetryEvent> {
190        self.sender.subscribe()
191    }
192
193    /// Register a metrics source.
194    pub fn register_source(&self, source: Arc<dyn MetricsSource>) {
195        let kernel_id = source.kernel_id().clone();
196        self.sources.write().insert(kernel_id, source);
197    }
198
199    /// Unregister a metrics source.
200    pub fn unregister_source(&self, kernel_id: &KernelId) {
201        self.sources.write().remove(kernel_id);
202    }
203
204    /// Start the telemetry collection loop.
205    pub fn start(&self) -> tokio::task::JoinHandle<()> {
206        self.running.store(true, Ordering::Release);
207
208        let running = Arc::clone(&self.running);
209        let sources = Arc::clone(&self.sources);
210        let history = Arc::clone(&self.history);
211        let sender = self.sender.clone();
212        let config = self.config.clone();
213        let start_time = self.start_time;
214
215        tokio::spawn(async move {
216            let interval = Duration::from_millis(config.collection_interval_ms);
217
218            while running.load(Ordering::Acquire) {
219                // Collect metrics
220                let snapshot = Self::collect_snapshot(&sources, start_time, &config);
221
222                // Check for alerts
223                let alerts = Self::check_alerts(&snapshot, &config);
224
225                // Store in history
226                {
227                    let mut hist = history.write();
228                    hist.push(snapshot.clone());
229                    if hist.len() > config.max_history_samples {
230                        hist.remove(0);
231                    }
232                }
233
234                // Broadcast snapshot
235                let _ = sender.send(TelemetryEvent::MetricsSnapshot(snapshot));
236
237                // Broadcast alerts
238                for alert in alerts {
239                    let _ = sender.send(TelemetryEvent::Alert(alert));
240                }
241
242                tokio::time::sleep(interval).await;
243            }
244        })
245    }
246
247    /// Stop the telemetry collection loop.
248    pub fn stop(&self) {
249        self.running.store(false, Ordering::Release);
250    }
251
252    /// Get the latest snapshot.
253    pub fn latest_snapshot(&self) -> Option<MetricsSnapshot> {
254        self.history.read().last().cloned()
255    }
256
257    /// Get historical snapshots.
258    pub fn history(&self) -> Vec<MetricsSnapshot> {
259        self.history.read().clone()
260    }
261
262    /// Get aggregate metrics over a time range.
263    pub fn aggregate_over(&self, duration: Duration) -> Option<AggregateMetrics> {
264        let history = self.history.read();
265        let cutoff = Instant::now() - duration;
266
267        let relevant: Vec<_> = history.iter().filter(|s| s.timestamp >= cutoff).collect();
268
269        if relevant.is_empty() {
270            return None;
271        }
272
273        let mut aggregate = AggregateMetrics::default();
274
275        for snapshot in &relevant {
276            aggregate.total_messages_processed += snapshot.aggregate.total_messages_processed;
277            aggregate.total_messages_dropped += snapshot.aggregate.total_messages_dropped;
278            aggregate.min_latency_us = aggregate
279                .min_latency_us
280                .min(snapshot.aggregate.min_latency_us);
281            aggregate.max_latency_us = aggregate
282                .max_latency_us
283                .max(snapshot.aggregate.max_latency_us);
284        }
285
286        // Average metrics
287        let count = relevant.len() as f64;
288        aggregate.avg_latency_us = relevant
289            .iter()
290            .map(|s| s.aggregate.avg_latency_us)
291            .sum::<f64>()
292            / count;
293        aggregate.throughput = relevant.iter().map(|s| s.aggregate.throughput).sum::<f64>() / count;
294        aggregate.active_kernels = relevant
295            .last()
296            .map(|s| s.aggregate.active_kernels)
297            .unwrap_or(0);
298        aggregate.total_gpu_memory = relevant
299            .last()
300            .map(|s| s.aggregate.total_gpu_memory)
301            .unwrap_or(0);
302
303        Some(aggregate)
304    }
305
306    /// Collect a metrics snapshot.
307    fn collect_snapshot(
308        sources: &RwLock<HashMap<KernelId, Arc<dyn MetricsSource>>>,
309        start_time: Instant,
310        _config: &TelemetryConfig,
311    ) -> MetricsSnapshot {
312        let sources = sources.read();
313        let mut kernel_metrics = HashMap::new();
314        let mut aggregate = AggregateMetrics::default();
315
316        let elapsed = start_time.elapsed().as_secs_f64();
317
318        for (kernel_id, source) in sources.iter() {
319            if source.is_active() {
320                aggregate.active_kernels += 1;
321            }
322
323            let metrics = source.get_metrics();
324
325            aggregate.total_messages_processed += metrics.telemetry.messages_processed;
326            aggregate.total_messages_dropped += metrics.telemetry.messages_dropped;
327            aggregate.total_gpu_memory += metrics.gpu_memory_used;
328
329            if metrics.telemetry.min_latency_us < aggregate.min_latency_us
330                || aggregate.min_latency_us == 0
331            {
332                aggregate.min_latency_us = metrics.telemetry.min_latency_us;
333            }
334            if metrics.telemetry.max_latency_us > aggregate.max_latency_us {
335                aggregate.max_latency_us = metrics.telemetry.max_latency_us;
336            }
337
338            kernel_metrics.insert(kernel_id.clone(), metrics);
339        }
340
341        // Calculate averages
342        if !kernel_metrics.is_empty() {
343            aggregate.avg_latency_us = kernel_metrics
344                .values()
345                .map(|m| m.telemetry.avg_latency_us())
346                .sum::<f64>()
347                / kernel_metrics.len() as f64;
348
349            if elapsed > 0.0 {
350                aggregate.throughput = aggregate.total_messages_processed as f64 / elapsed;
351            }
352        }
353
354        MetricsSnapshot {
355            timestamp: Instant::now(),
356            kernel_metrics,
357            aggregate,
358        }
359    }
360
361    /// Check for alert conditions.
362    fn check_alerts(snapshot: &MetricsSnapshot, config: &TelemetryConfig) -> Vec<TelemetryAlert> {
363        let mut alerts = Vec::new();
364
365        for (kernel_id, metrics) in &snapshot.kernel_metrics {
366            // Check drop rate
367            let drop_rate = metrics.telemetry.drop_rate();
368            if drop_rate > config.drop_rate_alert_threshold {
369                alerts.push(TelemetryAlert {
370                    severity: if drop_rate > 0.1 {
371                        AlertSeverity::Critical
372                    } else if drop_rate > 0.05 {
373                        AlertSeverity::Error
374                    } else {
375                        AlertSeverity::Warning
376                    },
377                    alert_type: AlertType::HighDropRate,
378                    message: format!(
379                        "Kernel {} drop rate is {:.2}%",
380                        kernel_id,
381                        drop_rate * 100.0
382                    ),
383                    kernel_id: Some(kernel_id.clone()),
384                    timestamp: Instant::now(),
385                });
386            }
387
388            // Check latency
389            let avg_latency = metrics.telemetry.avg_latency_us() as u64;
390            if avg_latency > config.latency_alert_threshold_us {
391                alerts.push(TelemetryAlert {
392                    severity: if avg_latency > config.latency_alert_threshold_us * 10 {
393                        AlertSeverity::Critical
394                    } else if avg_latency > config.latency_alert_threshold_us * 5 {
395                        AlertSeverity::Error
396                    } else {
397                        AlertSeverity::Warning
398                    },
399                    alert_type: AlertType::HighLatency,
400                    message: format!("Kernel {} average latency is {}µs", kernel_id, avg_latency),
401                    kernel_id: Some(kernel_id.clone()),
402                    timestamp: Instant::now(),
403                });
404            }
405
406            // Check for errors
407            if metrics.telemetry.last_error != 0 {
408                alerts.push(TelemetryAlert {
409                    severity: AlertSeverity::Error,
410                    alert_type: AlertType::KernelError,
411                    message: format!(
412                        "Kernel {} reported error code {}",
413                        kernel_id, metrics.telemetry.last_error
414                    ),
415                    kernel_id: Some(kernel_id.clone()),
416                    timestamp: Instant::now(),
417                });
418            }
419        }
420
421        alerts
422    }
423}
424
425/// Metrics collector that aggregates metrics from multiple kernels.
426#[derive(Default)]
427pub struct MetricsCollector {
428    /// Per-kernel telemetry.
429    kernel_telemetry: RwLock<HashMap<KernelId, TelemetryBuffer>>,
430    /// Per-kernel histograms.
431    kernel_histograms: RwLock<HashMap<KernelId, LatencyHistogram>>,
432    /// Start time.
433    start_time: RwLock<Option<Instant>>,
434}
435
436impl MetricsCollector {
437    /// Create a new metrics collector.
438    pub fn new() -> Self {
439        Self {
440            kernel_telemetry: RwLock::new(HashMap::new()),
441            kernel_histograms: RwLock::new(HashMap::new()),
442            start_time: RwLock::new(Some(Instant::now())),
443        }
444    }
445
446    /// Record a message processed event.
447    pub fn record_message_processed(&self, kernel_id: &KernelId, latency_us: u64) {
448        let mut telemetry = self.kernel_telemetry.write();
449        let entry = telemetry.entry(kernel_id.clone()).or_default();
450
451        entry.messages_processed += 1;
452        entry.total_latency_us += latency_us;
453        entry.min_latency_us = entry.min_latency_us.min(latency_us);
454        entry.max_latency_us = entry.max_latency_us.max(latency_us);
455
456        // Record in histogram
457        let mut histograms = self.kernel_histograms.write();
458        let histogram = histograms.entry(kernel_id.clone()).or_default();
459        histogram.record(latency_us);
460    }
461
462    /// Record a message dropped event.
463    pub fn record_message_dropped(&self, kernel_id: &KernelId) {
464        let mut telemetry = self.kernel_telemetry.write();
465        let entry = telemetry.entry(kernel_id.clone()).or_default();
466        entry.messages_dropped += 1;
467    }
468
469    /// Record an error.
470    pub fn record_error(&self, kernel_id: &KernelId, error_code: u32) {
471        let mut telemetry = self.kernel_telemetry.write();
472        let entry = telemetry.entry(kernel_id.clone()).or_default();
473        entry.last_error = error_code;
474    }
475
476    /// Get telemetry for a kernel.
477    pub fn get_telemetry(&self, kernel_id: &KernelId) -> Option<TelemetryBuffer> {
478        self.kernel_telemetry.read().get(kernel_id).copied()
479    }
480
481    /// Get histogram for a kernel.
482    pub fn get_histogram(&self, kernel_id: &KernelId) -> Option<LatencyHistogram> {
483        self.kernel_histograms.read().get(kernel_id).cloned()
484    }
485
486    /// Get aggregate telemetry across all kernels.
487    pub fn get_aggregate(&self) -> TelemetryBuffer {
488        let telemetry = self.kernel_telemetry.read();
489        let mut aggregate = TelemetryBuffer::new();
490
491        for buffer in telemetry.values() {
492            aggregate.merge(buffer);
493        }
494
495        aggregate
496    }
497
498    /// Reset all metrics.
499    pub fn reset(&self) {
500        self.kernel_telemetry.write().clear();
501        self.kernel_histograms.write().clear();
502        *self.start_time.write() = Some(Instant::now());
503    }
504
505    /// Get elapsed time since start.
506    pub fn elapsed(&self) -> Duration {
507        self.start_time
508            .read()
509            .map(|t| t.elapsed())
510            .unwrap_or_default()
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517
518    #[test]
519    fn test_telemetry_config_default() {
520        let config = TelemetryConfig::default();
521        assert_eq!(config.collection_interval_ms, 100);
522        assert_eq!(config.max_history_samples, 1000);
523    }
524
525    #[test]
526    fn test_metrics_collector() {
527        let collector = MetricsCollector::new();
528        let kernel_id = KernelId::new("test");
529
530        collector.record_message_processed(&kernel_id, 100);
531        collector.record_message_processed(&kernel_id, 200);
532        collector.record_message_dropped(&kernel_id);
533
534        let telemetry = collector.get_telemetry(&kernel_id).unwrap();
535        assert_eq!(telemetry.messages_processed, 2);
536        assert_eq!(telemetry.messages_dropped, 1);
537        assert_eq!(telemetry.min_latency_us, 100);
538        assert_eq!(telemetry.max_latency_us, 200);
539    }
540
541    #[test]
542    fn test_aggregate_metrics() {
543        let collector = MetricsCollector::new();
544
545        let kernel1 = KernelId::new("kernel1");
546        let kernel2 = KernelId::new("kernel2");
547
548        collector.record_message_processed(&kernel1, 100);
549        collector.record_message_processed(&kernel2, 200);
550
551        let aggregate = collector.get_aggregate();
552        assert_eq!(aggregate.messages_processed, 2);
553    }
554}