use super::*;
use rustrtc::media::MediaStreamTrack as _;
use rustrtc::TransportMode;
use tokio::fs;
async fn create_test_wav_file(path: &str) -> Result<()> {
create_test_wav_file_with_samples(path, 0).await
}
async fn create_test_wav_file_with_samples(path: &str, num_samples: usize) -> Result<()> {
let spec = hound::WavSpec {
channels: 1,
sample_rate: 8000,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer = hound::WavWriter::create(path, spec)
.map_err(|e| anyhow::anyhow!("WavWriter: {e}"))?;
for i in 0..num_samples {
let sample = ((i as f32 / 8.0).sin() * 1000.0) as i16;
writer
.write_sample(sample)
.map_err(|e| anyhow::anyhow!("write_sample: {e}"))?;
}
writer
.finalize()
.map_err(|e| anyhow::anyhow!("finalize: {e}"))?;
Ok(())
}
#[tokio::test]
async fn test_file_track_creation() {
let track = FileTrack::new("test-track".to_string())
.with_path("/tmp/test.wav".to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU, CodecType::PCMA]);
assert_eq!(track.id(), "test-track");
assert_eq!(track.file_path, Some("/tmp/test.wav".to_string()));
assert_eq!(track.loop_playback, false);
assert_eq!(
track.codec_preference,
vec![CodecType::PCMU, CodecType::PCMA]
);
}
#[tokio::test]
async fn test_file_track_with_ssrc_compatibility() {
let track = FileTrack::new("test".to_string())
.with_path("/tmp/test.wav".to_string())
.with_ssrc(12345);
assert_eq!(track.id(), "test");
}
#[tokio::test]
async fn test_file_track_generates_sdp() {
let track = FileTrack::new("test-sdp".to_string())
.with_path("/tmp/test.wav".to_string())
.with_codec_preference(vec![CodecType::PCMU]);
let sdp = track.local_description().await;
assert!(sdp.is_ok(), "Should generate SDP successfully");
let sdp_string = sdp.unwrap();
assert!(
sdp_string.contains("a=rtpmap:0 PCMU/8000"),
"SDP should contain PCMU codec, got: {}",
sdp_string
);
}
#[tokio::test]
async fn test_file_track_opus_codec_sdp() {
let track = FileTrack::new("test-opus".to_string())
.with_path("/tmp/test.wav".to_string())
.with_codec_preference(vec![CodecType::Opus]);
let sdp = track.local_description().await.unwrap();
assert!(
sdp.contains("a=rtpmap:111 opus/48000/2"),
"SDP should contain Opus codec"
);
assert!(
sdp.contains("a=fmtp:111 minptime=10;useinbandfec=1"),
"SDP should contain Opus fmtp parameters"
);
}
#[test]
fn test_audio_frame_timing_g722_splits_pcm_rate_from_rtp_clock() {
let timing = audio_frame_timing(CodecType::G722, 8000);
assert_eq!(
timing.pcm_sample_rate, 16000,
"G722 audio processing should use 16 kHz PCM"
);
assert_eq!(
timing.pcm_samples_per_frame, 320,
"20 ms of 16 kHz PCM should read 320 samples"
);
assert_eq!(
timing.rtp_ticks_per_frame, 160,
"20 ms of G722 RTP should advance timestamps by 160 ticks at 8 kHz"
);
}
#[tokio::test]
async fn test_file_track_multiple_codecs() {
let track = FileTrack::new("test-multi".to_string())
.with_path("/tmp/test.wav".to_string())
.with_codec_preference(vec![CodecType::PCMU, CodecType::PCMA, CodecType::Opus]);
let sdp = track.local_description().await.unwrap();
assert!(sdp.contains("PCMU/8000"), "Should contain PCMU");
assert!(sdp.contains("PCMA/8000"), "Should contain PCMA");
assert!(sdp.contains("opus/48000/2"), "Should contain Opus");
}
#[tokio::test]
async fn test_file_track_playback_starts() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_playback.wav");
create_test_wav_file(test_file.to_str().unwrap())
.await
.unwrap();
let track = FileTrack::new("playback-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false);
let track_mut = track.clone();
let _ = track_mut.local_description().await;
let result = track.start_playback().await;
assert!(result.is_ok(), "Playback should start successfully");
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_playback_completion_accurate() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_completion_accurate.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 160)
.await
.unwrap();
let track = FileTrack::new("completion-accurate".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU]);
let _ = track.local_description().await;
track.start_playback().await.unwrap();
let result = tokio::time::timeout(
tokio::time::Duration::from_secs(2),
track.wait_for_completion(),
)
.await;
assert!(
result.is_ok(),
"Playback completion notify must fire when the file is exhausted (not after a fixed sleep)"
);
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_playback_completion() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_completion.wav");
create_test_wav_file(test_file.to_str().unwrap())
.await
.unwrap();
let track = FileTrack::new("completion-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false);
let _ = track.local_description().await;
track.start_playback().await.unwrap();
let completion = tokio::time::timeout(
tokio::time::Duration::from_millis(500),
track.wait_for_completion(),
)
.await;
assert!(completion.is_ok(), "Playback should complete");
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_cancel_stops_rtp() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_cancel_rtp.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 8000)
.await
.unwrap();
let cancel_token = CancellationToken::new();
let track = FileTrack::new("cancel-rtp".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(true) .with_cancel_token(cancel_token.clone())
.with_codec_preference(vec![CodecType::PCMU]);
let _ = track.local_description().await;
track.start_playback().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
track.stop().await;
let result = tokio::time::timeout(
tokio::time::Duration::from_millis(500),
track.wait_for_completion(),
)
.await;
assert!(
result.is_ok(),
"completion_notify must fire after stop() cancels the playback task"
);
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_looping_playback() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_loop.wav");
create_test_wav_file(test_file.to_str().unwrap())
.await
.unwrap();
let cancel_token = CancellationToken::new();
let track = FileTrack::new("loop-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(true)
.with_cancel_token(cancel_token.clone());
let track_mut = track.clone();
let _ = track_mut.local_description().await;
track.start_playback().await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
track.stop().await;
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_missing_file_error() {
let track = FileTrack::new("missing-file".to_string())
.with_path("/nonexistent/path/to/file.wav".to_string());
let track_mut = track.clone();
let _ = track_mut.local_description().await;
let result = track.start_playback().await;
assert!(result.is_err(), "Should error on missing file");
assert!(result.unwrap_err().to_string().contains("not found"));
}
#[tokio::test]
async fn test_file_track_set_remote_description() {
let track =
FileTrack::new("remote-desc-test".to_string()).with_path("/tmp/test.wav".to_string());
let _ = track.local_description().await.unwrap();
let answer_sdp = r#"v=0
o=- 0 0 IN IP4 127.0.0.1
s=-
t=0 0
m=audio 9 RTP/AVP 0
a=rtpmap:0 PCMU/8000
"#;
let result = track.set_remote_description(answer_sdp).await;
assert!(result.is_ok(), "Should set remote description successfully");
}
#[tokio::test]
async fn test_file_track_completion_drives_audio_complete_event() {
use crate::call::app::ControllerEvent;
use tokio::sync::mpsc;
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_e2e_audio_complete.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 160)
.await
.unwrap();
let track = FileTrack::new("e2e-audio-complete".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU]);
let _ = track.local_description().await;
track.start_playback().await.unwrap();
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<ControllerEvent>();
let completion_notify = track.completion_notify.clone();
let cancel = CancellationToken::new();
let cancel_child = cancel.child_token();
let fp = test_file.to_string_lossy().to_string();
crate::utils::spawn(async move {
tokio::select! {
_ = completion_notify.notified() => {
let _ = event_tx.send(ControllerEvent::AudioComplete {
track_id: "default".to_string(),
interrupted: false,
});
}
_ = cancel_child.cancelled() => {
let _ = event_tx.send(ControllerEvent::AudioComplete {
track_id: "default".to_string(),
interrupted: true,
});
}
}
drop(fp);
});
let event = tokio::time::timeout(
tokio::time::Duration::from_secs(2),
event_rx.recv(),
)
.await
.expect("AudioComplete must arrive within 2 s")
.expect("channel must not be closed");
match event {
ControllerEvent::AudioComplete { interrupted, track_id } => {
assert!(!interrupted, "File-exhausted playback must not be interrupted");
assert_eq!(track_id, "default");
}
other => panic!("unexpected event: {:?}", other),
}
cancel.cancel();
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_file_track_stop_drives_interrupted_audio_complete() {
use crate::call::app::ControllerEvent;
use tokio::sync::mpsc;
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_e2e_interrupted.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 8000)
.await
.unwrap();
let cancel_token = CancellationToken::new();
let track = FileTrack::new("e2e-interrupted".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(true)
.with_cancel_token(cancel_token.clone())
.with_codec_preference(vec![CodecType::PCMU]);
let _ = track.local_description().await;
track.start_playback().await.unwrap();
let (event_tx, mut event_rx) = mpsc::unbounded_channel::<ControllerEvent>();
let completion_notify = track.completion_notify.clone();
let session_cancel = CancellationToken::new();
let session_child = session_cancel.child_token();
crate::utils::spawn(async move {
tokio::select! {
_ = completion_notify.notified() => {
let _ = event_tx.send(ControllerEvent::AudioComplete {
track_id: "default".to_string(),
interrupted: false,
});
}
_ = session_child.cancelled() => {
let _ = event_tx.send(ControllerEvent::AudioComplete {
track_id: "default".to_string(),
interrupted: true,
});
}
}
});
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
track.stop().await;
let event = tokio::time::timeout(
tokio::time::Duration::from_millis(500),
event_rx.recv(),
)
.await
.expect("AudioComplete must arrive within 500 ms after stop()")
.expect("channel must not be closed");
assert!(
matches!(event, ControllerEvent::AudioComplete { .. }),
"Expected AudioComplete after stop(), got: {:?}",
event
);
session_cancel.cancel();
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_start_playback_on_external_pc_delivers_rtp() {
use rustrtc::media::MediaSample;
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_external_pc_rtp.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 800)
.await
.unwrap();
let caller_track = RtpTrackBuilder::new("caller".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(17000, 17100)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let receiver_track = RtpTrackBuilder::new("receiver".to_string())
.with_mode(TransportMode::Rtp)
.with_rtp_range(17100, 17200)
.with_codec_preference(vec![CodecType::PCMU])
.build();
let caller_offer = caller_track.local_description().await.unwrap();
let receiver_answer = receiver_track.handshake(caller_offer).await.unwrap();
caller_track
.set_remote_description(&receiver_answer)
.await
.unwrap();
let caller_pc = caller_track.get_peer_connection().await.unwrap();
let file_track = FileTrack::new("rtp-delivery-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU]);
file_track
.start_playback_on(Some(caller_pc))
.await
.unwrap();
let receiver_pc = receiver_track.get_peer_connection().await.unwrap();
let received = tokio::time::timeout(tokio::time::Duration::from_secs(2), async {
let mut frame_count: u64 = 0;
for transceiver in receiver_pc.get_transceivers() {
if let Some(receiver) = transceiver.receiver() {
let track = receiver.track();
for _ in 0..10 {
match tokio::time::timeout(
tokio::time::Duration::from_millis(200),
track.recv(),
)
.await
{
Ok(Ok(MediaSample::Audio(frame))) => {
frame_count += 1;
debug!(
rtp_ts = frame.rtp_timestamp,
data_len = frame.data.len(),
"Receiver got RTP frame"
);
if frame_count >= 3 {
return frame_count;
}
}
Ok(Ok(_)) => continue,
Ok(Err(_)) => break,
Err(_) => continue, }
}
}
}
let mut pc_recv = Box::pin(receiver_pc.recv());
loop {
match tokio::time::timeout(tokio::time::Duration::from_millis(500), &mut pc_recv)
.await
{
Ok(Some(rustrtc::PeerConnectionEvent::Track(transceiver))) => {
if let Some(receiver) = transceiver.receiver() {
let track = receiver.track();
for _ in 0..10 {
match tokio::time::timeout(
tokio::time::Duration::from_millis(200),
track.recv(),
)
.await
{
Ok(Ok(MediaSample::Audio(frame))) => {
frame_count += 1;
debug!(
rtp_ts = frame.rtp_timestamp,
data_len = frame.data.len(),
"Receiver got RTP frame (from Track event)"
);
if frame_count >= 3 {
return frame_count;
}
}
Ok(Ok(_)) => continue,
Ok(Err(_)) => break,
Err(_) => continue,
}
}
}
pc_recv = Box::pin(receiver_pc.recv());
}
Ok(Some(_)) => {
pc_recv = Box::pin(receiver_pc.recv());
}
Ok(None) => break,
Err(_) => break, }
}
frame_count
})
.await;
let frame_count = received.expect("Timed out waiting for RTP frames on receiver PC");
assert!(
frame_count >= 1,
"Expected at least 1 RTP frame on the receiver PC, got {}. \
This means start_playback_on(caller_pc) did not send frames \
through the caller's negotiated transport.",
frame_count
);
debug!(frame_count, "test_start_playback_on_external_pc_delivers_rtp passed");
file_track.stop().await;
caller_track.stop().await;
receiver_track.stop().await;
let _ = fs::remove_file(&test_file).await;
}
#[tokio::test]
async fn test_start_playback_on_none_backward_compatible() {
let temp_dir = std::env::temp_dir();
let test_file = temp_dir.join("test_playback_on_none.wav");
create_test_wav_file_with_samples(test_file.to_str().unwrap(), 160)
.await
.unwrap();
let track = FileTrack::new("compat-test".to_string())
.with_path(test_file.to_string_lossy().to_string())
.with_loop(false)
.with_codec_preference(vec![CodecType::PCMU]);
let _ = track.local_description().await;
track.start_playback_on(None).await.unwrap();
let result = tokio::time::timeout(
tokio::time::Duration::from_secs(2),
track.wait_for_completion(),
)
.await;
assert!(
result.is_ok(),
"start_playback_on(None) must fire completion_notify just like start_playback()"
);
let _ = fs::remove_file(&test_file).await;
}