use super::*;
use bytes::Bytes;
use std::collections::VecDeque;
use tracing::trace;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct StoredPacket {
pub packet: Bytes,
pub decrypted: bool,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum PlayoutMode {
Fill,
Drain,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum PacketLookup {
Packet(StoredPacket),
MissedPacket,
Filling,
}
#[derive(Debug)]
pub struct PlayoutBuffer {
buffer: VecDeque<Option<StoredPacket>>,
playout_mode: PlayoutMode,
next_seq: RtpSequence,
current_timestamp: Option<RtpTimestamp>,
consecutive_store_fails: usize,
}
impl PlayoutBuffer {
pub fn new(capacity: usize, next_seq: RtpSequence) -> Self {
Self {
buffer: VecDeque::with_capacity(capacity),
playout_mode: PlayoutMode::Fill,
next_seq,
current_timestamp: None,
consecutive_store_fails: 0,
}
}
pub fn store_packet(&mut self, packet: StoredPacket, config: &Config) {
let rtp = RtpPacket::new(&packet.packet)
.expect("FATAL: earlier valid packet now invalid (store)");
if self.current_timestamp.is_none() {
self.current_timestamp = Some(reset_timeout(&rtp, config));
}
let pkt_seq = rtp.get_sequence().0;
let desired_index = (pkt_seq - self.next_seq).0 as i16;
let err_threshold = i16::from(config.playout_buffer_length.get() * 5);
let handling_desync = (self.buffer.is_empty()
|| self.consecutive_store_fails >= (err_threshold as usize))
&& desired_index >= err_threshold;
if desired_index < 0 {
trace!("Missed packet arrived late, discarding from playout.");
} else if !handling_desync && desired_index >= 64 {
trace!(
"Packet arrived beyond playout max length({}): wanted slot {desired_index}.\
ts {}, seq {}, next_out_seq {}",
rtp.get_ssrc(),
rtp.get_timestamp().0,
rtp.get_sequence().0,
self.next_seq,
);
self.consecutive_store_fails += 1;
} else {
let index = if handling_desync {
self.buffer.clear();
self.next_seq = pkt_seq;
0
} else {
desired_index as usize
};
while self.buffer.len() <= index {
self.buffer.push_back(None);
}
self.buffer[index] = Some(packet);
self.consecutive_store_fails = 0;
}
if self.buffer.len() >= usize::from(config.playout_buffer_length.get()) {
self.playout_mode = PlayoutMode::Drain;
}
}
pub fn fetch_packet(&mut self, config: &Config) -> PacketLookup {
if self.playout_mode == PlayoutMode::Fill {
return PacketLookup::Filling;
}
let out = match self.buffer.pop_front() {
Some(Some(pkt)) => {
let rtp = RtpPacket::new(&pkt.packet)
.expect("FATAL: earlier valid packet now invalid (fetch)");
let curr_ts = self.current_timestamp.as_mut().unwrap();
let pkt_ts = rtp.get_timestamp().0;
let ts_diff = (*curr_ts - pkt_ts).0 as i32;
let skip_after = i32::try_from(
usize::from(config.playout_buffer_length.get()) * 5 * MONO_FRAME_SIZE,
)
.unwrap_or((AUDIO_FRAME_RATE * 2 * MONO_FRAME_SIZE) as i32);
if ts_diff >= 0 {
self.next_seq = (rtp.get_sequence() + 1).0;
PacketLookup::Packet(pkt)
} else if ts_diff <= -skip_after {
self.next_seq = (rtp.get_sequence() + 1).0;
*curr_ts = pkt_ts;
PacketLookup::Packet(pkt)
} else {
trace!("Witholding packet: ts_diff is {ts_diff}");
self.buffer.push_front(Some(pkt));
self.playout_mode = PlayoutMode::Fill;
PacketLookup::Filling
}
},
Some(None) => {
self.next_seq += 1;
PacketLookup::MissedPacket
},
None => PacketLookup::Filling,
};
if self.buffer.is_empty() {
self.playout_mode = PlayoutMode::Fill;
self.current_timestamp = None;
}
if let Some(ts) = self.current_timestamp.as_mut() {
*ts += &(MONO_FRAME_SIZE as u32);
}
out
}
pub fn next_seq(&self) -> RtpSequence {
self.next_seq
}
}
#[inline]
fn reset_timeout(packet: &RtpPacket<'_>, config: &Config) -> RtpTimestamp {
let t_shift = MONO_FRAME_SIZE * usize::from(config.playout_buffer_length.get());
(packet.get_timestamp() + (t_shift as u32)).0
}