use log::warn;
use shared::time::SystemInstant;
use std::time::Instant;
pub(crate) struct SenderStream {
ssrc: u32,
clock_rate: f64,
use_latest_packet: bool,
started: bool,
last_rtp_sn: u16,
last_rtp_time_rtp: u32,
last_rtp_time_time: Instant,
time_baseline: SystemInstant,
counters: Counters,
}
impl SenderStream {
pub(crate) fn new(ssrc: u32, clock_rate: u32, use_latest_packet: bool) -> Self {
SenderStream {
ssrc,
clock_rate: clock_rate as f64,
use_latest_packet,
started: false,
last_rtp_sn: 0,
last_rtp_time_rtp: 0,
last_rtp_time_time: Instant::now(),
time_baseline: SystemInstant::now(),
counters: Default::default(),
}
}
pub(crate) fn process_rtp(&mut self, now: Instant, pkt: &rtp::packet::Packet) {
let seq = pkt.header.sequence_number;
let diff = seq.wrapping_sub(self.last_rtp_sn);
let is_in_order = !self.started || (diff > 0 && diff < (1 << 15));
if self.use_latest_packet || is_in_order {
self.started = true;
self.last_rtp_sn = seq;
if pkt.header.timestamp != self.last_rtp_time_rtp {
self.last_rtp_time_rtp = pkt.header.timestamp;
self.last_rtp_time_time = now;
}
}
self.counters.increment_packets();
self.counters.count_octets(pkt.payload.len());
}
pub(crate) fn generate_report(&mut self, now: Instant) -> rtcp::sender_report::SenderReport {
rtcp::sender_report::SenderReport {
ssrc: self.ssrc,
ntp_time: self.time_baseline.ntp(now),
rtp_time: self.last_rtp_time_rtp.wrapping_add(
(now.duration_since(self.last_rtp_time_time).as_secs_f64() * self.clock_rate)
as u32,
),
packet_count: self.counters.packet_count(),
octet_count: self.counters.octet_count(),
..Default::default()
}
}
}
#[derive(Default)]
pub(crate) struct Counters {
packets: u32,
octets: u32,
}
impl Counters {
pub(crate) fn increment_packets(&mut self) {
self.packets = self.packets.wrapping_add(1);
}
pub(crate) fn count_octets(&mut self, octets: usize) {
self.octets = self
.octets
.wrapping_add(octets.try_into().unwrap_or_else(|_| {
warn!("packet payload larger than 32 bits");
u32::MAX
}));
}
pub(crate) fn packet_count(&self) -> u32 {
self.packets
}
pub(crate) fn octet_count(&self) -> u32 {
self.octets
}
#[cfg(test)]
pub(crate) fn mock(packets: u32, octets: u32) -> Self {
Self { packets, octets }
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_rtp_packet(seq: u16, timestamp: u32, payload_len: usize) -> rtp::packet::Packet {
rtp::packet::Packet {
header: rtp::header::Header {
sequence_number: seq,
timestamp,
..Default::default()
},
payload: vec![0u8; payload_len].into(),
..Default::default()
}
}
#[test]
fn test_sender_stream_before_any_packet() {
let stream = SenderStream::new(123456, 90000, false);
let now = Instant::now();
let mut stream = stream;
let sr = stream.generate_report(now);
assert_eq!(sr.ssrc, 123456);
assert_eq!(sr.packet_count, 0);
assert_eq!(sr.octet_count, 0);
}
#[test]
fn test_sender_stream_after_rtp_packets() {
let mut stream = SenderStream::new(123456, 90000, false);
let now = Instant::now();
for i in 0..10 {
let pkt = make_rtp_packet(i, 0, 2);
stream.process_rtp(now, &pkt);
}
let sr = stream.generate_report(now);
assert_eq!(sr.ssrc, 123456);
assert_eq!(sr.packet_count, 10);
assert_eq!(sr.octet_count, 20);
}
#[test]
fn test_sender_stream_out_of_order_packets() {
let mut stream = SenderStream::new(123456, 90000, false);
let now = Instant::now();
for i in 0..10u16 {
let pkt = make_rtp_packet(i, i as u32, 2);
stream.process_rtp(now, &pkt);
}
let pkt = make_rtp_packet(12, 12, 2);
stream.process_rtp(now, &pkt);
let pkt = make_rtp_packet(11, 11, 2);
stream.process_rtp(now, &pkt);
let sr = stream.generate_report(now);
assert_eq!(sr.ssrc, 123456);
assert_eq!(sr.packet_count, 12);
assert_eq!(sr.octet_count, 24);
assert_eq!(sr.rtp_time, 12);
}
#[test]
fn test_sender_stream_out_of_order_with_use_latest_packet() {
let mut stream = SenderStream::new(123456, 90000, true); let now = Instant::now();
for i in 0..10u16 {
let pkt = make_rtp_packet(i, i as u32, 2);
stream.process_rtp(now, &pkt);
}
let pkt = make_rtp_packet(12, 12, 2);
stream.process_rtp(now, &pkt);
let pkt = make_rtp_packet(11, 11, 2);
stream.process_rtp(now, &pkt);
let sr = stream.generate_report(now);
assert_eq!(sr.ssrc, 123456);
assert_eq!(sr.packet_count, 12);
assert_eq!(sr.octet_count, 24);
assert_eq!(sr.rtp_time, 11);
}
#[test]
fn test_sender_stream_frame_first_packet_optimization() {
let mut stream = SenderStream::new(123456, 90000, false);
let base_time = Instant::now();
let pkt1 = make_rtp_packet(0, 1000, 100);
stream.process_rtp(base_time, &pkt1);
let later_time = base_time + std::time::Duration::from_millis(10);
let pkt2 = make_rtp_packet(1, 1000, 100);
stream.process_rtp(later_time, &pkt2);
let sr = stream.generate_report(base_time);
assert_eq!(sr.rtp_time, 1000);
}
#[test]
fn test_sender_stream_sequence_wrap() {
let mut stream = SenderStream::new(123456, 90000, false);
let now = Instant::now();
let pkt = make_rtp_packet(65534, 100, 10);
stream.process_rtp(now, &pkt);
let pkt = make_rtp_packet(65535, 200, 10);
stream.process_rtp(now, &pkt);
let pkt = make_rtp_packet(0, 300, 10);
stream.process_rtp(now, &pkt);
let pkt = make_rtp_packet(1, 400, 10);
stream.process_rtp(now, &pkt);
let sr = stream.generate_report(now);
assert_eq!(sr.packet_count, 4);
assert_eq!(sr.octet_count, 40);
assert_eq!(sr.rtp_time, 400);
}
#[test]
fn test_counters_wrapping() {
let mut counters = Counters::default();
counters.packets = u32::MAX - 1;
counters.octets = u32::MAX - 1;
counters.increment_packets();
assert_eq!(counters.packet_count(), u32::MAX);
counters.increment_packets();
assert_eq!(counters.packet_count(), 0);
counters.count_octets(1);
assert_eq!(counters.octet_count(), u32::MAX);
counters.count_octets(1);
assert_eq!(counters.octet_count(), 0);
}
}