use anyhow::Result;
use serde::{Deserialize, Serialize};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum BridgeError {
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Stream error: {0}")]
StreamError(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum StreamType {
Audio,
Video,
Data,
ScreenShare,
}
impl StreamType {
#[must_use]
pub const fn priority(&self) -> u8 {
match self {
Self::Audio => 1, Self::Video => 2,
Self::ScreenShare => 3,
Self::Data => 4, }
}
#[must_use]
pub const fn is_realtime(&self) -> bool {
matches!(self, Self::Audio | Self::Video | Self::ScreenShare)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RtpPacket {
pub version: u8,
pub padding: bool,
pub extension: bool,
pub csrc_count: u8,
pub marker: bool,
pub payload_type: u8,
pub sequence_number: u16,
pub timestamp: u32,
pub ssrc: u32,
pub payload: Vec<u8>,
pub stream_type: StreamType,
}
impl RtpPacket {
pub fn new(
payload_type: u8,
sequence_number: u16,
timestamp: u32,
ssrc: u32,
payload: Vec<u8>,
stream_type: StreamType,
) -> Result<Self> {
const MAX_PAYLOAD_SIZE: usize = 1188;
if payload.len() > MAX_PAYLOAD_SIZE {
return Err(anyhow::anyhow!(
"Payload size {} exceeds maximum {}",
payload.len(),
MAX_PAYLOAD_SIZE
));
}
Ok(Self {
version: 2,
padding: false,
extension: false,
csrc_count: 0,
marker: false,
payload_type,
sequence_number,
timestamp,
ssrc,
payload,
stream_type,
})
}
pub fn to_bytes(&self) -> Result<Vec<u8>> {
bincode::serialize(self)
.map_err(|e| anyhow::anyhow!("Failed to serialize RTP packet: {}", e))
}
pub fn from_bytes(data: &[u8]) -> Result<Self> {
const MAX_PACKET_SIZE: usize = 1200;
if data.is_empty() {
return Err(anyhow::anyhow!("Cannot deserialize empty data"));
}
if data.len() > MAX_PACKET_SIZE {
return Err(anyhow::anyhow!(
"Data size {} exceeds maximum packet size {}",
data.len(),
MAX_PACKET_SIZE
));
}
bincode::deserialize(data)
.map_err(|e| anyhow::anyhow!("Failed to deserialize RTP packet: {}", e))
}
#[must_use]
pub fn size(&self) -> usize {
12 + self.payload.len() }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamConfig {
pub stream_type: StreamType,
pub target_bitrate_bps: u32,
pub max_bitrate_bps: u32,
pub max_latency_ms: u32,
}
impl StreamConfig {
#[must_use]
pub fn audio() -> Self {
Self {
stream_type: StreamType::Audio,
target_bitrate_bps: 64_000,
max_bitrate_bps: 128_000,
max_latency_ms: 50,
}
}
#[must_use]
pub fn video() -> Self {
Self {
stream_type: StreamType::Video,
target_bitrate_bps: 1_000_000,
max_bitrate_bps: 2_000_000,
max_latency_ms: 150,
}
}
#[must_use]
pub fn screen_share() -> Self {
Self {
stream_type: StreamType::ScreenShare,
target_bitrate_bps: 500_000,
max_bitrate_bps: 1_500_000,
max_latency_ms: 200,
}
}
}
#[derive(Debug, Clone)]
pub struct QuicBridgeConfig {
pub max_packet_size: usize,
}
impl Default for QuicBridgeConfig {
fn default() -> Self {
Self {
max_packet_size: 1200,
}
}
}
pub struct WebRtcQuicBridge {
config: QuicBridgeConfig,
transport: Option<crate::transport::AntQuicTransport>,
}
impl WebRtcQuicBridge {
#[must_use]
pub fn new(config: QuicBridgeConfig) -> Self {
Self {
config,
transport: None,
}
}
#[must_use]
pub fn with_transport(config: QuicBridgeConfig, transport: crate::transport::AntQuicTransport) -> Self {
Self {
config,
transport: Some(transport),
}
}
pub async fn send_rtp_packet(&self, packet: &RtpPacket) -> Result<(), BridgeError> {
let transport = self.transport.as_ref()
.ok_or_else(|| BridgeError::ConfigError("No transport configured".to_string()))?;
let data = packet.to_bytes()
.map_err(|e| BridgeError::StreamError(format!("Failed to serialize packet: {}", e)))?;
if data.len() > self.config.max_packet_size {
return Err(BridgeError::StreamError(format!(
"Packet size {} exceeds maximum {}",
data.len(),
self.config.max_packet_size
)));
}
transport.send_bytes(&data).await
.map_err(|e| BridgeError::StreamError(format!("Failed to send packet: {}", e)))?;
tracing::debug!("Sent RTP packet of size {} bytes", data.len());
Ok(())
}
pub async fn receive_rtp_packet(&self) -> Result<RtpPacket, BridgeError> {
let transport = self.transport.as_ref()
.ok_or_else(|| BridgeError::ConfigError("No transport configured".to_string()))?;
let data = transport.receive_bytes().await
.map_err(|e| BridgeError::StreamError(format!("Failed to receive: {}", e)))?;
let packet = RtpPacket::from_bytes(&data)
.map_err(|e| BridgeError::StreamError(format!("Failed to deserialize packet: {}", e)))?;
tracing::debug!("Received RTP packet of size {} bytes", data.len());
Ok(packet)
}
pub async fn bridge_track(&self, _track_id: &str) -> Result<(), BridgeError> {
Ok(())
}
}
impl Default for WebRtcQuicBridge {
fn default() -> Self {
Self::new(QuicBridgeConfig::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_quic_bridge_send_rtp_packet() {
let bridge = WebRtcQuicBridge::default();
let packet = RtpPacket::new(96, 1000, 12345, 0xDEADBEEF, vec![1, 2, 3, 4], StreamType::Audio)
.expect("Failed to create packet");
let _result = bridge.send_rtp_packet(&packet).await;
}
#[tokio::test]
async fn test_quic_bridge_receive_rtp_packet() {
let bridge = WebRtcQuicBridge::default();
let result = bridge.receive_rtp_packet().await;
assert!(result.is_err());
assert!(matches!(result, Err(BridgeError::ConfigError(_))));
}
#[tokio::test]
async fn test_quic_bridge_bridge_track() {
let bridge = WebRtcQuicBridge::default();
let result = bridge.bridge_track("audio-track").await;
assert!(result.is_ok());
}
}