use rtc_interceptor::{
Interceptor, Packet, RTPHeaderExtension, Registry, StreamInfo, TaggedPacket,
TwccReceiverBuilder, TwccSenderBuilder,
};
use sansio::Protocol;
use shared::TransportContext;
use shared::marshal::{Marshal, Unmarshal};
use std::time::{Duration, Instant};
const TRANSPORT_CC_URI: &str =
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
fn create_rtp_packet(ssrc: u32, seq: u16, timestamp: u32, payload_len: usize) -> TaggedPacket {
let mut payload = vec![0u8; payload_len];
for (i, byte) in payload.iter_mut().enumerate() {
*byte = (i % 256) as u8;
}
TaggedPacket {
now: Instant::now(),
transport: TransportContext::default(),
message: Packet::Rtp(rtp::Packet {
header: rtp::header::Header {
ssrc,
sequence_number: seq,
timestamp,
payload_type: 96,
..Default::default()
},
payload: payload.into(),
}),
}
}
fn create_rtp_packet_with_twcc(
now: Instant,
ssrc: u32,
seq: u16,
twcc_seq: u16,
hdr_ext_id: u8,
) -> TaggedPacket {
let mut pkt = rtp::Packet {
header: rtp::header::Header {
ssrc,
sequence_number: seq,
payload_type: 96,
..Default::default()
},
payload: vec![0u8; 100].into(),
};
let tcc_ext = rtp::extension::transport_cc_extension::TransportCcExtension {
transport_sequence: twcc_seq,
};
if let Ok(ext_data) = tcc_ext.marshal() {
let _ = pkt.header.set_extension(hdr_ext_id, ext_data.freeze());
}
TaggedPacket {
now,
transport: TransportContext::default(),
message: Packet::Rtp(pkt),
}
}
fn create_rtp_packet_with_time(
now: Instant,
ssrc: u32,
seq: u16,
timestamp: u32,
payload_len: usize,
) -> TaggedPacket {
let mut pkt = create_rtp_packet(ssrc, seq, timestamp, payload_len);
pkt.now = now;
pkt
}
fn twcc_stream_info(ssrc: u32, ext_id: u16) -> StreamInfo {
StreamInfo {
ssrc,
clock_rate: 90000,
mime_type: "video/VP8".to_string(),
payload_type: 96,
rtp_header_extensions: vec![RTPHeaderExtension {
uri: TRANSPORT_CC_URI.to_string(),
id: ext_id,
}],
..Default::default()
}
}
fn no_twcc_stream_info(ssrc: u32) -> StreamInfo {
StreamInfo {
ssrc,
clock_rate: 90000,
mime_type: "video/VP8".to_string(),
payload_type: 96,
rtp_header_extensions: vec![],
..Default::default()
}
}
fn extract_twcc_seq(rtp: &rtp::Packet, ext_id: u8) -> Option<u16> {
rtp.header.get_extension(ext_id).and_then(|ext_data| {
rtp::extension::transport_cc_extension::TransportCcExtension::unmarshal(
&mut ext_data.as_ref(),
)
.ok()
.map(|tcc| tcc.transport_sequence)
})
}
#[test]
fn test_twcc_sender_adds_sequence_numbers() {
let mut chain = Registry::new()
.with(TwccSenderBuilder::new().build())
.build();
let ssrc = 0x12345678;
let ext_id = 5u16;
chain.bind_local_stream(&twcc_stream_info(ssrc, ext_id));
let base_time = Instant::now();
for seq in 0..5u16 {
let pkt = create_rtp_packet_with_time(base_time, ssrc, seq, seq as u32 * 3000, 500);
chain.handle_write(pkt).unwrap();
}
let mut twcc_seqs = Vec::new();
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message
&& let Some(twcc_seq) = extract_twcc_seq(rtp, ext_id as u8)
{
twcc_seqs.push(twcc_seq);
}
}
assert_eq!(
twcc_seqs.len(),
5,
"All 5 packets should have TWCC extension"
);
assert_eq!(
twcc_seqs,
vec![0, 1, 2, 3, 4],
"TWCC sequence should increment"
);
}
#[test]
fn test_twcc_sender_multiple_streams_share_counter() {
let mut chain = Registry::new()
.with(TwccSenderBuilder::new().build())
.build();
let ssrc1 = 0x11111111;
let ssrc2 = 0x22222222;
let ext_id = 5u16;
chain.bind_local_stream(&twcc_stream_info(ssrc1, ext_id));
chain.bind_local_stream(&twcc_stream_info(ssrc2, ext_id));
let base_time = Instant::now();
let ssrcs = [ssrc1, ssrc2, ssrc1, ssrc2, ssrc1];
for (i, &ssrc) in ssrcs.iter().enumerate() {
let pkt = create_rtp_packet_with_time(base_time, ssrc, i as u16, 0, 500);
chain.handle_write(pkt).unwrap();
}
let mut twcc_seqs = Vec::new();
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message
&& let Some(twcc_seq) = extract_twcc_seq(rtp, ext_id as u8)
{
twcc_seqs.push(twcc_seq);
}
}
assert_eq!(twcc_seqs, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_twcc_sender_ignores_streams_without_twcc() {
let mut chain = Registry::new()
.with(TwccSenderBuilder::new().build())
.build();
let ssrc = 0x12345678;
chain.bind_local_stream(&no_twcc_stream_info(ssrc));
let base_time = Instant::now();
let pkt = create_rtp_packet_with_time(base_time, ssrc, 0, 0, 500);
chain.handle_write(pkt).unwrap();
if let Some(pkt) = chain.poll_write()
&& let Packet::Rtp(rtp) = &pkt.message
{
let has_ext = rtp.header.get_extension(5).is_some();
assert!(!has_ext, "Stream without TWCC should not have extension");
}
}
#[test]
fn test_twcc_sender_sequence_wraparound() {
let mut chain = Registry::new()
.with(TwccSenderBuilder::new().build())
.build();
let ssrc = 0x12345678;
let ext_id = 5u16;
chain.bind_local_stream(&twcc_stream_info(ssrc, ext_id));
let base_time = Instant::now();
for seq in 0..10u16 {
let pkt = create_rtp_packet_with_time(base_time, ssrc, seq, seq as u32 * 3000, 100);
chain.handle_write(pkt).unwrap();
}
let mut twcc_seqs = Vec::new();
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message
&& let Some(twcc_seq) = extract_twcc_seq(rtp, ext_id as u8)
{
twcc_seqs.push(twcc_seq);
}
}
for i in 1..twcc_seqs.len() {
assert_eq!(
twcc_seqs[i],
twcc_seqs[i - 1].wrapping_add(1),
"TWCC sequence should increment"
);
}
}
#[test]
fn test_twcc_receiver_generates_feedback_on_timeout() {
let mut chain = Registry::new()
.with(
TwccReceiverBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.build();
let ssrc = 0x12345678;
let ext_id = 5u16;
chain.bind_remote_stream(&twcc_stream_info(ssrc, ext_id));
let base_time = Instant::now();
for i in 0..5u16 {
let pkt = create_rtp_packet_with_twcc(
base_time + Duration::from_millis(i as u64 * 10),
ssrc,
i,
i,
ext_id as u8,
);
chain.handle_read(pkt).unwrap();
}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(150))
.unwrap();
let mut feedback_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_cc::TransportLayerCc>(
)
.is_some()
{
feedback_found = true;
}
}
}
}
assert!(
feedback_found,
"TransportLayerCC feedback should be generated on timeout"
);
}
#[test]
fn test_twcc_receiver_feedback_contains_packet_info() {
let mut chain = Registry::new()
.with(
TwccReceiverBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.build();
let ssrc = 0x12345678;
let ext_id = 5u16;
chain.bind_remote_stream(&twcc_stream_info(ssrc, ext_id));
let base_time = Instant::now();
for i in 0..10u16 {
let pkt = create_rtp_packet_with_twcc(
base_time + Duration::from_millis(i as u64 * 20),
ssrc,
i,
i,
ext_id as u8,
);
chain.handle_read(pkt).unwrap();
}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(150))
.unwrap();
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if let Some(tlcc) = rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_cc::TransportLayerCc>(
)
{
assert!(
!tlcc.packet_chunks.is_empty(),
"Feedback should have packet chunks"
);
assert!(
!tlcc.recv_deltas.is_empty(),
"Feedback should have receive deltas"
);
assert_eq!(
tlcc.base_sequence_number, 0,
"Base sequence should be first packet"
);
return;
}
}
}
}
panic!("TransportLayerCC feedback not found");
}
#[test]
fn test_twcc_receiver_ignores_streams_without_twcc() {
let mut chain = Registry::new()
.with(
TwccReceiverBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.build();
let ssrc = 0x12345678;
chain.bind_remote_stream(&no_twcc_stream_info(ssrc));
let base_time = Instant::now();
for i in 0..5u16 {
let pkt = create_rtp_packet_with_time(base_time, ssrc, i, i as u32 * 3000, 500);
chain.handle_read(pkt).unwrap();
}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(150))
.unwrap();
let mut feedback_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_cc::TransportLayerCc>(
)
.is_some()
{
feedback_found = true;
}
}
}
}
assert!(
!feedback_found,
"No feedback should be generated for streams without TWCC"
);
}
#[test]
fn test_twcc_receiver_configurable_interval() {
let interval = Duration::from_millis(50);
let mut chain = Registry::new()
.with(TwccReceiverBuilder::new().with_interval(interval).build())
.build();
let ssrc = 0x12345678;
let ext_id = 5u16;
chain.bind_remote_stream(&twcc_stream_info(ssrc, ext_id));
let base_time = Instant::now();
let pkt = create_rtp_packet_with_twcc(base_time, ssrc, 0, 0, ext_id as u8);
chain.handle_read(pkt).unwrap();
while chain.poll_read().is_some() {}
let timeout = chain.poll_timeout();
assert!(timeout.is_some(), "Timeout should be scheduled");
}
#[test]
fn test_combined_twcc_sender_and_receiver() {
let mut chain = Registry::new()
.with(TwccSenderBuilder::new().build())
.with(
TwccReceiverBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.build();
let local_ssrc = 0x11111111;
let remote_ssrc = 0x22222222;
let ext_id = 5u16;
chain.bind_local_stream(&twcc_stream_info(local_ssrc, ext_id));
chain.bind_remote_stream(&twcc_stream_info(remote_ssrc, ext_id));
let base_time = Instant::now();
for seq in 0..5u16 {
let pkt = create_rtp_packet_with_time(base_time, local_ssrc, seq, seq as u32 * 3000, 500);
chain.handle_write(pkt).unwrap();
}
for i in 0..5u16 {
let pkt = create_rtp_packet_with_twcc(
base_time + Duration::from_millis(i as u64 * 10),
remote_ssrc,
i,
i,
ext_id as u8,
);
chain.handle_read(pkt).unwrap();
}
let mut outgoing_twcc_count = 0;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message
&& extract_twcc_seq(rtp, ext_id as u8).is_some()
{
outgoing_twcc_count += 1;
}
}
assert_eq!(
outgoing_twcc_count, 5,
"All outgoing packets should have TWCC"
);
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(150))
.unwrap();
let mut feedback_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_cc::TransportLayerCc>(
)
.is_some()
{
feedback_found = true;
}
}
}
}
assert!(
feedback_found,
"TWCC feedback should be generated for incoming stream"
);
}
#[test]
fn test_twcc_unbind_stops_processing() {
let mut chain = Registry::new()
.with(TwccSenderBuilder::new().build())
.with(
TwccReceiverBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.build();
let local_ssrc = 0x11111111;
let remote_ssrc = 0x22222222;
let ext_id = 5u16;
let local_info = twcc_stream_info(local_ssrc, ext_id);
let remote_info = twcc_stream_info(remote_ssrc, ext_id);
chain.bind_local_stream(&local_info);
chain.bind_remote_stream(&remote_info);
let base_time = Instant::now();
let pkt = create_rtp_packet_with_time(base_time, local_ssrc, 0, 0, 500);
chain.handle_write(pkt).unwrap();
let pkt = create_rtp_packet_with_twcc(base_time, remote_ssrc, 0, 0, ext_id as u8);
chain.handle_read(pkt).unwrap();
chain.unbind_local_stream(&local_info);
chain.unbind_remote_stream(&remote_info);
while chain.poll_write().is_some() {}
while chain.poll_read().is_some() {}
let pkt = create_rtp_packet_with_time(base_time, local_ssrc, 1, 3000, 500);
chain.handle_write(pkt).unwrap();
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtp(rtp) = &pkt.message
&& rtp.header.ssrc == local_ssrc
{
let has_twcc = extract_twcc_seq(rtp, ext_id as u8).is_some();
assert!(
!has_twcc,
"Unbound stream should not get TWCC extension added"
);
}
}
}
#[test]
fn test_twcc_multiple_remote_streams() {
let mut chain = Registry::new()
.with(
TwccReceiverBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.build();
let video_ssrc = 0x11111111;
let audio_ssrc = 0x22222222;
let ext_id = 5u16;
chain.bind_remote_stream(&twcc_stream_info(video_ssrc, ext_id));
chain.bind_remote_stream(&twcc_stream_info(audio_ssrc, ext_id));
let base_time = Instant::now();
for i in 0..3u16 {
let pkt = create_rtp_packet_with_twcc(
base_time + Duration::from_millis(i as u64 * 10),
video_ssrc,
i,
i, ext_id as u8,
);
chain.handle_read(pkt).unwrap();
}
for i in 0..3u16 {
let pkt = create_rtp_packet_with_twcc(
base_time + Duration::from_millis((i + 3) as u64 * 10),
audio_ssrc,
i,
i + 3, ext_id as u8,
);
chain.handle_read(pkt).unwrap();
}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(150))
.unwrap();
let mut feedback_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if let Some(tlcc) = rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_cc::TransportLayerCc>(
)
{
feedback_found = true;
assert!(
tlcc.packet_status_count >= 6,
"Feedback should cover all packets, got {}",
tlcc.packet_status_count
);
}
}
}
}
assert!(feedback_found, "TWCC feedback should be generated");
}
#[test]
fn test_full_interceptor_chain_with_reports_and_twcc() {
use rtc_interceptor::{ReceiverReportBuilder, SenderReportBuilder};
let mut chain = Registry::new()
.with(
SenderReportBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.with(
ReceiverReportBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.with(TwccSenderBuilder::new().build())
.with(
TwccReceiverBuilder::new()
.with_interval(Duration::from_millis(100))
.build(),
)
.build();
let local_ssrc = 0x11111111;
let remote_ssrc = 0x22222222;
let ext_id = 5u16;
let mut local_info = twcc_stream_info(local_ssrc, ext_id);
local_info.clock_rate = 90000;
let mut remote_info = twcc_stream_info(remote_ssrc, ext_id);
remote_info.clock_rate = 90000;
chain.bind_local_stream(&local_info);
chain.bind_remote_stream(&remote_info);
let base_time = Instant::now();
for seq in 0..5u16 {
let pkt = create_rtp_packet_with_time(base_time, local_ssrc, seq, seq as u32 * 3000, 500);
chain.handle_write(pkt).unwrap();
}
for i in 0..5u16 {
let pkt = create_rtp_packet_with_twcc(
base_time + Duration::from_millis(i as u64 * 10),
remote_ssrc,
i,
i,
ext_id as u8,
);
chain.handle_read(pkt).unwrap();
}
while chain.poll_write().is_some() {}
while chain.poll_read().is_some() {}
chain
.handle_timeout(base_time + Duration::from_millis(150))
.unwrap();
let mut sr_found = false;
let mut rr_found = false;
let mut twcc_found = false;
while let Some(pkt) = chain.poll_write() {
if let Packet::Rtcp(rtcp_packets) = &pkt.message {
for rtcp_pkt in rtcp_packets {
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::sender_report::SenderReport>()
.is_some()
{
sr_found = true;
}
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::receiver_report::ReceiverReport>()
.is_some()
{
rr_found = true;
}
if rtcp_pkt
.as_any()
.downcast_ref::<rtcp::transport_feedbacks::transport_layer_cc::TransportLayerCc>(
)
.is_some()
{
twcc_found = true;
}
}
}
}
assert!(sr_found, "Sender Report should be generated");
assert!(rr_found, "Receiver Report should be generated");
assert!(twcc_found, "TWCC feedback should be generated");
}