use crate::error::{NetError, NetResult};
use crate::smpte2110::rtp::{RtpHeader, RtpPacket, MAX_RTP_PAYLOAD};
use crate::smpte2110::timing::{FrameRate, ScanType};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::collections::HashMap;
pub const RTP_PAYLOAD_TYPE_VIDEO: u8 = 96;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PixelFormat {
YCbCr422_10bit,
YCbCr422_8bit,
YCbCr444_10bit,
YCbCr444_12bit,
Rgb10bit,
Rgb12bit,
Rgb8bit,
}
impl PixelFormat {
#[must_use]
pub const fn sampling(&self) -> &'static str {
match self {
Self::YCbCr422_10bit | Self::YCbCr422_8bit => "YCbCr-4:2:2",
Self::YCbCr444_10bit | Self::YCbCr444_12bit => "YCbCr-4:4:4",
Self::Rgb10bit | Self::Rgb12bit | Self::Rgb8bit => "RGB",
}
}
#[must_use]
pub const fn bit_depth(&self) -> u8 {
match self {
Self::YCbCr422_8bit | Self::Rgb8bit => 8,
Self::YCbCr422_10bit | Self::YCbCr444_10bit | Self::Rgb10bit => 10,
Self::YCbCr444_12bit | Self::Rgb12bit => 12,
}
}
#[must_use]
pub const fn components(&self) -> u8 {
match self {
Self::YCbCr422_10bit | Self::YCbCr422_8bit => 2, Self::YCbCr444_10bit
| Self::YCbCr444_12bit
| Self::Rgb10bit
| Self::Rgb12bit
| Self::Rgb8bit => 3,
}
}
#[must_use]
pub const fn pixel_group_size(&self) -> usize {
match self {
Self::YCbCr422_10bit | Self::YCbCr422_8bit => 2, Self::YCbCr444_10bit
| Self::YCbCr444_12bit
| Self::Rgb10bit
| Self::Rgb12bit
| Self::Rgb8bit => 1,
}
}
#[must_use]
pub const fn bytes_per_pixel_group(&self) -> usize {
match self {
Self::YCbCr422_8bit => 4, Self::YCbCr422_10bit => 5, Self::YCbCr444_10bit => 4, Self::YCbCr444_12bit => 5, Self::Rgb8bit => 3, Self::Rgb10bit => 4, Self::Rgb12bit => 5, }
}
#[must_use]
pub const fn sdp_format(&self) -> &'static str {
match self {
Self::YCbCr422_10bit => "UYVP", Self::YCbCr422_8bit => "UYVY", Self::YCbCr444_10bit => "v210", Self::YCbCr444_12bit => "v410", Self::Rgb10bit => "r210", Self::Rgb12bit => "R12B", Self::Rgb8bit => "RGB8", }
}
}
#[derive(Debug, Clone)]
pub struct VideoConfig {
pub width: u32,
pub height: u32,
pub frame_rate: FrameRate,
pub pixel_format: PixelFormat,
pub scan_type: ScanType,
pub packets_per_line: u32,
pub max_payload_size: usize,
}
impl Default for VideoConfig {
fn default() -> Self {
Self {
width: 1920,
height: 1080,
frame_rate: FrameRate::FPS_25,
pixel_format: PixelFormat::YCbCr422_10bit,
scan_type: ScanType::Progressive,
packets_per_line: 1,
max_payload_size: MAX_RTP_PAYLOAD,
}
}
}
impl VideoConfig {
#[must_use]
pub fn bytes_per_line(&self) -> usize {
let pixels_per_line = self.width as usize;
let pg_size = self.pixel_format.pixel_group_size();
let bytes_per_pg = self.pixel_format.bytes_per_pixel_group();
(pixels_per_line / pg_size) * bytes_per_pg
}
#[must_use]
pub fn active_lines(&self) -> u32 {
match self.scan_type {
ScanType::Progressive => self.height,
ScanType::InterlacedField1 | ScanType::InterlacedField2 => self.height / 2,
}
}
#[must_use]
pub fn frame_size_bytes(&self) -> usize {
self.bytes_per_line() * self.active_lines() as usize
}
#[must_use]
pub fn bitrate(&self) -> u64 {
let frame_size = self.frame_size_bytes() as u64;
let fps = self.frame_rate.as_f64();
(frame_size * 8) * (fps as u64)
}
}
#[derive(Debug, Clone, Copy)]
pub struct VideoHeaderExtension {
pub extended_sequence: u16,
pub line_and_field: u16,
pub offset_and_continuation: u16,
}
impl VideoHeaderExtension {
#[must_use]
pub const fn new(line_number: u16, field_id: bool, offset: u16, continuation: bool) -> Self {
let line_and_field = (line_number & 0x7FFF) | (if field_id { 0x8000 } else { 0 });
let offset_and_continuation =
((offset & 0x0FFF) << 4) | (if continuation { 0x01 } else { 0 });
Self {
extended_sequence: 0,
line_and_field,
offset_and_continuation,
}
}
#[must_use]
pub const fn line_number(&self) -> u16 {
self.line_and_field & 0x7FFF
}
#[must_use]
pub const fn field_id(&self) -> bool {
(self.line_and_field & 0x8000) != 0
}
#[must_use]
pub const fn offset(&self) -> u16 {
(self.offset_and_continuation >> 4) & 0x0FFF
}
#[must_use]
pub const fn is_continuation(&self) -> bool {
(self.offset_and_continuation & 0x01) != 0
}
pub fn serialize(&self, buf: &mut BytesMut) {
buf.put_u16(self.extended_sequence);
buf.put_u16(self.line_and_field);
buf.put_u16(self.offset_and_continuation);
}
pub fn parse(data: &[u8]) -> NetResult<Self> {
if data.len() < 6 {
return Err(NetError::parse(0, "Video header extension too short"));
}
let mut cursor = &data[..];
let extended_sequence = cursor.get_u16();
let line_and_field = cursor.get_u16();
let offset_and_continuation = cursor.get_u16();
Ok(Self {
extended_sequence,
line_and_field,
offset_and_continuation,
})
}
}
#[derive(Debug, Clone)]
pub struct VideoPacket {
pub header: RtpHeader,
pub video_extension: VideoHeaderExtension,
pub line_data: Bytes,
}
impl VideoPacket {
#[must_use]
pub fn new(header: RtpHeader, video_extension: VideoHeaderExtension, line_data: Bytes) -> Self {
Self {
header,
video_extension,
line_data,
}
}
pub fn from_rtp(rtp_packet: &RtpPacket) -> NetResult<Self> {
let ext_data = rtp_packet
.header
.extension_data
.as_ref()
.ok_or_else(|| NetError::protocol("Missing video header extension"))?;
let video_extension = VideoHeaderExtension::parse(&ext_data.data)?;
Ok(Self {
header: rtp_packet.header.clone(),
video_extension,
line_data: rtp_packet.payload.clone(),
})
}
#[must_use]
pub fn to_rtp(&self) -> RtpPacket {
let mut ext_data = BytesMut::with_capacity(6);
self.video_extension.serialize(&mut ext_data);
let mut header = self.header.clone();
header.extension = true;
header.extension_data = Some(crate::smpte2110::rtp::RtpHeaderExtension {
profile: 0x0100, data: ext_data.freeze(),
});
RtpPacket {
header,
payload: self.line_data.clone(),
}
}
}
pub struct VideoEncoder {
config: VideoConfig,
current_timestamp: u32,
sequence_number: u16,
ssrc: u32,
}
impl VideoEncoder {
#[must_use]
pub fn new(config: VideoConfig, ssrc: u32) -> Self {
Self {
config,
current_timestamp: rand::random(),
sequence_number: rand::random(),
ssrc,
}
}
pub fn encode_frame(&mut self, frame_data: &[u8]) -> NetResult<Vec<VideoPacket>> {
let expected_size = self.config.frame_size_bytes();
if frame_data.len() != expected_size {
return Err(NetError::protocol(format!(
"Frame size mismatch: expected {} bytes, got {}",
expected_size,
frame_data.len()
)));
}
let mut packets = Vec::new();
let bytes_per_line = self.config.bytes_per_line();
let active_lines = self.config.active_lines();
let field_id = matches!(self.config.scan_type, ScanType::InterlacedField2);
for line_num in 0..active_lines {
let line_start = (line_num as usize) * bytes_per_line;
let line_end = line_start + bytes_per_line;
let line_data = &frame_data[line_start..line_end];
let max_payload = self.config.max_payload_size;
let packets_this_line = (bytes_per_line + max_payload - 1) / max_payload;
for packet_idx in 0..packets_this_line {
let offset = packet_idx * max_payload;
let remaining = bytes_per_line - offset;
let payload_size = remaining.min(max_payload);
let payload = Bytes::copy_from_slice(&line_data[offset..offset + payload_size]);
let continuation = packet_idx > 0;
let marker = packet_idx == packets_this_line - 1 && line_num == active_lines - 1;
let video_ext = VideoHeaderExtension::new(
line_num as u16,
field_id,
offset as u16,
continuation,
);
let header = RtpHeader {
padding: false,
extension: true,
csrc_count: 0,
marker,
payload_type: RTP_PAYLOAD_TYPE_VIDEO,
sequence_number: self.sequence_number,
timestamp: self.current_timestamp,
ssrc: self.ssrc,
csrcs: Vec::new(),
extension_data: None, };
packets.push(VideoPacket::new(header, video_ext, payload));
self.sequence_number = self.sequence_number.wrapping_add(1);
}
}
let timestamp_increment =
(90000 * self.config.frame_rate.denominator) / self.config.frame_rate.numerator;
self.current_timestamp = self.current_timestamp.wrapping_add(timestamp_increment);
Ok(packets)
}
#[must_use]
pub const fn current_timestamp(&self) -> u32 {
self.current_timestamp
}
#[must_use]
pub const fn config(&self) -> &VideoConfig {
&self.config
}
}
#[derive(Debug)]
pub struct VideoDecoder {
config: VideoConfig,
frame_buffer: HashMap<u32, FrameAssembler>,
max_buffered_frames: usize,
}
impl VideoDecoder {
#[must_use]
pub fn new(config: VideoConfig) -> Self {
Self {
config,
frame_buffer: HashMap::new(),
max_buffered_frames: 4,
}
}
pub fn process_rtp_packet(&mut self, rtp_packet: &RtpPacket) -> NetResult<()> {
let video_packet = VideoPacket::from_rtp(rtp_packet)?;
let timestamp = video_packet.header.timestamp;
let assembler = self
.frame_buffer
.entry(timestamp)
.or_insert_with(|| FrameAssembler::new(self.config.clone()));
assembler.add_packet(video_packet)?;
if self.frame_buffer.len() > self.max_buffered_frames {
if let Some(oldest_ts) = self.frame_buffer.keys().min().copied() {
self.frame_buffer.remove(&oldest_ts);
}
}
Ok(())
}
pub fn get_frame(&mut self, timestamp: u32) -> Option<Vec<u8>> {
if let Some(assembler) = self.frame_buffer.get(×tamp) {
if assembler.is_complete() {
return self
.frame_buffer
.remove(×tamp)
.and_then(|a| a.get_frame());
}
}
None
}
pub fn get_completed_frames(&mut self) -> Vec<(u32, Vec<u8>)> {
let mut frames = Vec::new();
let completed: Vec<u32> = self
.frame_buffer
.iter()
.filter(|(_, a)| a.is_complete())
.map(|(ts, _)| *ts)
.collect();
for ts in completed {
if let Some(assembler) = self.frame_buffer.remove(&ts) {
if let Some(frame) = assembler.get_frame() {
frames.push((ts, frame));
}
}
}
frames
}
#[must_use]
pub const fn config(&self) -> &VideoConfig {
&self.config
}
}
#[derive(Debug)]
struct FrameAssembler {
config: VideoConfig,
lines: HashMap<u16, Vec<u8>>,
expected_lines: u32,
marker_received: bool,
}
impl FrameAssembler {
fn new(config: VideoConfig) -> Self {
let expected_lines = config.active_lines();
Self {
config,
lines: HashMap::new(),
expected_lines,
marker_received: false,
}
}
fn add_packet(&mut self, packet: VideoPacket) -> NetResult<()> {
let line_num = packet.video_extension.line_number();
let offset = packet.video_extension.offset() as usize;
let line_buffer = self
.lines
.entry(line_num)
.or_insert_with(|| vec![0u8; self.config.bytes_per_line()]);
let data_len = packet.line_data.len();
if offset + data_len <= line_buffer.len() {
line_buffer[offset..offset + data_len].copy_from_slice(&packet.line_data);
} else {
return Err(NetError::protocol("Packet data exceeds line buffer"));
}
if packet.header.marker {
self.marker_received = true;
}
Ok(())
}
fn is_complete(&self) -> bool {
self.marker_received && self.lines.len() == self.expected_lines as usize
}
fn get_frame(self) -> Option<Vec<u8>> {
if !self.is_complete() {
return None;
}
let mut frame = Vec::with_capacity(self.config.frame_size_bytes());
for line_num in 0..self.expected_lines {
if let Some(line_data) = self.lines.get(&(line_num as u16)) {
frame.extend_from_slice(line_data);
} else {
return None; }
}
Some(frame)
}
}
pub mod packing {
pub fn pack_ycbcr422_10bit(y0: u16, cb: u16, y1: u16, cr: u16) -> [u8; 5] {
let mut packed = [0u8; 5];
packed[0] = (cb >> 2) as u8;
packed[1] = ((cb & 0x03) << 6 | (y0 >> 4)) as u8;
packed[2] = ((y0 & 0x0F) << 4 | (cr >> 6)) as u8;
packed[3] = ((cr & 0x3F) << 2 | (y1 >> 8)) as u8;
packed[4] = (y1 & 0xFF) as u8;
packed
}
#[must_use]
pub fn unpack_ycbcr422_10bit(packed: &[u8; 5]) -> (u16, u16, u16, u16) {
let cb = (u16::from(packed[0]) << 2) | (u16::from(packed[1]) >> 6);
let y0 = ((u16::from(packed[1]) & 0x3F) << 4) | (u16::from(packed[2]) >> 4);
let cr = ((u16::from(packed[2]) & 0x0F) << 6) | (u16::from(packed[3]) >> 2);
let y1 = ((u16::from(packed[3]) & 0x03) << 8) | u16::from(packed[4]);
(y0, cb, y1, cr)
}
pub fn pack_rgb_10bit(r: u16, g: u16, b: u16) -> [u8; 4] {
let mut packed = [0u8; 4];
packed[0] = (r >> 2) as u8;
packed[1] = ((r & 0x03) << 6 | (g >> 4)) as u8;
packed[2] = ((g & 0x0F) << 4 | (b >> 6)) as u8;
packed[3] = ((b & 0x3F) << 2) as u8;
packed
}
#[must_use]
pub fn unpack_rgb_10bit(packed: &[u8; 4]) -> (u16, u16, u16) {
let r = (u16::from(packed[0]) << 2) | (u16::from(packed[1]) >> 6);
let g = ((u16::from(packed[1]) & 0x3F) << 4) | (u16::from(packed[2]) >> 4);
let b = ((u16::from(packed[2]) & 0x0F) << 6) | (u16::from(packed[3]) >> 2);
(r, g, b)
}
}
pub mod trs {
pub const SAV: [u8; 4] = [0xFF, 0x00, 0x00, 0xAB];
pub const EAV: [u8; 4] = [0xFF, 0x00, 0x00, 0x9D];
#[must_use]
pub fn is_sav(data: &[u8]) -> bool {
data.len() >= 4 && data[0..4] == SAV
}
#[must_use]
pub fn is_eav(data: &[u8]) -> bool {
data.len() >= 4 && data[0..4] == EAV
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pixel_format_properties() {
let fmt = PixelFormat::YCbCr422_10bit;
assert_eq!(fmt.sampling(), "YCbCr-4:2:2");
assert_eq!(fmt.bit_depth(), 10);
assert_eq!(fmt.pixel_group_size(), 2);
assert_eq!(fmt.bytes_per_pixel_group(), 5);
}
#[test]
fn test_video_config() {
let config = VideoConfig {
width: 1920,
height: 1080,
frame_rate: FrameRate::FPS_25,
pixel_format: PixelFormat::YCbCr422_10bit,
scan_type: ScanType::Progressive,
packets_per_line: 1,
max_payload_size: MAX_RTP_PAYLOAD,
};
assert_eq!(config.bytes_per_line(), 4800); assert_eq!(config.active_lines(), 1080);
assert!(config.bitrate() > 0);
}
#[test]
fn test_video_header_extension() {
let ext = VideoHeaderExtension::new(100, true, 256, false);
assert_eq!(ext.line_number(), 100);
assert!(ext.field_id());
assert_eq!(ext.offset(), 256);
assert!(!ext.is_continuation());
let mut buf = BytesMut::new();
ext.serialize(&mut buf);
assert_eq!(buf.len(), 6);
}
#[test]
fn test_ycbcr422_10bit_packing() {
let y0 = 512;
let cb = 256;
let y1 = 768;
let cr = 128;
let packed = packing::pack_ycbcr422_10bit(y0, cb, y1, cr);
let (y0_out, cb_out, y1_out, cr_out) = packing::unpack_ycbcr422_10bit(&packed);
assert_eq!(y0, y0_out);
assert_eq!(cb, cb_out);
assert_eq!(y1, y1_out);
assert_eq!(cr, cr_out);
}
#[test]
fn test_rgb_10bit_packing() {
let r = 512;
let g = 768;
let b = 256;
let packed = packing::pack_rgb_10bit(r, g, b);
let (r_out, g_out, b_out) = packing::unpack_rgb_10bit(&packed);
assert_eq!(r, r_out);
assert_eq!(g, g_out);
assert_eq!(b, b_out);
}
#[test]
fn test_trs_detection() {
assert!(trs::is_sav(&trs::SAV));
assert!(trs::is_eav(&trs::EAV));
assert!(!trs::is_sav(&[0, 0, 0, 0]));
}
#[test]
fn test_video_encoder() {
let config = VideoConfig {
width: 320,
height: 240,
frame_rate: FrameRate::FPS_25,
pixel_format: PixelFormat::YCbCr422_10bit,
scan_type: ScanType::Progressive,
packets_per_line: 1,
max_payload_size: MAX_RTP_PAYLOAD,
};
let mut encoder = VideoEncoder::new(config.clone(), 12345);
let frame_size = config.frame_size_bytes();
let frame_data = vec![0u8; frame_size];
let packets = encoder
.encode_frame(&frame_data)
.expect("should succeed in test");
assert!(!packets.is_empty());
assert_eq!(packets.len(), config.active_lines() as usize);
}
#[test]
fn test_frame_assembler() {
let config = VideoConfig {
width: 320,
height: 240,
frame_rate: FrameRate::FPS_25,
pixel_format: PixelFormat::YCbCr422_10bit,
scan_type: ScanType::Progressive,
packets_per_line: 1,
max_payload_size: MAX_RTP_PAYLOAD,
};
let mut assembler = FrameAssembler::new(config.clone());
assert!(!assembler.is_complete());
for line_num in 0..config.active_lines() {
let line_data = vec![0u8; config.bytes_per_line()];
let ext = VideoHeaderExtension::new(line_num as u16, false, 0, false);
let header = RtpHeader {
padding: false,
extension: true,
csrc_count: 0,
marker: line_num == config.active_lines() - 1,
payload_type: RTP_PAYLOAD_TYPE_VIDEO,
sequence_number: line_num as u16,
timestamp: 0,
ssrc: 12345,
csrcs: Vec::new(),
extension_data: None,
};
let packet = VideoPacket::new(header, ext, Bytes::from(line_data));
assembler
.add_packet(packet)
.expect("should succeed in test");
}
assert!(assembler.is_complete());
let frame = assembler.get_frame();
assert!(frame.is_some());
}
}