mecha10-video 0.1.25

WebRTC video streaming for Mecha10 - camera frame capture and broadcasting
Documentation
/// Camera frame publisher with dual-path support
///
/// Handles publishing camera frames to multiple destinations:
/// - WebRTC for low-latency streaming to dashboards
/// - Optional secondary channel (e.g., Redis) for telemetry/recording
///
/// Uses Arc-based zero-copy sharing to efficiently send the same frame data
/// to multiple consumers without duplication.
use anyhow::Result;
#[cfg(feature = "diagnostics")]
use mecha10_diagnostics::prelude::StreamingCollector;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::debug;

use crate::frame::{CameraFrame, ImageFormat};

/// Type alias for secondary publisher channel (e.g., Redis telemetry)
///
/// Tuple format: (camera_id, image_bytes_arc, format, width, height, timestamp)
pub type SecondaryPublisherChannel = mpsc::Sender<(String, Arc<Vec<u8>>, ImageFormat, u32, u32, u64)>;

/// Camera frame publisher
///
/// Provides efficient dual-path publishing:
/// 1. WebRTC channel for low-latency streaming
/// 2. Optional secondary channel for telemetry/recording
///
/// # Example
///
/// ```no_run
/// use mecha10_video::{CameraPublisher, ImageFormat};
/// use tokio::sync::mpsc;
///
/// # #[tokio::main]
/// # async fn main() -> anyhow::Result<()> {
/// let (webrtc_tx, webrtc_rx) = mpsc::channel(1);
/// let (redis_tx, redis_rx) = mpsc::channel(50);
///
/// # #[cfg(not(feature = "diagnostics"))]
/// let publisher = CameraPublisher::new(webrtc_tx)
///     .with_secondary_publisher(redis_tx);
///
/// // Publish frame (zero-copy Arc sharing)
/// publisher.publish(
///     "camera0".to_string(),
///     vec![0u8; 640 * 480 * 3],
///     ImageFormat::Rgb,
///     640,
///     480,
///     12345,
/// )?;
/// # Ok(())
/// # }
/// ```
pub struct CameraPublisher {
    webrtc_tx: mpsc::Sender<CameraFrame>,
    /// Optional secondary publisher (e.g., Redis channel)
    /// Tuple format: (camera_id, image_bytes_arc, format, width, height, timestamp)
    secondary_tx: Option<SecondaryPublisherChannel>,
    #[cfg(feature = "diagnostics")]
    diagnostics: Option<Arc<StreamingCollector>>,
}

impl CameraPublisher {
    /// Create a new camera publisher
    ///
    /// # Arguments
    /// * `webrtc_tx` - Channel for sending frames to WebRTC server
    ///
    /// # Example (without diagnostics)
    /// ```no_run
    /// use mecha10_video::CameraPublisher;
    /// use tokio::sync::mpsc;
    ///
    /// # #[tokio::main]
    /// # async fn main() {
    /// let (webrtc_tx, webrtc_rx) = mpsc::channel(1);
    /// # #[cfg(not(feature = "diagnostics"))]
    /// let publisher = CameraPublisher::new(webrtc_tx);
    /// # }
    /// ```
    #[cfg(not(feature = "diagnostics"))]
    pub fn new(webrtc_tx: mpsc::Sender<CameraFrame>) -> Self {
        Self {
            webrtc_tx,
            secondary_tx: None,
        }
    }

    /// Create a new camera publisher with diagnostics
    ///
    /// # Arguments
    /// * `webrtc_tx` - Channel for sending frames to WebRTC server
    /// * `diagnostics` - Streaming diagnostics collector
    ///
    /// # Example (with diagnostics feature)
    /// ```no_run
    /// use mecha10_video::CameraPublisher;
    /// # #[cfg(feature = "diagnostics")]
    /// use mecha10_diagnostics::prelude::StreamingCollector;
    /// use tokio::sync::mpsc;
    /// use std::sync::Arc;
    ///
    /// # #[cfg(feature = "diagnostics")]
    /// # #[tokio::main]
    /// # async fn main() {
    /// let (webrtc_tx, webrtc_rx) = mpsc::channel(1);
    /// let diagnostics = Arc::new(StreamingCollector::new("my-node"));
    /// let publisher = CameraPublisher::new(webrtc_tx, diagnostics);
    /// # }
    /// ```
    #[cfg(feature = "diagnostics")]
    pub fn new(webrtc_tx: mpsc::Sender<CameraFrame>, diagnostics: Arc<StreamingCollector>) -> Self {
        Self {
            webrtc_tx,
            secondary_tx: None,
            diagnostics: Some(diagnostics),
        }
    }

    /// Add a secondary publisher (e.g., for Redis telemetry)
    ///
    /// The secondary publisher receives the same Arc-wrapped frame data,
    /// enabling zero-copy sharing.
    ///
    /// # Arguments
    /// * `tx` - Channel for secondary publishing (best-effort, drops on full)
    ///
    /// # Example
    /// ```no_run
    /// use mecha10_video::CameraPublisher;
    /// use tokio::sync::mpsc;
    ///
    /// # #[tokio::main]
    /// # async fn main() {
    /// let (webrtc_tx, webrtc_rx) = mpsc::channel(1);
    /// let (redis_tx, redis_rx) = mpsc::channel(50);
    ///
    /// # #[cfg(not(feature = "diagnostics"))]
    /// let publisher = CameraPublisher::new(webrtc_tx)
    ///     .with_secondary_publisher(redis_tx);
    /// # }
    /// ```
    pub fn with_secondary_publisher(mut self, tx: SecondaryPublisherChannel) -> Self {
        self.secondary_tx = Some(tx);
        self
    }

    /// Publish a camera frame to all configured destinations
    ///
    /// This method:
    /// 1. Wraps image bytes in Arc for zero-copy sharing
    /// 2. Sends CameraFrame to WebRTC channel
    /// 3. Sends raw data to secondary channel (if configured)
    /// 4. Updates diagnostics (if enabled)
    ///
    /// # Arguments
    /// * `camera_id` - Camera identifier (e.g., "CameraFront")
    /// * `image_bytes` - Raw image data (JPEG or RGB)
    /// * `format` - Image format (JPEG or RGB)
    /// * `width` - Image width in pixels
    /// * `height` - Image height in pixels
    /// * `timestamp` - Frame timestamp in microseconds
    ///
    /// # Returns
    /// * `Ok(PublishStats)` - Publishing statistics
    /// * `Err(e)` - If WebRTC channel is closed or full
    ///
    /// # Example
    /// ```no_run
    /// use mecha10_video::{CameraPublisher, ImageFormat};
    /// use tokio::sync::mpsc;
    ///
    /// # #[tokio::main]
    /// # async fn main() -> anyhow::Result<()> {
    /// # let (webrtc_tx, webrtc_rx) = mpsc::channel(1);
    /// # #[cfg(not(feature = "diagnostics"))]
    /// # let publisher = CameraPublisher::new(webrtc_tx);
    /// let stats = publisher.publish(
    ///     "camera0".to_string(),
    ///     vec![0u8; 640 * 480 * 3],
    ///     ImageFormat::Rgb,
    ///     640,
    ///     480,
    ///     12345,
    /// )?;
    ///
    /// println!("Published to {} destinations", stats.destinations);
    /// # Ok(())
    /// # }
    /// ```
    pub fn publish(
        &self,
        camera_id: String,
        image_bytes: Vec<u8>,
        format: ImageFormat,
        width: u32,
        height: u32,
        timestamp: u64,
    ) -> Result<PublishStats> {
        // Wrap in Arc once for zero-copy sharing
        let image_bytes_arc = Arc::new(image_bytes);
        let frame_size = image_bytes_arc.len() as u64;

        let mut stats = PublishStats {
            frame_size_bytes: frame_size,
            destinations: 0,
            secondary_dropped: false,
        };

        // 1. Send to WebRTC (primary path)
        let webrtc_frame = CameraFrame {
            camera_id: camera_id.clone(),
            width,
            height,
            timestamp,
            image_bytes: Arc::clone(&image_bytes_arc),
            format,
        };

        match self.webrtc_tx.try_send(webrtc_frame) {
            Ok(_) => {
                stats.destinations += 1;

                #[cfg(feature = "diagnostics")]
                if let Some(diag) = &self.diagnostics {
                    diag.record_frame_sent(frame_size);
                }
            }
            Err(mpsc::error::TrySendError::Full(_)) => {
                #[cfg(feature = "diagnostics")]
                if let Some(diag) = &self.diagnostics {
                    diag.record_frame_dropped();
                }

                // Best-effort: drop frame if channel full (no dashboard or dashboard is slow)
                // This prevents backpressure from blocking frame capture
                debug!("WebRTC channel full, dropping frame");
            }
            Err(mpsc::error::TrySendError::Closed(_)) => {
                // Channel closed indicates WebRTC server failure - this is fatal
                return Err(anyhow::anyhow!("WebRTC channel closed"));
            }
        }

        // 2. Send to secondary publisher (best-effort)
        if let Some(tx) = &self.secondary_tx {
            match tx.try_send((camera_id, image_bytes_arc, format, width, height, timestamp)) {
                Ok(_) => {
                    stats.destinations += 1;
                }
                Err(mpsc::error::TrySendError::Full(_)) => {
                    // Best-effort - drop frame if channel full
                    debug!("Secondary publisher channel full, dropping frame");
                    stats.secondary_dropped = true;
                }
                Err(mpsc::error::TrySendError::Closed(_)) => {
                    // Secondary channel closed - not fatal
                    debug!("Secondary publisher channel closed");
                }
            }
        }

        Ok(stats)
    }
}

/// Statistics from publishing a camera frame
#[derive(Debug, Clone)]
pub struct PublishStats {
    /// Size of the published frame in bytes
    pub frame_size_bytes: u64,
    /// Number of destinations successfully sent to
    pub destinations: usize,
    /// Whether the secondary publisher dropped the frame
    pub secondary_dropped: bool,
}