1#[derive(Debug, Clone, Copy)]
11#[repr(C, align(64))]
12pub struct TelemetryBuffer {
13 pub messages_processed: u64,
15 pub messages_dropped: u64,
17 pub total_latency_us: u64,
19 pub min_latency_us: u64,
21 pub max_latency_us: u64,
23 pub input_queue_depth: u32,
25 pub output_queue_depth: u32,
27 pub last_error: u32,
29 pub _reserved: [u32; 3],
31}
32
33const _: () = assert!(std::mem::size_of::<TelemetryBuffer>() == 64);
35
36impl TelemetryBuffer {
37 pub const fn new() -> Self {
39 Self {
40 messages_processed: 0,
41 messages_dropped: 0,
42 total_latency_us: 0,
43 min_latency_us: u64::MAX,
44 max_latency_us: 0,
45 input_queue_depth: 0,
46 output_queue_depth: 0,
47 last_error: 0,
48 _reserved: [0; 3],
49 }
50 }
51
52 pub fn avg_latency_us(&self) -> f64 {
54 if self.messages_processed == 0 {
55 0.0
56 } else {
57 self.total_latency_us as f64 / self.messages_processed as f64
58 }
59 }
60
61 pub fn throughput(&self, elapsed_secs: f64) -> f64 {
63 if elapsed_secs <= 0.0 {
64 0.0
65 } else {
66 self.messages_processed as f64 / elapsed_secs
67 }
68 }
69
70 pub fn drop_rate(&self) -> f64 {
72 let total = self.messages_processed + self.messages_dropped;
73 if total == 0 {
74 0.0
75 } else {
76 self.messages_dropped as f64 / total as f64
77 }
78 }
79
80 pub fn reset(&mut self) {
82 *self = Self::new();
83 }
84
85 pub fn merge(&mut self, other: &TelemetryBuffer) {
87 self.messages_processed += other.messages_processed;
88 self.messages_dropped += other.messages_dropped;
89 self.total_latency_us += other.total_latency_us;
90 self.min_latency_us = self.min_latency_us.min(other.min_latency_us);
91 self.max_latency_us = self.max_latency_us.max(other.max_latency_us);
92 self.input_queue_depth = other.input_queue_depth;
94 self.output_queue_depth = other.output_queue_depth;
95 if other.last_error != 0 {
97 self.last_error = other.last_error;
98 }
99 }
100}
101
102impl Default for TelemetryBuffer {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108#[derive(Debug, Clone)]
110pub struct KernelMetrics {
111 pub telemetry: TelemetryBuffer,
113
114 pub kernel_id: String,
116
117 pub collected_at: std::time::Instant,
119
120 pub uptime: std::time::Duration,
122
123 pub invocations: u64,
125
126 pub bytes_to_device: u64,
128
129 pub bytes_from_device: u64,
131
132 pub gpu_memory_used: u64,
134
135 pub host_memory_used: u64,
137
138 pub messages_sent: u64,
140
141 pub messages_received: u64,
143
144 pub input_queue_depth: usize,
146
147 pub output_queue_depth: usize,
149
150 pub state: crate::runtime::KernelState,
152
153 pub gpu_launched: bool,
155}
156
157impl Default for KernelMetrics {
158 fn default() -> Self {
159 Self {
160 telemetry: TelemetryBuffer::default(),
161 kernel_id: String::new(),
162 collected_at: std::time::Instant::now(),
163 uptime: std::time::Duration::ZERO,
164 invocations: 0,
165 bytes_to_device: 0,
166 bytes_from_device: 0,
167 gpu_memory_used: 0,
168 host_memory_used: 0,
169 messages_sent: 0,
170 messages_received: 0,
171 input_queue_depth: 0,
172 output_queue_depth: 0,
173 state: crate::runtime::KernelState::Created,
174 gpu_launched: false,
175 }
176 }
177}
178
179impl KernelMetrics {
180 pub fn new(kernel_id: impl Into<String>) -> Self {
182 Self {
183 kernel_id: kernel_id.into(),
184 ..Default::default()
185 }
186 }
187
188 pub fn transfer_bandwidth(&self) -> f64 {
190 let total_bytes = self.bytes_to_device + self.bytes_from_device;
191 let secs = self.uptime.as_secs_f64();
192 if secs > 0.0 {
193 total_bytes as f64 / secs
194 } else {
195 0.0
196 }
197 }
198
199 pub fn summary(&self) -> String {
201 format!(
202 "Kernel {} - Processed: {}, Dropped: {}, Avg Latency: {:.2}µs, Throughput: {:.2}/s",
203 self.kernel_id,
204 self.telemetry.messages_processed,
205 self.telemetry.messages_dropped,
206 self.telemetry.avg_latency_us(),
207 self.telemetry.throughput(self.uptime.as_secs_f64())
208 )
209 }
210}
211
212#[derive(Debug, Clone)]
214pub struct LatencyHistogram {
215 pub buckets: Vec<u64>,
217 pub counts: Vec<u64>,
219 pub overflow: u64,
221}
222
223impl LatencyHistogram {
224 pub fn new() -> Self {
226 Self::with_buckets(vec![1, 10, 100, 1_000, 10_000, 100_000, 1_000_000])
228 }
229
230 pub fn with_buckets(buckets: Vec<u64>) -> Self {
232 let counts = vec![0; buckets.len()];
233 Self {
234 buckets,
235 counts,
236 overflow: 0,
237 }
238 }
239
240 pub fn record(&mut self, value_us: u64) {
242 for (i, &boundary) in self.buckets.iter().enumerate() {
243 if value_us <= boundary {
244 self.counts[i] += 1;
245 return;
246 }
247 }
248 self.overflow += 1;
249 }
250
251 pub fn total(&self) -> u64 {
253 self.counts.iter().sum::<u64>() + self.overflow
254 }
255
256 pub fn percentile(&self, p: f64) -> u64 {
258 let total = self.total();
259 if total == 0 {
260 return 0;
261 }
262
263 let target = (total as f64 * p / 100.0).ceil() as u64;
264 let mut cumulative = 0u64;
265
266 for (i, &count) in self.counts.iter().enumerate() {
267 cumulative += count;
268 if cumulative >= target {
269 return self.buckets[i];
270 }
271 }
272
273 self.buckets.last().map(|b| b + 1).unwrap_or(0)
275 }
276}
277
278impl Default for LatencyHistogram {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn test_telemetry_buffer_size() {
290 assert_eq!(std::mem::size_of::<TelemetryBuffer>(), 64);
291 }
292
293 #[test]
294 fn test_avg_latency() {
295 let mut tb = TelemetryBuffer::new();
296 assert_eq!(tb.avg_latency_us(), 0.0);
297
298 tb.messages_processed = 10;
299 tb.total_latency_us = 1000;
300 assert_eq!(tb.avg_latency_us(), 100.0);
301 }
302
303 #[test]
304 fn test_throughput() {
305 let mut tb = TelemetryBuffer::new();
306 tb.messages_processed = 1000;
307
308 assert_eq!(tb.throughput(1.0), 1000.0);
309 assert_eq!(tb.throughput(2.0), 500.0);
310 assert_eq!(tb.throughput(0.0), 0.0);
311 }
312
313 #[test]
314 fn test_drop_rate() {
315 let mut tb = TelemetryBuffer::new();
316 tb.messages_processed = 90;
317 tb.messages_dropped = 10;
318
319 assert!((tb.drop_rate() - 0.1).abs() < 0.001);
320 }
321
322 #[test]
323 fn test_merge() {
324 let mut tb1 = TelemetryBuffer::new();
325 tb1.messages_processed = 100;
326 tb1.min_latency_us = 10;
327 tb1.max_latency_us = 100;
328
329 let mut tb2 = TelemetryBuffer::new();
330 tb2.messages_processed = 50;
331 tb2.min_latency_us = 5;
332 tb2.max_latency_us = 200;
333
334 tb1.merge(&tb2);
335
336 assert_eq!(tb1.messages_processed, 150);
337 assert_eq!(tb1.min_latency_us, 5);
338 assert_eq!(tb1.max_latency_us, 200);
339 }
340
341 #[test]
342 fn test_histogram_percentile() {
343 let mut hist = LatencyHistogram::with_buckets(vec![10, 50, 100, 500]);
344
345 for _ in 0..80 {
347 hist.record(5); }
349 for _ in 0..15 {
350 hist.record(30); }
352 for _ in 0..5 {
353 hist.record(200); }
355
356 assert_eq!(hist.percentile(50.0), 10); assert_eq!(hist.percentile(90.0), 50); assert_eq!(hist.percentile(99.0), 500); }
360}