use rtc_interceptor::{
Interceptor, NackGeneratorBuilder, NackResponderBuilder, Packet, RTCPFeedback, Registry,
StreamInfo, TaggedPacket,
};
use sansio::Protocol;
use shared::TransportContext;
use std::time::{Duration, Instant};
fn create_rtp_packet(ssrc: u32, seq: u16, timestamp: u32, payload_len: usize) -> TaggedPacket {
let mut payload = vec![0u8; payload_len];
for (i, byte) in payload.iter_mut().enumerate() {
*byte = (i % 256) as u8;
}
TaggedPacket {
now: Instant::now(),
transport: TransportContext::default(),
message: Packet::Rtp(rtp::Packet {
header: rtp::header::Header {
ssrc,
sequence_number: seq,
timestamp,
payload_type: 96,
..Default::default()
},
payload: payload.into(),
}),
}
}
fn create_rtp_packet_with_time(
now: Instant,
ssrc: u32,
seq: u16,
timestamp: u32,
payload_len: usize,
) -> TaggedPacket {
let mut pkt = create_rtp_packet(ssrc, seq, timestamp, payload_len);
pkt.now = now;
pkt
}
fn nack_stream_info(ssrc: u32) -> StreamInfo {
StreamInfo {
ssrc,
clock_rate: 90000,
mime_type: "video/VP8".to_string(),
payload_type: 96,
rtcp_feedback: vec![RTCPFeedback {
typ: "nack".to_string(),
parameter: String::new(),
}],
..Default::default()
}
}
fn nack_rtx_stream_info(ssrc: u32, rtx_ssrc: u32, rtx_pt: u8) -> StreamInfo {
StreamInfo {
ssrc,
ssrc_rtx: Some(rtx_ssrc),
clock_rate: 90000,
mime_type: "video/VP8".to_string(),
payload_type: 96,
payload_type_rtx: Some(rtx_pt),
rtcp_feedback: vec![RTCPFeedback {
typ: "nack".to_string(),
parameter: String::new(),
}],
..Default::default()
}
}
fn no_nack_stream_info(ssrc: u32) -> StreamInfo {
StreamInfo {
ssrc,
clock_rate: 90000,
mime_type: "video/VP8".to_string(),
payload_type: 96,
rtcp_feedback: vec![],
..Default::default()
}
}
fn create_nack_packet(
now: Instant,
sender_ssrc: u32,
media_ssrc: u32,
nacks: Vec<rtcp::transport_feedbacks::transport_layer_nack::NackPair>,
) -> TaggedPacket {
let nack = rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack {
sender_ssrc,
media_ssrc,
nacks,
};
TaggedPacket {
now,
transport: TransportContext::default(),
message: Packet::Rtcp(vec![Box::new(nack)]),
}
}
#[test]
fn test_nack_generator_detects_packet_loss() {
let mut chain = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.build();
let ssrc = 0x12345678;
chain.bind_remote_stream(&nack_stream_info(ssrc));
let base_time = Instant::now();
for seq in [0u16, 1, 2, 6, 7] {
let pkt = create_rtp_packet_with_time(base_time, ssrc, seq, seq as u32 * 3000, 500);
chain.handle_read(pkt).unwrap();
}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut nack_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if let Some(nack) = rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
{
assert_eq!(nack.media_ssrc, ssrc);
nack_found = true;
}
}
}
}
assert!(nack_found, "NACK should be generated for missing packets");
}
#[test]
fn test_nack_generator_no_nack_for_sequential_packets() {
let mut chain = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.build();
let ssrc = 0x12345678;
chain.bind_remote_stream(&nack_stream_info(ssrc));
let base_time = Instant::now();
for seq in 0..10u16 {
let pkt = create_rtp_packet_with_time(base_time, ssrc, seq, seq as u32 * 3000, 500);
chain.handle_read(pkt).unwrap();
}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut nack_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
.is_some()
{
nack_found = true;
}
}
}
}
assert!(
!nack_found,
"No NACK should be generated for sequential packets"
);
}
#[test]
fn test_nack_generator_ignores_streams_without_nack_support() {
let mut chain = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.build();
let ssrc = 0x12345678;
chain.bind_remote_stream(&no_nack_stream_info(ssrc));
let base_time = Instant::now();
for seq in [0u16, 1, 2, 10, 11] {
let pkt = create_rtp_packet_with_time(base_time, ssrc, seq, seq as u32 * 3000, 500);
chain.handle_read(pkt).unwrap();
}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut nack_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
.is_some()
{
nack_found = true;
}
}
}
}
assert!(
!nack_found,
"No NACK should be generated for streams without NACK support"
);
}
#[test]
fn test_nack_responder_retransmits_packet() {
let mut chain = Registry::new()
.with(NackResponderBuilder::new().with_size(512).build())
.build();
let ssrc = 0xABCDEF00;
chain.bind_local_stream(&nack_stream_info(ssrc));
let base_time = Instant::now();
for seq in 0..10u16 {
let pkt = create_rtp_packet_with_time(base_time, ssrc, seq, seq as u32 * 3000, 500);
chain.handle_write(pkt).unwrap();
}
let mut sent_packets = Vec::new();
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message {
sent_packets.push(rtp.header.sequence_number);
}
}
assert_eq!(sent_packets.len(), 10);
let nack = create_nack_packet(
base_time,
0x11111111,
ssrc,
vec![rtcp::transport_feedbacks::transport_layer_nack::NackPair {
packet_id: 3,
lost_packets: 0b0000_0000_0000_0010, }],
);
chain.handle_read(nack).unwrap();
while chain.poll_read().is_some() {}
let mut retransmitted = Vec::new();
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message {
retransmitted.push(rtp.header.sequence_number);
}
}
assert!(
retransmitted.contains(&3),
"Packet 3 should be retransmitted, got {:?}",
retransmitted
);
assert!(
retransmitted.contains(&5),
"Packet 5 should be retransmitted, got {:?}",
retransmitted
);
}
#[test]
fn test_nack_responder_rtx_retransmission() {
let mut chain = Registry::new()
.with(NackResponderBuilder::new().with_size(512).build())
.build();
let ssrc = 0xABCDEF00;
let rtx_ssrc = 0xABCDEF01;
let rtx_pt = 97;
chain.bind_local_stream(&nack_rtx_stream_info(ssrc, rtx_ssrc, rtx_pt));
let base_time = Instant::now();
for seq in 0..5u16 {
let pkt = create_rtp_packet_with_time(base_time, ssrc, seq, seq as u32 * 3000, 500);
chain.handle_write(pkt).unwrap();
}
while chain.poll_write().is_some() {}
let nack = create_nack_packet(
base_time,
0x11111111,
ssrc,
vec![rtcp::transport_feedbacks::transport_layer_nack::NackPair {
packet_id: 2,
lost_packets: 0,
}],
);
chain.handle_read(nack).unwrap();
while chain.poll_read().is_some() {}
let mut rtx_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message
&& rtp.header.ssrc == rtx_ssrc
&& rtp.header.payload_type == rtx_pt
{
rtx_found = true;
assert!(rtp.payload.len() >= 2);
let original_seq = u16::from_be_bytes([rtp.payload[0], rtp.payload[1]]);
assert_eq!(original_seq, 2, "RTX payload should contain original seq");
}
}
assert!(rtx_found, "RTX retransmission should be sent");
}
#[test]
fn test_nack_responder_ignores_expired_packets() {
let mut chain = Registry::new()
.with(NackResponderBuilder::new().with_size(4).build()) .build();
let ssrc = 0xABCDEF00;
chain.bind_local_stream(&nack_stream_info(ssrc));
let base_time = Instant::now();
for seq in 0..10u16 {
let pkt = create_rtp_packet_with_time(base_time, ssrc, seq, seq as u32 * 3000, 500);
chain.handle_write(pkt).unwrap();
}
while chain.poll_write().is_some() {}
let nack = create_nack_packet(
base_time,
0x11111111,
ssrc,
vec![rtcp::transport_feedbacks::transport_layer_nack::NackPair {
packet_id: 0,
lost_packets: 0,
}],
);
chain.handle_read(nack).unwrap();
while chain.poll_read().is_some() {}
let mut retransmit_count = 0;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message
&& rtp.header.sequence_number == 0
{
retransmit_count += 1;
}
}
assert_eq!(
retransmit_count, 0,
"Expired packets should not be retransmitted"
);
}
#[test]
fn test_combined_nack_generator_and_responder() {
let mut chain = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.with(NackResponderBuilder::new().with_size(512).build())
.build();
let local_ssrc = 0x11111111;
let remote_ssrc = 0x22222222;
chain.bind_local_stream(&nack_stream_info(local_ssrc));
chain.bind_remote_stream(&nack_stream_info(remote_ssrc));
let base_time = Instant::now();
for seq in 0..5u16 {
let pkt = create_rtp_packet_with_time(base_time, local_ssrc, seq, seq as u32 * 3000, 500);
chain.handle_write(pkt).unwrap();
}
for seq in [0u16, 1, 2, 5, 6] {
let pkt = create_rtp_packet_with_time(base_time, remote_ssrc, seq, seq as u32 * 3000, 500);
chain.handle_read(pkt).unwrap();
}
while chain.poll_write().is_some() {}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut nack_generated = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if let Some(nack) = rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
&& nack.media_ssrc == remote_ssrc
{
nack_generated = true;
}
}
}
}
assert!(
nack_generated,
"NACK should be generated for remote stream packet loss"
);
let nack = create_nack_packet(
base_time,
remote_ssrc,
local_ssrc,
vec![rtcp::transport_feedbacks::transport_layer_nack::NackPair {
packet_id: 2,
lost_packets: 0,
}],
);
chain.handle_read(nack).unwrap();
while chain.poll_read().is_some() {}
let mut retransmitted = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message
&& rtp.header.ssrc == local_ssrc
&& rtp.header.sequence_number == 2
{
retransmitted = true;
}
}
assert!(
retransmitted,
"Local packet should be retransmitted on NACK"
);
}
#[test]
fn test_nack_unbind_stops_processing() {
let mut chain = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.with(NackResponderBuilder::new().with_size(512).build())
.build();
let local_ssrc = 0x11111111;
let remote_ssrc = 0x22222222;
let local_info = nack_stream_info(local_ssrc);
let remote_info = nack_stream_info(remote_ssrc);
chain.bind_local_stream(&local_info);
chain.bind_remote_stream(&remote_info);
let base_time = Instant::now();
let pkt = create_rtp_packet_with_time(base_time, local_ssrc, 0, 0, 500);
chain.handle_write(pkt).unwrap();
let pkt = create_rtp_packet_with_time(base_time, remote_ssrc, 0, 0, 500);
chain.handle_read(pkt).unwrap();
chain.unbind_local_stream(&local_info);
chain.unbind_remote_stream(&remote_info);
while chain.poll_write().is_some() {}
while chain.poll_read().is_some() {}
let pkt = create_rtp_packet_with_time(base_time, remote_ssrc, 10, 30000, 500);
chain.handle_read(pkt).unwrap();
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut nack_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
.is_some()
{
nack_found = true;
}
}
}
}
assert!(
!nack_found,
"No NACK should be generated for unbound streams"
);
}
#[test]
fn test_nack_multiple_streams() {
let mut chain = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.build();
let video_ssrc = 0x11111111;
let audio_ssrc = 0x22222222;
chain.bind_remote_stream(&nack_stream_info(video_ssrc));
chain.bind_remote_stream(&nack_stream_info(audio_ssrc));
let base_time = Instant::now();
for seq in [0u16, 1, 5, 6] {
let pkt = create_rtp_packet_with_time(base_time, video_ssrc, seq, seq as u32 * 3000, 500);
chain.handle_read(pkt).unwrap();
}
for seq in [0u16, 1, 10, 11] {
let pkt = create_rtp_packet_with_time(base_time, audio_ssrc, seq, seq as u32 * 960, 160);
chain.handle_read(pkt).unwrap();
}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut video_nack = false;
let mut audio_nack = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if let Some(nack) = rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
{
if nack.media_ssrc == video_ssrc {
video_nack = true;
}
if nack.media_ssrc == audio_ssrc {
audio_nack = true;
}
}
}
}
}
assert!(video_nack, "NACK should be generated for video stream");
assert!(audio_nack, "NACK should be generated for audio stream");
}
#[test]
fn test_nack_example_simulation() {
const SSRC: u32 = 5000;
let mut sender = Registry::new()
.with(NackResponderBuilder::new().with_size(512).build())
.build();
sender.bind_local_stream(&nack_stream_info(SSRC));
let mut receiver = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.build();
receiver.bind_remote_stream(&nack_stream_info(SSRC));
let base_time = Instant::now();
let mut sent_seqs = Vec::new();
for seq in 0..10u16 {
let pkt = create_rtp_packet_with_time(base_time, SSRC, seq, seq as u32 * 3000, 3);
sender.handle_write(pkt).unwrap();
while let Some(out_pkt) = sender.poll_write() {
if let Packet::Rtp(rtp) = &out_pkt.message {
sent_seqs.push(rtp.header.sequence_number);
}
}
}
assert_eq!(sent_seqs.len(), 10, "Sender should have sent 10 packets");
let lost_packets: Vec<u16> = vec![3, 5, 7];
for seq in 0..10u16 {
if !lost_packets.contains(&seq) {
let recv_pkt = create_rtp_packet_with_time(base_time, SSRC, seq, seq as u32 * 3000, 3);
receiver.handle_read(recv_pkt).unwrap();
}
}
while receiver.poll_read().is_some() {}
receiver
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut nack_packets = Vec::new();
while let Some(pkt) = receiver.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if let Some(nack) = rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
{
nack_packets.push(nack.clone());
}
}
}
}
assert!(
!nack_packets.is_empty(),
"Receiver should generate NACK for lost packets"
);
let mut nacked_seqs = Vec::new();
for nack in &nack_packets {
assert_eq!(nack.media_ssrc, SSRC);
for nack_pair in &nack.nacks {
nacked_seqs.push(nack_pair.packet_id);
for i in 0..16 {
if nack_pair.lost_packets & (1 << i) != 0 {
nacked_seqs.push(nack_pair.packet_id.wrapping_add(i + 1));
}
}
}
}
for lost_seq in &lost_packets {
assert!(
nacked_seqs.contains(lost_seq),
"NACK should request retransmission of seq {}",
lost_seq
);
}
for nack in &nack_packets {
let nack_pkt = TaggedPacket {
now: base_time,
transport: TransportContext::default(),
message: Packet::Rtcp(vec![Box::new(nack.clone())]),
};
sender.handle_read(nack_pkt).unwrap();
}
while sender.poll_read().is_some() {}
let mut retransmitted_seqs = Vec::new();
while let Some(pkt) = sender.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message {
retransmitted_seqs.push(rtp.header.sequence_number);
}
}
for lost_seq in &lost_packets {
assert!(
retransmitted_seqs.contains(lost_seq),
"Sender should retransmit seq {}",
lost_seq
);
}
println!(
"NACK simulation successful: lost {:?}, retransmitted {:?}",
lost_packets, retransmitted_seqs
);
}
#[test]
fn test_nack_example_with_rtx() {
const SSRC: u32 = 5000;
const RTX_SSRC: u32 = 5001;
const RTX_PT: u8 = 97;
let mut sender = Registry::new()
.with(NackResponderBuilder::new().with_size(512).build())
.build();
sender.bind_local_stream(&nack_rtx_stream_info(SSRC, RTX_SSRC, RTX_PT));
let mut receiver = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.build();
receiver.bind_remote_stream(&nack_stream_info(SSRC));
let base_time = Instant::now();
for seq in 0..10u16 {
let pkt = create_rtp_packet_with_time(base_time, SSRC, seq, seq as u32 * 3000, 100);
sender.handle_write(pkt).unwrap();
while sender.poll_write().is_some() {}
}
let lost_packets: Vec<u16> = vec![2, 6];
for seq in 0..10u16 {
if !lost_packets.contains(&seq) {
let recv_pkt =
create_rtp_packet_with_time(base_time, SSRC, seq, seq as u32 * 3000, 100);
receiver.handle_read(recv_pkt).unwrap();
}
}
while receiver.poll_read().is_some() {}
receiver
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut nack_packets = Vec::new();
while let Some(pkt) = receiver.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if let Some(nack) = rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
{
nack_packets.push(nack.clone());
}
}
}
}
for nack in &nack_packets {
let nack_pkt = TaggedPacket {
now: base_time,
transport: TransportContext::default(),
message: Packet::Rtcp(vec![Box::new(nack.clone())]),
};
sender.handle_read(nack_pkt).unwrap();
}
while sender.poll_read().is_some() {}
let mut rtx_packets = Vec::new();
while let Some(pkt) = sender.poll_write() {
if let Packet::Rtp(rtp) = pkt.message {
rtx_packets.push(rtp);
}
}
assert_eq!(
rtx_packets.len(),
lost_packets.len(),
"Should retransmit all lost packets"
);
for rtx in &rtx_packets {
assert_eq!(rtx.header.ssrc, RTX_SSRC, "RTX should use RTX SSRC");
assert_eq!(
rtx.header.payload_type, RTX_PT,
"RTX should use RTX payload type"
);
assert!(rtx.payload.len() >= 2, "RTX payload should have seq header");
let original_seq = u16::from_be_bytes([rtx.payload[0], rtx.payload[1]]);
assert!(
lost_packets.contains(&original_seq),
"RTX should contain original seq {} in payload",
original_seq
);
}
println!(
"RTX NACK simulation successful: lost {:?}, retransmitted {} RTX packets",
lost_packets,
rtx_packets.len()
);
}
#[test]
fn test_continuous_stream_with_nack_recovery() {
const SSRC: u32 = 5000;
const TOTAL_PACKETS: u16 = 100;
let mut sender = Registry::new()
.with(NackResponderBuilder::new().with_size(512).build())
.build();
sender.bind_local_stream(&nack_stream_info(SSRC));
let mut receiver = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(20))
.build(),
)
.build();
receiver.bind_remote_stream(&nack_stream_info(SSRC));
let base_time = Instant::now();
let packet_interval = Duration::from_millis(20);
let mut received_seqs: std::collections::HashSet<u16> = std::collections::HashSet::new();
let mut lost_seqs: Vec<u16> = Vec::new();
for seq in 0..TOTAL_PACKETS {
if seq % 10 == 5 {
lost_seqs.push(seq);
}
}
for seq in 0..TOTAL_PACKETS {
let pkt_time = base_time + packet_interval * seq as u32;
let pkt = create_rtp_packet_with_time(pkt_time, SSRC, seq, seq as u32 * 3000, 100);
sender.handle_write(pkt).unwrap();
while sender.poll_write().is_some() {}
if !lost_seqs.contains(&seq) {
let recv_pkt = create_rtp_packet_with_time(pkt_time, SSRC, seq, seq as u32 * 3000, 100);
receiver.handle_read(recv_pkt).unwrap();
while receiver.poll_read().is_some() {}
received_seqs.insert(seq);
}
if seq % 10 == 9 {
receiver.handle_timeout(pkt_time).unwrap();
while let Some(nack_pkt) = receiver.poll_write() {
if let Packet::Rtcp(rtcp_packets) = nack_pkt.message {
let sender_nack = TaggedPacket {
now: pkt_time,
transport: TransportContext::default(),
message: Packet::Rtcp(rtcp_packets),
};
sender.handle_read(sender_nack).unwrap();
while sender.poll_read().is_some() {}
while let Some(retrans_pkt) = sender.poll_write() {
if let Packet::Rtp(rtp) = &retrans_pkt.message {
received_seqs.insert(rtp.header.sequence_number);
}
}
}
}
}
}
let final_time = base_time + packet_interval * TOTAL_PACKETS as u32;
receiver.handle_timeout(final_time).unwrap();
while let Some(nack_pkt) = receiver.poll_write() {
if let Packet::Rtcp(rtcp_packets) = nack_pkt.message {
let sender_nack = TaggedPacket {
now: final_time,
transport: TransportContext::default(),
message: Packet::Rtcp(rtcp_packets),
};
sender.handle_read(sender_nack).unwrap();
while sender.poll_read().is_some() {}
while let Some(retrans_pkt) = sender.poll_write() {
if let Packet::Rtp(rtp) = &retrans_pkt.message {
received_seqs.insert(rtp.header.sequence_number);
}
}
}
}
let recovery_count = lost_seqs
.iter()
.filter(|seq| received_seqs.contains(seq))
.count();
println!(
"Continuous stream test: {} packets sent, {} initially lost, {} recovered via NACK",
TOTAL_PACKETS,
lost_seqs.len(),
recovery_count
);
assert_eq!(
recovery_count,
lost_seqs.len(),
"All lost packets should be recovered via NACK"
);
assert_eq!(
received_seqs.len(),
TOTAL_PACKETS as usize,
"All packets should be received (original + retransmitted)"
);
}
#[test]
fn test_nack_sequence_wraparound() {
const SSRC: u32 = 5000;
let mut sender = Registry::new()
.with(NackResponderBuilder::new().with_size(512).build())
.build();
sender.bind_local_stream(&nack_stream_info(SSRC));
let mut receiver = Registry::new()
.with(
NackGeneratorBuilder::new()
.with_size(512)
.with_interval(Duration::from_millis(50))
.build(),
)
.build();
receiver.bind_remote_stream(&nack_stream_info(SSRC));
let base_time = Instant::now();
let sequences: Vec<u16> = (65530..=65535).chain(0..=3).map(|s| s as u16).collect();
let lost_seqs: Vec<u16> = vec![65533, 0, 2];
for &seq in &sequences {
let pkt = create_rtp_packet_with_time(base_time, SSRC, seq, seq as u32 * 3000, 100);
sender.handle_write(pkt).unwrap();
while sender.poll_write().is_some() {}
}
for &seq in &sequences {
if !lost_seqs.contains(&seq) {
let recv_pkt =
create_rtp_packet_with_time(base_time, SSRC, seq, seq as u32 * 3000, 100);
receiver.handle_read(recv_pkt).unwrap();
}
}
while receiver.poll_read().is_some() {}
receiver
.handle_timeout(base_time + Duration::from_millis(100))
.unwrap();
let mut nacked_seqs = Vec::new();
while let Some(pkt) = receiver.poll_write() {
if let Packet::Rtcp(rtcp_packets) = pkt.message {
for rtcp_pkt in &rtcp_packets {
if let Some(nack) = rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_nack::TransportLayerNack>(
)
{
for nack_pair in &nack.nacks {
nacked_seqs.push(nack_pair.packet_id);
for i in 0..16 {
if nack_pair.lost_packets & (1 << i) != 0 {
nacked_seqs.push(nack_pair.packet_id.wrapping_add(i + 1));
}
}
}
}
}
let nack_pkt = TaggedPacket {
now: base_time,
transport: TransportContext::default(),
message: Packet::Rtcp(rtcp_packets),
};
sender.handle_read(nack_pkt).unwrap();
}
}
while sender.poll_read().is_some() {}
let mut retransmitted = Vec::new();
while let Some(pkt) = sender.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message {
retransmitted.push(rtp.header.sequence_number);
}
}
for &lost_seq in &lost_seqs {
assert!(
nacked_seqs.contains(&lost_seq),
"NACK should request seq {} (wraparound)",
lost_seq
);
assert!(
retransmitted.contains(&lost_seq),
"Should retransmit seq {} (wraparound)",
lost_seq
);
}
println!(
"Wraparound test successful: lost {:?}, nacked {:?}, retransmitted {:?}",
lost_seqs, nacked_seqs, retransmitted
);
}