use anyhow::Result;
#[cfg(feature = "diagnostics")]
use mecha10_diagnostics::prelude::StreamingCollector;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::debug;
use crate::frame::{CameraFrame, ImageFormat};
pub type SecondaryPublisherChannel = mpsc::Sender<(String, Arc<Vec<u8>>, ImageFormat, u32, u32, u64)>;
pub struct CameraPublisher {
webrtc_tx: mpsc::Sender<CameraFrame>,
secondary_tx: Option<SecondaryPublisherChannel>,
#[cfg(feature = "diagnostics")]
diagnostics: Option<Arc<StreamingCollector>>,
}
impl CameraPublisher {
#[cfg(not(feature = "diagnostics"))]
pub fn new(webrtc_tx: mpsc::Sender<CameraFrame>) -> Self {
Self {
webrtc_tx,
secondary_tx: None,
}
}
#[cfg(feature = "diagnostics")]
pub fn new(webrtc_tx: mpsc::Sender<CameraFrame>, diagnostics: Arc<StreamingCollector>) -> Self {
Self {
webrtc_tx,
secondary_tx: None,
diagnostics: Some(diagnostics),
}
}
pub fn with_secondary_publisher(mut self, tx: SecondaryPublisherChannel) -> Self {
self.secondary_tx = Some(tx);
self
}
pub fn publish(
&self,
camera_id: String,
image_bytes: Vec<u8>,
format: ImageFormat,
width: u32,
height: u32,
timestamp: u64,
) -> Result<PublishStats> {
let image_bytes_arc = Arc::new(image_bytes);
let frame_size = image_bytes_arc.len() as u64;
let mut stats = PublishStats {
frame_size_bytes: frame_size,
destinations: 0,
secondary_dropped: false,
};
let webrtc_frame = CameraFrame {
camera_id: camera_id.clone(),
width,
height,
timestamp,
image_bytes: Arc::clone(&image_bytes_arc),
format,
};
match self.webrtc_tx.try_send(webrtc_frame) {
Ok(_) => {
stats.destinations += 1;
#[cfg(feature = "diagnostics")]
if let Some(diag) = &self.diagnostics {
diag.record_frame_sent(frame_size);
}
}
Err(mpsc::error::TrySendError::Full(_)) => {
#[cfg(feature = "diagnostics")]
if let Some(diag) = &self.diagnostics {
diag.record_frame_dropped();
}
debug!("WebRTC channel full, dropping frame");
}
Err(mpsc::error::TrySendError::Closed(_)) => {
return Err(anyhow::anyhow!("WebRTC channel closed"));
}
}
if let Some(tx) = &self.secondary_tx {
match tx.try_send((camera_id, image_bytes_arc, format, width, height, timestamp)) {
Ok(_) => {
stats.destinations += 1;
}
Err(mpsc::error::TrySendError::Full(_)) => {
debug!("Secondary publisher channel full, dropping frame");
stats.secondary_dropped = true;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
debug!("Secondary publisher channel closed");
}
}
}
Ok(stats)
}
}
#[derive(Debug, Clone)]
pub struct PublishStats {
pub frame_size_bytes: u64,
pub destinations: usize,
pub secondary_dropped: bool,
}