use gst::prelude::*;
use gst_rtp::rtp_buffer::*;
use gst_rtp::RTPBuffer;
use rand::Rng;
#[must_use]
struct RaptorqTest {
protected_packets: usize,
repair_packets: usize,
repair_window: usize,
symbol_size: usize,
mtu: usize,
initial_seq: u16,
lost_buffers: Vec<usize>,
swapped_buffers: Vec<usize>,
input_buffers: usize,
expect_output_buffers: usize,
}
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstraptorq::plugin_register_static().expect("Failed to register raptorqenc plugin");
});
}
impl RaptorqTest {
fn new() -> Self {
init();
let enc = gst::ElementFactory::make("raptorqenc").build().unwrap();
let protected_packets = enc.property::<u32>("protected-packets") as usize;
let repair_packets = enc.property::<u32>("repair-packets") as usize;
let repair_window = enc.property::<u32>("repair-window") as usize;
let symbol_size = enc.property::<u32>("symbol-size") as usize;
let mtu = enc.property::<u32>("mtu") as usize;
Self {
protected_packets,
repair_packets,
repair_window,
symbol_size,
mtu,
initial_seq: 42,
lost_buffers: vec![0],
swapped_buffers: vec![],
input_buffers: protected_packets,
expect_output_buffers: protected_packets,
}
}
fn protected_packets(mut self, protected_packets: usize) -> Self {
self.protected_packets = protected_packets;
self
}
fn repair_packets(mut self, repair_packets: usize) -> Self {
self.repair_packets = repair_packets;
self
}
fn repair_window(mut self, repair_window: usize) -> Self {
self.repair_window = repair_window;
self
}
fn symbol_size(mut self, symbol_size: usize) -> Self {
self.symbol_size = symbol_size;
self
}
fn initial_seq(mut self, initial_seq: u16) -> Self {
self.initial_seq = initial_seq;
self
}
fn mtu(mut self, mtu: usize) -> Self {
self.mtu = mtu;
self
}
fn lost_buffers(mut self, lost_buffers: Vec<usize>) -> Self {
self.lost_buffers = lost_buffers;
self
}
fn swapped_buffers(mut self, swapped_buffers: Vec<usize>) -> Self {
self.swapped_buffers = swapped_buffers;
self
}
fn input_buffers(mut self, input_buffers: usize) -> Self {
self.input_buffers = input_buffers;
self
}
fn expect_output_buffers(mut self, expect_output_buffers: usize) -> Self {
self.expect_output_buffers = expect_output_buffers;
self
}
fn run(self) {
assert!(self.input_buffers >= self.protected_packets);
let enc = gst::ElementFactory::make("raptorqenc")
.property("protected-packets", self.protected_packets as u32)
.property("repair-packets", self.repair_packets as u32)
.property("repair-window", self.repair_window as u32)
.property("symbol-size", self.symbol_size as u32)
.property("mtu", self.mtu as u32)
.build()
.unwrap();
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
let dec = gst::ElementFactory::make("raptorqdec").build().unwrap();
let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src"));
let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None);
let caps = gst::Caps::builder("application/x-rtp")
.field("raptor-scheme-id", "6")
.field("repair-window", "1000000")
.field("t", self.symbol_size.to_string())
.build();
h_dec.set_src_caps_str("application/x-rtp");
h_dec_fec.set_src_caps(caps);
let mut rng = rand::thread_rng();
let input_buffers = (0..self.input_buffers)
.map(|i| {
let size = rng.gen_range(1..self.mtu - 12 - 3);
let data = (0..size).map(|_| rng.gen()).collect::<Vec<u8>>();
let mut buf = gst::Buffer::new_rtp_with_sizes(size as u32, 0, 0).unwrap();
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(gst::ClockTime::ZERO);
buf_mut.set_dts(gst::ClockTime::ZERO);
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
let payload = rtpbuf.payload_mut().unwrap();
payload.copy_from_slice(data.as_slice());
rtpbuf.set_seq(self.initial_seq.wrapping_add(i as u16));
rtpbuf.set_timestamp(0);
}
buf
})
.collect::<Vec<_>>();
for buf in &input_buffers {
let result = h_enc.push(buf.clone());
assert!(result.is_ok());
}
assert_eq!(h_enc.buffers_in_queue(), self.input_buffers as u32);
let mut media_packets = (0..self.input_buffers)
.map(|_| {
let result = h_enc.pull();
assert!(result.is_ok());
result.unwrap()
})
.collect::<Vec<_>>();
for x in self.swapped_buffers.chunks_exact(2) {
media_packets.swap(x[0], x[1])
}
let delay_step = ((self.repair_window / self.repair_packets) as u64).mseconds();
let mut delay = delay_step;
let repair_packets = (0..self.repair_packets)
.map(|_| {
h_enc_fec.set_time(delay - gst::ClockTime::NSECOND).unwrap();
assert_eq!(h_enc_fec.buffers_in_queue(), 0);
h_enc_fec.set_time(delay).unwrap();
h_enc_fec.crank_single_clock_wait().unwrap();
let result = h_enc_fec.pull();
assert!(result.is_ok());
let buf = result.unwrap();
assert_eq!(buf.pts().unwrap(), delay);
assert_eq!(buf.dts().unwrap(), delay);
let ts = RTPBuffer::from_buffer_readable(&buf).unwrap().timestamp();
let expected_ts =
*delay.mul_div_round(*gst::ClockTime::SECOND, 8000).unwrap() as u32;
assert_eq!(ts, expected_ts);
delay += delay_step;
buf
})
.collect::<Vec<_>>();
let media_packets = media_packets
.iter()
.cloned()
.enumerate()
.filter(|(i, _)| !self.lost_buffers.contains(i))
.map(|(_, x)| x)
.collect::<Vec<_>>();
for buf in media_packets {
assert!(h_dec.push(buf).is_ok());
}
for buf in repair_packets {
assert!(h_dec_fec.push(buf).is_ok());
}
let result = h_dec.push(input_buffers.iter().last().unwrap().clone());
assert!(result.is_ok());
let mut output_buffers = (0..self.expect_output_buffers)
.map(|_| {
let result = h_dec.pull();
assert!(result.is_ok());
result.unwrap()
})
.collect::<Vec<_>>();
output_buffers.sort_unstable_by(|a, b| {
let aa = RTPBuffer::from_buffer_readable(a).unwrap();
let bb = RTPBuffer::from_buffer_readable(b).unwrap();
match gst_rtp::compare_seqnum(bb.seq(), aa.seq()) {
x if x > 0 => std::cmp::Ordering::Greater,
x if x < 0 => std::cmp::Ordering::Less,
_ => std::cmp::Ordering::Equal,
}
});
assert_eq!(output_buffers.len(), self.expect_output_buffers);
if self.input_buffers == self.expect_output_buffers {
for (inbuf, outbuf) in Iterator::zip(input_buffers.iter(), output_buffers.iter()) {
let rtp1 = RTPBuffer::from_buffer_readable(inbuf).unwrap();
let rtp2 = RTPBuffer::from_buffer_readable(outbuf).unwrap();
assert_eq!(rtp1.seq(), rtp2.seq());
assert_eq!(rtp1.payload().unwrap(), rtp2.payload().unwrap());
}
}
}
}
#[test]
fn test_raptorq_all_default() {
RaptorqTest::new().run();
}
#[test]
fn test_raptorq_decoder_media_packets_out_of_sequence() {
RaptorqTest::new()
.swapped_buffers(vec![5, 10, 12, 15])
.run();
}
#[test]
fn test_raptorq_10_percent_overhead() {
RaptorqTest::new()
.protected_packets(100)
.repair_packets(10)
.lost_buffers(vec![4, 42, 43, 44, 45])
.input_buffers(100)
.expect_output_buffers(100)
.run();
}
#[test]
fn test_raptorq_5_percent_overhead() {
RaptorqTest::new()
.protected_packets(100)
.repair_packets(5)
.input_buffers(100)
.lost_buffers(vec![8, 11])
.expect_output_buffers(100)
.run();
}
#[test]
fn test_raptorq_symbol_size_128() {
RaptorqTest::new()
.protected_packets(20)
.repair_packets(4)
.symbol_size(128)
.mtu(400)
.input_buffers(20)
.lost_buffers(vec![9])
.expect_output_buffers(20)
.run();
}
#[test]
fn test_raptorq_symbol_size_192() {
RaptorqTest::new()
.protected_packets(20)
.repair_packets(4)
.symbol_size(192)
.mtu(999)
.input_buffers(20)
.lost_buffers(vec![16, 19])
.expect_output_buffers(20)
.run();
}
#[test]
fn test_raptorq_symbol_size_1024() {
RaptorqTest::new()
.protected_packets(20)
.repair_packets(8)
.symbol_size(192)
.mtu(100)
.input_buffers(20)
.lost_buffers(vec![0, 1, 2, 3, 4, 5])
.expect_output_buffers(20)
.run();
}
#[test]
fn test_raptorq_mtu_lt_symbol_size() {
RaptorqTest::new()
.protected_packets(20)
.repair_packets(8)
.symbol_size(1400)
.mtu(100)
.input_buffers(20)
.lost_buffers(vec![14, 15, 16, 17, 18, 19])
.expect_output_buffers(20)
.run();
}
#[test]
fn test_raptorq_heavy_loss() {
RaptorqTest::new()
.protected_packets(40)
.repair_packets(8)
.input_buffers(40)
.lost_buffers(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
.expect_output_buffers(30)
.run();
}
#[test]
fn test_raptorq_repair_window_100ms() {
RaptorqTest::new()
.protected_packets(10)
.repair_packets(10)
.repair_window(100)
.input_buffers(10)
.lost_buffers(vec![2, 6])
.expect_output_buffers(10)
.run();
}
#[test]
fn test_raptorq_repair_window_500ms() {
RaptorqTest::new()
.protected_packets(8)
.repair_packets(2)
.repair_window(500)
.input_buffers(8)
.lost_buffers(vec![])
.expect_output_buffers(8)
.run();
}
#[test]
fn test_raptorq_wrapping_sequence_number_1() {
RaptorqTest::new().initial_seq(u16::MAX - 5).run();
}
#[test]
fn test_raptorq_wrapping_sequence_number_2() {
RaptorqTest::new()
.initial_seq(u16::MAX - 5)
.swapped_buffers(vec![4, 5])
.run();
}
#[test]
fn test_raptorq_wrapping_sequence_number_3() {
RaptorqTest::new()
.initial_seq(u16::MAX - 3)
.lost_buffers(vec![0, 1, 2, 8])
.run();
}
#[test]
fn test_raptorq_encoder_flush_cancels_pending_timers() {
init();
let enc = gst::ElementFactory::make("raptorqenc")
.property("repair-window", 5000u32)
.property("protected-packets", 5u32)
.property("repair-packets", 5u32)
.build()
.unwrap();
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
for i in 0u64..5 {
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(gst::ClockTime::SECOND * i);
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
rtpbuf.set_seq(i as u16);
drop(rtpbuf);
let result = h_enc.push(buf);
assert!(result.is_ok());
}
h_enc_fec.set_time(gst::ClockTime::SECOND * 6).unwrap();
h_enc_fec.crank_single_clock_wait().unwrap();
let result = h_enc_fec.pull();
assert!(result.is_ok());
h_enc.push_event(gst::event::FlushStart::new());
h_enc.push_event(gst::event::FlushStop::new(true));
h_enc_fec.set_time(gst::ClockTime::SECOND * 10).unwrap();
loop {
let event = h_enc.pull_event();
if let Ok(event) = event {
match event.view() {
gst::EventView::FlushStart(_) => {
continue;
}
gst::EventView::FlushStop(_) => {
break;
}
_ => (),
}
}
}
assert_eq!(h_enc_fec.buffers_in_queue(), 0);
assert_eq!(h_enc_fec.testclock().unwrap().peek_id_count(), 0);
}
#[test]
fn test_raptorq_repair_window_tolerance() {
init();
let enc = gst::ElementFactory::make("raptorqenc")
.property("repair-window", 1000u32)
.property("protected-packets", 5u32)
.property("repair-packets", 5u32)
.build()
.unwrap();
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
for i in 0u64..5 {
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(gst::ClockTime::SECOND * i);
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
rtpbuf.set_seq(i as u16);
drop(rtpbuf);
let result = h_enc.push(buf);
assert!(result.is_ok());
}
let dec = gst::ElementFactory::make("raptorqdec")
.property("repair-window-tolerance", 1000u32)
.build()
.unwrap();
let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src"));
let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None);
let caps = loop {
let event = h_enc_fec.pull_event();
if let Ok(event) = event {
#[allow(clippy::single_match)]
match event.view() {
gst::EventView::Caps(c) => {
break c.caps_owned();
}
_ => (),
}
}
};
h_dec.set_src_caps_str("application/x-rtp");
h_dec_fec.set_src_caps(caps);
h_enc_fec.set_time(1.seconds()).unwrap();
let result = h_enc.pull();
assert!(result.is_ok());
let buf = result.unwrap();
let result = h_dec.push(buf);
assert!(result.is_ok());
for _ in 0..2 {
h_enc_fec.crank_single_clock_wait().unwrap();
let result = h_enc_fec.pull();
assert!(result.is_ok());
let buf = result.unwrap();
let result = h_dec_fec.push(buf);
assert!(result.is_ok());
}
let stats = h_dec.element().unwrap().property::<gst::Structure>("stats");
assert_eq!(
stats
.get::<u64>("buffered-media-packets")
.expect("type error"),
1
);
assert_eq!(
stats
.get::<u64>("buffered-repair-packets")
.expect("type error"),
2
);
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(gst::ClockTime::SECOND * 10);
let result = h_dec.push(buf);
assert!(result.is_ok());
let stats = h_dec.element().unwrap().property::<gst::Structure>("stats");
assert_eq!(
stats
.get::<u64>("buffered-media-packets")
.expect("type error"),
0
);
assert_eq!(
stats
.get::<u64>("buffered-repair-packets")
.expect("type error"),
0
);
}