use std::{collections::VecDeque, ops::Deref};
use lru::LruCache;
use crate::rtp::{IncomingRtpPacket, OrderedRtpPacket};
pub enum ReorderingError {
BufferFull(IncomingRtpPacket),
DuplicatePacket(IncomingRtpPacket),
}
impl ReorderingError {
#[inline]
pub fn is_full(&self) -> bool {
matches!(self, Self::BufferFull(_))
}
#[inline]
pub fn is_duplicate(&self) -> bool {
matches!(self, Self::DuplicatePacket(_))
}
}
pub struct ReorderingBuffer {
inner: InternalBuffer,
}
impl ReorderingBuffer {
#[inline]
pub fn new(depth: usize) -> Self {
Self {
inner: InternalBuffer::new(depth),
}
}
pub fn estimate_index(&self, sequence_nr: u16) -> u64 {
self.inner.estimate_index(sequence_nr)
}
pub fn is_duplicate(&self, index: u64) -> bool {
self.inner.is_duplicate(index)
}
#[allow(clippy::result_large_err)]
pub fn push(&mut self, packet: IncomingRtpPacket) -> Result<u64, ReorderingError> {
self.inner
.push(InputPacket::new(packet, 0))
.map_err(ReorderingError::from)
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<OrderedRtpPacket> {
self.inner.next().map(OrderedRtpPacket::from)
}
pub fn take(&mut self) -> Option<OrderedRtpPacket> {
self.inner.take().map(OrderedRtpPacket::from)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
pub struct ReorderingMultiBuffer {
input_index_to_ssrc: VecDeque<Option<u32>>,
first_input_index: usize,
sources: LruCache<u32, InternalBuffer>,
output: VecDeque<OutputPacket>,
capacity: usize,
max_ssrcs: Option<usize>,
}
impl ReorderingMultiBuffer {
pub fn new(depth: usize, max_ssrcs: Option<usize>) -> Self {
Self {
input_index_to_ssrc: VecDeque::new(),
first_input_index: 0,
sources: LruCache::unbounded(),
output: VecDeque::with_capacity(depth.min(8)),
capacity: depth,
max_ssrcs,
}
}
pub fn estimate_index(&self, ssrc: u32, sequence_nr: u16) -> u64 {
self.sources
.peek(&ssrc)
.map(|source| source.estimate_index(sequence_nr))
.unwrap_or(sequence_nr as u64)
}
pub fn is_duplicate(&self, ssrc: u32, index: u64) -> bool {
self.sources
.peek(&ssrc)
.map(|source| source.is_duplicate(index))
.unwrap_or(false)
}
#[allow(clippy::result_large_err)]
pub fn push(&mut self, packet: IncomingRtpPacket) -> Result<u64, ReorderingError> {
if self.input_index_to_ssrc.len() >= self.capacity {
return Err(ReorderingError::BufferFull(packet));
}
let ssrc = packet.ssrc();
let source = self
.sources
.get_or_insert_mut(ssrc, || InternalBuffer::new(self.capacity));
let input_index = self
.first_input_index
.wrapping_add(self.input_index_to_ssrc.len());
let output_index = source.push(InputPacket::new(packet, input_index))?;
self.input_index_to_ssrc.push_back(Some(ssrc));
while let Some(packet) = source.next() {
self.output.push_back(packet);
}
if let Some(max_ssrcs) = self.max_ssrcs {
while self.sources.len() > max_ssrcs {
if let Some((_, mut source)) = self.sources.pop_lru() {
while !source.is_empty() {
if let Some(packet) = source.take() {
self.output.push_back(packet);
}
}
}
}
}
Ok(output_index)
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<OrderedRtpPacket> {
let packet = self.output.pop_front()?;
self.remove_input_index(packet.input_index);
Some(packet.into())
}
pub fn take(&mut self) -> Option<OrderedRtpPacket> {
let packet = if let Some(p) = self.output.pop_front() {
p
} else {
self.poll_oldest_source()?
};
self.remove_input_index(packet.input_index);
Some(packet.into())
}
#[inline]
pub fn is_empty(&self) -> bool {
self.input_index_to_ssrc.is_empty()
}
fn remove_input_index(&mut self, input_index: usize) {
let offset = input_index.wrapping_sub(self.first_input_index);
self.input_index_to_ssrc[offset] = None;
while let Some(None) = self.input_index_to_ssrc.front() {
self.input_index_to_ssrc.pop_front();
self.first_input_index = self.first_input_index.wrapping_add(1);
}
}
fn poll_oldest_source(&mut self) -> Option<OutputPacket> {
if let Some(ssrc) = self.input_index_to_ssrc.front()? {
if let Some(source) = self.sources.peek_mut(ssrc) {
if !source.is_empty() {
let res = source.take();
while let Some(packet) = source.next() {
self.output.push_back(packet);
}
return res;
}
}
}
panic!("inconsistent state")
}
}
struct InternalBuffer {
start: Option<u64>,
window: VecDeque<Option<OutputPacket>>,
capacity: usize,
}
impl InternalBuffer {
#[inline]
fn new(depth: usize) -> Self {
Self {
start: None,
window: VecDeque::with_capacity(depth.min(8)),
capacity: depth,
}
}
fn estimate_index(&self, sequence_nr: u16) -> u64 {
let start_index = self.start.unwrap_or(sequence_nr as u64);
let last_index = start_index.wrapping_add(self.window.len() as u64);
let last_seq_nr = last_index as u16;
let last_roc = last_index & !0xffff;
let new_seq_nr = sequence_nr;
let new_roc = if last_seq_nr < 0x8000 {
if new_seq_nr > (last_seq_nr + 0x8000) {
last_roc.wrapping_sub(0x10000)
} else {
last_roc
}
} else if (last_seq_nr - 0x8000) > new_seq_nr {
last_roc.wrapping_add(0x10000)
} else {
last_roc
};
new_roc | (new_seq_nr as u64)
}
fn is_duplicate(&self, index: u64) -> bool {
let start = self.start.unwrap_or(index);
let offset = index.wrapping_sub(start);
if offset > (u64::MAX >> 1) {
return true;
}
let Ok(offset) = usize::try_from(offset) else {
return false;
};
self.window
.get(offset)
.map(|entry| entry.is_some())
.unwrap_or(false)
}
#[allow(clippy::result_large_err)]
fn push(&mut self, packet: InputPacket) -> Result<u64, InternalError> {
let index = self.estimate_index(packet.sequence_number());
if self.start.is_none() {
self.start = Some(index);
}
let start = self.start.unwrap_or(index);
let offset = index.wrapping_sub(start);
if offset > (u64::MAX >> 1) {
return Err(InternalError::DuplicatePacket(packet));
}
let Ok(offset) = usize::try_from(offset) else {
return Err(InternalError::BufferFull(packet));
};
if offset < self.capacity {
while offset >= self.window.len() {
self.window.push_back(None);
}
let entry = &mut self.window[offset];
if entry.is_some() {
return Err(InternalError::DuplicatePacket(packet));
}
*entry = Some(OutputPacket::new(packet, index));
Ok(index)
} else {
Err(InternalError::BufferFull(packet))
}
}
fn next(&mut self) -> Option<OutputPacket> {
if let Some(entry) = self.window.front() {
if entry.is_some() {
return self.take();
}
}
None
}
fn take(&mut self) -> Option<OutputPacket> {
if let Some(start) = self.start.as_mut() {
*start = start.wrapping_add(1);
}
self.window.pop_front()?
}
#[inline]
fn is_empty(&self) -> bool {
self.window.is_empty()
}
}
struct InputPacket {
input_index: usize,
packet: IncomingRtpPacket,
}
impl InputPacket {
fn new(packet: IncomingRtpPacket, input_index: usize) -> Self {
Self {
input_index,
packet,
}
}
}
impl Deref for InputPacket {
type Target = IncomingRtpPacket;
fn deref(&self) -> &Self::Target {
&self.packet
}
}
struct OutputPacket {
input_index: usize,
output_index: u64,
packet: IncomingRtpPacket,
}
impl OutputPacket {
fn new(packet: InputPacket, output_index: u64) -> Self {
Self {
input_index: packet.input_index,
output_index,
packet: packet.packet,
}
}
}
impl From<OutputPacket> for OrderedRtpPacket {
fn from(packet: OutputPacket) -> Self {
OrderedRtpPacket::new(packet.packet, packet.output_index)
}
}
enum InternalError {
BufferFull(InputPacket),
DuplicatePacket(InputPacket),
}
impl From<InternalError> for ReorderingError {
fn from(err: InternalError) -> Self {
match err {
InternalError::BufferFull(packet) => Self::BufferFull(packet.packet),
InternalError::DuplicatePacket(packet) => Self::DuplicatePacket(packet.packet),
}
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use super::{ReorderingBuffer, ReorderingError, ReorderingMultiBuffer};
use crate::rtp::{IncomingRtpPacket, RtpPacket};
fn make_packet(seq: u16, ssrc: u32) -> IncomingRtpPacket {
let packet = RtpPacket::new().with_sequence_number(seq).with_ssrc(ssrc);
IncomingRtpPacket::new(packet, Instant::now())
}
#[test]
fn test_wrapping_index_arithmetic() {
let mut buffer = ReorderingBuffer::new(4);
assert!(matches!(buffer.push(make_packet(0x1000, 1)), Ok(0x1000)));
assert_eq!(buffer.estimate_index(0x0000), 0x0000_0000_0000_0000);
assert_eq!(buffer.estimate_index(0x2000), 0x0000_0000_0000_2000);
assert_eq!(buffer.estimate_index(0xf000), 0xffff_ffff_ffff_f000);
assert!(matches!(
buffer.push(make_packet(0xf000, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(
buffer.push(make_packet(0x2000, 1)),
Err(ReorderingError::BufferFull(_))
));
buffer = ReorderingBuffer::new(4);
assert!(matches!(buffer.push(make_packet(0xe000, 1)), Ok(0xe000)));
assert_eq!(buffer.estimate_index(0xd000), 0x0000_0000_0000_d000);
assert_eq!(buffer.estimate_index(0xf000), 0x0000_0000_0000_f000);
assert_eq!(buffer.estimate_index(0x1000), 0x0000_0000_0001_1000);
assert!(matches!(
buffer.push(make_packet(0xd000, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(
buffer.push(make_packet(0x1000, 1)),
Err(ReorderingError::BufferFull(_))
));
buffer = ReorderingBuffer::new(4);
buffer.inner.start = Some(u64::MAX);
assert!(matches!(buffer.push(make_packet(0xffff, 1)), Ok(u64::MAX)));
assert!(matches!(buffer.push(make_packet(0x0000, 1)), Ok(0)));
assert_eq!(buffer.estimate_index(0xf000), 0xffff_ffff_ffff_f000);
assert_eq!(buffer.estimate_index(0x1000), 0x0000_0000_0000_1000);
assert!(matches!(
buffer.push(make_packet(0xf000, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(
buffer.push(make_packet(0x1000, 1)),
Err(ReorderingError::BufferFull(_))
));
}
#[test]
fn test_reordering_buffer() {
let mut buffer = ReorderingBuffer::new(5);
assert!(buffer.is_empty());
assert!(matches!(buffer.push(make_packet(2, 1)), Ok(2)));
assert!(matches!(
buffer.push(make_packet(0, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(
buffer.push(make_packet(1, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(buffer.push(make_packet(4, 1)), Ok(4)));
assert!(matches!(buffer.push(make_packet(3, 1)), Ok(3)));
assert!(matches!(
buffer.push(make_packet(3, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(buffer.push(make_packet(6, 1)), Ok(6)));
assert!(matches!(
buffer.push(make_packet(7, 1)),
Err(ReorderingError::BufferFull(_))
));
assert!(!buffer.is_empty());
assert_eq!(buffer.next().unwrap().index(), 2);
assert_eq!(buffer.next().unwrap().index(), 3);
assert_eq!(buffer.next().unwrap().index(), 4);
assert!(matches!(buffer.next(), None));
assert!(!buffer.is_empty());
assert!(matches!(buffer.take(), None));
assert!(!buffer.is_empty());
assert_eq!(buffer.next().unwrap().index(), 6);
assert!(buffer.is_empty());
}
#[test]
fn test_reordering_multi_buffer() {
let mut buffer = ReorderingMultiBuffer::new(8, Some(2));
assert!(buffer.is_empty());
assert!(matches!(buffer.push(make_packet(2, 1)), Ok(2)));
assert!(matches!(
buffer.push(make_packet(0, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(
buffer.push(make_packet(1, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(buffer.push(make_packet(4, 1)), Ok(4)));
assert!(matches!(buffer.push(make_packet(3, 1)), Ok(3)));
assert!(matches!(
buffer.push(make_packet(3, 1)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(buffer.push(make_packet(6, 1)), Ok(6)));
assert!(matches!(
buffer.push(make_packet(13, 1)),
Err(ReorderingError::BufferFull(_))
));
assert!(matches!(buffer.push(make_packet(10, 2)), Ok(10)));
assert!(matches!(
buffer.push(make_packet(9, 2)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(
buffer.push(make_packet(8, 2)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(buffer.push(make_packet(12, 2)), Ok(12)));
assert!(matches!(buffer.push(make_packet(11, 2)), Ok(11)));
assert!(matches!(
buffer.push(make_packet(11, 2)),
Err(ReorderingError::DuplicatePacket(_))
));
assert!(matches!(
buffer.push(make_packet(21, 2)),
Err(ReorderingError::BufferFull(_))
));
assert!(matches!(buffer.push(make_packet(14, 2)), Ok(14)));
assert!(matches!(
buffer.push(make_packet(15, 2)),
Err(ReorderingError::BufferFull(_))
));
assert!(!buffer.is_empty());
assert_eq!(buffer.next().unwrap().index(), 2);
assert_eq!(buffer.next().unwrap().index(), 3);
assert_eq!(buffer.next().unwrap().index(), 4);
assert_eq!(buffer.next().unwrap().index(), 10);
assert_eq!(buffer.next().unwrap().index(), 11);
assert_eq!(buffer.next().unwrap().index(), 12);
assert!(matches!(buffer.next(), None));
assert!(!buffer.is_empty());
assert!(matches!(buffer.take(), None));
assert!(!buffer.is_empty());
assert_eq!(buffer.next().unwrap().index(), 6);
assert!(matches!(buffer.next(), None));
assert!(!buffer.is_empty());
assert!(matches!(buffer.take(), None));
assert!(!buffer.is_empty());
assert_eq!(buffer.next().unwrap().index(), 14);
assert!(buffer.is_empty());
}
}