Skip to main content

ringkernel_core/
telemetry.rs

1//! Telemetry and metrics collection for kernel monitoring.
2//!
3//! This module provides structures for collecting performance metrics
4//! from GPU kernels, including throughput, latency, and error tracking.
5
6/// Telemetry buffer (64 bytes, cache-line aligned).
7///
8/// This structure is updated by the GPU kernel and read by the host
9/// for monitoring and debugging purposes.
10#[derive(Debug, Clone, Copy)]
11#[repr(C, align(64))]
12pub struct TelemetryBuffer {
13    /// Total messages processed successfully.
14    pub messages_processed: u64,
15    /// Total messages dropped (queue full, timeout, etc.).
16    pub messages_dropped: u64,
17    /// Sum of processing latencies in microseconds.
18    pub total_latency_us: u64,
19    /// Minimum processing latency in microseconds.
20    pub min_latency_us: u64,
21    /// Maximum processing latency in microseconds.
22    pub max_latency_us: u64,
23    /// Current input queue depth.
24    pub input_queue_depth: u32,
25    /// Current output queue depth.
26    pub output_queue_depth: u32,
27    /// Last error code (0 = no error).
28    pub last_error: u32,
29    /// Reserved for alignment (pad to 64 bytes).
30    pub _reserved: [u32; 3],
31}
32
33// Verify size at compile time
34const _: () = assert!(std::mem::size_of::<TelemetryBuffer>() == 64);
35
36impl TelemetryBuffer {
37    /// Create a new telemetry buffer.
38    pub const fn new() -> Self {
39        Self {
40            messages_processed: 0,
41            messages_dropped: 0,
42            total_latency_us: 0,
43            min_latency_us: u64::MAX,
44            max_latency_us: 0,
45            input_queue_depth: 0,
46            output_queue_depth: 0,
47            last_error: 0,
48            _reserved: [0; 3],
49        }
50    }
51
52    /// Calculate average latency in microseconds.
53    pub fn avg_latency_us(&self) -> f64 {
54        if self.messages_processed == 0 {
55            0.0
56        } else {
57            self.total_latency_us as f64 / self.messages_processed as f64
58        }
59    }
60
61    /// Get throughput (messages per second) given elapsed time.
62    pub fn throughput(&self, elapsed_secs: f64) -> f64 {
63        if elapsed_secs <= 0.0 {
64            0.0
65        } else {
66            self.messages_processed as f64 / elapsed_secs
67        }
68    }
69
70    /// Get drop rate (0.0 to 1.0).
71    pub fn drop_rate(&self) -> f64 {
72        let total = self.messages_processed + self.messages_dropped;
73        if total == 0 {
74            0.0
75        } else {
76            self.messages_dropped as f64 / total as f64
77        }
78    }
79
80    /// Reset all counters to initial state.
81    pub fn reset(&mut self) {
82        *self = Self::new();
83    }
84
85    /// Merge another telemetry buffer into this one.
86    pub fn merge(&mut self, other: &TelemetryBuffer) {
87        self.messages_processed += other.messages_processed;
88        self.messages_dropped += other.messages_dropped;
89        self.total_latency_us += other.total_latency_us;
90        self.min_latency_us = self.min_latency_us.min(other.min_latency_us);
91        self.max_latency_us = self.max_latency_us.max(other.max_latency_us);
92        // Queue depths are point-in-time, take latest
93        self.input_queue_depth = other.input_queue_depth;
94        self.output_queue_depth = other.output_queue_depth;
95        // Last error takes the most recent non-zero
96        if other.last_error != 0 {
97            self.last_error = other.last_error;
98        }
99    }
100}
101
102impl Default for TelemetryBuffer {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108/// Extended metrics for detailed monitoring.
109#[derive(Debug, Clone)]
110pub struct KernelMetrics {
111    /// Basic telemetry from GPU.
112    pub telemetry: TelemetryBuffer,
113
114    /// Kernel identifier.
115    pub kernel_id: String,
116
117    /// Timestamp when metrics were collected.
118    pub collected_at: std::time::Instant,
119
120    /// Time since kernel was launched.
121    pub uptime: std::time::Duration,
122
123    /// Number of kernel invocations (for event-driven mode).
124    pub invocations: u64,
125
126    /// Total bytes transferred to device.
127    pub bytes_to_device: u64,
128
129    /// Total bytes transferred from device.
130    pub bytes_from_device: u64,
131
132    /// GPU memory usage in bytes.
133    pub gpu_memory_used: u64,
134
135    /// Host memory usage in bytes.
136    pub host_memory_used: u64,
137
138    /// Total messages sent to this kernel.
139    pub messages_sent: u64,
140
141    /// Total messages received from this kernel.
142    pub messages_received: u64,
143
144    /// Current number of messages in the input queue.
145    pub input_queue_depth: usize,
146
147    /// Current number of messages in the output queue.
148    pub output_queue_depth: usize,
149
150    /// Current kernel state.
151    pub state: crate::runtime::KernelState,
152
153    /// Whether the kernel has been launched on the GPU.
154    pub gpu_launched: bool,
155}
156
157impl Default for KernelMetrics {
158    fn default() -> Self {
159        Self {
160            telemetry: TelemetryBuffer::default(),
161            kernel_id: String::new(),
162            collected_at: std::time::Instant::now(),
163            uptime: std::time::Duration::ZERO,
164            invocations: 0,
165            bytes_to_device: 0,
166            bytes_from_device: 0,
167            gpu_memory_used: 0,
168            host_memory_used: 0,
169            messages_sent: 0,
170            messages_received: 0,
171            input_queue_depth: 0,
172            output_queue_depth: 0,
173            state: crate::runtime::KernelState::Created,
174            gpu_launched: false,
175        }
176    }
177}
178
179impl KernelMetrics {
180    /// Create new metrics for a kernel.
181    pub fn new(kernel_id: impl Into<String>) -> Self {
182        Self {
183            kernel_id: kernel_id.into(),
184            ..Default::default()
185        }
186    }
187
188    /// Calculate transfer bandwidth (bytes/sec).
189    pub fn transfer_bandwidth(&self) -> f64 {
190        let total_bytes = self.bytes_to_device + self.bytes_from_device;
191        let secs = self.uptime.as_secs_f64();
192        if secs > 0.0 {
193            total_bytes as f64 / secs
194        } else {
195            0.0
196        }
197    }
198
199    /// Get summary as a formatted string.
200    pub fn summary(&self) -> String {
201        format!(
202            "Kernel {} - Processed: {}, Dropped: {}, Avg Latency: {:.2}µs, Throughput: {:.2}/s",
203            self.kernel_id,
204            self.telemetry.messages_processed,
205            self.telemetry.messages_dropped,
206            self.telemetry.avg_latency_us(),
207            self.telemetry.throughput(self.uptime.as_secs_f64())
208        )
209    }
210}
211
212/// Histogram for latency distribution.
213#[derive(Debug, Clone)]
214pub struct LatencyHistogram {
215    /// Bucket boundaries in microseconds.
216    pub buckets: Vec<u64>,
217    /// Counts for each bucket.
218    pub counts: Vec<u64>,
219    /// Count of values above last bucket.
220    pub overflow: u64,
221}
222
223impl LatencyHistogram {
224    /// Create a new histogram with default buckets.
225    pub fn new() -> Self {
226        // Default buckets: 1µs, 10µs, 100µs, 1ms, 10ms, 100ms, 1s
227        Self::with_buckets(vec![1, 10, 100, 1_000, 10_000, 100_000, 1_000_000])
228    }
229
230    /// Create with custom bucket boundaries.
231    pub fn with_buckets(buckets: Vec<u64>) -> Self {
232        let counts = vec![0; buckets.len()];
233        Self {
234            buckets,
235            counts,
236            overflow: 0,
237        }
238    }
239
240    /// Record a latency value.
241    pub fn record(&mut self, value_us: u64) {
242        for (i, &boundary) in self.buckets.iter().enumerate() {
243            if value_us <= boundary {
244                self.counts[i] += 1;
245                return;
246            }
247        }
248        self.overflow += 1;
249    }
250
251    /// Get total count.
252    pub fn total(&self) -> u64 {
253        self.counts.iter().sum::<u64>() + self.overflow
254    }
255
256    /// Get percentile value.
257    pub fn percentile(&self, p: f64) -> u64 {
258        let total = self.total();
259        if total == 0 {
260            return 0;
261        }
262
263        let target = (total as f64 * p / 100.0).ceil() as u64;
264        let mut cumulative = 0u64;
265
266        for (i, &count) in self.counts.iter().enumerate() {
267            cumulative += count;
268            if cumulative >= target {
269                return self.buckets[i];
270            }
271        }
272
273        // Return last bucket boundary + 1 for overflow
274        self.buckets.last().map(|b| b + 1).unwrap_or(0)
275    }
276}
277
278impl Default for LatencyHistogram {
279    fn default() -> Self {
280        Self::new()
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn test_telemetry_buffer_size() {
290        assert_eq!(std::mem::size_of::<TelemetryBuffer>(), 64);
291    }
292
293    #[test]
294    fn test_avg_latency() {
295        let mut tb = TelemetryBuffer::new();
296        assert_eq!(tb.avg_latency_us(), 0.0);
297
298        tb.messages_processed = 10;
299        tb.total_latency_us = 1000;
300        assert_eq!(tb.avg_latency_us(), 100.0);
301    }
302
303    #[test]
304    fn test_throughput() {
305        let mut tb = TelemetryBuffer::new();
306        tb.messages_processed = 1000;
307
308        assert_eq!(tb.throughput(1.0), 1000.0);
309        assert_eq!(tb.throughput(2.0), 500.0);
310        assert_eq!(tb.throughput(0.0), 0.0);
311    }
312
313    #[test]
314    fn test_drop_rate() {
315        let mut tb = TelemetryBuffer::new();
316        tb.messages_processed = 90;
317        tb.messages_dropped = 10;
318
319        assert!((tb.drop_rate() - 0.1).abs() < 0.001);
320    }
321
322    #[test]
323    fn test_merge() {
324        let mut tb1 = TelemetryBuffer::new();
325        tb1.messages_processed = 100;
326        tb1.min_latency_us = 10;
327        tb1.max_latency_us = 100;
328
329        let mut tb2 = TelemetryBuffer::new();
330        tb2.messages_processed = 50;
331        tb2.min_latency_us = 5;
332        tb2.max_latency_us = 200;
333
334        tb1.merge(&tb2);
335
336        assert_eq!(tb1.messages_processed, 150);
337        assert_eq!(tb1.min_latency_us, 5);
338        assert_eq!(tb1.max_latency_us, 200);
339    }
340
341    #[test]
342    fn test_histogram_percentile() {
343        let mut hist = LatencyHistogram::with_buckets(vec![10, 50, 100, 500]);
344
345        // Record some values
346        for _ in 0..80 {
347            hist.record(5); // <= 10
348        }
349        for _ in 0..15 {
350            hist.record(30); // <= 50
351        }
352        for _ in 0..5 {
353            hist.record(200); // <= 500
354        }
355
356        assert_eq!(hist.percentile(50.0), 10); // p50 = 10µs bucket
357        assert_eq!(hist.percentile(90.0), 50); // p90 = 50µs bucket
358        assert_eq!(hist.percentile(99.0), 500); // p99 = 500µs bucket
359    }
360}