use crate::media::ReceiveTimestampClock;
use crate::media::recorder::{Leg as RecLeg, Recorder};
use crate::media::transcoder::{RtpTiming, Transcoder, rewrite_dtmf_duration};
use anyhow::Result;
use audio_codec::CodecType as AudioCodecType;
use rustrtc::{
IceServer, PeerConnection, PeerConnectionEvent, RtpCodecParameters, RtpSender, TransportMode,
media::{
AudioFrame, MediaError, MediaKind, MediaSample, MediaStreamTrack, SampleStreamSource,
SampleStreamTrack,
},
rtp::RtcpPacket,
};
use std::{
collections::HashMap,
sync::{
Arc,
atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
},
};
use tokio::sync::{Mutex as AsyncMutex, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
pub type MediaSender = SampleStreamSource;
pub type PeerId = String;
const BRIDGE_OUTPUT_PEER: u8 = 0;
const BRIDGE_OUTPUT_FILE: u8 = 1;
const BRIDGE_OUTPUT_MUTED: u8 = 2;
struct OutputState {
mode: u8,
file_source: Option<crate::media::FileTrackPlaybackSource>,
next_rtp_timestamp: Option<u32>,
next_sequence_number: Option<u16>,
active_rtp_offset: Option<u32>,
active_seq_offset: Option<u16>,
}
pub struct PeerEntry {
pub pc: PeerConnection,
pub transport: rustrtc::TransportMode,
pub audio_sender: Option<MediaSender>,
pub codec: AudioCodecType,
pub dtmf_payload_type: Option<u8>,
pub dtmf_sink: Option<Arc<dyn Fn(char) + Send + Sync + 'static>>,
_output_state: Arc<AsyncMutex<OutputState>>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum BridgeEndpoint {
Caller,
Callee,
}
struct LegStats {
packets: AtomicU64,
bytes: AtomicU64,
lost: AtomicU64,
dropped: AtomicU64,
}
impl LegStats {
fn new() -> Arc<Self> {
Arc::new(Self {
packets: AtomicU64::new(0),
bytes: AtomicU64::new(0),
lost: AtomicU64::new(0),
dropped: AtomicU64::new(0),
})
}
}
struct VideoForwardingTrack {
id: String,
inner: Arc<dyn MediaStreamTrack>,
payload_type: Arc<AtomicU8>,
payload_map: Arc<parking_lot::RwLock<HashMap<u8, u8>>>,
}
#[async_trait::async_trait]
impl MediaStreamTrack for VideoForwardingTrack {
fn id(&self) -> &str {
&self.id
}
fn kind(&self) -> MediaKind {
MediaKind::Video
}
fn state(&self) -> rustrtc::media::track::TrackState {
self.inner.state()
}
async fn recv(&self) -> rustrtc::media::error::MediaResult<MediaSample> {
loop {
let sample = self.inner.recv().await?;
let MediaSample::Video(mut frame) = sample else {
continue;
};
if matches!(frame.payload_type, Some(pt) if pt < 96) {
continue;
}
let target_payload_type = frame
.payload_type
.and_then(|pt| self.payload_map.read().get(&pt).copied())
.unwrap_or_else(|| self.payload_type.load(Ordering::Relaxed));
frame.payload_type = Some(target_payload_type);
frame.header_extension = None;
frame.raw_packet = None;
frame.csrcs.clear();
return Ok(MediaSample::Video(frame));
}
}
async fn request_key_frame(&self) -> rustrtc::media::error::MediaResult<()> {
self.inner.request_key_frame().await
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum LegTransport {
Caller,
Callee,
}
impl LegTransport {
const fn as_str(self) -> &'static str {
match self {
Self::Caller => "Caller",
Self::Callee => "Callee",
}
}
const fn endpoint(self) -> BridgeEndpoint {
match self {
Self::Caller => BridgeEndpoint::Caller,
Self::Callee => BridgeEndpoint::Callee,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct ForwardPath {
from: LegTransport,
to: LegTransport,
}
impl ForwardPath {
const fn new(from: LegTransport, to: LegTransport) -> Self {
Self { from, to }
}
const fn source_endpoint(self) -> BridgeEndpoint {
self.from.endpoint()
}
const fn should_strip_caller_audio_metadata(self) -> bool {
matches!(
(self.from, self.to),
(LegTransport::Caller, LegTransport::Callee)
)
}
}
impl std::fmt::Display for ForwardPath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}→{}", self.from.as_str(), self.to.as_str())
}
}
type DtmfHandler = Arc<dyn Fn(char) + Send + Sync + 'static>;
#[derive(Clone)]
struct BridgeDtmfSink {
endpoint: BridgeEndpoint,
payload_types: Vec<u8>,
handler: DtmfHandler,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct BridgePayloadMapping {
source_pt: u8,
target_pt: u8,
source_clock_rate: u32,
target_clock_rate: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct BridgeDtmfEventKey {
digit_code: u8,
rtp_timestamp: u32,
}
#[derive(Debug, Default)]
struct BridgeDtmfDetector {
last_event: Option<BridgeDtmfEventKey>,
}
impl BridgeDtmfDetector {
fn observe(&mut self, payload: &[u8], rtp_timestamp: u32) -> Option<char> {
if payload.len() < 4 {
return None;
}
let digit_code = payload[0];
let digit = crate::media::telephone_event::dtmf_code_to_char(digit_code)?;
let event = BridgeDtmfEventKey {
digit_code,
rtp_timestamp,
};
if self.last_event == Some(event) {
return None;
}
self.last_event = Some(event);
Some(digit)
}
}
fn output_mode_name(mode: u8) -> &'static str {
match mode {
BRIDGE_OUTPUT_PEER => "peer",
BRIDGE_OUTPUT_FILE => "file",
BRIDGE_OUTPUT_MUTED => "muted",
_ => "unknown",
}
}
fn frame_ticks_20ms(clock_rate: u32) -> u32 {
(clock_rate / 50).max(1)
}
fn scale_rtp_timestamp(rtp_timestamp: u32, source_rate: u32, target_rate: u32) -> u32 {
if source_rate == target_rate {
rtp_timestamp
} else {
(rtp_timestamp as u64 * target_rate as u64 / source_rate as u64) as u32
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BridgeSide {
Caller,
Callee,
}
pub struct BridgePeer {
id: String,
caller_pc: PeerConnection,
callee_pc: PeerConnection,
bridge_tasks: AsyncMutex<Vec<tokio::task::JoinHandle<()>>>,
sub_tasks: Arc<AsyncMutex<Vec<tokio::task::JoinHandle<()>>>>,
cancel_token: CancellationToken,
forwarding_started: AtomicBool,
caller_gate: Arc<AtomicBool>,
recorder: Option<Arc<parking_lot::RwLock<Option<Recorder>>>>,
recording_paused: Arc<AtomicBool>,
sipflow_tx: Option<mpsc::Sender<(RecLeg, MediaSample, u64)>>,
receive_clock: ReceiveTimestampClock,
dtmf_sink: Arc<parking_lot::RwLock<Option<BridgeDtmfSink>>>,
caller_send: Arc<AsyncMutex<Option<MediaSender>>>,
callee_send: Arc<AsyncMutex<Option<MediaSender>>>,
caller_output_state: Arc<AsyncMutex<OutputState>>,
callee_output_state: Arc<AsyncMutex<OutputState>>,
caller_output_mode: Arc<AtomicU8>,
callee_output_mode: Arc<AtomicU8>,
caller_video_send: Arc<AsyncMutex<Option<MediaSender>>>,
callee_video_send: Arc<AsyncMutex<Option<MediaSender>>>,
caller_track: AsyncMutex<Option<Arc<SampleStreamTrack>>>,
callee_track: AsyncMutex<Option<Arc<SampleStreamTrack>>>,
caller_sender_codec: Option<RtpCodecParameters>,
callee_sender_codec: Option<RtpCodecParameters>,
caller_video_codec: Option<RtpCodecParameters>,
callee_video_codec: Option<RtpCodecParameters>,
caller_video_sender: AsyncMutex<Option<Arc<RtpSender>>>,
callee_video_sender: AsyncMutex<Option<Arc<RtpSender>>>,
caller_video_payload_type: Arc<AtomicU8>,
callee_video_payload_type: Arc<AtomicU8>,
caller_video_payload_map: Arc<parking_lot::RwLock<HashMap<u8, u8>>>,
callee_video_payload_map: Arc<parking_lot::RwLock<HashMap<u8, u8>>>,
caller_to_callee_stats: Arc<LegStats>,
callee_to_caller_stats: Arc<LegStats>,
peers: Arc<AsyncMutex<std::collections::HashMap<PeerId, PeerEntry>>>,
routes: Arc<AsyncMutex<std::collections::HashMap<PeerId, std::collections::HashSet<PeerId>>>>,
callee_to_caller_transcoder: Arc<parking_lot::Mutex<Option<Transcoder>>>,
callee_to_caller_timing: Arc<parking_lot::Mutex<Option<RtpTiming>>>,
caller_to_callee_transcoder: Arc<parking_lot::Mutex<Option<Transcoder>>>,
caller_to_callee_timing: Arc<parking_lot::Mutex<Option<RtpTiming>>>,
callee_to_caller_dtmf_mapping: Arc<parking_lot::RwLock<Option<BridgePayloadMapping>>>,
caller_to_callee_dtmf_mapping: Arc<parking_lot::RwLock<Option<BridgePayloadMapping>>>,
rtp_timeout: Option<std::time::Duration>,
rtp_timeout_tx: Option<mpsc::Sender<String>>,
}
impl BridgePeer {
pub fn new(id: String, caller_pc: PeerConnection, callee_pc: PeerConnection) -> Self {
Self {
id,
caller_pc,
callee_pc,
bridge_tasks: AsyncMutex::new(Vec::new()),
sub_tasks: Arc::new(AsyncMutex::new(Vec::new())),
cancel_token: CancellationToken::new(),
forwarding_started: AtomicBool::new(false),
caller_gate: Arc::new(AtomicBool::new(false)),
recorder: None,
recording_paused: Arc::new(AtomicBool::new(false)),
sipflow_tx: None,
receive_clock: ReceiveTimestampClock::new(),
dtmf_sink: Arc::new(parking_lot::RwLock::new(None)),
caller_send: Arc::new(AsyncMutex::new(None)),
callee_send: Arc::new(AsyncMutex::new(None)),
caller_output_state: Arc::new(AsyncMutex::new(OutputState {
mode: BRIDGE_OUTPUT_PEER,
file_source: None,
next_rtp_timestamp: None,
next_sequence_number: None,
active_rtp_offset: None,
active_seq_offset: None,
})),
callee_output_state: Arc::new(AsyncMutex::new(OutputState {
mode: BRIDGE_OUTPUT_PEER,
file_source: None,
next_rtp_timestamp: None,
next_sequence_number: None,
active_rtp_offset: None,
active_seq_offset: None,
})),
caller_output_mode: Arc::new(AtomicU8::new(BRIDGE_OUTPUT_PEER)),
callee_output_mode: Arc::new(AtomicU8::new(BRIDGE_OUTPUT_PEER)),
caller_video_send: Arc::new(AsyncMutex::new(None)),
callee_video_send: Arc::new(AsyncMutex::new(None)),
caller_track: AsyncMutex::new(None),
callee_track: AsyncMutex::new(None),
caller_sender_codec: None,
callee_sender_codec: None,
caller_video_codec: None,
callee_video_codec: None,
caller_video_sender: AsyncMutex::new(None),
callee_video_sender: AsyncMutex::new(None),
caller_video_payload_type: Arc::new(AtomicU8::new(96)),
callee_video_payload_type: Arc::new(AtomicU8::new(96)),
caller_video_payload_map: Arc::new(parking_lot::RwLock::new(HashMap::new())),
callee_video_payload_map: Arc::new(parking_lot::RwLock::new(HashMap::new())),
caller_to_callee_stats: LegStats::new(),
callee_to_caller_stats: LegStats::new(),
peers: Arc::new(AsyncMutex::new(std::collections::HashMap::new())),
routes: Arc::new(AsyncMutex::new(std::collections::HashMap::new())),
callee_to_caller_transcoder: Arc::new(parking_lot::Mutex::new(None)),
callee_to_caller_timing: Arc::new(parking_lot::Mutex::new(None)),
caller_to_callee_transcoder: Arc::new(parking_lot::Mutex::new(None)),
caller_to_callee_timing: Arc::new(parking_lot::Mutex::new(None)),
callee_to_caller_dtmf_mapping: Arc::new(parking_lot::RwLock::new(None)),
caller_to_callee_dtmf_mapping: Arc::new(parking_lot::RwLock::new(None)),
rtp_timeout: None,
rtp_timeout_tx: None,
}
}
pub fn open_caller_gate(&self) {
self.caller_gate.store(true, Ordering::Release);
}
pub async fn setup_bridge_with_codecs(
&self,
caller_params: RtpCodecParameters,
callee_params: RtpCodecParameters,
) -> Result<()> {
let (caller_tx, caller_track, _) =
rustrtc::media::track::sample_track(MediaKind::Audio, 100);
*self.caller_track.lock().await = Some(caller_track.clone());
let _ = self.caller_pc().add_track(caller_track, caller_params);
*self.caller_send.lock().await = Some(caller_tx);
let (callee_tx, callee_track, _) =
rustrtc::media::track::sample_track(MediaKind::Audio, 100);
*self.callee_track.lock().await = Some(callee_track.clone());
let _ = self.callee_pc().add_track(callee_track, callee_params);
*self.callee_send.lock().await = Some(callee_tx);
let mut tasks = self.bridge_tasks.lock().await;
tasks.push(Self::spawn_file_output_clock(
self.id.clone(),
BridgeEndpoint::Caller,
self.caller_send.lock().await.clone(),
Arc::clone(&self.caller_output_state),
self.cancel_token.clone(),
));
tasks.push(Self::spawn_file_output_clock(
self.id.clone(),
BridgeEndpoint::Callee,
self.callee_send.lock().await.clone(),
Arc::clone(&self.callee_output_state),
self.cancel_token.clone(),
));
drop(tasks);
if let Some(ref caller_video_params) = self.caller_video_codec {
self.caller_video_payload_type
.store(caller_video_params.payload_type, Ordering::Relaxed);
let (caller_video_tx, caller_video_track, _) =
rustrtc::media::track::sample_track(MediaKind::Video, 100);
if let Ok(sender) = self
.caller_pc()
.add_track(caller_video_track, caller_video_params.clone())
{
*self.caller_video_sender.lock().await = Some(sender);
}
*self.caller_video_send.lock().await = Some(caller_video_tx);
debug!(bridge_id = %self.id, pt = caller_video_params.payload_type, clock_rate = caller_video_params.clock_rate, "Caller video sender setup complete");
} else {
debug!(bridge_id = %self.id, "Caller video sender NOT configured (no video codec)");
}
if let Some(ref callee_video_params) = self.callee_video_codec {
self.callee_video_payload_type
.store(callee_video_params.payload_type, Ordering::Relaxed);
let (callee_video_tx, callee_video_track, _) =
rustrtc::media::track::sample_track(MediaKind::Video, 100);
if let Ok(sender) = self
.callee_pc()
.add_track(callee_video_track, callee_video_params.clone())
{
*self.callee_video_sender.lock().await = Some(sender);
}
*self.callee_video_send.lock().await = Some(callee_video_tx);
debug!(bridge_id = %self.id, pt = callee_video_params.payload_type, clock_rate = callee_video_params.clock_rate, "Callee video sender setup complete");
}
Ok(())
}
pub async fn add_video_track(&self, pt: u8, clock_rate: u32) -> Result<()> {
let params = RtpCodecParameters {
payload_type: pt,
clock_rate,
channels: 0,
};
self.caller_video_payload_type
.store(params.payload_type, Ordering::Relaxed);
self.callee_video_payload_type
.store(params.payload_type, Ordering::Relaxed);
if self.caller_video_send.lock().await.is_none() {
let (tx, track, _) = rustrtc::media::track::sample_track(MediaKind::Video, 100);
let sender = self.caller_pc().add_track(track, params.clone())?;
*self.caller_video_sender.lock().await = Some(sender);
*self.caller_video_send.lock().await = Some(tx);
}
if self.callee_video_send.lock().await.is_none() {
let (tx, track, _) = rustrtc::media::track::sample_track(MediaKind::Video, 100);
let sender = self.callee_pc().add_track(track, params)?;
*self.callee_video_sender.lock().await = Some(sender);
*self.callee_video_send.lock().await = Some(tx);
}
debug!(
bridge_id = %self.id,
pt = pt,
clock_rate = clock_rate,
"Video tracks added dynamically to both bridge sides"
);
Ok(())
}
pub async fn has_video(&self) -> bool {
self.caller_video_send.lock().await.is_some()
&& self.callee_video_send.lock().await.is_some()
}
pub fn set_video_payload_types(
&self,
caller_params: Option<RtpCodecParameters>,
callee_params: Option<RtpCodecParameters>,
) {
if let Some(params) = caller_params.as_ref() {
self.caller_video_payload_type
.store(params.payload_type, Ordering::Relaxed);
debug!(
bridge_id = %self.id,
side = "caller",
pt = params.payload_type,
clock_rate = params.clock_rate,
"Video pass-through payload type updated"
);
}
if let Some(params) = callee_params.as_ref() {
self.callee_video_payload_type
.store(params.payload_type, Ordering::Relaxed);
debug!(
bridge_id = %self.id,
side = "callee",
pt = params.payload_type,
clock_rate = params.clock_rate,
"Video pass-through payload type updated"
);
}
}
fn video_payload_map_by_codec(
source_caps: &[rustrtc::VideoCapability],
target_caps: &[rustrtc::VideoCapability],
) -> HashMap<u8, u8> {
let mut map = HashMap::new();
for source in source_caps {
if let Some(target) = target_caps.iter().find(|target| {
source.codec_name.eq_ignore_ascii_case(&target.codec_name)
&& source.clock_rate == target.clock_rate
}) {
map.entry(source.payload_type)
.or_insert(target.payload_type);
}
}
map
}
pub fn set_video_payload_maps(
&self,
webrtc_caps: &[rustrtc::VideoCapability],
rtp_caps: &[rustrtc::VideoCapability],
) {
let rtp_to_webrtc = Self::video_payload_map_by_codec(rtp_caps, webrtc_caps);
let webrtc_to_rtp = Self::video_payload_map_by_codec(webrtc_caps, rtp_caps);
let caller_fallback = rtp_caps
.iter()
.find_map(|cap| rtp_to_webrtc.get(&cap.payload_type).copied());
let callee_fallback = webrtc_caps
.iter()
.find_map(|cap| webrtc_to_rtp.get(&cap.payload_type).copied());
if let Some(pt) = caller_fallback {
self.caller_video_payload_type.store(pt, Ordering::Relaxed);
}
if let Some(pt) = callee_fallback {
self.callee_video_payload_type.store(pt, Ordering::Relaxed);
}
*self.caller_video_payload_map.write() = rtp_to_webrtc;
*self.callee_video_payload_map.write() = webrtc_to_rtp;
let caller_map = self.caller_video_payload_map.read().clone();
let callee_map = self.callee_video_payload_map.read().clone();
debug!(
bridge_id = %self.id,
rtp_to_webrtc = ?caller_map,
webrtc_to_rtp = ?callee_map,
caller_fallback_pt = ?caller_fallback,
callee_fallback_pt = ?callee_fallback,
"Video pass-through payload maps updated"
);
}
pub async fn setup_bridge(&self) -> Result<()> {
let caller_params = self
.caller_sender_codec
.clone()
.unwrap_or(RtpCodecParameters {
payload_type: 111,
clock_rate: 48000,
channels: 2,
});
let callee_params = self
.callee_sender_codec
.clone()
.unwrap_or(RtpCodecParameters {
payload_type: 0,
clock_rate: 8000,
channels: 1,
});
self.setup_bridge_with_codecs(caller_params, callee_params)
.await
}
pub async fn start_bridge(&self) {
if self.forwarding_started.swap(true, Ordering::AcqRel) {
debug!(bridge_id = %self.id, "Media bridge forwarding already started");
return;
}
let bidirectional_task = self.spawn_bidirectional_forwarder();
let stats_task = {
let w2r = Arc::clone(&self.caller_to_callee_stats);
let r2w = Arc::clone(&self.callee_to_caller_stats);
let bridge_id = self.id.clone();
let cancel = self.cancel_token.clone();
let caller_pc = self.caller_pc.clone();
let callee_pc = self.callee_pc.clone();
let rtp_timeout = self.rtp_timeout;
let rtp_timeout_tx = self.rtp_timeout_tx.clone();
crate::utils::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval.tick().await;
let (mut prev_w_pkts, mut prev_w_lost, mut prev_w_bytes) = (0u64, 0u64, 0u64);
let (mut prev_r_pkts, mut prev_r_lost, mut prev_r_bytes) = (0u64, 0u64, 0u64);
let mut caller_silence_start: Option<std::time::Instant> = None;
let mut callee_silence_start: Option<std::time::Instant> = None;
let mut rtp_timeout_fired = false;
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = interval.tick() => {
let w_pkts = w2r.packets.load(Ordering::Relaxed);
let w_bytes = w2r.bytes.load(Ordering::Relaxed);
let w_lost = w2r.lost.load(Ordering::Relaxed);
let w_drop = w2r.dropped.load(Ordering::Relaxed);
let r_pkts = r2w.packets.load(Ordering::Relaxed);
let r_bytes = r2w.bytes.load(Ordering::Relaxed);
let r_lost = r2w.lost.load(Ordering::Relaxed);
let r_drop = r2w.dropped.load(Ordering::Relaxed);
let dw_pkts = w_pkts.saturating_sub(prev_w_pkts);
let dw_bytes = w_bytes.saturating_sub(prev_w_bytes);
let dw_lost = w_lost.saturating_sub(prev_w_lost);
let dr_pkts = r_pkts.saturating_sub(prev_r_pkts);
let dr_bytes = r_bytes.saturating_sub(prev_r_bytes);
let dr_lost = r_lost.saturating_sub(prev_r_lost);
let w_loss_pct = if dw_pkts + dw_lost > 0 {
dw_lost as f64 / (dw_pkts + dw_lost) as f64 * 100.0
} else { 0.0 };
let r_loss_pct = if dr_pkts + dr_lost > 0 {
dr_lost as f64 / (dr_pkts + dr_lost) as f64 * 100.0
} else { 0.0 };
debug!(
bridge_id = %bridge_id,
caller_to_callee_pps = dw_pkts,
caller_to_callee_kbps = dw_bytes * 8 / 5 / 1000,
caller_to_callee_loss = format!("{:.2}%", w_loss_pct),
caller_to_callee_drop = w_drop,
callee_to_caller_pps = dr_pkts,
callee_to_caller_kbps = dr_bytes * 8 / 5 / 1000,
callee_to_caller_loss = format!("{:.2}%", r_loss_pct),
callee_to_caller_drop = r_drop,
"Bridge leg stats [5s]"
);
if !rtp_timeout_fired
&& let Some(timeout) = rtp_timeout
{
if dw_pkts == 0 {
caller_silence_start.get_or_insert(std::time::Instant::now());
} else {
caller_silence_start = None;
}
if dr_pkts == 0 {
callee_silence_start.get_or_insert(std::time::Instant::now());
} else {
callee_silence_start = None;
}
if caller_silence_start.is_some_and(|s| s.elapsed() >= timeout) {
warn!(bridge_id = %bridge_id, ?timeout, "RTP timeout: caller side silent");
if let Some(ref tx) = rtp_timeout_tx {
let _ = tx.try_send("caller_silent".to_string());
}
rtp_timeout_fired = true;
} else if callee_silence_start.is_some_and(|s| s.elapsed() >= timeout) {
warn!(bridge_id = %bridge_id, ?timeout, "RTP timeout: callee side silent");
if let Some(ref tx) = rtp_timeout_tx {
let _ = tx.try_send("callee_silent".to_string());
}
rtp_timeout_fired = true;
}
}
(prev_w_pkts, prev_w_lost, prev_w_bytes) = (w_pkts, w_lost, w_bytes);
(prev_r_pkts, prev_r_lost, prev_r_bytes) = (r_pkts, r_lost, r_bytes);
}
}
}
})
};
let mut tasks = self.bridge_tasks.lock().await;
tasks.push(bidirectional_task);
tasks.push(stats_task);
tasks.push(self.spawn_peer_forward_loops());
}
pub async fn get_caller_sender(&self) -> Option<MediaSender> {
self.caller_send.lock().await.clone()
}
pub async fn get_callee_sender(&self) -> Option<MediaSender> {
self.callee_send.lock().await.clone()
}
pub fn set_dtmf_sink(
&self,
endpoint: BridgeEndpoint,
mut payload_types: Vec<u8>,
handler: Arc<dyn Fn(char) + Send + Sync + 'static>,
) {
payload_types.sort_unstable();
payload_types.dedup();
if payload_types.is_empty() {
warn!(
bridge_id = %self.id,
endpoint = ?endpoint,
"Bridge DTMF sink install skipped: no payload types provided"
);
return;
}
let mut sink = self.dtmf_sink.write();
*sink = Some(BridgeDtmfSink {
endpoint,
payload_types: payload_types.clone(),
handler,
});
debug!(
bridge_id = %self.id,
endpoint = ?endpoint,
payload_types = ?payload_types,
"Bridge DTMF sink installed"
);
}
pub async fn replace_output_with_file(
&self,
endpoint: BridgeEndpoint,
track: &crate::media::FileTrack,
) -> Result<()> {
match endpoint {
BridgeEndpoint::Caller => self.get_caller_sender().await,
BridgeEndpoint::Callee => self.get_callee_sender().await,
}
.ok_or_else(|| anyhow::anyhow!("bridge {:?} output sender is not ready", endpoint))?;
let source = track.create_playback_source().await?;
let mut state = self.output_state(endpoint).lock().await;
let old_source = state.file_source.replace(source);
state.mode = BRIDGE_OUTPUT_FILE;
state.active_rtp_offset = None;
state.active_seq_offset = None;
self.output_mode(endpoint)
.store(BRIDGE_OUTPUT_FILE, Ordering::Release);
drop(state);
drop(old_source);
info!(
bridge_id = %self.id,
endpoint = ?endpoint,
"Bridge output replaced with file source"
);
Ok(())
}
pub async fn replace_output_with_silence(
&self,
endpoint: BridgeEndpoint,
codec_info: crate::media::negotiate::CodecInfo,
) -> Result<()> {
match endpoint {
BridgeEndpoint::Caller => self.get_caller_sender().await,
BridgeEndpoint::Callee => self.get_callee_sender().await,
}
.ok_or_else(|| anyhow::anyhow!("bridge {:?} output sender is not ready", endpoint))?;
let track = crate::media::FileTrack::new(format!("{}-{:?}-silence", self.id, endpoint))
.with_loop(true)
.with_codec_info(codec_info);
let source = track.create_playback_source().await?;
let mut state = self.output_state(endpoint).lock().await;
let old_source = state.file_source.replace(source);
state.mode = BRIDGE_OUTPUT_FILE;
state.active_rtp_offset = None;
state.active_seq_offset = None;
self.output_mode(endpoint)
.store(BRIDGE_OUTPUT_FILE, Ordering::Release);
drop(state);
drop(old_source);
info!(
bridge_id = %self.id,
endpoint = ?endpoint,
"Bridge output replaced with silence source"
);
Ok(())
}
pub async fn replace_output_with_peer(&self, endpoint: BridgeEndpoint) {
let mut state = self.output_state(endpoint).lock().await;
state.mode = BRIDGE_OUTPUT_PEER;
let old_source = state.file_source.take();
self.output_mode(endpoint)
.store(BRIDGE_OUTPUT_PEER, Ordering::Release);
drop(state);
drop(old_source);
info!(
bridge_id = %self.id,
endpoint = ?endpoint,
"Bridge output replaced with peer source"
);
}
pub async fn mute_output(&self, endpoint: BridgeEndpoint) {
let mut state = self.output_state(endpoint).lock().await;
state.mode = BRIDGE_OUTPUT_MUTED;
let old_source = state.file_source.take();
self.output_mode(endpoint)
.store(BRIDGE_OUTPUT_MUTED, Ordering::Release);
drop(state);
drop(old_source);
info!(
bridge_id = %self.id,
endpoint = ?endpoint,
"Bridge output muted"
);
}
pub async fn add_peer(&self, peer_id: PeerId, entry: PeerEntry) -> Result<()> {
let mut peers = self.peers.lock().await;
if peers.contains_key(&peer_id) {
return Err(anyhow::anyhow!(
"Peer {} already exists in bridge {}",
peer_id,
self.id
));
}
peers.insert(peer_id, entry);
Ok(())
}
pub async fn remove_peer(&self, peer_id: &PeerId) -> Option<PeerEntry> {
let mut peers = self.peers.lock().await;
let mut routes = self.routes.lock().await;
routes.remove(peer_id);
for dests in routes.values_mut() {
dests.remove(peer_id);
}
peers.remove(peer_id)
}
pub async fn set_route(
&self,
from_peer: PeerId,
to_peers: std::collections::HashSet<PeerId>,
) -> Result<()> {
let peers = self.peers.lock().await;
for dest in &to_peers {
if !peers.contains_key(dest) {
return Err(anyhow::anyhow!("Route destination peer {} not found", dest));
}
}
drop(peers);
let mut routes = self.routes.lock().await;
routes.insert(from_peer, to_peers);
Ok(())
}
pub fn set_transcoder(
&self,
from_endpoint: BridgeEndpoint,
source: audio_codec::CodecType,
target: audio_codec::CodecType,
target_pt: u8,
) {
let (transcoder_slot, timing_slot) = match from_endpoint {
BridgeEndpoint::Callee => (
&self.callee_to_caller_transcoder,
&self.callee_to_caller_timing,
),
BridgeEndpoint::Caller => (
&self.caller_to_callee_transcoder,
&self.caller_to_callee_timing,
),
};
let source_cr = source.clock_rate();
let target_cr = target.clock_rate();
*transcoder_slot.lock() = Some(Transcoder::new(source, target, target_pt));
if source_cr != target_cr {
*timing_slot.lock() = Some(RtpTiming::default());
} else {
timing_slot.lock().take();
}
info!(
bridge_id = %self.id,
from = ?from_endpoint,
?source,
?target,
"Bridge transcoder configured"
);
}
pub fn clear_transcoder(&self, from_endpoint: BridgeEndpoint) {
let (transcoder_slot, timing_slot) = match from_endpoint {
BridgeEndpoint::Callee => (
&self.callee_to_caller_transcoder,
&self.callee_to_caller_timing,
),
BridgeEndpoint::Caller => (
&self.caller_to_callee_transcoder,
&self.caller_to_callee_timing,
),
};
*transcoder_slot.lock() = None;
*timing_slot.lock() = None;
}
pub fn set_dtmf_mapping(
&self,
from_endpoint: BridgeEndpoint,
source_pt: u8,
source_clock_rate: u32,
target_pt: u8,
target_clock_rate: u32,
) {
let mapping_slot = match from_endpoint {
BridgeEndpoint::Callee => &self.callee_to_caller_dtmf_mapping,
BridgeEndpoint::Caller => &self.caller_to_callee_dtmf_mapping,
};
*mapping_slot.write() = Some(BridgePayloadMapping {
source_pt,
target_pt,
source_clock_rate,
target_clock_rate,
});
info!(
bridge_id = %self.id,
from = ?from_endpoint,
source_pt,
source_clock_rate,
target_pt,
target_clock_rate,
"Bridge DTMF mapping configured"
);
}
pub fn clear_dtmf_mapping(&self, from_endpoint: BridgeEndpoint) {
let mapping_slot = match from_endpoint {
BridgeEndpoint::Callee => &self.callee_to_caller_dtmf_mapping,
BridgeEndpoint::Caller => &self.caller_to_callee_dtmf_mapping,
};
mapping_slot.write().take();
}
fn output_state(&self, endpoint: BridgeEndpoint) -> &Arc<AsyncMutex<OutputState>> {
match endpoint {
BridgeEndpoint::Caller => &self.caller_output_state,
BridgeEndpoint::Callee => &self.callee_output_state,
}
}
fn output_mode(&self, endpoint: BridgeEndpoint) -> &Arc<AtomicU8> {
match endpoint {
BridgeEndpoint::Caller => &self.caller_output_mode,
BridgeEndpoint::Callee => &self.callee_output_mode,
}
}
fn spawn_file_output_clock(
bridge_id: String,
endpoint: BridgeEndpoint,
sender: Option<MediaSender>,
output_state: Arc<AsyncMutex<OutputState>>,
cancel_token: CancellationToken,
) -> tokio::task::JoinHandle<()> {
crate::utils::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(20));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
break;
}
_ = interval.tick() => {
let sample = {
let mut guard = output_state.lock().await;
if guard.mode != BRIDGE_OUTPUT_FILE {
continue;
}
let Some(source) = guard.file_source.as_mut() else {
guard.mode = BRIDGE_OUTPUT_MUTED;
continue;
};
source.next_audio_sample()
};
let Some(mut sample) = sample else {
let mut guard = output_state.lock().await;
guard.mode = BRIDGE_OUTPUT_MUTED;
guard.file_source.take();
guard.active_rtp_offset = None;
guard.active_seq_offset = None;
debug!(
bridge_id = %bridge_id,
endpoint = ?endpoint,
"Bridge file output completed"
);
continue;
};
if let MediaSample::Audio(frame) = &mut sample {
let mut guard = output_state.lock().await;
let src_seq = frame.sequence_number.unwrap_or_default();
let src_ts = frame.rtp_timestamp;
let seq_offset = match guard.active_seq_offset {
Some(offset) => offset,
None => {
let expected = guard.next_sequence_number.unwrap_or(src_seq);
let offset = expected.wrapping_sub(src_seq);
guard.active_seq_offset = Some(offset);
offset
}
};
let ts_offset = match guard.active_rtp_offset {
Some(offset) => offset,
None => {
let expected = guard.next_rtp_timestamp.unwrap_or(src_ts);
let offset = expected.wrapping_sub(src_ts);
guard.active_rtp_offset = Some(offset);
offset
}
};
let mapped_seq = src_seq.wrapping_add(seq_offset);
let mapped_ts = src_ts.wrapping_add(ts_offset);
frame.sequence_number = Some(mapped_seq);
frame.rtp_timestamp = mapped_ts;
guard.next_sequence_number = Some(mapped_seq.wrapping_add(1));
guard.next_rtp_timestamp =
Some(mapped_ts.wrapping_add(frame_ticks_20ms(frame.clock_rate)));
}
let Some(sender) = sender.as_ref() else {
warn!(
bridge_id = %bridge_id,
endpoint = ?endpoint,
"Bridge file output sender is unavailable"
);
break;
};
match sender.try_send(sample.clone()) {
Ok(()) => {}
Err(MediaError::WouldBlock) => {
if let Err(error) = sender.send(sample).await {
warn!(
bridge_id = %bridge_id,
endpoint = ?endpoint,
error = %error,
"Bridge file output send failed"
);
break;
}
}
Err(error) => {
warn!(
bridge_id = %bridge_id,
endpoint = ?endpoint,
error = ?error,
"Bridge file output send failed"
);
break;
}
}
}
}
}
})
}
fn spawn_peer_forward_loops(&self) -> tokio::task::JoinHandle<()> {
let peers_map = self.peers.clone();
let routes_map = self.routes.clone();
let cancel_token = self.cancel_token.clone();
let bridge_id = self.id.clone();
let dtmf_sink = Arc::clone(&self.dtmf_sink);
let recorder = self.recorder.clone();
let _recording_paused = self.recording_paused.clone();
crate::utils::spawn(async move {
let peers = peers_map.lock().await;
let extra_peers: Vec<(PeerId, PeerConnection)> = peers
.iter()
.filter(|(id, _)| *id != "webrtc" && *id != "rtp")
.map(|(id, entry)| (id.clone(), entry.pc.clone()))
.collect();
drop(peers);
for (peer_id, pc) in extra_peers {
let routes = routes_map.clone();
let _dtmf = Arc::clone(&dtmf_sink);
let _rec = recorder.clone();
let cancel = cancel_token.child_token();
let pid = peer_id.clone();
let bid = bridge_id.clone();
let peers_ref = peers_map.clone();
crate::utils::spawn(async move {
let mut recv = Box::pin(pc.recv());
loop {
tokio::select! {
_ = cancel.cancelled() => break,
event = &mut recv => {
match event {
Some(PeerConnectionEvent::Track(transceiver)) => {
if let Some(receiver) = transceiver.receiver() {
let track = receiver.track();
let _is_video = transceiver.kind() == rustrtc::MediaKind::Video;
let route_dests: Vec<PeerId> = routes.lock().await
.get(&pid)
.cloned()
.unwrap_or_default()
.into_iter().collect();
if !route_dests.is_empty() {
let track_id = track.id().to_string();
let peers_for_forward = peers_ref.clone();
let dests_for_forward = route_dests.clone();
let pid_clone = pid.clone();
let bid_clone = bid.clone();
crate::utils::spawn(async move {
loop {
let sample = match track.recv().await {
Ok(s) => s,
Err(_) => break,
};
let guard = peers_for_forward.lock().await;
for dest in &dests_for_forward {
if let Some(entry) = guard.get(dest) {
if let Some(ref sender) = entry.audio_sender {
let _ = sender.try_send(sample.clone());
}
}
}
drop(guard);
}
debug!(bridge_id = %bid_clone, peer_id = %pid_clone, track = %track_id, "N-peer forward task ended");
});
}
}
recv = Box::pin(pc.recv());
}
Some(_) => {
recv = Box::pin(pc.recv());
}
None => break,
}
}
}
}
debug!(bridge_id = %bid, peer_id = %pid, "N-peer recv loop ended");
});
}
})
}
pub async fn get_caller_track(&self) -> Option<Arc<SampleStreamTrack>> {
self.caller_track.lock().await.clone()
}
pub async fn get_callee_track(&self) -> Option<Arc<SampleStreamTrack>> {
self.callee_track.lock().await.clone()
}
fn spawn_bidirectional_forwarder(&self) -> tokio::task::JoinHandle<()> {
let caller_pc = self.caller_pc.clone();
let callee_pc = self.callee_pc.clone();
let callee_send = Arc::downgrade(&self.callee_send);
let caller_send = Arc::downgrade(&self.caller_send);
let callee_output_mode = Arc::clone(&self.callee_output_mode);
let caller_output_mode = Arc::clone(&self.caller_output_mode);
let cancel_token = self.cancel_token.clone();
let bridge_id = self.id.clone();
let w2r_stats = Arc::clone(&self.caller_to_callee_stats);
let r2w_stats = Arc::clone(&self.callee_to_caller_stats);
let recorder = self.recorder.clone();
let recording_paused = self.recording_paused.clone();
let sipflow_tx = self.sipflow_tx.clone();
let receive_clock = self.receive_clock.clone();
let dtmf_sink = Arc::clone(&self.dtmf_sink);
let caller_to_callee_transcoder = Arc::clone(&self.caller_to_callee_transcoder);
let caller_to_callee_timing = Arc::clone(&self.caller_to_callee_timing);
let callee_to_caller_transcoder = Arc::clone(&self.callee_to_caller_transcoder);
let callee_to_caller_timing = Arc::clone(&self.callee_to_caller_timing);
let caller_to_callee_dtmf_mapping = Arc::clone(&self.caller_to_callee_dtmf_mapping);
let callee_to_caller_dtmf_mapping = Arc::clone(&self.callee_to_caller_dtmf_mapping);
let caller_video_payload_type = Arc::clone(&self.caller_video_payload_type);
let callee_video_payload_type = Arc::clone(&self.callee_video_payload_type);
let caller_video_payload_map = Arc::clone(&self.caller_video_payload_map);
let callee_video_payload_map = Arc::clone(&self.callee_video_payload_map);
let caller_gate = Arc::clone(&self.caller_gate);
let sub_tasks = self.sub_tasks.clone();
crate::utils::spawn(async move {
let mut caller_recv = Box::pin(caller_pc.recv());
let mut callee_recv = Box::pin(callee_pc.recv());
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
debug!(bridge_id = %bridge_id, "Bidirectional forwarder cancelled");
break;
}
event = &mut caller_recv => {
match event {
Some(PeerConnectionEvent::Track(transceiver)) => {
if let Some(receiver) = transceiver.receiver() {
let track = receiver.track();
let is_video = transceiver.kind() == rustrtc::MediaKind::Video;
let payload_types = transceiver
.get_payload_map()
.keys()
.copied()
.collect::<Vec<_>>();
info!(
bridge_id = %bridge_id,
direction = "Caller->Callee",
transceiver_id = transceiver.id(),
kind = ?transceiver.kind(),
mid = ?transceiver.mid(),
receiver_ssrc = receiver.ssrc(),
track_id = %track.id(),
payload_types = ?payload_types,
"Bridge received track event"
);
let sender = if is_video {
let target_transceiver = callee_pc
.get_transceivers()
.into_iter()
.find(|t| t.kind() == rustrtc::MediaKind::Video);
if let Some(target_transceiver) = target_transceiver {
if let Some(existing_sender) = target_transceiver.sender() {
let forwarding_track: Arc<dyn MediaStreamTrack> =
Arc::new(VideoForwardingTrack {
id: format!("{}-caller-to-callee-video", bridge_id),
inner: track.clone(),
payload_type: Arc::clone(&callee_video_payload_type),
payload_map: Arc::clone(&callee_video_payload_map),
});
let mut sender_builder = rustrtc::RtpSender::builder(
forwarding_track,
existing_sender.ssrc(),
)
.stream_id(existing_sender.stream_id().to_string())
.params(existing_sender.params());
let cname_val = existing_sender.cname();
if !cname_val.starts_with("rustrtc-cname-") {
sender_builder = sender_builder.cname(cname_val.to_string());
}
let sender = sender_builder.build();
target_transceiver.set_sender(Some(sender.clone()));
let h = Self::spawn_pli_forwarder(
bridge_id.clone(),
sender,
track.clone(),
cancel_token.clone(),
"Callee PLI -> Caller source",
);
sub_tasks.lock().await.push(h);
if let Err(e) = track.request_key_frame().await {
debug!(
bridge_id = %bridge_id,
source_track = %track.id(),
error = %e,
"Initial Caller video keyframe request failed"
);
}
debug!(
bridge_id = %bridge_id,
source_track = %track.id(),
target_ssrc = existing_sender.ssrc(),
"Wired Caller->Callee video forwarding track"
);
} else {
warn!(bridge_id = %bridge_id, "Callee video transceiver has no sender for forwarding track");
}
} else {
warn!(bridge_id = %bridge_id, "Callee video transceiver not found for forwarding track");
}
caller_recv = Box::pin(caller_pc.recv());
continue;
} else {
callee_send.clone()
};
let h = Self::forward_track_to_sender(
bridge_id.clone(),
track,
sender,
Arc::clone(&callee_output_mode),
cancel_token.clone(),
ForwardPath::new(LegTransport::Caller, LegTransport::Callee),
Arc::clone(&w2r_stats),
if !is_video { recorder.clone() } else { None },
if !is_video { Some(RecLeg::A) } else { None },
if !is_video { sipflow_tx.clone() } else { None },
receive_clock.clone(),
recording_paused.clone(),
Arc::clone(&dtmf_sink),
Some(Arc::clone(&caller_to_callee_transcoder)),
Some(Arc::clone(&caller_to_callee_timing)),
Some(Arc::clone(&caller_to_callee_dtmf_mapping)),
Some(Arc::clone(&caller_gate)),
);
if let Ok(mut st) = sub_tasks.try_lock() {
st.push(h);
}
} else {
warn!(
bridge_id = %bridge_id,
direction = "Caller->Callee",
transceiver_id = transceiver.id(),
kind = ?transceiver.kind(),
mid = ?transceiver.mid(),
"Bridge received track event without receiver"
);
}
caller_recv = Box::pin(caller_pc.recv());
}
Some(_) => {
caller_recv = Box::pin(caller_pc.recv());
}
None => {
debug!(bridge_id = %bridge_id, "Caller PeerConnection closed");
break;
}
}
}
event = &mut callee_recv => {
match event {
Some(PeerConnectionEvent::Track(transceiver)) => {
if let Some(receiver) = transceiver.receiver() {
let track = receiver.track();
let is_video = transceiver.kind() == rustrtc::MediaKind::Video;
let payload_types = transceiver
.get_payload_map()
.keys()
.copied()
.collect::<Vec<_>>();
info!(
bridge_id = %bridge_id,
direction = "Callee->Caller",
transceiver_id = transceiver.id(),
kind = ?transceiver.kind(),
mid = ?transceiver.mid(),
receiver_ssrc = receiver.ssrc(),
track_id = %track.id(),
payload_types = ?payload_types,
"Bridge received track event"
);
let sender = if is_video {
let target_transceiver = caller_pc
.get_transceivers()
.into_iter()
.find(|t| t.kind() == rustrtc::MediaKind::Video);
if let Some(target_transceiver) = target_transceiver {
if let Some(existing_sender) = target_transceiver.sender() {
let forwarding_track: Arc<dyn MediaStreamTrack> =
Arc::new(VideoForwardingTrack {
id: format!("{}-callee-to-caller-video", bridge_id),
inner: track.clone(),
payload_type: Arc::clone(&caller_video_payload_type),
payload_map: Arc::clone(&caller_video_payload_map),
});
let mut sender_builder = rustrtc::RtpSender::builder(
forwarding_track,
existing_sender.ssrc(),
)
.stream_id(existing_sender.stream_id().to_string())
.params(existing_sender.params());
let cname_val = existing_sender.cname();
if !cname_val.starts_with("rustrtc-cname-") {
sender_builder = sender_builder.cname(cname_val.to_string());
}
let sender = sender_builder.build();
target_transceiver.set_sender(Some(sender.clone()));
let h = Self::spawn_pli_forwarder(
bridge_id.clone(),
sender,
track.clone(),
cancel_token.clone(),
"Caller PLI -> Callee source",
);
sub_tasks.lock().await.push(h);
if let Err(e) = track.request_key_frame().await {
debug!(
bridge_id = %bridge_id,
source_track = %track.id(),
error = %e,
"Initial Callee video keyframe request failed"
);
}
debug!(
bridge_id = %bridge_id,
source_track = %track.id(),
target_ssrc = existing_sender.ssrc(),
"Wired Callee->Caller video forwarding track"
);
} else {
warn!(bridge_id = %bridge_id, "Caller video transceiver has no sender for forwarding track");
}
} else {
warn!(bridge_id = %bridge_id, "Caller video transceiver not found for forwarding track");
}
callee_recv = Box::pin(callee_pc.recv());
continue;
} else {
caller_send.clone()
};
let h = Self::forward_track_to_sender(
bridge_id.clone(),
track,
sender,
Arc::clone(&caller_output_mode),
cancel_token.clone(),
ForwardPath::new(LegTransport::Callee, LegTransport::Caller),
Arc::clone(&r2w_stats),
if !is_video { recorder.clone() } else { None },
if !is_video { Some(RecLeg::B) } else { None },
if !is_video { sipflow_tx.clone() } else { None },
receive_clock.clone(),
recording_paused.clone(),
Arc::clone(&dtmf_sink),
Some(Arc::clone(&callee_to_caller_transcoder)),
Some(Arc::clone(&callee_to_caller_timing)),
Some(Arc::clone(&callee_to_caller_dtmf_mapping)),
None, );
if let Ok(mut st) = sub_tasks.try_lock() {
st.push(h);
}
} else {
warn!(
bridge_id = %bridge_id,
direction = "Callee->Caller",
transceiver_id = transceiver.id(),
kind = ?transceiver.kind(),
mid = ?transceiver.mid(),
"Bridge received track event without receiver"
);
}
callee_recv = Box::pin(callee_pc.recv());
}
Some(_) => {
callee_recv = Box::pin(callee_pc.recv());
}
None => {
debug!(bridge_id = %bridge_id, "Callee PeerConnection closed");
break;
}
}
}
}
}
})
}
#[allow(clippy::too_many_arguments)]
async fn run_forward_loop(
bridge_id: String,
track: Arc<dyn MediaStreamTrack>,
sender_weak: std::sync::Weak<AsyncMutex<Option<MediaSender>>>,
output_mode: Arc<AtomicU8>,
cancel_token: CancellationToken,
path: ForwardPath,
leg_stats: Arc<LegStats>,
recorder: Option<Arc<parking_lot::RwLock<Option<Recorder>>>>,
recorder_leg: Option<RecLeg>,
sipflow_tx: Option<mpsc::Sender<(RecLeg, MediaSample, u64)>>,
receive_clock: ReceiveTimestampClock,
recording_paused: Arc<AtomicBool>,
dtmf_sink: Arc<parking_lot::RwLock<Option<BridgeDtmfSink>>>,
transcoder: Option<Arc<parking_lot::Mutex<Option<Transcoder>>>>,
transcoder_timing: Option<Arc<parking_lot::Mutex<Option<RtpTiming>>>>,
dtmf_mapping: Option<Arc<parking_lot::RwLock<Option<BridgePayloadMapping>>>>,
gate: Option<Arc<AtomicBool>>,
) {
let sender = if let Some(strong) = sender_weak.upgrade() {
let guard = strong.lock().await;
guard.clone()
} else {
warn!(bridge_id = %bridge_id, direction = %path, "Sender channel no longer available");
return;
};
if sender.is_none() {
warn!(bridge_id = %bridge_id, direction = %path, "No sender channel available — video/audio sender was not configured");
return;
}
let sender = sender.unwrap();
let is_video = track.kind() == MediaKind::Video;
let mut dtmf_detector = BridgeDtmfDetector::default();
let mut packet_count: u64 = 0;
let mut last_seq: Option<u16> = None;
let mut stats_packets: u64 = 0;
let mut stats_bytes: u64 = 0;
let mut stats_last_pt: Option<u8> = None;
let mut stats_last_port: Option<u16> = None;
let mut stats_interval = tokio::time::interval(std::time::Duration::from_secs(1));
stats_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
'outer: loop {
tokio::select! {
_ = cancel_token.cancelled() => {
break;
}
_ = stats_interval.tick(), if is_video && packet_count > 0 => {
trace!(
bridge_id = %bridge_id,
direction = %path,
packets_per_sec = stats_packets,
bytes_per_sec = stats_bytes,
payload_type = ?stats_last_pt,
remote_port = ?stats_last_port,
total_packets = packet_count,
"Video stats"
);
stats_packets = 0;
stats_bytes = 0;
}
sample_result = track.recv() => {
match sample_result {
Ok(sample) => {
packet_count += 1;
let received_at_micros = receive_clock.now_micros();
if !is_video {
if let (Some(rec), Some(leg)) = (&recorder, recorder_leg)
&& !recording_paused.load(std::sync::atomic::Ordering::Relaxed)
&& let Some(mut guard) = rec.try_write()
&& let Some(r) = guard.as_mut()
{
let _ = r.write_sample(
leg,
&sample,
None,
None,
None::<AudioCodecType>,
);
}
if let (Some(tx), Some(leg)) = (&sipflow_tx, recorder_leg) {
let _ = tx.try_send((leg, sample.clone(), received_at_micros));
}
}
if !is_video {
Self::observe_dtmf_sample(
&dtmf_sink,
path.source_endpoint(),
&sample,
&mut dtmf_detector,
);
}
let mode = output_mode.load(Ordering::Acquire);
if !is_video && mode != BRIDGE_OUTPUT_PEER {
if packet_count == 1 {
debug!(
bridge_id = %bridge_id,
direction = %path,
output_source = output_mode_name(mode),
"Peer media suppressed by bridge output source"
);
}
continue;
}
if let Some(ref g) = gate {
if !g.load(Ordering::Acquire) {
continue; }
}
if packet_count == 1 {
debug!(bridge_id = %bridge_id, direction = %path, kind = ?sample.kind(), "First media sample forwarded");
}
let (sample_bytes, sample_seq) = match &sample {
MediaSample::Audio(a) => (a.data.len() as u64, a.sequence_number),
MediaSample::Video(v) => (v.data.len() as u64, v.sequence_number),
};
leg_stats.packets.fetch_add(1, Ordering::Relaxed);
leg_stats.bytes.fetch_add(sample_bytes, Ordering::Relaxed);
if let Some(seq) = sample_seq {
if let Some(prev) = last_seq {
let gap = seq.wrapping_sub(prev.wrapping_add(1));
if gap > 0 && gap < 512 {
leg_stats.lost.fetch_add(gap as u64, Ordering::Relaxed);
}
}
last_seq = Some(seq);
}
let samples_to_send: Vec<MediaSample> = match sample {
MediaSample::Audio(mut a) => {
if path.should_strip_caller_audio_metadata() {
a.header_extension = None;
a.raw_packet = None;
a.marker = false;
}
let mapped_dtmf = Self::rewrite_dtmf_sample(
&mut a,
dtmf_mapping.as_deref(),
transcoder_timing.as_deref(),
);
let transcoded = transcoder.as_ref().and_then(|tx_arc| {
let mut guard = tx_arc.lock();
let tx = guard.as_mut()?;
let is_dtmf = dtmf_sink
.read()
.as_ref()
.filter(|s| s.endpoint == path.source_endpoint())
.map_or(false, |s| {
a.payload_type
.is_some_and(|pt| s.payload_types.contains(&pt))
});
if mapped_dtmf || is_dtmf {
return None;
}
let frame = AudioFrame {
rtp_timestamp: a.rtp_timestamp,
clock_rate: a.clock_rate,
data: a.data.clone(),
sequence_number: a.sequence_number,
payload_type: a.payload_type,
marker: a.marker,
source_addr: a.source_addr,
..Default::default()
};
let mut output = tx.transcode(&frame);
if let Some(ref timing_arc) = transcoder_timing {
if let Some(ref mut timing) = *timing_arc.lock() {
timing.rewrite(
&mut output,
tx.source_clock_rate(),
tx.target_clock_rate(),
tx.target_pt(),
);
}
}
Some(MediaSample::Audio(output))
});
match transcoded {
Some(ts) => vec![ts],
None => vec![MediaSample::Audio(a)],
}
}
MediaSample::Video(mut v) => {
if is_video {
if matches!(v.payload_type, Some(pt) if pt < 96) {
debug!(
bridge_id = %bridge_id,
direction = %path,
pt = ?v.payload_type,
"Dropping non-video payload type on video track"
);
vec![]
} else {
stats_packets += 1;
stats_bytes += v.data.len() as u64;
stats_last_pt = v.payload_type;
if let Some(addr) = v.source_addr {
stats_last_port = Some(addr.port());
}
v.header_extension = None;
v.raw_packet = None;
v.csrcs.clear();
if matches!(
(path.from, path.to),
(LegTransport::Caller, LegTransport::Callee)
) {
v.sequence_number = None;
}
vec![MediaSample::Video(v)]
}
} else {
v.sequence_number = None;
v.payload_type = None;
vec![MediaSample::Video(v)]
}
}
};
for sample in samples_to_send {
match sender.try_send(sample.clone()) {
Ok(()) => {}
Err(MediaError::WouldBlock) => {
if let Err(e) = sender.send(sample).await {
warn!(bridge_id = %bridge_id, direction = %path, error = %e, "Failed to forward media sample");
break 'outer;
}
}
Err(e) => {
leg_stats.dropped.fetch_add(1, Ordering::Relaxed);
warn!(bridge_id = %bridge_id, direction = %path, error = ?e, "Failed to forward media sample (kind mismatch or closed)");
break 'outer;
}
}
}
}
Err(e) => {
debug!(bridge_id = %bridge_id, direction = %path, error = %e, "Track recv ended");
break;
}
}
}
}
}
}
fn rewrite_dtmf_sample(
frame: &mut AudioFrame,
mapping_slot: Option<&parking_lot::RwLock<Option<BridgePayloadMapping>>>,
timing_slot: Option<&parking_lot::Mutex<Option<RtpTiming>>>,
) -> bool {
let Some(mapping_slot) = mapping_slot else {
return false;
};
let Some(mapping) = mapping_slot.read().clone() else {
return false;
};
if frame.payload_type != Some(mapping.source_pt) {
return false;
}
if mapping.source_clock_rate != mapping.target_clock_rate {
frame.data = rewrite_dtmf_duration(
&frame.data,
mapping.source_clock_rate,
mapping.target_clock_rate,
);
}
let used_shared_timing = if let Some(timing_slot) = timing_slot {
let mut guard = timing_slot.lock();
if let Some(timing) = guard.as_mut() {
timing.rewrite(
frame,
mapping.source_clock_rate,
mapping.target_clock_rate,
mapping.target_pt,
);
true
} else {
false
}
} else {
false
};
if !used_shared_timing {
frame.payload_type = Some(mapping.target_pt);
frame.clock_rate = mapping.target_clock_rate;
if mapping.source_clock_rate != mapping.target_clock_rate {
frame.rtp_timestamp = scale_rtp_timestamp(
frame.rtp_timestamp,
mapping.source_clock_rate,
mapping.target_clock_rate,
);
}
}
true
}
fn observe_dtmf_sample(
dtmf_sink: &Arc<parking_lot::RwLock<Option<BridgeDtmfSink>>>,
endpoint: BridgeEndpoint,
sample: &MediaSample,
detector: &mut BridgeDtmfDetector,
) {
let Some(sink) = dtmf_sink.read().clone() else {
return;
};
if sink.endpoint != endpoint {
return;
}
let MediaSample::Audio(frame) = sample else {
return;
};
let Some(frame_pt) = frame.payload_type else {
return;
};
if !sink.payload_types.contains(&frame_pt) {
return;
}
debug!(
rtp_ts = frame.rtp_timestamp,
data_len = frame.data.len(),
first_byte = frame.data.first().copied().unwrap_or(0),
"DTMF observe: PT matched, calling detector"
);
if let Some(digit) = detector.observe(&frame.data, frame.rtp_timestamp) {
info!(digit = %digit, endpoint = ?sink.endpoint, "DTMF digit detected");
(sink.handler)(digit);
}
}
fn spawn_pli_forwarder(
bridge_id: String,
sender: Arc<RtpSender>,
source_track: Arc<dyn MediaStreamTrack>,
cancel_token: CancellationToken,
label: &'static str,
) -> tokio::task::JoinHandle<()> {
let mut rtcp_rx = sender.subscribe_rtcp();
crate::utils::spawn(async move {
loop {
tokio::select! {
_ = cancel_token.cancelled() => break,
result = rtcp_rx.recv() => {
match result {
Ok(RtcpPacket::PictureLossIndication(_)) | Ok(RtcpPacket::FullIntraRequest(_)) => {
if let Err(e) = source_track.request_key_frame().await {
warn!(bridge_id = %bridge_id, label = %label, error = %e, "PLI forward: request_key_frame failed");
} else {
debug!(bridge_id = %bridge_id, label = %label, "Forwarded PLI to source keyframe request");
}
}
Err(_) => break,
_ => {}
}
}
}
}
})
}
#[allow(clippy::too_many_arguments)]
fn forward_track_to_sender(
bridge_id: String,
track: Arc<dyn MediaStreamTrack>,
sender_weak: std::sync::Weak<AsyncMutex<Option<MediaSender>>>,
output_mode: Arc<AtomicU8>,
cancel_token: CancellationToken,
path: ForwardPath,
leg_stats: Arc<LegStats>,
recorder: Option<Arc<parking_lot::RwLock<Option<Recorder>>>>,
recorder_leg: Option<RecLeg>,
sipflow_tx: Option<mpsc::Sender<(RecLeg, MediaSample, u64)>>,
receive_clock: ReceiveTimestampClock,
recording_paused: Arc<AtomicBool>,
dtmf_sink: Arc<parking_lot::RwLock<Option<BridgeDtmfSink>>>,
transcoder: Option<Arc<parking_lot::Mutex<Option<Transcoder>>>>,
transcoder_timing: Option<Arc<parking_lot::Mutex<Option<RtpTiming>>>>,
dtmf_mapping: Option<Arc<parking_lot::RwLock<Option<BridgePayloadMapping>>>>,
gate: Option<Arc<AtomicBool>>,
) -> tokio::task::JoinHandle<()> {
crate::utils::spawn(async move {
Self::run_forward_loop(
bridge_id,
track,
sender_weak,
output_mode,
cancel_token,
path,
leg_stats,
recorder,
recorder_leg,
sipflow_tx,
receive_clock,
recording_paused,
dtmf_sink,
transcoder,
transcoder_timing,
dtmf_mapping,
gate,
)
.await;
})
}
pub fn caller_pc(&self) -> &PeerConnection {
&self.caller_pc
}
pub fn callee_pc(&self) -> &PeerConnection {
&self.callee_pc
}
pub async fn stop(&self) {
debug!(bridge_id = %self.id, "Stopping bridge");
self.cancel_token.cancel();
self.caller_pc.close();
self.callee_pc.close();
let mut tasks = self.bridge_tasks.lock().await;
for task in tasks.drain(..) {
let _ = task.await;
}
let mut sub = self.sub_tasks.lock().await;
for task in sub.drain(..) {
let _ = task.await;
}
}
fn close_sync(&self) {
self.cancel_token.cancel();
self.caller_pc.close();
self.callee_pc.close();
}
}
impl Drop for BridgePeer {
fn drop(&mut self) {
debug!(bridge_id = %self.id, "BridgePeer dropping, closing PeerConnections");
self.close_sync();
}
}
pub struct BridgePeerBuilder {
bridge_id: String,
caller_config: Option<rustrtc::RtcConfiguration>,
callee_config: Option<rustrtc::RtcConfiguration>,
rtp_port_range: (u16, u16),
enable_latching: bool,
probation_max_packets: Option<u8>,
external_ip: Option<String>,
bind_ip: Option<String>,
caller_audio_capabilities: Option<Vec<rustrtc::config::AudioCapability>>,
callee_audio_capabilities: Option<Vec<rustrtc::config::AudioCapability>>,
caller_video_capabilities: Option<Vec<rustrtc::config::VideoCapability>>,
callee_video_capabilities: Option<Vec<rustrtc::config::VideoCapability>>,
caller_sender_codec: Option<RtpCodecParameters>,
callee_sender_codec: Option<RtpCodecParameters>,
rtp_sdp_compatibility: rustrtc::config::SdpCompatibilityMode,
ice_servers: Vec<IceServer>,
recorder: Option<Arc<parking_lot::RwLock<Option<Recorder>>>>,
recording_paused: Arc<AtomicBool>,
sipflow_tx: Option<mpsc::Sender<(RecLeg, MediaSample, u64)>>,
cname: Option<String>,
rtp_timeout: Option<std::time::Duration>,
rtp_timeout_tx: Option<mpsc::Sender<String>>,
}
impl BridgePeerBuilder {
pub fn new(bridge_id: String) -> Self {
Self {
bridge_id,
caller_config: None,
callee_config: None,
rtp_port_range: (20000, 30000),
enable_latching: false,
probation_max_packets: None,
external_ip: None,
bind_ip: None,
caller_audio_capabilities: None,
callee_audio_capabilities: None,
caller_video_capabilities: None,
callee_video_capabilities: None,
caller_sender_codec: None,
callee_sender_codec: None,
rtp_sdp_compatibility: rustrtc::config::SdpCompatibilityMode::LegacySip,
ice_servers: Vec::new(),
recorder: None,
recording_paused: Arc::new(AtomicBool::new(false)),
sipflow_tx: None,
cname: None,
rtp_timeout: None,
rtp_timeout_tx: None,
}
}
pub fn with_caller_config(mut self, config: rustrtc::RtcConfiguration) -> Self {
self.caller_config = Some(config);
self
}
pub fn with_callee_config(mut self, config: rustrtc::RtcConfiguration) -> Self {
self.callee_config = Some(config);
self
}
pub fn with_rtp_port_range(mut self, start: u16, end: u16) -> Self {
self.rtp_port_range = (start, end);
self
}
pub fn with_enable_latching(mut self, enable: bool) -> Self {
self.enable_latching = enable;
self
}
pub fn with_probation_max_packets(mut self, max: Option<u8>) -> Self {
self.probation_max_packets = max;
self
}
pub fn with_external_ip(mut self, ip: String) -> Self {
self.external_ip = Some(ip);
self
}
pub fn with_bind_ip(mut self, ip: String) -> Self {
self.bind_ip = Some(ip);
self
}
pub fn with_ice_servers(mut self, servers: Vec<IceServer>) -> Self {
self.ice_servers = servers;
self
}
pub fn with_caller_audio_capabilities(
mut self,
caps: Vec<rustrtc::config::AudioCapability>,
) -> Self {
self.caller_audio_capabilities = Some(caps);
self
}
pub fn with_callee_audio_capabilities(
mut self,
caps: Vec<rustrtc::config::AudioCapability>,
) -> Self {
self.callee_audio_capabilities = Some(caps);
self
}
pub fn with_caller_video_capabilities(
mut self,
caps: Vec<rustrtc::config::VideoCapability>,
) -> Self {
self.caller_video_capabilities = Some(caps);
self
}
pub fn with_callee_video_capabilities(
mut self,
caps: Vec<rustrtc::config::VideoCapability>,
) -> Self {
self.callee_video_capabilities = Some(caps);
self
}
pub fn with_sender_codecs(
mut self,
caller: RtpCodecParameters,
callee: RtpCodecParameters,
) -> Self {
self.caller_sender_codec = Some(caller);
self.callee_sender_codec = Some(callee);
self
}
pub fn with_rtp_sdp_compatibility(
mut self,
mode: rustrtc::config::SdpCompatibilityMode,
) -> Self {
self.rtp_sdp_compatibility = mode;
self
}
pub fn with_recorder(
mut self,
recorder: Arc<parking_lot::RwLock<Option<Recorder>>>,
recording_paused: Arc<AtomicBool>,
) -> Self {
self.recorder = Some(recorder);
self.recording_paused = recording_paused;
self
}
pub fn with_sipflow_capture(
mut self,
sipflow_tx: mpsc::Sender<(RecLeg, MediaSample, u64)>,
) -> Self {
self.sipflow_tx = Some(sipflow_tx);
self
}
pub fn with_cname(mut self, cname: String) -> Self {
self.cname = Some(cname);
self
}
pub fn with_rtp_timeout_notify(
mut self,
tx: mpsc::Sender<String>,
timeout: std::time::Duration,
) -> Self {
self.rtp_timeout_tx = Some(tx);
self.rtp_timeout = Some(timeout);
self
}
fn default_video_capabilities() -> Vec<rustrtc::config::VideoCapability> {
vec![
rustrtc::config::VideoCapability {
payload_type: 96,
codec_name: "H264".to_string(),
clock_rate: 90000,
fmtp: Some("packetization-mode=1".to_string()),
rtcp_fbs: vec![],
},
rustrtc::config::VideoCapability {
payload_type: 97,
codec_name: "VP8".to_string(),
clock_rate: 90000,
fmtp: None,
rtcp_fbs: vec![],
},
]
}
fn resolve_video_caps(
configured: &Option<Vec<rustrtc::config::VideoCapability>>,
) -> Vec<rustrtc::config::VideoCapability> {
configured
.as_ref()
.filter(|c| !c.is_empty())
.cloned()
.unwrap_or_else(Self::default_video_capabilities)
}
pub fn build(self) -> Arc<BridgePeer> {
let caller_video = Self::resolve_video_caps(&self.caller_video_capabilities);
let callee_video = Self::resolve_video_caps(&self.callee_video_capabilities);
let caller_media_caps =
self.caller_audio_capabilities
.map(|audio| rustrtc::config::MediaCapabilities {
audio,
video: caller_video,
application: None,
image: vec![],
});
let caller_config = self
.caller_config
.unwrap_or_else(|| rustrtc::RtcConfiguration {
transport_mode: TransportMode::WebRtc,
external_ip: self.external_ip.clone(),
media_capabilities: caller_media_caps,
ssrc_start: rand::random::<u32>(),
sdp_compatibility: rustrtc::config::SdpCompatibilityMode::Standard,
ice_servers: self.ice_servers,
cname: self.cname.clone(),
..Default::default()
});
let callee_media_caps =
self.callee_audio_capabilities
.map(|audio| rustrtc::config::MediaCapabilities {
audio,
video: callee_video,
application: None,
image: vec![],
});
let callee_config = self
.callee_config
.unwrap_or_else(|| rustrtc::RtcConfiguration {
transport_mode: TransportMode::Rtp,
rtp_start_port: Some(self.rtp_port_range.0),
rtp_end_port: Some(self.rtp_port_range.1),
enable_latching: self.enable_latching,
probation_max_packets: self.probation_max_packets,
external_ip: self.external_ip,
bind_ip: self.bind_ip,
media_capabilities: callee_media_caps,
ssrc_start: rand::random::<u32>(),
sdp_compatibility: self.rtp_sdp_compatibility,
cname: self.cname.clone(),
..Default::default()
});
let mut caller_config = caller_config;
caller_config.label = Some(format!("{}-caller", self.bridge_id));
let mut callee_config = callee_config;
callee_config.label = Some(format!("{}-callee", self.bridge_id));
let caller_pc = PeerConnection::new(caller_config);
let callee_pc = PeerConnection::new(callee_config);
let mut bridge = BridgePeer::new(self.bridge_id, caller_pc, callee_pc);
bridge.caller_sender_codec = self.caller_sender_codec;
bridge.callee_sender_codec = self.callee_sender_codec;
bridge.recorder = self.recorder;
bridge.recording_paused = self.recording_paused;
bridge.sipflow_tx = self.sipflow_tx;
bridge.rtp_timeout = self.rtp_timeout;
bridge.rtp_timeout_tx = self.rtp_timeout_tx;
bridge.caller_video_codec = self
.caller_video_capabilities
.as_ref()
.and_then(|caps| caps.first())
.map(|cap| RtpCodecParameters {
payload_type: cap.payload_type,
clock_rate: cap.clock_rate,
channels: 0,
});
bridge.callee_video_codec = self
.callee_video_capabilities
.as_ref()
.and_then(|caps| caps.first())
.map(|cap| RtpCodecParameters {
payload_type: cap.payload_type,
clock_rate: cap.clock_rate,
channels: 0,
});
Arc::new(bridge)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::media::{CodecType, FileTrack, RtpTrackBuilder};
use rustrtc::{
TransportMode,
sdp::{SdpType, SessionDescription},
};
fn create_test_wav_file(path: &str, num_samples: usize) -> Result<()> {
let spec = crate::media::wav_reader::WavSpec {
channels: 1,
sample_rate: 8000,
bits_per_sample: 16,
sample_format: crate::media::wav_reader::SampleFormat::Int,
};
let mut writer = crate::media::wav_reader::WavWriter::create(path, spec)
.map_err(|e| anyhow::anyhow!("WavWriter: {e}"))?;
for i in 0..num_samples {
let sample = ((i as f32 / 8.0).sin() * 1000.0) as i16;
writer
.write_sample(sample)
.map_err(|e| anyhow::anyhow!("write_sample: {e}"))?;
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("finalize: {e}"))?;
Ok(())
}
#[test]
fn test_video_payload_map_matches_codec_identity_not_payload_number() {
let rtp_caps = vec![
rustrtc::config::VideoCapability {
payload_type: 96,
codec_name: "H264".to_string(),
clock_rate: 90000,
fmtp: Some("profile-level-id=42801F".to_string()),
rtcp_fbs: vec![],
},
rustrtc::config::VideoCapability {
payload_type: 97,
codec_name: "VP8".to_string(),
clock_rate: 90000,
fmtp: None,
rtcp_fbs: vec![],
},
];
let webrtc_caps = vec![
rustrtc::config::VideoCapability {
payload_type: 96,
codec_name: "VP8".to_string(),
clock_rate: 90000,
fmtp: None,
rtcp_fbs: vec![],
},
rustrtc::config::VideoCapability {
payload_type: 103,
codec_name: "H264".to_string(),
clock_rate: 90000,
fmtp: Some(
"level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f"
.to_string(),
),
rtcp_fbs: vec![],
},
];
let rtp_to_webrtc = BridgePeer::video_payload_map_by_codec(&rtp_caps, &webrtc_caps);
let webrtc_to_rtp = BridgePeer::video_payload_map_by_codec(&webrtc_caps, &rtp_caps);
assert_eq!(rtp_to_webrtc.get(&96), Some(&103));
assert_eq!(rtp_to_webrtc.get(&97), Some(&96));
assert_eq!(webrtc_to_rtp.get(&103), Some(&96));
assert_eq!(webrtc_to_rtp.get(&96), Some(&97));
}
#[tokio::test]
async fn test_bridge_peer_creation() {
let bridge = BridgePeerBuilder::new("test-bridge".to_string())
.with_rtp_port_range(25000, 25100)
.build();
bridge.setup_bridge().await.unwrap();
let webrtc_offer = bridge.caller_pc().create_offer().await;
let rtp_offer = bridge.callee_pc().create_offer().await;
assert!(webrtc_offer.is_ok(), "WebRTC should create offer");
assert!(rtp_offer.is_ok(), "RTP should create offer");
let _ = bridge
.caller_pc()
.set_local_description(webrtc_offer.unwrap());
let _ = bridge.callee_pc().set_local_description(rtp_offer.unwrap());
let webrtc_local = bridge.caller_pc().local_description();
let rtp_local = bridge.callee_pc().local_description();
assert!(
webrtc_local.is_some(),
"WebRTC PC should have local description"
);
assert!(rtp_local.is_some(), "RTP PC should have local description");
let webrtc_sdp = webrtc_local.unwrap().to_sdp_string();
assert!(
webrtc_sdp.contains("UDP/TLS/RTP/SAVPF"),
"WebRTC should use SAVPF"
);
assert!(
webrtc_sdp.contains("fingerprint"),
"WebRTC should have DTLS fingerprint"
);
let rtp_sdp = rtp_local.unwrap().to_sdp_string();
assert!(rtp_sdp.contains("RTP/AVP"), "RTP should use AVP");
assert!(
!rtp_sdp.contains("fingerprint"),
"RTP should not have DTLS fingerprint"
);
}
use crate::media::Track;
#[tokio::test]
async fn test_bridge_setup_with_external_tracks() {
let bridge = BridgePeerBuilder::new("test-bridge-integration".to_string())
.with_rtp_port_range(26000, 26100)
.build();
bridge.setup_bridge().await.unwrap();
let bridge_rtp_offer = bridge.callee_pc().create_offer().await.unwrap();
bridge
.callee_pc()
.set_local_description(bridge_rtp_offer)
.unwrap();
let rtp_callee = RtpTrackBuilder::new("rtp-callee".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(26100, 26200)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let bridge_rtp_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
let callee_answer = rtp_callee.handshake(bridge_rtp_sdp).await.unwrap();
let rtp_leg_desc = SessionDescription::parse(SdpType::Answer, &callee_answer).unwrap();
bridge
.callee_pc()
.set_remote_description(rtp_leg_desc)
.await
.unwrap();
let bridge_webrtc_offer = bridge.caller_pc().create_offer().await.unwrap();
bridge
.caller_pc()
.set_local_description(bridge_webrtc_offer)
.unwrap();
bridge.start_bridge().await;
let caller_pc = bridge.caller_pc();
let callee_pc = bridge.callee_pc();
assert!(
caller_pc.local_description().is_some(),
"Caller should have local description"
);
assert!(
callee_pc.local_description().is_some(),
"Callee should have local description"
);
assert!(
callee_pc.remote_description().is_some(),
"Callee should have remote description"
);
bridge.stop().await;
}
#[tokio::test]
async fn test_bridge_file_output_polls_file_track_source() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_bridge_file_output.wav");
create_test_wav_file(test_file.to_str().unwrap(), 800).unwrap();
let bridge = BridgePeerBuilder::new("test-bridge-file-output".to_string())
.with_rtp_port_range(25100, 25200)
.build();
bridge.setup_bridge().await.unwrap();
let track = FileTrack::new("bridge-file-output".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_info(crate::media::negotiate::CodecInfo {
payload_type: 0,
codec: CodecType::PCMU,
clock_rate: 8000,
channels: 1,
});
bridge
.replace_output_with_file(BridgeEndpoint::Callee, &track)
.await
.unwrap();
drop(track);
let rtp_track = bridge
.get_callee_track()
.await
.expect("bridge RTP output track should exist");
let mut frame_count = 0;
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(400);
while tokio::time::Instant::now() < deadline && frame_count < 3 {
match tokio::time::timeout(tokio::time::Duration::from_millis(100), rtp_track.recv())
.await
{
Ok(Ok(MediaSample::Audio(_))) => frame_count += 1,
Ok(Ok(_)) => {}
Ok(Err(_)) => break,
Err(_) => {}
}
}
bridge.stop().await;
let _ = std::fs::remove_file(&test_file);
assert!(
frame_count >= 3,
"bridge should poll the FileTrack source and send audio without a FileTrack-owned task"
);
}
#[tokio::test]
async fn test_bridge_sdp_format_differences() {
let bridge = BridgePeerBuilder::new("test-bridge-sdp".to_string())
.with_rtp_port_range(27000, 27100)
.build();
bridge.setup_bridge().await.unwrap();
let webrtc_offer = bridge.caller_pc().create_offer().await.unwrap();
let rtp_offer = bridge.callee_pc().create_offer().await.unwrap();
bridge
.caller_pc()
.set_local_description(webrtc_offer)
.unwrap();
bridge.callee_pc().set_local_description(rtp_offer).unwrap();
let webrtc_sdp = bridge
.caller_pc()
.local_description()
.unwrap()
.to_sdp_string();
let rtp_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(
webrtc_sdp.contains("ice-ufrag"),
"WebRTC should have ICE ufrag"
);
assert!(webrtc_sdp.contains("ice-pwd"), "WebRTC should have ICE pwd");
assert!(
webrtc_sdp.contains("setup:"),
"WebRTC should have DTLS setup"
);
assert!(
!rtp_sdp.contains("ice-ufrag"),
"RTP should NOT have ICE ufrag"
);
assert!(
!rtp_sdp.contains("fingerprint:"),
"RTP should NOT have DTLS fingerprint"
);
assert!(webrtc_sdp.contains("SAVPF"), "WebRTC should use SAVPF");
assert!(rtp_sdp.contains("RTP/AVP"), "RTP should use AVP");
}
#[tokio::test]
async fn test_p2p_webrtc_to_rtp_bridge() {
let bridge = BridgePeerBuilder::new("p2p-bridge".to_string())
.with_rtp_port_range(28000, 28100)
.build();
let rtp_callee = RtpTrackBuilder::new("rtp-callee".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(28200, 28300)
.with_codec_preference(vec![CodecType::PCMU])
.build();
bridge.setup_bridge().await.unwrap();
let bridge_webrtc_offer = bridge.caller_pc().create_offer().await.unwrap();
let bridge_rtp_offer = bridge.callee_pc().create_offer().await.unwrap();
bridge
.caller_pc()
.set_local_description(bridge_webrtc_offer.clone())
.unwrap();
bridge
.callee_pc()
.set_local_description(bridge_rtp_offer.clone())
.unwrap();
let webrtc_sdp = bridge
.caller_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(webrtc_sdp.contains("SAVPF"), "WebRTC side should use SAVPF");
assert!(
webrtc_sdp.contains("fingerprint"),
"WebRTC side should have DTLS fingerprint"
);
let rtp_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(rtp_sdp.contains("RTP/AVP"), "RTP side should use AVP");
assert!(
!rtp_sdp.contains("fingerprint"),
"RTP side should NOT have DTLS fingerprint"
);
let rtp_callee_answer = rtp_callee.handshake(rtp_sdp).await.unwrap();
let callee_desc = SessionDescription::parse(SdpType::Answer, &rtp_callee_answer).unwrap();
bridge
.callee_pc()
.set_remote_description(callee_desc)
.await
.unwrap();
bridge.start_bridge().await;
assert!(
bridge.caller_pc().local_description().is_some(),
"WebRTC should have local description"
);
assert!(
bridge.callee_pc().local_description().is_some(),
"RTP should have local description"
);
assert!(
bridge.callee_pc().remote_description().is_some(),
"RTP should have remote description from callee"
);
bridge.stop().await;
}
#[tokio::test]
async fn test_bridge_as_webrtc_answerer() {
let bridge = BridgePeerBuilder::new("answerer-test".to_string())
.with_rtp_port_range(29000, 29100)
.build();
bridge.setup_bridge().await.unwrap();
let rtp_offer = bridge.callee_pc().create_offer().await.unwrap();
bridge.callee_pc().set_local_description(rtp_offer).unwrap();
let webrtc_caller = RtpTrackBuilder::new("webrtc-caller".to_string())
.with_mode(TransportMode::WebRtc)
.with_rtp_range(29100, 29200)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let caller_offer = webrtc_caller.local_description().await.unwrap();
assert!(
caller_offer.contains("SAVPF"),
"Caller offer should use SAVPF"
);
assert!(
caller_offer.contains("setup:actpass"),
"Caller should offer actpass"
);
let caller_desc = SessionDescription::parse(SdpType::Offer, &caller_offer).unwrap();
bridge
.caller_pc()
.set_remote_description(caller_desc)
.await
.unwrap();
let answer = bridge.caller_pc().create_answer().await.unwrap();
bridge.caller_pc().set_local_description(answer).unwrap();
let answer_sdp = bridge
.caller_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(answer_sdp.contains("SAVPF"), "Answer should use SAVPF");
assert!(
answer_sdp.contains("fingerprint:sha-256"),
"Answer must have real DTLS fingerprint"
);
assert!(
answer_sdp.contains("ice-ufrag:"),
"Answer must have real ICE ufrag"
);
assert!(
answer_sdp.contains("ice-pwd:"),
"Answer must have real ICE password"
);
assert!(
answer_sdp.contains("rtcp-mux"),
"Answer should have rtcp-mux"
);
assert!(
!answer_sdp.contains("setup:actpass"),
"Answer MUST NOT use actpass"
);
assert!(
answer_sdp.contains("setup:passive") || answer_sdp.contains("setup:active"),
"Answer must choose a DTLS role (passive or active)"
);
bridge.stop().await;
}
#[tokio::test]
async fn test_full_webrtc_to_rtp_bridge_flow() {
let bridge = BridgePeerBuilder::new("full-webrtc-rtp".to_string())
.with_rtp_port_range(30000, 30100)
.build();
bridge.setup_bridge().await.unwrap();
let rtp_offer = bridge.callee_pc().create_offer().await.unwrap();
bridge.callee_pc().set_local_description(rtp_offer).unwrap();
bridge.start_bridge().await;
let webrtc_caller = RtpTrackBuilder::new("caller".to_string())
.with_mode(TransportMode::WebRtc)
.with_rtp_range(30100, 30200)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let caller_offer = webrtc_caller.local_description().await.unwrap();
let rtp_callee = RtpTrackBuilder::new("callee".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(30200, 30300)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let bridge_rtp_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(
bridge_rtp_sdp.contains("RTP/AVP"),
"Bridge RTP offer should be plain RTP"
);
let callee_answer = rtp_callee.handshake(bridge_rtp_sdp).await.unwrap();
let callee_desc = SessionDescription::parse(SdpType::Answer, &callee_answer).unwrap();
bridge
.callee_pc()
.set_remote_description(callee_desc)
.await
.unwrap();
let caller_desc = SessionDescription::parse(SdpType::Offer, &caller_offer).unwrap();
bridge
.caller_pc()
.set_remote_description(caller_desc)
.await
.unwrap();
let answer = bridge.caller_pc().create_answer().await.unwrap();
bridge.caller_pc().set_local_description(answer).unwrap();
let answer_sdp = bridge
.caller_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(answer_sdp.contains("SAVPF"), "Answer must use SAVPF");
assert!(
answer_sdp.contains("fingerprint:sha-256"),
"Must have real DTLS fingerprint"
);
assert!(
answer_sdp.contains("ice-ufrag:"),
"Must have real ICE ufrag"
);
assert!(
answer_sdp.contains("ice-pwd:"),
"Must have real ICE password"
);
assert!(
!answer_sdp.contains("setup:actpass"),
"Must NOT use actpass in answer"
);
assert!(
answer_sdp.contains("setup:passive") || answer_sdp.contains("setup:active"),
"Must choose DTLS role"
);
webrtc_caller
.set_remote_description(&answer_sdp)
.await
.unwrap();
assert!(
bridge.callee_pc().remote_description().is_some(),
"RTP side should have remote"
);
assert!(
bridge.caller_pc().local_description().is_some(),
"WebRTC side should have local"
);
assert!(
bridge.caller_pc().remote_description().is_some(),
"WebRTC side should have remote"
);
bridge.stop().await;
}
#[tokio::test]
async fn test_full_rtp_to_webrtc_bridge_flow() {
let bridge = BridgePeerBuilder::new("full-rtp-webrtc".to_string())
.with_rtp_port_range(31000, 31100)
.build();
bridge.setup_bridge().await.unwrap();
let webrtc_offer = bridge.caller_pc().create_offer().await.unwrap();
bridge
.caller_pc()
.set_local_description(webrtc_offer)
.unwrap();
bridge.start_bridge().await;
let rtp_caller = RtpTrackBuilder::new("rtp-caller".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(31100, 31200)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let caller_offer = rtp_caller.local_description().await.unwrap();
let webrtc_callee = RtpTrackBuilder::new("webrtc-callee".to_string())
.with_mode(TransportMode::WebRtc)
.with_rtp_range(31200, 31300)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let bridge_webrtc_sdp = bridge
.caller_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(
bridge_webrtc_sdp.contains("SAVPF"),
"Bridge WebRTC offer should use SAVPF"
);
let callee_answer = webrtc_callee.handshake(bridge_webrtc_sdp).await.unwrap();
let callee_desc = SessionDescription::parse(SdpType::Answer, &callee_answer).unwrap();
bridge
.caller_pc()
.set_remote_description(callee_desc)
.await
.unwrap();
let caller_desc = SessionDescription::parse(SdpType::Offer, &caller_offer).unwrap();
bridge
.callee_pc()
.set_remote_description(caller_desc)
.await
.unwrap();
let answer = bridge.callee_pc().create_answer().await.unwrap();
bridge.callee_pc().set_local_description(answer).unwrap();
let answer_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(answer_sdp.contains("RTP/AVP"), "Answer must be plain RTP");
assert!(
!answer_sdp.contains("fingerprint"),
"RTP answer must NOT have fingerprint"
);
assert!(
!answer_sdp.contains("ice-ufrag"),
"RTP answer must NOT have ICE"
);
rtp_caller
.set_remote_description(&answer_sdp)
.await
.unwrap();
assert!(
bridge.callee_pc().remote_description().is_some(),
"RTP side should have remote"
);
assert!(
bridge.callee_pc().local_description().is_some(),
"RTP side should have local"
);
assert!(
bridge.caller_pc().remote_description().is_some(),
"WebRTC side should have remote"
);
bridge.stop().await;
}
#[tokio::test]
async fn test_bridge_rtp_side_respects_configured_audio_capabilities() {
use rustrtc::config::AudioCapability;
let bridge = BridgePeerBuilder::new("codec-test-rtp".to_string())
.with_rtp_port_range(32000, 32100)
.with_callee_audio_capabilities(vec![
AudioCapability::pcmu(),
AudioCapability::pcma(),
AudioCapability::telephone_event(),
])
.build();
bridge.setup_bridge().await.unwrap();
let rtp_offer = bridge.callee_pc().create_offer().await.unwrap();
bridge.callee_pc().set_local_description(rtp_offer).unwrap();
let rtp_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(rtp_sdp.contains("PCMU/8000"), "RTP SDP must contain PCMU");
assert!(rtp_sdp.contains("PCMA/8000"), "RTP SDP must contain PCMA");
assert!(
rtp_sdp.contains("telephone-event"),
"RTP SDP must contain telephone-event"
);
assert!(rtp_sdp.contains("RTP/AVP"), "Must use RTP/AVP");
bridge.stop().await;
}
#[tokio::test]
async fn test_bridge_webrtc_side_respects_configured_audio_capabilities() {
use rustrtc::config::AudioCapability;
let bridge = BridgePeerBuilder::new("codec-test-webrtc".to_string())
.with_rtp_port_range(33000, 33100)
.with_caller_audio_capabilities(vec![
AudioCapability::opus(),
AudioCapability::pcmu(),
AudioCapability::telephone_event(),
])
.build();
bridge.setup_bridge().await.unwrap();
let webrtc_offer = bridge.caller_pc().create_offer().await.unwrap();
bridge
.caller_pc()
.set_local_description(webrtc_offer)
.unwrap();
let webrtc_sdp = bridge
.caller_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(
webrtc_sdp.contains("opus/48000"),
"WebRTC SDP must contain Opus"
);
assert!(
webrtc_sdp.contains("PCMU/8000"),
"WebRTC SDP must contain PCMU"
);
assert!(webrtc_sdp.contains("UDP/TLS/RTP/SAVPF"), "Must use SAVPF");
assert!(
webrtc_sdp.contains("fingerprint"),
"Must have DTLS fingerprint"
);
bridge.stop().await;
}
#[tokio::test]
async fn test_bridge_with_custom_sender_codecs() {
use rustrtc::config::AudioCapability;
let bridge = BridgePeerBuilder::new("sender-codec-test".to_string())
.with_rtp_port_range(34000, 34100)
.with_callee_audio_capabilities(vec![
AudioCapability::pcmu(),
AudioCapability::pcma(),
AudioCapability::telephone_event(),
])
.with_sender_codecs(
RtpCodecParameters {
payload_type: 111,
clock_rate: 48000,
channels: 2,
},
RtpCodecParameters {
payload_type: 8,
clock_rate: 8000,
channels: 1,
}, )
.build();
bridge.setup_bridge().await.unwrap();
let rtp_offer = bridge.callee_pc().create_offer().await.unwrap();
bridge.callee_pc().set_local_description(rtp_offer).unwrap();
let rtp_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(rtp_sdp.contains("RTP/AVP"), "RTP SDP must be plain RTP");
assert!(
rtp_sdp.contains("PCMU/8000") || rtp_sdp.contains("PCMA/8000"),
"RTP SDP must have audio codecs"
);
bridge.stop().await;
}
#[tokio::test]
async fn test_bridge_e2e_webrtc_to_rtp_pcmu_only() {
use crate::media::negotiate::MediaNegotiator;
let caller = RtpTrackBuilder::new("webrtc-caller-pcmu".to_string())
.with_mode(TransportMode::WebRtc)
.with_rtp_range(35000, 35100)
.with_codec_preference(vec![CodecType::Opus, CodecType::PCMU])
.build();
let caller_offer = caller.local_description().await.unwrap();
assert!(caller_offer.contains("opus"), "Caller must offer Opus");
let caller_side_codecs = MediaNegotiator::build_codec_list_from_offer(
&caller_offer,
&[CodecType::PCMU, CodecType::TelephoneEvent],
);
let callee_side_codecs = MediaNegotiator::build_callee_codec_offer_with_allow(
&caller_offer,
&[CodecType::PCMU, CodecType::TelephoneEvent],
);
let webrtc_caps: Vec<_> = caller_side_codecs
.iter()
.filter_map(|c| c.to_audio_capability())
.collect();
let rtp_caps: Vec<_> = callee_side_codecs
.iter()
.filter_map(|c| c.to_audio_capability())
.collect();
let caller_sender = caller_side_codecs
.iter()
.find(|c| !c.is_dtmf())
.map(|c| c.to_params())
.unwrap();
let callee_sender = callee_side_codecs
.iter()
.find(|c| !c.is_dtmf())
.map(|c| c.to_params())
.unwrap();
let bridge = BridgePeerBuilder::new("e2e-pcmu-only".to_string())
.with_rtp_port_range(35100, 35200)
.with_caller_audio_capabilities(webrtc_caps)
.with_callee_audio_capabilities(rtp_caps)
.with_sender_codecs(caller_sender, callee_sender)
.build();
bridge.setup_bridge().await.unwrap();
let rtp_offer = bridge.callee_pc().create_offer().await.unwrap();
bridge.callee_pc().set_local_description(rtp_offer).unwrap();
let bridge_rtp_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(
bridge_rtp_sdp.contains("PCMU/8000"),
"RTP side must offer PCMU"
);
assert!(
!bridge_rtp_sdp.contains("opus"),
"RTP side should NOT offer Opus (not in allow_codecs)"
);
let callee = RtpTrackBuilder::new("rtp-callee-pcmu".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(35200, 35300)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let callee_answer = callee.handshake(bridge_rtp_sdp).await.unwrap();
let callee_desc = SessionDescription::parse(SdpType::Answer, &callee_answer).unwrap();
bridge
.callee_pc()
.set_remote_description(callee_desc)
.await
.unwrap();
let caller_desc = SessionDescription::parse(SdpType::Offer, &caller_offer).unwrap();
bridge
.caller_pc()
.set_remote_description(caller_desc)
.await
.unwrap();
let answer = bridge.caller_pc().create_answer().await.unwrap();
bridge.caller_pc().set_local_description(answer).unwrap();
let answer_sdp = bridge
.caller_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(answer_sdp.contains("SAVPF"), "Answer must be WebRTC");
assert!(
answer_sdp.contains("fingerprint:sha-256"),
"Must have real DTLS fingerprint"
);
assert!(
!answer_sdp.contains("setup:actpass"),
"Must NOT use actpass"
);
caller.set_remote_description(&answer_sdp).await.unwrap();
bridge.stop().await;
}
#[tokio::test]
async fn test_bridge_e2e_rtp_to_webrtc_filters_g729_offer() {
use crate::media::negotiate::MediaNegotiator;
let caller = RtpTrackBuilder::new("rtp-caller-g729".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(36000, 36100)
.with_codec_preference(vec![CodecType::G729, CodecType::PCMU])
.build();
let caller_offer = caller.local_description().await.unwrap();
let caller_side_codecs = MediaNegotiator::build_codec_list_from_offer(
&caller_offer,
&[CodecType::G729, CodecType::PCMU, CodecType::TelephoneEvent],
);
let callee_side_codecs = MediaNegotiator::build_callee_codec_offer_with_allow(
&caller_offer,
&[CodecType::G729, CodecType::PCMU, CodecType::TelephoneEvent],
);
let callee_side_codecs =
MediaNegotiator::filter_webrtc_offer_codecs(&caller_offer, callee_side_codecs);
assert!(
caller_side_codecs
.iter()
.any(|c| c.codec == CodecType::G729),
"G729 on RTP caller side"
);
assert!(
!callee_side_codecs
.iter()
.any(|c| c.codec == CodecType::G729),
"G729 must be removed from WebRTC offer codecs"
);
assert!(
callee_side_codecs
.iter()
.any(|c| c.codec == CodecType::PCMU)
);
let webrtc_caps: Vec<_> = callee_side_codecs
.iter()
.filter_map(|c| c.to_audio_capability())
.collect();
let rtp_caps: Vec<_> = caller_side_codecs
.iter()
.filter_map(|c| c.to_audio_capability())
.collect();
let callee_sender = caller_side_codecs
.iter()
.find(|c| !c.is_dtmf())
.map(|c| c.to_params())
.unwrap();
let caller_sender = callee_side_codecs
.iter()
.find(|c| !c.is_dtmf())
.map(|c| c.to_params())
.unwrap();
let bridge = BridgePeerBuilder::new("e2e-g729-drop".to_string())
.with_rtp_port_range(36100, 36200)
.with_caller_audio_capabilities(webrtc_caps)
.with_callee_audio_capabilities(rtp_caps)
.with_sender_codecs(caller_sender, callee_sender)
.build();
bridge.setup_bridge().await.unwrap();
let webrtc_offer = bridge.caller_pc().create_offer().await.unwrap();
bridge
.caller_pc()
.set_local_description(webrtc_offer)
.unwrap();
let bridge_webrtc_sdp = bridge
.caller_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(
bridge_webrtc_sdp.contains("SAVPF"),
"WebRTC side must use SAVPF"
);
assert!(
!bridge_webrtc_sdp.contains("G729"),
"WebRTC-side SDP must not offer G729"
);
assert!(bridge_webrtc_sdp.contains("PCMU"));
let callee = RtpTrackBuilder::new("webrtc-callee".to_string())
.with_mode(TransportMode::WebRtc)
.with_rtp_range(36200, 36300)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let callee_answer = callee.handshake(bridge_webrtc_sdp).await.unwrap();
let callee_desc = SessionDescription::parse(SdpType::Answer, &callee_answer).unwrap();
bridge
.caller_pc()
.set_remote_description(callee_desc)
.await
.unwrap();
let caller_desc = SessionDescription::parse(SdpType::Offer, &caller_offer).unwrap();
bridge
.callee_pc()
.set_remote_description(caller_desc)
.await
.unwrap();
let rtp_answer = bridge.callee_pc().create_answer().await.unwrap();
bridge
.callee_pc()
.set_local_description(rtp_answer)
.unwrap();
let rtp_answer_sdp = bridge
.callee_pc()
.local_description()
.unwrap()
.to_sdp_string();
assert!(
rtp_answer_sdp.contains("RTP/AVP"),
"RTP answer must be plain RTP"
);
caller
.set_remote_description(&rtp_answer_sdp)
.await
.unwrap();
bridge.stop().await;
}
use rustrtc::media::{MediaResult, TrackState};
struct OneShotAudioTrack {
sample: tokio::sync::Mutex<Option<MediaSample>>,
}
#[async_trait::async_trait]
impl MediaStreamTrack for OneShotAudioTrack {
fn id(&self) -> &str {
"one-shot-audio"
}
fn kind(&self) -> MediaKind {
MediaKind::Audio
}
fn state(&self) -> TrackState {
TrackState::Live
}
async fn recv(&self) -> MediaResult<MediaSample> {
let mut guard = self.sample.lock().await;
if let Some(s) = guard.take() {
Ok(s)
} else {
loop {
tokio::time::sleep(std::time::Duration::from_secs(999)).await;
}
}
}
async fn request_key_frame(&self) -> MediaResult<()> {
Ok(())
}
}
#[tokio::test]
async fn test_bridge_set_transcoder() {
let bridge = BridgePeerBuilder::new("transcoder-test".to_string())
.with_rtp_port_range(38000, 38100)
.build();
assert!(bridge.callee_to_caller_transcoder.lock().is_none());
assert!(bridge.caller_to_callee_transcoder.lock().is_none());
bridge.set_transcoder(BridgeEndpoint::Callee, CodecType::G729, CodecType::PCMU, 0);
assert!(
bridge.callee_to_caller_transcoder.lock().is_some(),
"RTP→WebRTC transcoder should be set"
);
bridge.set_transcoder(BridgeEndpoint::Caller, CodecType::PCMU, CodecType::G729, 18);
assert!(
bridge.caller_to_callee_transcoder.lock().is_some(),
"WebRTC→RTP transcoder should be set"
);
bridge.clear_transcoder(BridgeEndpoint::Callee);
assert!(
bridge.callee_to_caller_transcoder.lock().is_none(),
"RTP→WebRTC should be cleared"
);
assert!(
bridge.caller_to_callee_transcoder.lock().is_some(),
"WebRTC→RTP should still be set"
);
bridge.clear_transcoder(BridgeEndpoint::Caller);
assert!(bridge.caller_to_callee_transcoder.lock().is_none());
}
#[tokio::test]
async fn test_bridge_transcoding_in_forwarding_loop() {
use audio_codec::create_encoder;
use rustrtc::media::track::sample_track;
let pcm = vec![0i16; 160];
let mut g729_enc = create_encoder(CodecType::G729);
let g729_data = g729_enc.encode(&pcm);
let input_frame = AudioFrame {
rtp_timestamp: 100,
clock_rate: 8000,
data: g729_data.into(),
sequence_number: Some(10),
payload_type: Some(18), ..Default::default()
};
let mock_track: Arc<dyn MediaStreamTrack> = Arc::new(OneShotAudioTrack {
sample: tokio::sync::Mutex::new(Some(MediaSample::Audio(input_frame))),
});
let (output_tx, output_track, _) = sample_track(MediaKind::Audio, 10);
let sender_arc = Arc::new(AsyncMutex::new(Some(output_tx)));
let sender_weak = Arc::downgrade(&sender_arc);
let transcoder = Arc::new(parking_lot::Mutex::new(Some(Transcoder::new(
CodecType::G729,
CodecType::PCMU,
0, ))));
let timing: Arc<parking_lot::Mutex<Option<RtpTiming>>> =
Arc::new(parking_lot::Mutex::new(None));
let cancel = CancellationToken::new();
let stats: Arc<LegStats> = LegStats::new();
let dtmf: Arc<parking_lot::RwLock<Option<BridgeDtmfSink>>> =
Arc::new(parking_lot::RwLock::new(None));
let task_handle = {
let c = cancel.clone();
let mt = mock_track.clone();
let sw = sender_weak.clone();
let tr = Some(transcoder);
let ti = Some(timing);
let st = stats;
let ds = dtmf;
crate::utils::spawn(async move {
BridgePeer::run_forward_loop(
"test".to_string(),
mt,
sw,
Arc::new(AtomicU8::new(BRIDGE_OUTPUT_PEER)),
c,
ForwardPath::new(LegTransport::Callee, LegTransport::Caller),
st,
None, None, None, ReceiveTimestampClock::new(),
Arc::new(AtomicBool::new(false)), ds,
tr,
ti,
None, None, )
.await;
})
};
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
cancel.cancel();
let _ = task_handle.await;
let captured =
tokio::time::timeout(std::time::Duration::from_secs(1), output_track.recv()).await;
match captured {
Ok(Ok(MediaSample::Audio(frame))) => {
assert_eq!(
frame.payload_type,
Some(0),
"Output should be PCMU (PT=0), got {:?}",
frame.payload_type
);
assert_eq!(frame.clock_rate, 8000, "PCMU clock rate should be 8000");
assert_eq!(
frame.data.len(),
160,
"PCMU 20 ms frame is 160 bytes, got {}",
frame.data.len()
);
assert_ne!(
frame.data.len(),
20,
"Must NOT be G.729 (20 bytes) – transcoding should have expanded"
);
}
other => panic!("Expected a transcoded PCMU Audio frame, got {:?}", other),
}
}
#[tokio::test]
async fn test_bridge_dtmf_mapping_rewrites_payload_type_and_clock_rate() {
use bytes::Bytes;
use rustrtc::media::track::sample_track;
let input_frame = AudioFrame {
rtp_timestamp: 48_000,
clock_rate: 48_000,
data: Bytes::from_static(&[0x05, 0x0A, 0x12, 0xC0]), sequence_number: Some(10),
payload_type: Some(110),
marker: true,
..Default::default()
};
let mock_track: Arc<dyn MediaStreamTrack> = Arc::new(OneShotAudioTrack {
sample: tokio::sync::Mutex::new(Some(MediaSample::Audio(input_frame))),
});
let (output_tx, output_track, _) = sample_track(MediaKind::Audio, 10);
let sender_arc = Arc::new(AsyncMutex::new(Some(output_tx)));
let sender_weak = Arc::downgrade(&sender_arc);
let transcoder = Arc::new(parking_lot::Mutex::new(Some(Transcoder::new(
CodecType::PCMU,
CodecType::PCMA,
8,
))));
let dtmf_mapping = Arc::new(parking_lot::RwLock::new(Some(BridgePayloadMapping {
source_pt: 110,
target_pt: 126,
source_clock_rate: 48_000,
target_clock_rate: 8_000,
})));
let cancel = CancellationToken::new();
let stats = LegStats::new();
let dtmf_sink = Arc::new(parking_lot::RwLock::new(None));
let task_handle = {
let c = cancel.clone();
let mt = mock_track.clone();
let sw = sender_weak.clone();
let st = stats;
let ds = dtmf_sink;
let tr = Some(transcoder);
let dm = Some(dtmf_mapping);
crate::utils::spawn(async move {
BridgePeer::run_forward_loop(
"test-dtmf-mapping".to_string(),
mt,
sw,
Arc::new(AtomicU8::new(BRIDGE_OUTPUT_PEER)),
c,
ForwardPath::new(LegTransport::Caller, LegTransport::Callee),
st,
None,
None,
None,
ReceiveTimestampClock::new(),
Arc::new(AtomicBool::new(false)),
ds,
tr,
None,
dm,
None,
)
.await;
})
};
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
cancel.cancel();
let _ = task_handle.await;
let captured =
tokio::time::timeout(std::time::Duration::from_secs(1), output_track.recv()).await;
match captured {
Ok(Ok(MediaSample::Audio(frame))) => {
assert_eq!(frame.payload_type, Some(126));
assert_eq!(frame.clock_rate, 8_000);
assert_eq!(frame.sequence_number, Some(10));
assert_eq!(frame.rtp_timestamp, 8_000);
assert_eq!(frame.data.as_ref(), &[0x05, 0x0A, 0x03, 0x20]);
}
other => panic!("Expected mapped DTMF Audio frame, got {:?}", other),
}
}
#[test]
fn test_bridge_dtmf_mapping_uses_shared_timing_when_available() {
use bytes::Bytes;
let dtmf_mapping = parking_lot::RwLock::new(Some(BridgePayloadMapping {
source_pt: 110,
target_pt: 126,
source_clock_rate: 48_000,
target_clock_rate: 8_000,
}));
let timing = parking_lot::Mutex::new(Some(RtpTiming::default()));
let mut first = AudioFrame {
rtp_timestamp: 48_000,
clock_rate: 48_000,
data: Bytes::from_static(&[0x05, 0x0A, 0x12, 0xC0]),
sequence_number: Some(10),
payload_type: Some(110),
..Default::default()
};
let mut second = AudioFrame {
rtp_timestamp: 52_800,
clock_rate: 48_000,
data: Bytes::from_static(&[0x05, 0x0A, 0x12, 0xC0]),
sequence_number: Some(11),
payload_type: Some(110),
..Default::default()
};
assert!(BridgePeer::rewrite_dtmf_sample(
&mut first,
Some(&dtmf_mapping),
Some(&timing),
));
assert!(BridgePeer::rewrite_dtmf_sample(
&mut second,
Some(&dtmf_mapping),
Some(&timing),
));
assert_eq!(first.payload_type, Some(126));
assert_eq!(first.clock_rate, 8_000);
assert_eq!(first.data.as_ref(), &[0x05, 0x0A, 0x03, 0x20]);
assert_eq!(second.payload_type, Some(126));
assert_eq!(second.clock_rate, 8_000);
assert_eq!(second.data.as_ref(), &[0x05, 0x0A, 0x03, 0x20]);
assert_eq!(second.rtp_timestamp.wrapping_sub(first.rtp_timestamp), 800);
assert_eq!(
second
.sequence_number
.expect("second sequence")
.wrapping_sub(first.sequence_number.expect("first sequence")),
1
);
}
#[tokio::test]
#[cfg(feature = "opus")]
async fn test_transcoder_opus_to_pcmu() {
use audio_codec::create_encoder;
let pcm_48k: Vec<i16> = (0..960).map(|i| ((i * 100) % 32767) as i16).collect();
let mut opus_enc = create_encoder(CodecType::Opus);
let opus_data = opus_enc.encode(&pcm_48k);
assert!(!opus_data.is_empty(), "Opus encoder should produce output");
let is_stereo = opus_data[0] & 0x04 != 0;
assert!(
is_stereo,
"Opus encoder should produce stereo packet (TOC bit 2 set)"
);
let mut standalone_dec = audio_codec::create_decoder(CodecType::Opus);
let decoded_pcm = standalone_dec.decode(&opus_data);
assert_eq!(
decoded_pcm.len(),
960,
"Opus decoder should output 960 mono samples (20ms), got {}",
decoded_pcm.len()
);
let mut transcoder = Transcoder::new(CodecType::Opus, CodecType::PCMU, 0);
let input_frame = AudioFrame {
rtp_timestamp: 100,
clock_rate: 48000,
data: opus_data.clone().into(),
sequence_number: Some(10),
payload_type: Some(111),
..Default::default()
};
let output = transcoder.transcode(&input_frame);
assert_eq!(
output.data.len(),
160,
"Opus→PCMU should produce 160 bytes (20ms), got {}",
output.data.len()
);
assert_eq!(output.payload_type, Some(0));
let input_frame2 = AudioFrame {
rtp_timestamp: 100,
clock_rate: 48000,
data: opus_data.into(),
sequence_number: Some(11),
payload_type: Some(111),
..Default::default()
};
let output2 = transcoder.transcode(&input_frame2);
assert_eq!(
output2.data.len(),
160,
"Second call should also produce 160 bytes"
);
assert_eq!(output2.payload_type, Some(0));
let mut pcmu_dec = audio_codec::create_decoder(CodecType::PCMU);
let decoded = pcmu_dec.decode(&output.data);
assert_eq!(
decoded.len(),
160,
"PCMU decode should yield 160 samples (20ms at 8kHz), got {}",
decoded.len()
);
}
#[tokio::test]
#[cfg(feature = "opus")]
async fn test_transcoder_opus_to_g722() {
use audio_codec::create_encoder;
let pcm_48k: Vec<i16> = (0..960).map(|i| ((i * 100) % 32767) as i16).collect();
let mut opus_enc = create_encoder(CodecType::Opus);
let opus_data = opus_enc.encode(&pcm_48k);
assert!(!opus_data.is_empty(), "Opus encoder should produce output");
let mut transcoder = Transcoder::new(CodecType::Opus, CodecType::G722, 9);
let input_frame = AudioFrame {
rtp_timestamp: 100,
clock_rate: 48000,
data: opus_data.into(),
sequence_number: Some(10),
payload_type: Some(111),
..Default::default()
};
let output = transcoder.transcode(&input_frame);
assert!(
!output.data.is_empty(),
"Opus→G.722 should produce non-empty output"
);
assert_eq!(output.payload_type, Some(9));
assert_eq!(output.clock_rate, 8000, "G.722 clock_rate should be 8000");
}
#[tokio::test]
#[cfg(feature = "opus")]
async fn test_transcoder_opus_to_pcmu_roundtrip_quality() {
use audio_codec::{create_decoder, create_encoder};
let freq = 440.0; let sample_rate = 48000.0;
let pcm_48k: Vec<i16> = (0..960)
.map(|i| {
let t = i as f64 / sample_rate;
(16384.0 * (2.0 * std::f64::consts::PI * freq * t).sin()) as i16
})
.collect();
let mut opus_enc = create_encoder(CodecType::Opus);
let opus_data = opus_enc.encode(&pcm_48k);
let mut transcoder = Transcoder::new(CodecType::Opus, CodecType::PCMU, 0);
let mut all_pcmu = Vec::new();
for i in 0..5 {
let frame = AudioFrame {
rtp_timestamp: 100 + i * 960,
clock_rate: 48000,
data: opus_data.clone().into(),
sequence_number: Some(10 + i as u16),
payload_type: Some(111),
..Default::default()
};
let output = transcoder.transcode(&frame);
assert_eq!(
output.data.len(),
160,
"Frame {}: Opus→PCMU should produce 160 bytes",
i
);
let mut pcmu_dec = create_decoder(CodecType::PCMU);
let decoded = pcmu_dec.decode(&output.data);
assert_eq!(decoded.len(), 160);
all_pcmu.push(decoded);
}
for (i, frame) in all_pcmu.iter().enumerate() {
let energy: f64 =
frame.iter().map(|&s| (s as f64).powi(2)).sum::<f64>() / frame.len() as f64;
let rms = energy.sqrt();
assert!(
rms > 100.0 && rms < 20000.0,
"Frame {}: RMS {} is outside expected range for 440Hz sine",
i,
rms
);
}
}
#[tokio::test]
async fn test_bridge_forwarding_passthrough_without_transcoder() {
use audio_codec::create_encoder;
use rustrtc::media::track::sample_track;
let pcm = vec![0i16; 160];
let mut pcmu_enc = create_encoder(CodecType::PCMU);
let pcmu_data = pcmu_enc.encode(&pcm);
let input_frame = AudioFrame {
rtp_timestamp: 200,
clock_rate: 8000,
data: pcmu_data.into(),
sequence_number: Some(20),
payload_type: Some(0), ..Default::default()
};
let mock_track: Arc<dyn MediaStreamTrack> = Arc::new(OneShotAudioTrack {
sample: tokio::sync::Mutex::new(Some(MediaSample::Audio(input_frame))),
});
let (output_tx, output_track, _) = sample_track(MediaKind::Audio, 10);
let sender_arc = Arc::new(AsyncMutex::new(Some(output_tx)));
let sender_weak = Arc::downgrade(&sender_arc);
let cancel = CancellationToken::new();
let stats = LegStats::new();
let dtmf = Arc::new(parking_lot::RwLock::new(None));
let task_handle = {
let c = cancel.clone();
let mt = mock_track.clone();
let sw = sender_weak.clone();
let st = stats;
let ds = dtmf;
crate::utils::spawn(async move {
BridgePeer::run_forward_loop(
"test-passthrough".to_string(),
mt,
sw,
Arc::new(AtomicU8::new(BRIDGE_OUTPUT_PEER)),
c,
ForwardPath::new(LegTransport::Callee, LegTransport::Caller),
st,
None,
None,
None,
ReceiveTimestampClock::new(),
Arc::new(AtomicBool::new(false)),
ds,
None, None, None, None, )
.await;
})
};
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
cancel.cancel();
let _ = task_handle.await;
let captured =
tokio::time::timeout(std::time::Duration::from_secs(1), output_track.recv()).await;
match captured {
Ok(Ok(MediaSample::Audio(frame))) => {
assert_eq!(
frame.payload_type,
Some(0),
"Passthrough PCMU should keep PT=0"
);
assert_eq!(frame.data.len(), 160, "Passthrough should keep 160 bytes");
}
other => panic!("Expected passthrough PCMU Audio frame, got {:?}", other),
}
}
}