use std::net::Ipv4Addr;
use std::time::Instant;
use str0m::media::{Direction, MediaKind};
use str0m::rtp::{ExtensionValues, Ssrc};
use str0m::{Event, Rtc, RtcError};
mod common;
use common::{Peer, TestRtc, init_log, negotiate, progress};
use common::{connect_l_r_with_rtc, init_crypto_default};
#[test]
fn rtp_sequence_number_near_boundary() -> Result<(), RtcError> {
init_log();
init_crypto_default();
let now = Instant::now();
let rtc1 = Rtc::builder().set_rtp_mode(true).build(now);
let rtc2 = Rtc::builder().build(now);
let (mut l, mut r) = connect_l_r_with_rtc(rtc1, rtc2);
let mid = "audio".into();
let ssrc_tx: Ssrc = 1337.into();
l.direct_api().declare_media(mid, MediaKind::Audio);
l.direct_api().declare_stream_tx(ssrc_tx, None, mid, None);
r.direct_api().declare_media(mid, MediaKind::Audio);
let max = l.last.max(r.last);
l.last = max;
r.last = max;
let params = l.params_opus();
let pt = params.pt();
let ssrc = l.direct_api().stream_tx_by_mid(mid, None).unwrap().ssrc();
let start_seq: u64 = 65530;
let packet_count = 12;
for i in 0..packet_count {
let seq_no = start_seq + i;
let time = (i * 960) as u32; let wallclock = l.start + l.duration();
let exts = ExtensionValues::default();
let mut direct = l.direct_api();
let stream = direct.stream_tx(&ssrc).unwrap();
stream
.write_rtp(
pt,
seq_no.into(),
time,
wallclock,
false,
exts,
false,
vec![1_u8; 80],
)
.expect("write_rtp should succeed");
drop(direct);
progress(&mut l, &mut r)?;
}
for _ in 0..50 {
progress(&mut l, &mut r)?;
}
let received_seqs: Vec<u64> = r
.events
.iter()
.filter_map(|(_, e)| {
if let Event::MediaData(data) = e {
Some(**data.seq_range.start())
} else {
None
}
})
.collect();
assert!(
received_seqs.len() >= 10,
"Should receive most packets, got {}",
received_seqs.len()
);
let before_boundary: Vec<_> = received_seqs
.iter()
.filter(|&&s| s >= 65530 && s <= 65535)
.collect();
assert!(
!before_boundary.is_empty(),
"Should receive packets before boundary (65530-65535), got seq_nos: {:?}",
received_seqs
);
let after_boundary: Vec<_> = received_seqs.iter().filter(|&&s| s >= 65536).collect();
assert!(
!after_boundary.is_empty(),
"Should receive packets after boundary (65536+ = wire 0+), got seq_nos: {:?}",
received_seqs
);
for i in 1..received_seqs.len() {
assert!(
received_seqs[i] > received_seqs[i - 1],
"Sequence numbers should be monotonically increasing across boundary: {} should be > {}",
received_seqs[i],
received_seqs[i - 1]
);
}
Ok(())
}
#[test]
fn rtp_reordering_buffer_audio() -> Result<(), RtcError> {
init_log();
init_crypto_default();
let now = Instant::now();
let rtc1 = Rtc::builder().set_rtp_mode(true).build(now);
let rtc2 = Rtc::builder().set_reordering_size_audio(15).build(now);
let (mut l, mut r) = connect_l_r_with_rtc(rtc1, rtc2);
let mid = "audio".into();
let ssrc_tx: Ssrc = 1337.into();
l.direct_api().declare_media(mid, MediaKind::Audio);
l.direct_api().declare_stream_tx(ssrc_tx, None, mid, None);
r.direct_api().declare_media(mid, MediaKind::Audio);
let max = l.last.max(r.last);
l.last = max;
r.last = max;
let params = l.params_opus();
let pt = params.pt();
let ssrc = l.direct_api().stream_tx_by_mid(mid, None).unwrap().ssrc();
let send_order = [1u64, 3, 2, 5, 4, 7, 6, 9, 8, 10];
for &seq in &send_order {
let time = (seq * 960) as u32; let wallclock = l.start + l.duration();
let exts = ExtensionValues::default();
let mut direct = l.direct_api();
let stream = direct.stream_tx(&ssrc).unwrap();
stream
.write_rtp(
pt,
seq.into(),
time,
wallclock,
false,
exts,
false,
vec![seq as u8; 80], )
.expect("write_rtp should succeed");
drop(direct);
progress(&mut l, &mut r)?;
}
for _ in 0..50 {
progress(&mut l, &mut r)?;
}
let received_ranges: Vec<(u64, u64)> = r
.events
.iter()
.filter_map(|(_, e)| {
if let Event::MediaData(data) = e {
Some((**data.seq_range.start(), **data.seq_range.end()))
} else {
None
}
})
.collect();
assert!(!received_ranges.is_empty(), "Should receive some packets");
for i in 1..received_ranges.len() {
assert!(
received_ranges[i].0 >= received_ranges[i - 1].0,
"Packets should be reordered: range {:?} should come after {:?}, all ranges: {:?}",
received_ranges[i],
received_ranges[i - 1],
received_ranges
);
}
let min_received = received_ranges.iter().map(|r| r.0).min().unwrap();
let max_received = received_ranges.iter().map(|r| r.1).max().unwrap();
assert!(
min_received <= 2,
"Should receive early packets, min was {}",
min_received
);
assert!(
max_received >= 9,
"Should receive late packets, max was {}",
max_received
);
Ok(())
}
#[test]
fn rtp_reordering_buffer_video() -> Result<(), RtcError> {
init_log();
init_crypto_default();
let now = Instant::now();
let rtc1 = Rtc::builder().set_rtp_mode(true).build(now);
let rtc2 = Rtc::builder().set_reordering_size_video(30).build(now);
let (mut l, mut r) = connect_l_r_with_rtc(rtc1, rtc2);
let mid = "video".into();
let ssrc_tx: Ssrc = 1337.into();
l.direct_api().declare_media(mid, MediaKind::Video);
l.direct_api().declare_stream_tx(ssrc_tx, None, mid, None);
r.direct_api().declare_media(mid, MediaKind::Video);
let max = l.last.max(r.last);
l.last = max;
r.last = max;
let params = l.params_vp8();
let pt = params.pt();
let ssrc = l.direct_api().stream_tx_by_mid(mid, None).unwrap().ssrc();
let send_order = [1u64, 3, 2, 5, 4, 7, 6, 9, 8, 10];
for &seq in &send_order {
let time = (seq * 3000) as u32; let wallclock = l.start + l.duration();
let exts = ExtensionValues::default();
let mut direct = l.direct_api();
let stream = direct.stream_tx(&ssrc).unwrap();
stream
.write_rtp(
pt,
seq.into(),
time,
wallclock,
true, exts,
false,
vec![0x10, 0x00, 0x00, seq as u8],
)
.expect("write_rtp should succeed");
drop(direct);
progress(&mut l, &mut r)?;
}
for _ in 0..50 {
progress(&mut l, &mut r)?;
}
let received_ranges: Vec<(u64, u64)> = r
.events
.iter()
.filter_map(|(_, e)| {
if let Event::MediaData(data) = e {
Some((**data.seq_range.start(), **data.seq_range.end()))
} else {
None
}
})
.collect();
assert!(
!received_ranges.is_empty(),
"Should receive some video packets"
);
for i in 1..received_ranges.len() {
assert!(
received_ranges[i].0 >= received_ranges[i - 1].0,
"Video packets should be reordered: {:?} should come after {:?}",
received_ranges[i],
received_ranges[i - 1]
);
}
Ok(())
}
#[test]
fn rtp_reordering_buffer_custom_size() -> Result<(), RtcError> {
init_log();
init_crypto_default();
let now = Instant::now();
let rtc1 = Rtc::builder().set_rtp_mode(true).build(now);
let rtc2 = Rtc::builder().set_reordering_size_audio(5).build(now);
let (mut l, mut r) = connect_l_r_with_rtc(rtc1, rtc2);
let mid = "audio".into();
let ssrc_tx: Ssrc = 1337.into();
l.direct_api().declare_media(mid, MediaKind::Audio);
l.direct_api().declare_stream_tx(ssrc_tx, None, mid, None);
r.direct_api().declare_media(mid, MediaKind::Audio);
let max = l.last.max(r.last);
l.last = max;
r.last = max;
let params = l.params_opus();
let pt = params.pt();
let ssrc = l.direct_api().stream_tx_by_mid(mid, None).unwrap().ssrc();
let send_order = [1u64, 3, 2, 4, 6, 5, 7, 9, 8, 10];
for &seq in &send_order {
let time = (seq * 960) as u32;
let wallclock = l.start + l.duration();
let exts = ExtensionValues::default();
let mut direct = l.direct_api();
let stream = direct.stream_tx(&ssrc).unwrap();
stream
.write_rtp(
pt,
seq.into(),
time,
wallclock,
false,
exts,
false,
vec![seq as u8; 80],
)
.expect("write_rtp should succeed");
drop(direct);
progress(&mut l, &mut r)?;
}
for _ in 0..50 {
progress(&mut l, &mut r)?;
}
let received_ranges: Vec<(u64, u64)> = r
.events
.iter()
.filter_map(|(_, e)| {
if let Event::MediaData(data) = e {
Some((**data.seq_range.start(), **data.seq_range.end()))
} else {
None
}
})
.collect();
assert!(
!received_ranges.is_empty(),
"Should receive packets with small reordering buffer"
);
for i in 1..received_ranges.len() {
assert!(
received_ranges[i].0 >= received_ranges[i - 1].0,
"Packets should be reordered with small buffer"
);
}
Ok(())
}
#[test]
fn rtp_media_time_increasing() -> Result<(), RtcError> {
init_log();
init_crypto_default();
let mut l = TestRtc::new(Peer::Left);
let mut r = TestRtc::new(Peer::Right);
l.add_host_candidate((Ipv4Addr::new(1, 1, 1, 1), 1000).into());
r.add_host_candidate((Ipv4Addr::new(2, 2, 2, 2), 2000).into());
let mid = negotiate(&mut l, &mut r, |change| {
change.add_media(MediaKind::Audio, Direction::SendRecv, None, None, None)
});
loop {
if l.is_connected() && r.is_connected() {
break;
}
progress(&mut l, &mut r)?;
}
let max = l.last.max(r.last);
l.last = max;
r.last = max;
let params = l.params_opus();
let pt = params.pt();
let data = vec![1_u8; 80];
for _ in 0..50 {
let wallclock = l.start + l.duration();
let time = l.duration().into();
l.writer(mid)
.unwrap()
.write(pt, wallclock, time, data.clone())?;
progress(&mut l, &mut r)?;
}
let received_count = r
.events
.iter()
.filter(|(_, e)| matches!(e, Event::MediaData(_)))
.count();
assert!(
received_count > 20,
"Should receive packets with increasing timestamps, got {}",
received_count
);
Ok(())
}
#[test]
fn rtp_reordering_buffer_large() -> Result<(), RtcError> {
init_log();
init_crypto_default();
let now = Instant::now();
let rtc1 = Rtc::builder().set_rtp_mode(true).build(now);
let rtc2 = Rtc::builder().set_reordering_size_audio(50).build(now);
let (mut l, mut r) = connect_l_r_with_rtc(rtc1, rtc2);
let mid = "audio".into();
let ssrc_tx: Ssrc = 1337.into();
l.direct_api().declare_media(mid, MediaKind::Audio);
l.direct_api().declare_stream_tx(ssrc_tx, None, mid, None);
r.direct_api().declare_media(mid, MediaKind::Audio);
let max = l.last.max(r.last);
l.last = max;
r.last = max;
let params = l.params_opus();
let pt = params.pt();
let ssrc = l.direct_api().stream_tx_by_mid(mid, None).unwrap().ssrc();
let mut send_order = Vec::new();
for i in 0..10 {
send_order.push(1 + i); send_order.push(11 + i); }
let mut interleaved = Vec::new();
for i in 0..10 {
interleaved.push(1 + i);
interleaved.push(11 + i);
}
for seq in interleaved {
let time = (seq * 960) as u32;
let wallclock = l.start + l.duration();
let exts = ExtensionValues::default();
let mut direct = l.direct_api();
let stream = direct.stream_tx(&ssrc).unwrap();
stream
.write_rtp(
pt,
seq.into(),
time,
wallclock,
false,
exts,
false,
vec![seq as u8; 80],
)
.expect("write_rtp should succeed");
drop(direct);
progress(&mut l, &mut r)?;
}
for _ in 0..50 {
progress(&mut l, &mut r)?;
}
let received_ranges: Vec<(u64, u64)> = r
.events
.iter()
.filter_map(|(_, e)| {
if let Event::MediaData(data) = e {
Some((**data.seq_range.start(), **data.seq_range.end()))
} else {
None
}
})
.collect();
assert!(
!received_ranges.is_empty(),
"Should receive packets with large reordering buffer"
);
for i in 1..received_ranges.len() {
assert!(
received_ranges[i].0 >= received_ranges[i - 1].0,
"Packets should be reordered with large buffer: {:?} vs {:?}",
received_ranges[i],
received_ranges[i - 1]
);
}
Ok(())
}