msf-rtp 0.2.0

Real-time Transport Protocol (RTP) for Rust.
Documentation
//! Helpers.

use std::collections::VecDeque;

use crate::rtp::RtpPacket;

/// Reordering error.
pub enum ReorderingError {
    BufferFull(RtpPacket),
    DuplicatePacket(RtpPacket),
}

impl ReorderingError {
    /// Check if the reordering buffer rejected the packet because it would
    /// exceed its capacity.
    #[inline]
    pub fn is_full(&self) -> bool {
        matches!(self, Self::BufferFull(_))
    }

    /// Check if the packet was rejected because of being a duplicate.
    #[inline]
    pub fn is_duplicate(&self) -> bool {
        matches!(self, Self::DuplicatePacket(_))
    }
}

/// Reordering buffer for RTP packets.
///
/// The reordering buffer internally uses `u64` packet indices. These indices
/// are estimated from packet sequence numbers according to the algorithm
/// presented in RFC 3711, section 3.3.1.
///
/// This simplifies the original algorithm presented in RFC 3550.
///
/// Note: RFC 3711 uses 32-bit ROC. This implementation uses 64-bit indices, so
/// the actual bit width of the ROC is 48 bits here (48-bit ROC + 16-bit
/// sequence number gives 64-bit packet index). The 32-bit ROC value can be
/// extracted from packet indices simply by cutting off the upper 16 bits, e.g.
/// `(index >> 16) as u32`.
pub struct ReorderingBuffer {
    start: Option<u64>,
    window: VecDeque<Option<RtpPacket>>,
    capacity: usize,
}

impl ReorderingBuffer {
    /// Create a new reordering buffer with a given depth.
    #[inline]
    pub fn new(depth: usize) -> Self {
        Self {
            start: None,
            window: VecDeque::with_capacity(depth.min(32)),
            capacity: depth,
        }
    }

    /// Estimate packet index from a given sequence number.
    pub fn estimate_index(&self, sequence_nr: u16) -> u64 {
        let start_index = self.start.unwrap_or(sequence_nr as u64);
        let last_index = start_index + (self.window.len() as u64);
        let last_seq_nr = (last_index & 0xffff) as u16;
        let last_roc = last_index >> 16;

        let new_seq_nr = sequence_nr;

        let new_roc = if new_seq_nr > last_seq_nr {
            if (new_seq_nr - last_seq_nr) < 0x8000 {
                last_roc
            } else {
                last_roc.wrapping_sub(1)
            }
        } else if (last_seq_nr - new_seq_nr) < 0x8000 {
            last_roc
        } else {
            last_roc.wrapping_add(1)
        };

        (new_roc << 16) | (new_seq_nr as u64)
    }

    /// Check if a packet with a given index would be a duplicate.
    pub fn is_duplicate(&self, index: u64) -> bool {
        let start = self.start.unwrap_or(index);

        if index < start {
            return true;
        }

        let offset = index - start;

        if offset > (usize::MAX as u64) {
            return false;
        }

        let offset = offset as usize;

        self.window
            .get(offset)
            .map(|entry| entry.is_some())
            .unwrap_or(false)
    }

    /// Push a given packet into the buffer and return index of the inserted
    /// packet.
    ///
    /// The method returns an error if the packet cannot be inserted into the
    /// buffer because it is either a duplicate or the buffer would exceed its
    /// capacity.
    pub fn push(&mut self, packet: RtpPacket) -> Result<u64, ReorderingError> {
        let index = self.estimate_index(packet.sequence_number());

        if self.start.is_none() {
            self.start = Some(index);
        }

        let start = self.start.unwrap();

        if index < start {
            return Err(ReorderingError::DuplicatePacket(packet));
        }

        let offset = index - start;

        if offset > (usize::MAX as u64) {
            return Err(ReorderingError::BufferFull(packet));
        }

        let offset = offset as usize;

        if offset < self.capacity {
            while offset >= self.window.len() {
                self.window.push_back(None);
            }

            let entry = &mut self.window[offset];

            if entry.is_some() {
                return Err(ReorderingError::DuplicatePacket(packet));
            }

            *entry = Some(packet);

            Ok(index)
        } else {
            Err(ReorderingError::BufferFull(packet))
        }
    }

    /// Take the next packet from the buffer.
    ///
    /// This method will return a packet only if there is a packet in the front
    /// slot of the buffer. In other words, the index of the returned packet
    /// will always be equal to the start index.
    #[allow(clippy::should_implement_trait)]
    pub fn next(&mut self) -> Option<RtpPacket> {
        if let Some(entry) = self.window.front() {
            if entry.is_some() {
                return self.take();
            }
        }

        None
    }

    /// Remove the front packet in the buffer and advance the window start
    /// position by one.
    ///
    /// The method will always advance the window start position even if the
    /// front slot is empty or if the underlying buffer itself is empty.
    pub fn take(&mut self) -> Option<RtpPacket> {
        if let Some(start) = self.start.as_mut() {
            *start += 1;
        }

        self.window.pop_front()?
    }

    /// Check if the underlying buffer is empty.
    #[inline]
    pub fn is_empty(&self) -> bool {
        self.window.is_empty()
    }
}