#[derive(Debug, Clone, Copy)]
#[repr(C, align(64))]
pub struct TelemetryBuffer {
pub messages_processed: u64,
pub messages_dropped: u64,
pub total_latency_us: u64,
pub min_latency_us: u64,
pub max_latency_us: u64,
pub input_queue_depth: u32,
pub output_queue_depth: u32,
pub last_error: u32,
pub _reserved: [u32; 3],
}
const _: () = assert!(std::mem::size_of::<TelemetryBuffer>() == 64);
impl TelemetryBuffer {
pub const fn new() -> Self {
Self {
messages_processed: 0,
messages_dropped: 0,
total_latency_us: 0,
min_latency_us: u64::MAX,
max_latency_us: 0,
input_queue_depth: 0,
output_queue_depth: 0,
last_error: 0,
_reserved: [0; 3],
}
}
pub fn avg_latency_us(&self) -> f64 {
if self.messages_processed == 0 {
0.0
} else {
self.total_latency_us as f64 / self.messages_processed as f64
}
}
pub fn throughput(&self, elapsed_secs: f64) -> f64 {
if elapsed_secs <= 0.0 {
0.0
} else {
self.messages_processed as f64 / elapsed_secs
}
}
pub fn drop_rate(&self) -> f64 {
let total = self.messages_processed + self.messages_dropped;
if total == 0 {
0.0
} else {
self.messages_dropped as f64 / total as f64
}
}
pub fn reset(&mut self) {
*self = Self::new();
}
pub fn merge(&mut self, other: &TelemetryBuffer) {
self.messages_processed += other.messages_processed;
self.messages_dropped += other.messages_dropped;
self.total_latency_us += other.total_latency_us;
self.min_latency_us = self.min_latency_us.min(other.min_latency_us);
self.max_latency_us = self.max_latency_us.max(other.max_latency_us);
self.input_queue_depth = other.input_queue_depth;
self.output_queue_depth = other.output_queue_depth;
if other.last_error != 0 {
self.last_error = other.last_error;
}
}
}
impl Default for TelemetryBuffer {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct KernelMetrics {
pub telemetry: TelemetryBuffer,
pub kernel_id: String,
pub collected_at: std::time::Instant,
pub uptime: std::time::Duration,
pub invocations: u64,
pub bytes_to_device: u64,
pub bytes_from_device: u64,
pub gpu_memory_used: u64,
pub host_memory_used: u64,
}
impl Default for KernelMetrics {
fn default() -> Self {
Self {
telemetry: TelemetryBuffer::default(),
kernel_id: String::new(),
collected_at: std::time::Instant::now(),
uptime: std::time::Duration::ZERO,
invocations: 0,
bytes_to_device: 0,
bytes_from_device: 0,
gpu_memory_used: 0,
host_memory_used: 0,
}
}
}
impl KernelMetrics {
pub fn new(kernel_id: impl Into<String>) -> Self {
Self {
kernel_id: kernel_id.into(),
..Default::default()
}
}
pub fn transfer_bandwidth(&self) -> f64 {
let total_bytes = self.bytes_to_device + self.bytes_from_device;
let secs = self.uptime.as_secs_f64();
if secs > 0.0 {
total_bytes as f64 / secs
} else {
0.0
}
}
pub fn summary(&self) -> String {
format!(
"Kernel {} - Processed: {}, Dropped: {}, Avg Latency: {:.2}µs, Throughput: {:.2}/s",
self.kernel_id,
self.telemetry.messages_processed,
self.telemetry.messages_dropped,
self.telemetry.avg_latency_us(),
self.telemetry.throughput(self.uptime.as_secs_f64())
)
}
}
#[derive(Debug, Clone)]
pub struct LatencyHistogram {
pub buckets: Vec<u64>,
pub counts: Vec<u64>,
pub overflow: u64,
}
impl LatencyHistogram {
pub fn new() -> Self {
Self::with_buckets(vec![1, 10, 100, 1_000, 10_000, 100_000, 1_000_000])
}
pub fn with_buckets(buckets: Vec<u64>) -> Self {
let counts = vec![0; buckets.len()];
Self {
buckets,
counts,
overflow: 0,
}
}
pub fn record(&mut self, value_us: u64) {
for (i, &boundary) in self.buckets.iter().enumerate() {
if value_us <= boundary {
self.counts[i] += 1;
return;
}
}
self.overflow += 1;
}
pub fn total(&self) -> u64 {
self.counts.iter().sum::<u64>() + self.overflow
}
pub fn percentile(&self, p: f64) -> u64 {
let total = self.total();
if total == 0 {
return 0;
}
let target = (total as f64 * p / 100.0).ceil() as u64;
let mut cumulative = 0u64;
for (i, &count) in self.counts.iter().enumerate() {
cumulative += count;
if cumulative >= target {
return self.buckets[i];
}
}
self.buckets.last().map(|b| b + 1).unwrap_or(0)
}
}
impl Default for LatencyHistogram {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_telemetry_buffer_size() {
assert_eq!(std::mem::size_of::<TelemetryBuffer>(), 64);
}
#[test]
fn test_avg_latency() {
let mut tb = TelemetryBuffer::new();
assert_eq!(tb.avg_latency_us(), 0.0);
tb.messages_processed = 10;
tb.total_latency_us = 1000;
assert_eq!(tb.avg_latency_us(), 100.0);
}
#[test]
fn test_throughput() {
let mut tb = TelemetryBuffer::new();
tb.messages_processed = 1000;
assert_eq!(tb.throughput(1.0), 1000.0);
assert_eq!(tb.throughput(2.0), 500.0);
assert_eq!(tb.throughput(0.0), 0.0);
}
#[test]
fn test_drop_rate() {
let mut tb = TelemetryBuffer::new();
tb.messages_processed = 90;
tb.messages_dropped = 10;
assert!((tb.drop_rate() - 0.1).abs() < 0.001);
}
#[test]
fn test_merge() {
let mut tb1 = TelemetryBuffer::new();
tb1.messages_processed = 100;
tb1.min_latency_us = 10;
tb1.max_latency_us = 100;
let mut tb2 = TelemetryBuffer::new();
tb2.messages_processed = 50;
tb2.min_latency_us = 5;
tb2.max_latency_us = 200;
tb1.merge(&tb2);
assert_eq!(tb1.messages_processed, 150);
assert_eq!(tb1.min_latency_us, 5);
assert_eq!(tb1.max_latency_us, 200);
}
#[test]
fn test_histogram_percentile() {
let mut hist = LatencyHistogram::with_buckets(vec![10, 50, 100, 500]);
for _ in 0..80 {
hist.record(5); }
for _ in 0..15 {
hist.record(30); }
for _ in 0..5 {
hist.record(200); }
assert_eq!(hist.percentile(50.0), 10); assert_eq!(hist.percentile(90.0), 50); assert_eq!(hist.percentile(99.0), 500); }
}