mecha10-diagnostics 0.1.25

Diagnostics and metrics collection for Mecha10 robotics framework
Documentation
//! Streaming pipeline diagnostics collector
//!
//! Tracks frame pipeline metrics, encoding performance, latency, and bandwidth.

use crate::metrics::{Counter, Gauge, Histogram, RateCalculator};
use crate::topics::*;
use crate::types::*;
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use std::sync::Mutex;
use std::time::Instant;

/// Streaming diagnostics collector
///
/// Designed for minimal overhead on hot paths:
/// - Uses atomic counters for frame counting
/// - Histogram updates in background thread
/// - Publishes aggregated metrics every 1-5 seconds
pub struct StreamingCollector {
    source: String,

    // Pipeline counters (atomic, zero-cost on hot path)
    frames_received: Counter,
    frames_encoded: Counter,
    frames_sent: Counter,
    frames_dropped: Counter,

    // Queue depth gauge
    encoder_queue_depth: Gauge,

    // Bandwidth tracking
    bytes_sent: Counter,

    // Rate calculators
    fps_calculator: RateCalculator,
    bandwidth_calculator: RateCalculator,

    // Encoding performance (requires mutex for histogram)
    encoding_histogram: Mutex<Option<Histogram>>,
    slow_frames: Counter,

    // Last publish time
    last_publish: Mutex<Instant>,
}

impl StreamingCollector {
    /// Create a new streaming collector
    pub fn new(source: impl Into<String>) -> Self {
        Self {
            source: source.into(),
            frames_received: Counter::new(),
            frames_encoded: Counter::new(),
            frames_sent: Counter::new(),
            frames_dropped: Counter::new(),
            encoder_queue_depth: Gauge::new(),
            bytes_sent: Counter::new(),
            fps_calculator: RateCalculator::new(),
            bandwidth_calculator: RateCalculator::new(),
            encoding_histogram: Mutex::new(Histogram::for_duration().ok()),
            slow_frames: Counter::new(),
            last_publish: Mutex::new(Instant::now()),
        }
    }

    // ===== Hot Path Methods (minimal overhead) =====

    /// Record frame received from camera
    #[inline]
    pub fn record_frame_received(&self) {
        self.frames_received.inc();
    }

    /// Record frame encoded
    #[inline]
    pub fn record_frame_encoded(&self, encode_time_us: u64) {
        self.frames_encoded.inc();

        // Record encoding time in histogram (requires lock but off hot path)
        if let Ok(mut hist) = self.encoding_histogram.try_lock() {
            if let Some(h) = hist.as_mut() {
                h.record(encode_time_us);
            }
        }

        // Track slow frames (>50ms for 20fps target)
        if encode_time_us > 50_000 {
            self.slow_frames.inc();
        }
    }

    /// Record frame sent via WebRTC
    #[inline]
    pub fn record_frame_sent(&self, frame_size_bytes: u64) {
        self.frames_sent.inc();
        self.bytes_sent.add(frame_size_bytes);
    }

    /// Record frame dropped
    #[inline]
    pub fn record_frame_dropped(&self) {
        self.frames_dropped.inc();
    }

    /// Update encoder queue depth
    #[inline]
    pub fn set_encoder_queue_depth(&self, depth: usize) {
        self.encoder_queue_depth.set(depth as u64);
    }

    // ===== Publishing Methods =====

    /// Publish pipeline metrics (call periodically, e.g., every 1-5 seconds)
    pub async fn publish_pipeline_metrics(&self, ctx: &Context) -> Result<()> {
        let frames_received = self.frames_received.get();
        let frames_encoded = self.frames_encoded.get();
        let frames_sent = self.frames_sent.get();
        let frames_dropped = self.frames_dropped.get();
        let bytes_sent = self.bytes_sent.get();

        // Calculate rates
        let fps = self.fps_calculator.calculate(frames_received).unwrap_or(0.0);
        let bytes_per_second = self.bandwidth_calculator.calculate(bytes_sent).unwrap_or(0.0) as u64;

        let metrics = StreamingPipelineMetrics {
            frames_received,
            frames_encoded,
            frames_sent,
            frames_dropped,
            bytes_per_second,
            fps,
        };

        let msg = DiagnosticMessage::new(&self.source, metrics);
        ctx.publish_to(
            Topic::<DiagnosticMessage<StreamingPipelineMetrics>>::new(TOPIC_DIAGNOSTICS_STREAMING_PIPELINE),
            &msg,
        )
        .await?;

        Ok(())
    }

    /// Publish encoding metrics
    pub async fn publish_encoding_metrics(&self, ctx: &Context) -> Result<()> {
        let total_frames = self.frames_encoded.get();
        let slow_frames = self.slow_frames.get();
        let queue_depth = self.encoder_queue_depth.get() as usize;

        // Get histogram stats
        let (avg, p50, p95, p99, max) = {
            let hist_lock = self.encoding_histogram.lock().unwrap();
            if let Some(hist) = hist_lock.as_ref() {
                if hist.count() > 0 {
                    (
                        hist.mean() / 1000.0, // Convert to ms
                        hist.p50() as f64 / 1000.0,
                        hist.p95() as f64 / 1000.0,
                        hist.p99() as f64 / 1000.0,
                        hist.max() as f64 / 1000.0,
                    )
                } else {
                    (0.0, 0.0, 0.0, 0.0, 0.0)
                }
            } else {
                (0.0, 0.0, 0.0, 0.0, 0.0)
            }
        };

        let metrics = EncodingMetrics {
            total_frames,
            slow_frames,
            avg_encode_time_ms: avg,
            p50_encode_time_ms: p50,
            p95_encode_time_ms: p95,
            p99_encode_time_ms: p99,
            max_encode_time_ms: max,
            queue_depth,
        };

        let msg = DiagnosticMessage::new(&self.source, metrics);
        ctx.publish_to(
            Topic::<DiagnosticMessage<EncodingMetrics>>::new(TOPIC_DIAGNOSTICS_STREAMING_ENCODING),
            &msg,
        )
        .await?;

        Ok(())
    }

    /// Publish bandwidth metrics
    pub async fn publish_bandwidth_metrics(&self, ctx: &Context, target_bitrate_bps: u64) -> Result<()> {
        let bytes_sent = self.bytes_sent.get();
        let frames_sent = self.frames_sent.get();

        let bytes_per_second = self.bandwidth_calculator.calculate(bytes_sent).unwrap_or(0.0) as u64;
        let bitrate_bps = bytes_per_second * 8;

        let avg_frame_size = if frames_sent > 0 { bytes_sent / frames_sent } else { 0 };

        let utilization = if target_bitrate_bps > 0 {
            (bitrate_bps as f64 / target_bitrate_bps as f64).min(1.0)
        } else {
            0.0
        };

        let metrics = BandwidthMetrics {
            bitrate_bps,
            target_bitrate_bps,
            avg_frame_size_bytes: avg_frame_size,
            total_bytes_sent: bytes_sent,
            utilization,
        };

        let msg = DiagnosticMessage::new(&self.source, metrics);
        ctx.publish_to(
            Topic::<DiagnosticMessage<BandwidthMetrics>>::new(TOPIC_DIAGNOSTICS_STREAMING_BANDWIDTH),
            &msg,
        )
        .await?;

        Ok(())
    }

    /// Publish all streaming metrics (convenience method)
    pub async fn publish_all(&self, ctx: &Context, target_bitrate_bps: u64) -> Result<()> {
        // Only publish if enough time has elapsed (rate limiting)
        let should_publish = {
            let mut last = self.last_publish.lock().unwrap();
            let elapsed = last.elapsed();
            if elapsed.as_secs() >= 1 {
                *last = Instant::now();
                true
            } else {
                false
            }
        };

        if !should_publish {
            return Ok(());
        }

        self.publish_pipeline_metrics(ctx).await?;
        self.publish_encoding_metrics(ctx).await?;
        self.publish_bandwidth_metrics(ctx, target_bitrate_bps).await?;

        Ok(())
    }

    /// Reset all metrics (useful for testing)
    pub fn reset(&self) {
        self.frames_received.reset();
        self.frames_encoded.reset();
        self.frames_sent.reset();
        self.frames_dropped.reset();
        self.bytes_sent.reset();
        self.slow_frames.reset();
        self.encoder_queue_depth.set(0);
        self.fps_calculator.reset();
        self.bandwidth_calculator.reset();

        if let Ok(mut hist) = self.encoding_histogram.lock() {
            if let Some(h) = hist.as_mut() {
                h.reset();
            }
        }
    }
}