use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use bytes::Bytes;
use libwebrtc::prelude::*;
use libwebrtc::video_frame::FrameMetadata;
use libwebrtc::video_source::native::NativeVideoSource;
use tokio::sync::watch;
use tracing::{debug, error, warn};
use crate::RawChannel;
use crate::img2yuv::{ImageEncoding, ImageMessage, Yuv420Buffer};
use crate::throttler::Throttler;
const MIN_VIDEO_DIMENSION: u32 = 16;
const TOO_SMALL_WARN_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(clippy::enum_variant_names)]
pub enum VideoInputSchema {
FoxgloveCompressedImage,
FoxgloveRawImage,
#[cfg(feature = "img2yuv-ros1")]
Ros1CompressedImage,
#[cfg(feature = "img2yuv-ros1")]
Ros1Image,
#[cfg(feature = "img2yuv-ros2")]
Ros2CompressedImage,
#[cfg(feature = "img2yuv-ros2")]
Ros2Image,
}
fn detect_video_schema(encoding: &str, schema_name: &str) -> Option<VideoInputSchema> {
match (encoding, schema_name) {
("protobuf", "foxglove.CompressedImage") => Some(VideoInputSchema::FoxgloveCompressedImage),
("protobuf", "foxglove.RawImage") => Some(VideoInputSchema::FoxgloveRawImage),
#[cfg(feature = "img2yuv-ros1")]
("ros1", "sensor_msgs/CompressedImage") => Some(VideoInputSchema::Ros1CompressedImage),
#[cfg(feature = "img2yuv-ros1")]
("ros1", "sensor_msgs/Image") => Some(VideoInputSchema::Ros1Image),
#[cfg(feature = "img2yuv-ros2")]
("cdr", "sensor_msgs/msg/CompressedImage") => Some(VideoInputSchema::Ros2CompressedImage),
#[cfg(feature = "img2yuv-ros2")]
("cdr", "sensor_msgs/msg/Image") => Some(VideoInputSchema::Ros2Image),
_ => None,
}
}
pub fn get_video_input_schema(channel: &RawChannel) -> Option<VideoInputSchema> {
let schema_name = channel.schema().map(|s| s.name.as_str()).unwrap_or("");
detect_video_schema(channel.message_encoding(), schema_name)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct VideoMetadata {
pub(crate) encoding: ImageEncoding,
pub(crate) frame_id: String,
}
struct I420Yuv420(I420Buffer);
impl Yuv420Buffer for I420Yuv420 {
fn dimensions(&self) -> (u32, u32) {
(self.0.width(), self.0.height())
}
fn yuv(&self) -> (&[u8], &[u8], &[u8]) {
self.0.data()
}
fn yuv_mut(&mut self) -> (&mut [u8], &mut [u8], &mut [u8]) {
self.0.data_mut()
}
fn yuv_strides(&self) -> (u32, u32, u32) {
self.0.strides()
}
}
#[derive(Debug, thiserror::Error)]
enum VideoEncodeError {
#[error("failed to decode image message: {0}")]
Decode(String),
#[error("failed to convert image to YUV420: {0}")]
YuvConversion(#[from] crate::img2yuv::Error),
#[error(
"frame {width}x{height} is below the minimum encoder size {MIN_VIDEO_DIMENSION}x{MIN_VIDEO_DIMENSION}"
)]
TooSmall { width: u32, height: u32 },
}
pub(crate) struct VideoPublisher {
tx: flume::Sender<(Bytes, u64)>,
rx: flume::Receiver<(Bytes, u64)>,
#[allow(dead_code)]
video_source: NativeVideoSource,
metadata: Arc<ArcSwapOption<VideoMetadata>>,
}
impl VideoPublisher {
const CHANNEL_CAPACITY: usize = 2;
pub fn new(
video_source: NativeVideoSource,
input_schema: VideoInputSchema,
video_metadata_tx: watch::Sender<()>,
) -> Self {
let (tx, rx) = flume::bounded::<(Bytes, u64)>(Self::CHANNEL_CAPACITY);
let metadata: Arc<ArcSwapOption<VideoMetadata>> = Arc::new(ArcSwapOption::empty());
let source = video_source.clone();
let consumer_rx = rx.clone();
let task_metadata = metadata.clone();
tokio::spawn(async move {
let mut last_metadata: Option<VideoMetadata> = None;
let mut too_small_throttler = Throttler::new(TOO_SMALL_WARN_INTERVAL);
while let Ok((data, log_time_ns)) = consumer_rx.recv_async().await {
let source = source.clone();
let result = tokio::task::spawn_blocking(move || {
transcode_and_publish(input_schema, &source, &data, log_time_ns)
})
.await;
match result {
Ok(Ok(new_metadata)) => {
if last_metadata.as_ref() != Some(&new_metadata) {
last_metadata = Some(new_metadata.clone());
task_metadata.store(Some(Arc::new(new_metadata)));
video_metadata_tx.send_modify(|_| {});
}
}
Ok(Err(VideoEncodeError::TooSmall { width, height })) => {
if too_small_throttler.try_acquire() {
warn!(
"video frame {width}x{height} is below the minimum encoder size {MIN_VIDEO_DIMENSION}x{MIN_VIDEO_DIMENSION}; dropping frame"
);
}
}
Ok(Err(e)) => {
debug!("video encode error: {e}");
}
Err(e) => {
error!("video encode task panicked: {e}");
}
}
}
});
Self {
tx,
rx,
video_source,
metadata,
}
}
pub fn metadata(&self) -> arc_swap::Guard<Option<Arc<VideoMetadata>>> {
self.metadata.load()
}
pub fn send(&self, data: Bytes, log_time_ns: u64) {
let msg = (data, log_time_ns);
match self.tx.try_send(msg) {
Ok(()) => {}
Err(flume::TrySendError::Full(msg)) => {
let _ = self.rx.try_recv();
let _ = self.tx.try_send(msg);
}
Err(flume::TrySendError::Disconnected(_)) => {
warn!("video publisher channel closed");
}
}
}
}
fn transcode_and_publish(
input_schema: VideoInputSchema,
video_source: &NativeVideoSource,
data: &[u8],
log_time_ns: u64,
) -> Result<VideoMetadata, VideoEncodeError> {
let image_msg = decode_image_message(input_schema, data)?;
let (frame, metadata) = build_video_frame(image_msg, log_time_ns)?;
video_source.capture_frame(&frame);
Ok(metadata)
}
fn build_video_frame(
image_msg: ImageMessage<'_>,
log_time_ns: u64,
) -> Result<(VideoFrame<I420Buffer>, VideoMetadata), VideoEncodeError> {
let metadata = VideoMetadata {
encoding: image_msg.image.encoding(),
frame_id: image_msg.frame_id.clone(),
};
let (width, height) = image_msg
.image
.probe_dimensions()
.map_err(VideoEncodeError::YuvConversion)?;
let (width, height) = validate_frame_dimensions(width, height)?;
let mut buffer = I420Yuv420(I420Buffer::new(width, height));
image_msg
.image
.to_yuv420(&mut buffer)
.map_err(VideoEncodeError::YuvConversion)?;
let timestamp_ns = match image_msg.timestamp {
Some(ts) => ts.total_nanos(),
None => log_time_ns,
};
let frame = VideoFrame {
rotation: VideoRotation::VideoRotation0,
timestamp_us: (timestamp_ns / 1000) as i64,
frame_metadata: Some(FrameMetadata {
user_timestamp: Some(timestamp_ns),
frame_id: None,
}),
buffer: buffer.0,
};
Ok((frame, metadata))
}
fn validate_frame_dimensions(width: u32, height: u32) -> Result<(u32, u32), VideoEncodeError> {
let even_width = width & !1;
let even_height = height & !1;
if even_width == 0 || even_height == 0 {
return Err(VideoEncodeError::YuvConversion(
crate::img2yuv::Error::ZeroSized,
));
}
if even_width < MIN_VIDEO_DIMENSION || even_height < MIN_VIDEO_DIMENSION {
return Err(VideoEncodeError::TooSmall { width, height });
}
Ok((even_width, even_height))
}
fn decode_image_message<'a>(
input_schema: VideoInputSchema,
data: &'a [u8],
) -> Result<ImageMessage<'a>, VideoEncodeError> {
match input_schema {
VideoInputSchema::FoxgloveCompressedImage => {
let msg = <crate::messages::CompressedImage as crate::Decode>::decode(data)
.map_err(|e| VideoEncodeError::Decode(e.to_string()))?;
ImageMessage::try_from(msg).map_err(|e| VideoEncodeError::Decode(e.to_string()))
}
VideoInputSchema::FoxgloveRawImage => {
let msg = <crate::messages::RawImage as crate::Decode>::decode(data)
.map_err(|e| VideoEncodeError::Decode(e.to_string()))?;
ImageMessage::try_from(msg).map_err(|e| VideoEncodeError::Decode(e.to_string()))
}
#[cfg(feature = "img2yuv-ros1")]
VideoInputSchema::Ros1CompressedImage => {
let msg = crate::img2yuv::ros1::Ros1CompressedImage::decode(data)
.map_err(|e| VideoEncodeError::Decode(e.to_string()))?;
ImageMessage::try_from(msg).map_err(|e| VideoEncodeError::Decode(e.to_string()))
}
#[cfg(feature = "img2yuv-ros1")]
VideoInputSchema::Ros1Image => {
let msg = crate::img2yuv::ros1::Ros1Image::decode(data)
.map_err(|e| VideoEncodeError::Decode(e.to_string()))?;
ImageMessage::try_from(msg).map_err(|e| VideoEncodeError::Decode(e.to_string()))
}
#[cfg(feature = "img2yuv-ros2")]
VideoInputSchema::Ros2CompressedImage => {
let msg = crate::img2yuv::ros2::Ros2CompressedImage::decode(data)
.map_err(|e| VideoEncodeError::Decode(e.to_string()))?;
ImageMessage::try_from(msg).map_err(|e| VideoEncodeError::Decode(e.to_string()))
}
#[cfg(feature = "img2yuv-ros2")]
VideoInputSchema::Ros2Image => {
let msg = crate::img2yuv::ros2::Ros2Image::decode(data)
.map_err(|e| VideoEncodeError::Decode(e.to_string()))?;
ImageMessage::try_from(msg).map_err(|e| VideoEncodeError::Decode(e.to_string()))
}
}
}
#[cfg(test)]
mod tests {
use std::borrow::Cow;
use super::*;
use crate::img2yuv::{Image, ImageMessage, RawImage, RawImageEncoding};
use crate::messages::Timestamp;
fn make_image_message(timestamp: Option<Timestamp>) -> ImageMessage<'static> {
let width: u32 = 16;
let height: u32 = 16;
let stride = width * 3;
let data = vec![128u8; (stride * height) as usize];
ImageMessage {
timestamp,
frame_id: "camera_optical_frame".to_string(),
image: Image::Raw(RawImage {
encoding: RawImageEncoding::Rgb8,
width,
height,
stride,
data: Cow::Owned(data),
}),
}
}
#[test]
fn build_video_frame_propagates_image_timestamp_as_user_timestamp() {
let ts = Timestamp::new(1_700_000_000, 123_456_789);
let expected_ns = ts.total_nanos();
let log_time_ns = 42; let (frame, metadata) =
build_video_frame(make_image_message(Some(ts)), log_time_ns).expect("build frame");
let meta = frame.frame_metadata.expect("frame_metadata must be set");
assert_eq!(
meta.user_timestamp,
Some(expected_ns),
"user_timestamp should be the original RawImage timestamp, not log_time"
);
assert_eq!(
meta.frame_id, None,
"frame_id is not used for this end-to-end timestamp path"
);
assert_eq!(frame.timestamp_us, (expected_ns / 1000) as i64);
assert_eq!(metadata.frame_id, "camera_optical_frame");
assert_eq!(
metadata.encoding,
ImageEncoding::Raw(RawImageEncoding::Rgb8)
);
}
#[test]
fn build_video_frame_falls_back_to_log_time_when_image_has_no_timestamp() {
let log_time_ns = 9_876_543_210u64;
let (frame, _metadata) =
build_video_frame(make_image_message(None), log_time_ns).expect("build frame");
let meta = frame.frame_metadata.expect("frame_metadata must be set");
assert_eq!(
meta.user_timestamp,
Some(log_time_ns),
"without an image timestamp, fall back to the message log time"
);
assert_eq!(frame.timestamp_us, (log_time_ns / 1000) as i64);
}
#[test]
fn test_foxglove_compressed_image() {
assert_eq!(
detect_video_schema("protobuf", "foxglove.CompressedImage"),
Some(VideoInputSchema::FoxgloveCompressedImage)
);
}
#[test]
fn test_foxglove_raw_image() {
assert_eq!(
detect_video_schema("protobuf", "foxglove.RawImage"),
Some(VideoInputSchema::FoxgloveRawImage)
);
}
#[cfg(feature = "img2yuv-ros1")]
#[test]
fn test_ros1_compressed_image() {
assert_eq!(
detect_video_schema("ros1", "sensor_msgs/CompressedImage"),
Some(VideoInputSchema::Ros1CompressedImage)
);
}
#[cfg(feature = "img2yuv-ros1")]
#[test]
fn test_ros1_image() {
assert_eq!(
detect_video_schema("ros1", "sensor_msgs/Image"),
Some(VideoInputSchema::Ros1Image)
);
}
#[cfg(feature = "img2yuv-ros2")]
#[test]
fn test_ros2_compressed_image() {
assert_eq!(
detect_video_schema("cdr", "sensor_msgs/msg/CompressedImage"),
Some(VideoInputSchema::Ros2CompressedImage)
);
}
#[cfg(feature = "img2yuv-ros2")]
#[test]
fn test_ros2_image() {
assert_eq!(
detect_video_schema("cdr", "sensor_msgs/msg/Image"),
Some(VideoInputSchema::Ros2Image)
);
}
#[test]
fn test_unknown_schema() {
assert_eq!(detect_video_schema("json", "SomeCustomType"), None);
assert_eq!(detect_video_schema("protobuf", "foxglove.Pose"), None);
}
#[test]
fn validate_frame_dimensions_rejects_too_small() {
let err = validate_frame_dimensions(15, 15).unwrap_err();
assert!(matches!(
err,
VideoEncodeError::TooSmall {
width: 15,
height: 15
}
));
let err = validate_frame_dimensions(15, 16).unwrap_err();
assert!(matches!(
err,
VideoEncodeError::TooSmall {
width: 15,
height: 16
}
));
let err = validate_frame_dimensions(16, 15).unwrap_err();
assert!(matches!(
err,
VideoEncodeError::TooSmall {
width: 16,
height: 15
}
));
let err = validate_frame_dimensions(15, 17).unwrap_err();
assert!(matches!(
err,
VideoEncodeError::TooSmall {
width: 15,
height: 17
}
));
}
#[test]
fn validate_frame_dimensions_accepts_at_minimum() {
assert_eq!(validate_frame_dimensions(16, 16).unwrap(), (16, 16));
assert_eq!(validate_frame_dimensions(17, 17).unwrap(), (16, 16));
assert_eq!(validate_frame_dimensions(1920, 1080).unwrap(), (1920, 1080));
}
#[test]
fn validate_frame_dimensions_rejects_zero_after_alignment() {
let err = validate_frame_dimensions(1, 1).unwrap_err();
assert!(matches!(
err,
VideoEncodeError::YuvConversion(crate::img2yuv::Error::ZeroSized)
));
let err = validate_frame_dimensions(0, 16).unwrap_err();
assert!(matches!(
err,
VideoEncodeError::YuvConversion(crate::img2yuv::Error::ZeroSized)
));
let err = validate_frame_dimensions(16, 0).unwrap_err();
assert!(matches!(
err,
VideoEncodeError::YuvConversion(crate::img2yuv::Error::ZeroSized)
));
}
}