use super::*;
use rustrtc::TransportMode;
use rustrtc::media::MediaStreamTrack as _;
use rustrtc::{MediaKind, PeerConnection, RtcConfiguration, TransceiverDirection};
use tokio::fs;
async fn create_test_wav_file(path: &str) -> Result<()> {
create_test_wav_file_with_samples(path, 0).await
}
async fn create_test_wav_file_with_samples(path: &str, num_samples: usize) -> Result<()> {
let spec = crate::media::wav_reader::WavSpec {
channels: 1,
sample_rate: 8000,
bits_per_sample: 16,
sample_format: crate::media::wav_reader::SampleFormat::Int,
};
let mut writer =
crate::media::wav_reader::WavWriter::create(path, spec).map_err(|e| anyhow::anyhow!("WavWriter: {e}"))?;
for i in 0..num_samples {
let sample = ((i as f32 / 8.0).sin() * 1000.0) as i16;
writer
.write_sample(sample)
.map_err(|e| anyhow::anyhow!("write_sample: {e}"))?;
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("finalize: {e}"))?;
Ok(())
}
#[tokio::test]
async fn test_file_track_creation() {
let track = FileTrack::new("test-track".to_string())
.with_path("/tmp/test.wav".to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU, CodecType::PCMA]);
assert_eq!(track.id(), "test-track");
assert_eq!(track.file_path, Some("/tmp/test.wav".to_string()));
assert!(!track.loop_playback);
assert_eq!(
track.codec_preference,
vec![CodecType::PCMU, CodecType::PCMA]
);
}
#[tokio::test]
async fn test_file_track_with_ssrc_compatibility() {
let track = FileTrack::new("test".to_string())
.with_path("/tmp/test.wav".to_string())
.with_ssrc(12345);
assert_eq!(track.id(), "test");
}
#[tokio::test]
async fn test_file_track_generates_sdp() {
let track = FileTrack::new("test-sdp".to_string())
.with_path("/tmp/test.wav".to_string())
.with_codec_preference(vec![CodecType::PCMU]);
let sdp = track.local_description().await;
assert!(sdp.is_ok(), "Should generate SDP successfully");
let sdp_string = sdp.unwrap();
assert!(
sdp_string.contains("a=rtpmap:0 PCMU/8000"),
"SDP should contain PCMU codec, got: {}",
sdp_string
);
}
#[tokio::test]
async fn test_file_track_opus_codec_sdp() {
let track = FileTrack::new("test-opus".to_string())
.with_path("/tmp/test.wav".to_string())
.with_codec_preference(vec![CodecType::Opus]);
let sdp = track.local_description().await.unwrap();
assert!(
sdp.contains("a=rtpmap:111 opus/48000/2"),
"SDP should contain Opus codec"
);
assert!(
sdp.contains("a=fmtp:111 minptime=10;useinbandfec=1"),
"SDP should contain Opus fmtp parameters"
);
}
#[tokio::test]
async fn test_file_track_start_playback_uses_codec_info_payload_type() {
let path = "/tmp/rustpbx-file-track-dynamic-pt.wav";
create_test_wav_file_with_samples(path, 160).await.unwrap();
let pc = PeerConnection::new(RtcConfiguration::default());
pc.add_transceiver(MediaKind::Audio, TransceiverDirection::SendOnly);
let track = FileTrack::new("test-dynamic-pt".to_string())
.with_path(path.to_string())
.with_codec_info(negotiate::CodecInfo {
payload_type: 96,
codec: CodecType::PCMU,
clock_rate: 8000,
channels: 1,
});
track.start_playback_on(Some(pc.clone())).await.unwrap();
let sender = pc
.get_transceivers()
.into_iter()
.find(|t| t.kind() == MediaKind::Audio)
.and_then(|t| t.sender())
.expect("audio sender should be installed");
assert_eq!(sender.params().payload_type, 96);
let _ = fs::remove_file(path).await;
}
#[test]
fn test_audio_frame_timing_g722_splits_pcm_rate_from_rtp_clock() {
let timing = audio_frame_timing(CodecType::G722, 8000);
assert_eq!(
timing.pcm_sample_rate, 16000,
"G722 audio processing should use 16 kHz PCM"
);
assert_eq!(
timing.pcm_samples_per_frame, 320,
"20 ms of 16 kHz PCM should read 320 samples"
);
assert_eq!(
timing.rtp_ticks_per_frame, 160,
"20 ms of G722 RTP should advance timestamps by 160 ticks at 8 kHz"
);
}
#[tokio::test]
async fn test_file_track_multiple_codecs() {
let track = FileTrack::new("test-multi".to_string())
.with_path("/tmp/test.wav".to_string())
.with_codec_preference(vec![CodecType::PCMU, CodecType::PCMA, CodecType::Opus]);
let sdp = track.local_description().await.unwrap();
assert!(sdp.contains("PCMU/8000"), "Should contain PCMU");
assert!(sdp.contains("PCMA/8000"), "Should contain PCMA");
assert!(sdp.contains("opus/48000/2"), "Should contain Opus");
}
#[tokio::test]
async fn test_file_track_playback_starts() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_playback.wav");
create_test_wav_file(test_file.to_str().unwrap())
.await
.unwrap();
let track = FileTrack::new("playback-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false);
let track_mut = track.clone();
let _ = track_mut.local_description().await;
let result = track.start_playback().await;
assert!(result.is_ok(), "Playback should start successfully");
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_playback_completion_accurate() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_completion_accurate.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 160)
.await
.unwrap();
let (end_tx, mut end_rx) = tokio::sync::mpsc::unbounded_channel();
let track = FileTrack::new("completion-accurate".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU])
.with_on_end(std::sync::Arc::new(move |reason| {
let _ = end_tx.send(reason);
}));
let _ = track.local_description().await;
track.start_playback().await.unwrap();
let result = tokio::time::timeout(tokio::time::Duration::from_secs(2), end_rx.recv()).await;
assert!(
matches!(result, Ok(Some(PlaybackEndReason::Completed))),
"Playback on_end must fire Completed when the file is exhausted (not after a fixed sleep)"
);
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_playback_completion() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_completion.wav");
create_test_wav_file(test_file.to_str().unwrap())
.await
.unwrap();
let (end_tx, mut end_rx) = tokio::sync::mpsc::unbounded_channel();
let track = FileTrack::new("completion-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_on_end(std::sync::Arc::new(move |reason| {
let _ = end_tx.send(reason);
}));
let _ = track.local_description().await;
track.start_playback().await.unwrap();
let completion =
tokio::time::timeout(tokio::time::Duration::from_millis(500), end_rx.recv()).await;
assert!(
matches!(completion, Ok(Some(PlaybackEndReason::Completed))),
"Playback should complete"
);
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_cancel_stops_rtp() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_cancel_rtp.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 8000)
.await
.unwrap();
let cancel_token = CancellationToken::new();
let (end_tx, mut end_rx) = tokio::sync::mpsc::unbounded_channel();
let track = FileTrack::new("cancel-rtp".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(true) .with_cancel_token(cancel_token.clone())
.with_codec_preference(vec![CodecType::PCMU])
.with_on_end(std::sync::Arc::new(move |reason| {
let _ = end_tx.send(reason);
}));
let _ = track.local_description().await;
track.start_playback().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
track.stop().await;
let result = tokio::time::timeout(tokio::time::Duration::from_millis(500), end_rx.recv()).await;
assert!(
matches!(result, Ok(Some(PlaybackEndReason::Interrupted))),
"on_end must fire Interrupted after stop() cancels the playback task"
);
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_looping_playback() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_loop.wav");
create_test_wav_file(test_file.to_str().unwrap())
.await
.unwrap();
let cancel_token = CancellationToken::new();
let track = FileTrack::new("loop-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(true)
.with_cancel_token(cancel_token.clone());
let track_mut = track.clone();
let _ = track_mut.local_description().await;
track.start_playback().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
track.stop().await;
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_missing_file_error() {
let track = FileTrack::new("missing-file".to_string())
.with_path("/nonexistent/path/to/file.wav".to_string());
let track_mut = track.clone();
let _ = track_mut.local_description().await;
let result = track.start_playback().await;
assert!(result.is_err(), "Should error on missing file");
assert!(result.unwrap_err().to_string().contains("not found"));
}
#[tokio::test]
async fn test_file_track_set_remote_description() {
let track =
FileTrack::new("remote-desc-test".to_string()).with_path("/tmp/test.wav".to_string());
let _ = track.local_description().await.unwrap();
let answer_sdp = r#"v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
t=0 0
m=audio 9 RTP/AVP 0
a=rtpmap:0 PCMU/8000
"#;
let result = track.set_remote_description(answer_sdp).await;
assert!(result.is_ok(), "Should set remote description successfully");
}
#[tokio::test]
async fn test_file_track_completion_drives_audio_complete_event() {
use crate::call::app::ControllerEvent;
use tokio::sync::mpsc;
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_e2e_audio_complete.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 160)
.await
.unwrap();
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<ControllerEvent>();
let track = FileTrack::new("e2e-audio-complete".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU])
.with_on_end(std::sync::Arc::new(move |reason| {
let _ = event_tx.send(ControllerEvent::AudioComplete {
track_id: "default".to_string(),
interrupted: matches!(reason, PlaybackEndReason::Interrupted),
});
}));
let _ = track.local_description().await;
track.start_playback().await.unwrap();
let event = tokio::time::timeout(tokio::time::Duration::from_secs(2), event_rx.recv())
.await
.expect("AudioComplete must arrive within 2 s")
.expect("channel must not be closed");
match event {
ControllerEvent::AudioComplete {
interrupted,
track_id,
} => {
assert!(
!interrupted,
"File-exhausted playback must not be interrupted"
);
assert_eq!(track_id, "default");
}
other => panic!("unexpected event: {:?}", other),
}
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_stop_drives_interrupted_audio_complete() {
use crate::call::app::ControllerEvent;
use tokio::sync::mpsc;
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_e2e_interrupted.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 8000)
.await
.unwrap();
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<ControllerEvent>();
let cancel_token = CancellationToken::new();
let track = FileTrack::new("e2e-interrupted".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(true)
.with_cancel_token(cancel_token.clone())
.with_codec_preference(vec![CodecType::PCMU])
.with_on_end(std::sync::Arc::new(move |reason| {
let _ = event_tx.send(ControllerEvent::AudioComplete {
track_id: "default".to_string(),
interrupted: matches!(reason, PlaybackEndReason::Interrupted),
});
}));
let _ = track.local_description().await;
track.start_playback().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
track.stop().await;
let event = tokio::time::timeout(tokio::time::Duration::from_millis(500), event_rx.recv())
.await
.expect("AudioComplete must arrive within 500 ms after stop()")
.expect("channel must not be closed");
assert!(
matches!(
event,
ControllerEvent::AudioComplete {
interrupted: true,
..
}
),
"Expected interrupted AudioComplete after stop(), got: {:?}",
event
);
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_start_playback_on_external_pc_delivers_rtp() {
use rustrtc::media::MediaSample;
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_external_pc_rtp.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 800)
.await
.unwrap();
let caller_track = RtpTrackBuilder::new("caller".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(17000, 17100)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let receiver_track = RtpTrackBuilder::new("receiver".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(17100, 17200)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let caller_offer = caller_track.local_description().await.unwrap();
let receiver_answer = receiver_track.handshake(caller_offer).await.unwrap();
caller_track
.set_remote_description(&receiver_answer)
.await
.unwrap();
let caller_pc = caller_track.get_peer_connection().await.unwrap();
let file_track = FileTrack::new("rtp-delivery-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU]);
file_track.start_playback_on(Some(caller_pc)).await.unwrap();
let receiver_pc = receiver_track.get_peer_connection().await.unwrap();
let received = tokio::time::timeout(tokio::time::Duration::from_secs(2), async {
let mut frame_count: u64 = 0;
for transceiver in receiver_pc.get_transceivers() {
if let Some(receiver) = transceiver.receiver() {
let track = receiver.track();
for _ in 0..10 {
match tokio::time::timeout(
tokio::time::Duration::from_millis(200),
track.recv(),
)
.await
{
Ok(Ok(MediaSample::Audio(frame))) => {
frame_count += 1;
debug!(
rtp_ts = frame.rtp_timestamp,
data_len = frame.data.len(),
"Receiver got RTP frame"
);
if frame_count >= 3 {
return frame_count;
}
}
Ok(Ok(_)) => continue,
Ok(Err(_)) => break,
Err(_) => continue, }
}
}
}
let mut pc_recv = Box::pin(receiver_pc.recv());
loop {
match tokio::time::timeout(tokio::time::Duration::from_millis(500), &mut pc_recv).await
{
Ok(Some(rustrtc::PeerConnectionEvent::Track(transceiver))) => {
if let Some(receiver) = transceiver.receiver() {
let track = receiver.track();
for _ in 0..10 {
match tokio::time::timeout(
tokio::time::Duration::from_millis(200),
track.recv(),
)
.await
{
Ok(Ok(MediaSample::Audio(frame))) => {
frame_count += 1;
debug!(
rtp_ts = frame.rtp_timestamp,
data_len = frame.data.len(),
"Receiver got RTP frame (from Track event)"
);
if frame_count >= 3 {
return frame_count;
}
}
Ok(Ok(_)) => continue,
Ok(Err(_)) => break,
Err(_) => continue,
}
}
}
pc_recv = Box::pin(receiver_pc.recv());
}
Ok(Some(_)) => {
pc_recv = Box::pin(receiver_pc.recv());
}
Ok(None) => break,
Err(_) => break, }
}
frame_count
})
.await;
let frame_count = received.expect("Timed out waiting for RTP frames on receiver PC");
assert!(
frame_count >= 1,
"Expected at least 1 RTP frame on the receiver PC, got {}. \
This means start_playback_on(caller_pc) did not send frames \
through the caller's negotiated transport.",
frame_count
);
debug!(
frame_count,
"test_start_playback_on_external_pc_delivers_rtp passed"
);
file_track.stop().await;
caller_track.stop().await;
receiver_track.stop().await;
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_start_playback_on_none_backward_compatible() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_playback_on_none.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 160)
.await
.unwrap();
let (end_tx, mut end_rx) = tokio::sync::mpsc::unbounded_channel();
let track = FileTrack::new("compat-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU])
.with_on_end(std::sync::Arc::new(move |reason| {
let _ = end_tx.send(reason);
}));
let _ = track.local_description().await;
track.start_playback_on(None).await.unwrap();
let result = tokio::time::timeout(tokio::time::Duration::from_secs(2), end_rx.recv()).await;
assert!(
matches!(result, Ok(Some(PlaybackEndReason::Completed))),
"start_playback_on(None) must fire on_end just like start_playback()"
);
let _ = fs::remove_file(&test_file).await;
}