use anyhow::Result;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum BridgeError {
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Stream error: {0}")]
StreamError(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum StreamType {
Audio,
Video,
Data,
ScreenShare,
RtcpFeedback,
}
pub mod stream_tags {
pub const AUDIO: u8 = 0x21;
pub const VIDEO: u8 = 0x22;
pub const SCREEN_SHARE: u8 = 0x23;
pub const RTCP_FEEDBACK: u8 = 0x24;
pub const DATA: u8 = 0x25;
}
impl StreamType {
#[must_use]
pub const fn priority(&self) -> u8 {
match self {
Self::Audio => 1, Self::RtcpFeedback => 1, Self::Video => 2,
Self::ScreenShare => 3,
Self::Data => 4, }
}
#[must_use]
pub const fn is_realtime(&self) -> bool {
matches!(
self,
Self::Audio | Self::Video | Self::ScreenShare | Self::RtcpFeedback
)
}
#[must_use]
pub const fn to_tag(&self) -> u8 {
match self {
Self::Audio => stream_tags::AUDIO,
Self::Video => stream_tags::VIDEO,
Self::ScreenShare => stream_tags::SCREEN_SHARE,
Self::RtcpFeedback => stream_tags::RTCP_FEEDBACK,
Self::Data => stream_tags::DATA,
}
}
#[must_use]
pub const fn from_tag(tag: u8) -> Option<Self> {
match tag {
stream_tags::AUDIO => Some(Self::Audio),
stream_tags::VIDEO => Some(Self::Video),
stream_tags::SCREEN_SHARE => Some(Self::ScreenShare),
stream_tags::RTCP_FEEDBACK => Some(Self::RtcpFeedback),
stream_tags::DATA => Some(Self::Data),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RtpPacket {
pub version: u8,
pub padding: bool,
pub extension: bool,
pub csrc_count: u8,
pub marker: bool,
pub payload_type: u8,
pub sequence_number: u16,
pub timestamp: u32,
pub ssrc: u32,
pub payload: Vec<u8>,
pub stream_type: StreamType,
}
impl RtpPacket {
pub fn new(
payload_type: u8,
sequence_number: u16,
timestamp: u32,
ssrc: u32,
payload: Vec<u8>,
stream_type: StreamType,
) -> Result<Self> {
const MAX_PAYLOAD_SIZE: usize = 1188;
if payload.len() > MAX_PAYLOAD_SIZE {
return Err(anyhow::anyhow!(
"Payload size {} exceeds maximum {}",
payload.len(),
MAX_PAYLOAD_SIZE
));
}
Ok(Self {
version: 2,
padding: false,
extension: false,
csrc_count: 0,
marker: false,
payload_type,
sequence_number,
timestamp,
ssrc,
payload,
stream_type,
})
}
pub fn to_bytes(&self) -> Result<Vec<u8>> {
postcard::to_stdvec(self)
.map_err(|e| anyhow::anyhow!("Failed to serialize RTP packet: {}", e))
}
pub fn from_bytes(data: &[u8]) -> Result<Self> {
const MAX_PACKET_SIZE: usize = 1200;
if data.is_empty() {
return Err(anyhow::anyhow!("Cannot deserialize empty data"));
}
if data.len() > MAX_PACKET_SIZE {
return Err(anyhow::anyhow!(
"Data size {} exceeds maximum packet size {}",
data.len(),
MAX_PACKET_SIZE
));
}
postcard::from_bytes(data)
.map_err(|e| anyhow::anyhow!("Failed to deserialize RTP packet: {}", e))
}
#[must_use]
pub fn size(&self) -> usize {
12 + self.payload.len() }
pub fn to_tagged_bytes(&self) -> Result<Vec<u8>> {
let tag = self.stream_type.to_tag();
let data = self.to_bytes()?;
let mut tagged = Vec::with_capacity(1 + data.len());
tagged.push(tag);
tagged.extend(data);
Ok(tagged)
}
pub fn from_tagged_bytes(data: &[u8]) -> Result<Self> {
if data.is_empty() {
return Err(anyhow::anyhow!("Cannot deserialize empty data"));
}
let tag = data[0];
let stream_type = StreamType::from_tag(tag)
.ok_or_else(|| anyhow::anyhow!("Invalid stream type tag: 0x{:02X}", tag))?;
let mut packet = Self::from_bytes(&data[1..])?;
packet.stream_type = stream_type;
Ok(packet)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
pub stream_type: StreamType,
pub target_bitrate_bps: u32,
pub max_bitrate_bps: u32,
pub max_latency_ms: u32,
}
impl StreamConfig {
#[must_use]
pub fn audio() -> Self {
Self {
stream_type: StreamType::Audio,
target_bitrate_bps: 64_000,
max_bitrate_bps: 128_000,
max_latency_ms: 50,
}
}
#[must_use]
pub fn video() -> Self {
Self {
stream_type: StreamType::Video,
target_bitrate_bps: 1_000_000,
max_bitrate_bps: 2_000_000,
max_latency_ms: 150,
}
}
#[must_use]
pub fn screen_share() -> Self {
Self {
stream_type: StreamType::ScreenShare,
target_bitrate_bps: 500_000,
max_bitrate_bps: 1_500_000,
max_latency_ms: 200,
}
}
}
#[derive(Debug, Clone)]
pub struct QuicBridgeConfig {
pub max_packet_size: usize,
}
impl Default for QuicBridgeConfig {
fn default() -> Self {
Self {
max_packet_size: 1200,
}
}
}
pub struct WebRtcQuicBridge {
config: QuicBridgeConfig,
transport: Option<crate::transport::AntQuicTransport>,
}
impl WebRtcQuicBridge {
#[must_use]
pub fn new(config: QuicBridgeConfig) -> Self {
Self {
config,
transport: None,
}
}
#[must_use]
pub fn with_transport(
config: QuicBridgeConfig,
transport: crate::transport::AntQuicTransport,
) -> Self {
Self {
config,
transport: Some(transport),
}
}
pub async fn send_rtp_packet(&self, packet: &RtpPacket) -> Result<(), BridgeError> {
let span = tracing::debug_span!(
"send_rtp_packet",
stream_type = ?packet.stream_type,
priority = packet.stream_type.priority(),
seq_num = packet.sequence_number
);
let _enter = span.enter();
let transport = self
.transport
.as_ref()
.ok_or_else(|| BridgeError::ConfigError("No transport configured".to_string()))?;
let data = packet
.to_tagged_bytes()
.map_err(|e| BridgeError::StreamError(format!("Failed to serialize packet: {}", e)))?;
if data.len() > self.config.max_packet_size {
return Err(BridgeError::StreamError(format!(
"Packet size {} exceeds maximum {}",
data.len(),
self.config.max_packet_size
)));
}
transport
.send_bytes(&data)
.await
.map_err(|e| BridgeError::StreamError(format!("Failed to send packet: {}", e)))?;
tracing::debug!(
"Sent RTP packet of size {} bytes with type tag 0x{:02X}",
data.len(),
packet.stream_type.to_tag()
);
Ok(())
}
pub async fn receive_rtp_packet(&self) -> Result<RtpPacket, BridgeError> {
let span = tracing::debug_span!("receive_rtp_packet");
let _enter = span.enter();
let transport = self
.transport
.as_ref()
.ok_or_else(|| BridgeError::ConfigError("No transport configured".to_string()))?;
let data = transport
.receive_bytes()
.await
.map_err(|e| BridgeError::StreamError(format!("Failed to receive: {}", e)))?;
let packet = RtpPacket::from_tagged_bytes(&data).map_err(|e| {
BridgeError::StreamError(format!("Failed to deserialize packet with tag: {}", e))
})?;
tracing::debug!(
"Received RTP packet of size {} bytes, stream_type={:?}, seq={}",
data.len(),
packet.stream_type,
packet.sequence_number
);
Ok(packet)
}
pub async fn bridge_track(&self, _track_id: &str) -> Result<(), BridgeError> {
Ok(())
}
}
impl Default for WebRtcQuicBridge {
fn default() -> Self {
Self::new(QuicBridgeConfig::default())
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[tokio::test]
async fn test_quic_bridge_send_rtp_packet() {
let bridge = WebRtcQuicBridge::default();
let packet = RtpPacket::new(
96,
1000,
12345,
0xDEADBEEF,
vec![1, 2, 3, 4],
StreamType::Audio,
)
.expect("Failed to create packet");
let _result = bridge.send_rtp_packet(&packet).await;
}
#[tokio::test]
async fn test_quic_bridge_receive_rtp_packet() {
let bridge = WebRtcQuicBridge::default();
let result = bridge.receive_rtp_packet().await;
assert!(result.is_err());
assert!(matches!(result, Err(BridgeError::ConfigError(_))));
}
#[tokio::test]
async fn test_quic_bridge_bridge_track() {
let bridge = WebRtcQuicBridge::default();
let result = bridge.bridge_track("audio-track").await;
assert!(result.is_ok());
}
#[test]
fn test_stream_type_to_tag() {
assert_eq!(StreamType::Audio.to_tag(), stream_tags::AUDIO);
assert_eq!(StreamType::Video.to_tag(), stream_tags::VIDEO);
assert_eq!(StreamType::ScreenShare.to_tag(), stream_tags::SCREEN_SHARE);
assert_eq!(StreamType::Data.to_tag(), stream_tags::DATA);
}
#[test]
fn test_stream_type_from_tag() {
assert_eq!(
StreamType::from_tag(stream_tags::AUDIO),
Some(StreamType::Audio)
);
assert_eq!(
StreamType::from_tag(stream_tags::VIDEO),
Some(StreamType::Video)
);
assert_eq!(
StreamType::from_tag(stream_tags::SCREEN_SHARE),
Some(StreamType::ScreenShare)
);
assert_eq!(
StreamType::from_tag(stream_tags::DATA),
Some(StreamType::Data)
);
assert_eq!(StreamType::from_tag(0xFF), None); }
#[test]
fn test_tagged_bytes_roundtrip() {
let packet = RtpPacket::new(
96,
1234,
56789,
0xABCDEF01,
vec![0x01, 0x02, 0x03, 0x04, 0x05],
StreamType::Audio,
)
.expect("Failed to create packet");
let tagged = packet.to_tagged_bytes().expect("Failed to serialize");
assert_eq!(tagged[0], stream_tags::AUDIO);
let restored = RtpPacket::from_tagged_bytes(&tagged).expect("Failed to deserialize");
assert_eq!(restored.payload_type, 96);
assert_eq!(restored.sequence_number, 1234);
assert_eq!(restored.timestamp, 56789);
assert_eq!(restored.ssrc, 0xABCDEF01);
assert_eq!(restored.stream_type, StreamType::Audio);
}
#[test]
fn test_tagged_bytes_video() {
let packet = RtpPacket::new(
98,
5000,
100000,
0x12345678,
vec![0xAA, 0xBB],
StreamType::Video,
)
.expect("Failed to create packet");
let tagged = packet.to_tagged_bytes().expect("Failed to serialize");
assert_eq!(tagged[0], stream_tags::VIDEO);
let restored = RtpPacket::from_tagged_bytes(&tagged).expect("Failed to deserialize");
assert_eq!(restored.stream_type, StreamType::Video);
}
#[test]
fn test_tagged_bytes_invalid_tag() {
let invalid = vec![0xFF, 0x00, 0x01];
let result = RtpPacket::from_tagged_bytes(&invalid);
assert!(result.is_err());
}
#[test]
fn test_tagged_bytes_empty() {
let result = RtpPacket::from_tagged_bytes(&[]);
assert!(result.is_err());
}
#[test]
fn test_stream_type_priority() {
assert_eq!(StreamType::Audio.priority(), 1);
assert_eq!(StreamType::Video.priority(), 2);
assert_eq!(StreamType::ScreenShare.priority(), 3);
assert_eq!(StreamType::Data.priority(), 4);
assert!(StreamType::Audio.priority() < StreamType::Video.priority());
assert!(StreamType::Video.priority() < StreamType::Data.priority());
}
#[test]
fn test_stream_type_is_realtime() {
assert!(StreamType::Audio.is_realtime());
assert!(StreamType::Video.is_realtime());
assert!(StreamType::ScreenShare.is_realtime());
assert!(!StreamType::Data.is_realtime());
}
#[test]
fn test_tagged_bytes_all_stream_types() {
for (stream_type, expected_tag) in &[
(StreamType::Audio, stream_tags::AUDIO),
(StreamType::Video, stream_tags::VIDEO),
(StreamType::ScreenShare, stream_tags::SCREEN_SHARE),
(StreamType::Data, stream_tags::DATA),
] {
let packet =
RtpPacket::new(96, 1000, 10000, 0x12345678, vec![0x01, 0x02], *stream_type)
.expect("Failed to create packet");
let tagged = packet.to_tagged_bytes().expect("Failed to tag");
assert_eq!(
tagged[0], *expected_tag,
"Tag mismatch for {:?}",
stream_type
);
let restored = RtpPacket::from_tagged_bytes(&tagged).expect("Failed to restore");
assert_eq!(
restored.stream_type, *stream_type,
"Type mismatch for {:?}",
stream_type
);
}
}
#[test]
fn test_tagged_bytes_preserves_payload() {
let original_payload = vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06];
let packet = RtpPacket::new(
96,
1000,
10000,
0x12345678,
original_payload.clone(),
StreamType::Audio,
)
.expect("Failed to create packet");
let tagged = packet.to_tagged_bytes().expect("Failed to tag");
let restored = RtpPacket::from_tagged_bytes(&tagged).expect("Failed to restore");
assert_eq!(restored.payload, original_payload);
assert_eq!(restored.version, 2);
assert_eq!(restored.payload_type, 96);
assert_eq!(restored.sequence_number, 1000);
assert_eq!(restored.timestamp, 10000);
assert_eq!(restored.ssrc, 0x12345678);
}
}