use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{info, trace, warn};
use crate::call::domain::LegId;
use crate::call::runtime::ConferenceManager;
use crate::media::conference_mixer::AudioFrame;
pub trait AudioSender: Send + Sync {
fn send(
&self,
sample: rustrtc::media::MediaSample,
) -> impl std::future::Future<
Output = Result<(), mpsc::error::SendError<rustrtc::media::MediaSample>>,
> + Send;
}
impl AudioSender for tokio::sync::mpsc::Sender<rustrtc::media::MediaSample> {
async fn send(
&self,
sample: rustrtc::media::MediaSample,
) -> Result<(), mpsc::error::SendError<rustrtc::media::MediaSample>> {
self.send(sample).await
}
}
pub trait AudioReceiver: Send + Sync {
fn recv(
&mut self,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<PcmAudioFrame>> + Send + '_>>;
}
#[derive(Debug, Clone)]
pub struct PcmAudioFrame {
pub samples: Vec<i16>,
pub sample_rate: u32,
pub timestamp: u64,
}
impl PcmAudioFrame {
pub fn new(samples: Vec<i16>, sample_rate: u32) -> Self {
Self {
samples,
sample_rate,
timestamp: 0,
}
}
}
pub struct ConferenceMediaBridge {
conference_manager: Arc<ConferenceManager>,
}
impl ConferenceMediaBridge {
pub fn new(conference_manager: Arc<ConferenceManager>) -> Self {
Self { conference_manager }
}
pub async fn start_bridge<S>(
&self,
conf_id: &str,
leg_id: &LegId,
audio_sender: S,
codec: audio_codec::CodecType,
) -> anyhow::Result<ConferenceBridgeHandle>
where
S: AudioSender + Send + Sync + 'static,
{
let _conf_id_obj = crate::call::runtime::ConferenceId::from(conf_id);
let output_rx = self
.conference_manager
.take_participant_output_rx(leg_id)
.await
.ok_or_else(|| {
anyhow::anyhow!(
"No output_rx found for leg {} in conference {}",
leg_id,
conf_id
)
})?;
info!(
conf_id = %conf_id,
leg_id = %leg_id,
"Starting conference media bridge (output only)"
);
crate::metrics::conference::created();
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
let leg_id_clone = leg_id.clone();
let conf_id_string = conf_id.to_string();
let handle = crate::utils::spawn(async move {
Self::forward_loop(
output_rx,
audio_sender,
leg_id_clone,
conf_id_string,
cancel_token_clone,
codec,
)
.await;
});
Ok(ConferenceBridgeHandle {
_tasks: vec![handle],
cancel_token,
})
}
pub async fn start_bridge_full_duplex<S>(
&self,
conf_id: &str,
leg_id: &LegId,
audio_sender: S,
audio_receiver: Box<dyn AudioReceiver>,
codec: audio_codec::CodecType,
) -> anyhow::Result<ConferenceBridgeHandle>
where
S: AudioSender + Send + Sync + 'static,
{
let conf_id_obj = crate::call::runtime::ConferenceId::from(conf_id);
let channels = self
.conference_manager
.add_participant(&conf_id_obj, leg_id.clone())
.await
.map_err(|e| anyhow::anyhow!("Failed to add participant to conference: {}", e))?;
let input_tx = channels.input_tx;
let output_rx = self
.conference_manager
.take_participant_output_rx(leg_id)
.await
.ok_or_else(|| {
anyhow::anyhow!(
"No output_rx found for leg {} in conference {}",
leg_id,
conf_id
)
})?;
info!(
conf_id = %conf_id,
leg_id = %leg_id,
"Starting full-duplex conference media bridge"
);
crate::metrics::conference::created();
let cancel_token = tokio_util::sync::CancellationToken::new();
let forward_cancel = cancel_token.child_token();
let leg_id_forward = leg_id.clone();
let conf_id_forward = conf_id.to_string();
let forward_handle = crate::utils::spawn(async move {
Self::forward_loop(
output_rx,
audio_sender,
leg_id_forward,
conf_id_forward,
forward_cancel,
codec,
)
.await;
});
let reverse_cancel = cancel_token.child_token();
let leg_id_reverse = leg_id.clone();
let conf_id_reverse = conf_id.to_string();
let reverse_handle = crate::utils::spawn(async move {
Self::reverse_loop(
audio_receiver,
input_tx,
leg_id_reverse,
conf_id_reverse,
reverse_cancel,
8000,
)
.await;
});
Ok(ConferenceBridgeHandle {
_tasks: vec![forward_handle, reverse_handle],
cancel_token,
})
}
pub async fn forward_loop<S>(
mut output_rx: mpsc::Receiver<AudioFrame>,
audio_sender: S,
leg_id: LegId,
conf_id: String,
cancel_token: tokio_util::sync::CancellationToken,
codec: audio_codec::CodecType,
) where
S: AudioSender + Send + Sync + 'static,
{
use audio_codec::create_encoder;
use rustrtc::media::{AudioFrame as RtcAudioFrame, MediaSample};
info!(
leg_id = %leg_id,
conf_id = %conf_id,
codec = ?codec,
"Conference media bridge forward loop started"
);
let mut encoder = create_encoder(codec);
let payload_type = codec.payload_type();
let clock_rate = codec.clock_rate() as u32;
let sample_rate = encoder.sample_rate();
let mut rtp_timestamp: u32 = rand::random();
let mut sequence_number: u16 = rand::random();
let interval_ms = 20u64;
let samples_per_frame = (sample_rate * interval_ms as u32 / 1000) as usize;
let rtp_ticks_per_frame = clock_rate * interval_ms as u32 / 1000;
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
info!(
leg_id = %leg_id,
conf_id = %conf_id,
"Conference media bridge forward loop cancelled"
);
break;
}
Some(frame) = output_rx.recv() => {
let pcm_samples = if frame.sample_rate == sample_rate {
frame.samples
} else {
resample_linear(
&frame.samples,
frame.sample_rate,
sample_rate,
)
};
for chunk in pcm_samples.chunks(samples_per_frame) {
let chunk_to_encode = if chunk.len() < samples_per_frame {
let mut padded = vec![0i16; samples_per_frame];
padded[..chunk.len()].copy_from_slice(chunk);
padded
} else {
chunk.to_vec()
};
let encoded = encoder.encode(&chunk_to_encode);
let rtc_frame = RtcAudioFrame {
rtp_timestamp,
clock_rate: clock_rate,
data: encoded.into(),
sequence_number: Some(sequence_number),
payload_type: Some(payload_type),
marker: false,
header_extension: None,
raw_packet: None,
source_addr: None,
};
let bytes_sent = chunk_to_encode.len() * 2; if let Err(e) = audio_sender.send(MediaSample::Audio(rtc_frame)).await {
warn!(
leg_id = %leg_id,
error = %e,
"Failed to send conference audio to media track"
);
return;
}
crate::metrics::conference::media_injected_bytes(&conf_id, bytes_sent as u64);
rtp_timestamp = rtp_timestamp.wrapping_add(rtp_ticks_per_frame);
sequence_number = sequence_number.wrapping_add(1);
}
trace!(
leg_id = %leg_id,
samples = pcm_samples.len(),
original_sample_rate = frame.sample_rate,
"Encoded and sent mixed audio frame from conference"
);
}
else => {
warn!(
leg_id = %leg_id,
conf_id = %conf_id,
"Conference output_rx closed"
);
break;
}
}
}
info!(
leg_id = %leg_id,
conf_id = %conf_id,
"Conference media bridge forward loop ended"
);
}
pub async fn reverse_loop(
mut audio_receiver: Box<dyn AudioReceiver>,
input_tx: mpsc::Sender<AudioFrame>,
leg_id: LegId,
conf_id: String,
cancel_token: tokio_util::sync::CancellationToken,
mixer_sample_rate: u32,
) {
info!(
leg_id = %leg_id,
conf_id = %conf_id,
"Conference media bridge reverse loop started"
);
loop {
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
info!(
leg_id = %leg_id,
conf_id = %conf_id,
"Conference media bridge reverse loop cancelled"
);
break;
}
Some(pcm_frame) = audio_receiver.recv() => {
let sample_count = pcm_frame.samples.len();
let sample_rate = pcm_frame.sample_rate;
let samples = if sample_rate == mixer_sample_rate {
pcm_frame.samples
} else {
resample_linear(&pcm_frame.samples, sample_rate, mixer_sample_rate)
};
let audio_frame = AudioFrame::new(samples, mixer_sample_rate);
if let Err(e) = input_tx.send(audio_frame).await {
warn!(
leg_id = %leg_id,
conf_id = %conf_id,
error = %e,
"Failed to send audio to conference mixer input"
);
break;
}
trace!(
leg_id = %leg_id,
samples = sample_count,
sample_rate = sample_rate,
"Sent participant audio to conference mixer"
);
}
else => {
info!(
leg_id = %leg_id,
conf_id = %conf_id,
"Audio receiver closed, stopping reverse loop"
);
break;
}
}
}
info!(
leg_id = %leg_id,
conf_id = %conf_id,
"Conference media bridge reverse loop ended"
);
}
}
pub(crate) fn resample_linear(samples: &[i16], src_rate: u32, dst_rate: u32) -> Vec<i16> {
if src_rate == dst_rate {
return samples.to_vec();
}
let ratio = src_rate as f32 / dst_rate as f32;
let new_len = (samples.len() as f32 / ratio) as usize;
let mut result = Vec::with_capacity(new_len);
for i in 0..new_len {
let src_idx = i as f32 * ratio;
let src_idx_floor = src_idx.floor() as usize;
let src_idx_ceil = (src_idx.ceil() as usize).min(samples.len().saturating_sub(1));
let frac = src_idx - src_idx.floor();
let sample = if src_idx_floor == src_idx_ceil {
samples[src_idx_floor]
} else {
let s0 = samples[src_idx_floor] as f32;
let s1 = samples[src_idx_ceil] as f32;
(s0 + frac * (s1 - s0)) as i16
};
result.push(sample);
}
result
}
pub struct ConferenceBridgeHandle {
pub(crate) _tasks: Vec<tokio::task::JoinHandle<()>>,
pub cancel_token: tokio_util::sync::CancellationToken,
}
impl ConferenceBridgeHandle {
pub fn stop(&self) {
self.cancel_token.cancel();
}
}
pub struct SessionConferenceBridge {
pub bridge_handle: Option<ConferenceBridgeHandle>,
pub conf_id: Option<String>,
}
impl SessionConferenceBridge {
pub fn new() -> Self {
Self {
bridge_handle: None,
conf_id: None,
}
}
pub fn is_active(&self) -> bool {
self.bridge_handle.is_some()
}
pub fn stop_bridge(&mut self) {
if let Some(ref handle) = self.bridge_handle {
handle.stop();
}
self.bridge_handle = None;
self.conf_id = None;
}
}
impl Default for SessionConferenceBridge {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::call::domain::LegId;
use crate::media::conference_mixer::AudioFrame;
use rustrtc::media::MediaSample;
struct MockAudioSender {
samples: std::sync::Arc<tokio::sync::Mutex<Vec<MediaSample>>>,
}
impl MockAudioSender {
fn new() -> Self {
Self {
samples: std::sync::Arc::new(tokio::sync::Mutex::new(Vec::new())),
}
}
async fn get_samples(&self) -> Vec<MediaSample> {
self.samples.lock().await.clone()
}
}
impl AudioSender for MockAudioSender {
async fn send(
&self,
sample: rustrtc::media::MediaSample,
) -> Result<(), mpsc::error::SendError<rustrtc::media::MediaSample>> {
self.samples.lock().await.push(sample);
Ok(())
}
}
struct MockAudioReceiver {
frames: Vec<PcmAudioFrame>,
index: usize,
}
impl MockAudioReceiver {
fn new(frames: Vec<PcmAudioFrame>) -> Self {
Self { frames, index: 0 }
}
}
impl AudioReceiver for MockAudioReceiver {
fn recv(
&mut self,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<PcmAudioFrame>> + Send + '_>>
{
Box::pin(async move {
if self.index < self.frames.len() {
let frame = self.frames[self.index].clone();
self.index += 1;
Some(frame)
} else {
None
}
})
}
}
#[tokio::test]
async fn test_conference_bridge_creation() {
let conf_mgr = Arc::new(ConferenceManager::new());
let _bridge = ConferenceMediaBridge::new(conf_mgr);
}
#[tokio::test]
async fn test_start_bridge_requires_output_rx() {
let conf_mgr = Arc::new(ConferenceManager::new());
let bridge = ConferenceMediaBridge::new(conf_mgr);
let leg_id = LegId::new("test-leg");
let (tx, _rx) = tokio::sync::mpsc::channel(100);
let result = bridge
.start_bridge("conf-1", &leg_id, tx, audio_codec::CodecType::PCMU)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_bridge_handle_stop() {
let conf_mgr = Arc::new(ConferenceManager::new());
let bridge = ConferenceMediaBridge::new(conf_mgr);
let leg_id = LegId::new("test-leg");
let (tx, _rx) = tokio::sync::mpsc::channel(100);
if let Ok(handle) = bridge
.start_bridge("conf-1", &leg_id, tx, audio_codec::CodecType::PCMU)
.await
{
handle.stop();
}
}
#[tokio::test]
async fn test_forward_loop_audio_encoding() {
let (tx, rx) = tokio::sync::mpsc::channel(10);
let sender = MockAudioSender::new();
let sender_clone = MockAudioSender {
samples: sender.samples.clone(),
};
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::forward_loop(
rx,
sender_clone,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
audio_codec::CodecType::PCMU,
)
.await;
});
let samples: Vec<i16> = (0..160).map(|i| (i as i16 * 100) % 32767).collect();
tx.send(AudioFrame {
sample_rate: 8000,
samples: samples.clone(),
timestamp: 0,
})
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
let sent = sender.get_samples().await;
assert!(
!sent.is_empty(),
"Expected audio samples to be sent after encoding"
);
match &sent[0] {
MediaSample::Audio(frame) => {
assert_eq!(frame.clock_rate, 8000);
assert_eq!(frame.payload_type, Some(0)); assert!(!frame.data.is_empty());
}
_ => panic!("Expected Audio sample, got {:?}", sent[0]),
}
}
#[tokio::test]
async fn test_forward_loop_resampling() {
let (tx, rx) = tokio::sync::mpsc::channel(10);
let sender = MockAudioSender::new();
let sender_clone = MockAudioSender {
samples: sender.samples.clone(),
};
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::forward_loop(
rx,
sender_clone,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
audio_codec::CodecType::PCMU,
)
.await;
});
let samples: Vec<i16> = (0..320).map(|i| (i as i16 * 50) % 32767).collect();
tx.send(AudioFrame {
sample_rate: 16000,
samples,
timestamp: 0,
})
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
let sent = sender.get_samples().await;
assert!(!sent.is_empty(), "Expected resampled audio to be sent");
}
#[tokio::test]
async fn test_forward_loop_sequence_increment() {
let (tx, rx) = tokio::sync::mpsc::channel(10);
let sender = MockAudioSender::new();
let sender_clone = MockAudioSender {
samples: sender.samples.clone(),
};
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::forward_loop(
rx,
sender_clone,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
audio_codec::CodecType::PCMU,
)
.await;
});
for _ in 0..2 {
let samples: Vec<i16> = (0..160).map(|i| (i as i16 * 100) % 32767).collect();
tx.send(AudioFrame {
sample_rate: 8000,
samples,
timestamp: 0,
})
.await
.unwrap();
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
let sent = sender.get_samples().await;
assert!(sent.len() >= 2, "Expected at least 2 audio packets");
let seq1 = match &sent[0] {
MediaSample::Audio(f) => f.sequence_number.unwrap(),
_ => panic!("Expected Audio"),
};
let seq2 = match &sent[1] {
MediaSample::Audio(f) => f.sequence_number.unwrap(),
_ => panic!("Expected Audio"),
};
assert_eq!(
seq2,
seq1.wrapping_add(1),
"Sequence numbers should increment by 1"
);
}
#[tokio::test]
async fn test_reverse_loop_sends_to_mixer() {
let (input_tx, mut input_rx) = tokio::sync::mpsc::channel(10);
let pcm_frames = vec![
PcmAudioFrame::new(vec![1000i16; 160], 8000),
PcmAudioFrame::new(vec![2000i16; 160], 8000),
];
let receiver = MockAudioReceiver::new(pcm_frames);
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::reverse_loop(
Box::new(receiver),
input_tx,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
8000,
)
.await;
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
let mut received_count = 0;
while let Ok(frame) = input_rx.try_recv() {
received_count += 1;
assert_eq!(frame.samples.len(), 160);
assert_eq!(frame.sample_rate, 8000);
}
assert_eq!(
received_count, 2,
"Expected 2 frames to be sent to mixer input"
);
}
#[tokio::test]
async fn test_full_duplex_bridge() {
let conf_mgr = Arc::new(ConferenceManager::new());
let bridge = ConferenceMediaBridge::new(conf_mgr.clone());
conf_mgr
.create_conference("conf-1".into(), None)
.await
.unwrap();
let leg_id = LegId::new("test-leg");
let sender = MockAudioSender::new();
let receiver = MockAudioReceiver::new(vec![PcmAudioFrame::new(vec![1000i16; 160], 8000)]);
let handle = bridge
.start_bridge_full_duplex(
"conf-1",
&leg_id,
sender,
Box::new(receiver),
audio_codec::CodecType::PCMU,
)
.await;
assert!(
handle.is_ok(),
"Full-duplex bridge should start successfully"
);
let handle = handle.unwrap();
handle.stop();
}
#[tokio::test]
async fn test_session_conference_bridge_lifecycle() {
let mut session_bridge = SessionConferenceBridge::new();
assert!(!session_bridge.is_active());
session_bridge.conf_id = Some("conf-1".to_string());
session_bridge.stop_bridge();
assert!(!session_bridge.is_active());
assert!(session_bridge.conf_id.is_none());
}
#[tokio::test]
async fn test_forward_loop_cancel_immediately() {
let (_tx, rx) = tokio::sync::mpsc::channel::<AudioFrame>(10);
let sender = MockAudioSender::new();
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::forward_loop(
rx,
sender,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
audio_codec::CodecType::PCMU,
)
.await;
});
cancel.cancel();
let result = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
assert!(result.is_ok(), "Forward loop should exit cleanly on cancel");
}
#[tokio::test]
async fn test_reverse_loop_resamples_opus_48khz_to_mixer_8khz() {
let (input_tx, mut input_rx) = tokio::sync::mpsc::channel(10);
let opus_frame = PcmAudioFrame::new(vec![1000i16; 960], 48000);
let receiver = MockAudioReceiver::new(vec![opus_frame]);
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::reverse_loop(
Box::new(receiver),
input_tx,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
8000,
)
.await;
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
let frame = input_rx
.try_recv()
.expect("Should receive resampled frame from reverse loop");
assert_eq!(
frame.sample_rate, 8000,
"Opus 48kHz PCM should be resampled to mixer rate 8000"
);
assert_eq!(
frame.samples.len(),
160,
"960 samples at 48kHz should become 160 at 8kHz (20ms)"
);
}
#[tokio::test]
async fn test_reverse_loop_passthrough_when_rate_matches() {
let (input_tx, mut input_rx) = tokio::sync::mpsc::channel(10);
let pcm_frame = PcmAudioFrame::new(vec![500i16; 160], 8000);
let receiver = MockAudioReceiver::new(vec![pcm_frame]);
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::reverse_loop(
Box::new(receiver),
input_tx,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
8000,
)
.await;
});
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
let frame = input_rx
.try_recv()
.expect("Should receive passthrough frame");
assert_eq!(frame.sample_rate, 8000);
assert_eq!(frame.samples.len(), 160);
assert_eq!(frame.samples[0], 500);
}
#[tokio::test]
async fn test_forward_loop_g722_uses_encoder_sample_rate() {
let (tx, rx) = tokio::sync::mpsc::channel(10);
let sender = MockAudioSender::new();
let sender_for_loop = MockAudioSender {
samples: sender.samples.clone(),
};
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::forward_loop(
rx,
sender_for_loop,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
audio_codec::CodecType::G722,
)
.await;
});
let samples: Vec<i16> = (0..160).map(|i| (i as i16 * 100) % 32767).collect();
tx.send(AudioFrame {
sample_rate: 8000,
samples: samples.clone(),
timestamp: 0,
})
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
let sent = sender.get_samples().await;
assert!(!sent.is_empty(), "Expected G.722 audio to be sent");
match &sent[0] {
MediaSample::Audio(frame) => {
assert_eq!(
frame.clock_rate, 8000,
"G.722 RTP clock rate should be 8000"
);
assert_eq!(frame.payload_type, Some(9)); assert!(!frame.data.is_empty(), "G.722 payload should not be empty");
}
_ => panic!("Expected Audio sample"),
}
}
#[tokio::test]
async fn test_forward_loop_g722_resamples_from_mixer_8khz() {
let (tx, rx) = tokio::sync::mpsc::channel(10);
let sender = MockAudioSender::new();
let sender_for_loop = MockAudioSender {
samples: sender.samples.clone(),
};
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let handle = crate::utils::spawn(async move {
ConferenceMediaBridge::forward_loop(
rx,
sender_for_loop,
LegId::new("test-leg"),
"conf-1".to_string(),
cancel_clone,
audio_codec::CodecType::G722,
)
.await;
});
let samples: Vec<i16> = vec![1000i16; 160];
tx.send(AudioFrame {
sample_rate: 8000,
samples,
timestamp: 0,
})
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
cancel.cancel();
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await;
let sent = sender.get_samples().await;
assert!(!sent.is_empty(), "Expected G.722 encoded audio");
match &sent[0] {
MediaSample::Audio(frame) => {
assert!(!frame.data.is_empty(), "G.722 payload should not be empty");
}
_ => panic!("Expected Audio sample"),
}
}
}