arcly-stream 0.1.5

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Panic-free big-endian reads over a byte slice — the shared primitive under
//! every network parser (RTP, SRT, MPEG-TS, RTMP).
//!
//! Gated by `ingest` (which all four protocols require). Network parsers consume
//! untrusted bytes, so the crate invariant — enforced by the fuzz targets and
//! [`tests/parser_panic_sweep`] — is that **no input may cause a panic**. Reading
//! through [`ByteReader`] makes that structural: it never indexes the slice
//! directly, so an out-of-range field yields `None`, not an abort.
//!
//! [`tests/parser_panic_sweep`]: https://gitlab.com/arcly/arcly-stream
//!
//! This is deliberately a primitive *reader*, not a protocol abstraction: it
//! unifies the bounds-checked integer/slice reads the parsers all share, while
//! each parser keeps its own structure (RTP/SRT headers, TS sections, RTMP
//! chunk assembly) where the logic genuinely differs.

/// A forward cursor over a byte slice with bounds-checked big-endian reads.
///
/// Every accessor advances the cursor and returns `Option`, so a truncated or
/// hostile buffer can never panic. Construct with [`new`](Self::new); query
/// [`remaining`](Self::remaining) / [`position`](Self::position) as needed.
#[derive(Debug, Clone)]
pub(crate) struct ByteReader<'a> {
    buf: &'a [u8],
    pos: usize,
}

impl<'a> ByteReader<'a> {
    /// A reader positioned at the start of `buf`.
    pub(crate) fn new(buf: &'a [u8]) -> Self {
        Self { buf, pos: 0 }
    }

    /// The current absolute offset into the original buffer.
    pub(crate) fn position(&self) -> usize {
        self.pos
    }

    /// Read one byte, or `None` at end of input.
    pub(crate) fn u8(&mut self) -> Option<u8> {
        let b = *self.buf.get(self.pos)?;
        self.pos += 1;
        Some(b)
    }

    /// Read a big-endian `u16`.
    pub(crate) fn u16_be(&mut self) -> Option<u16> {
        let s = self.take(2)?;
        Some(u16::from_be_bytes([s[0], s[1]]))
    }

    /// Read a big-endian `u32`.
    pub(crate) fn u32_be(&mut self) -> Option<u32> {
        let s = self.take(4)?;
        Some(u32::from_be_bytes([s[0], s[1], s[2], s[3]]))
    }

    /// Borrow the next `n` bytes and advance, or `None` if fewer remain.
    pub(crate) fn take(&mut self, n: usize) -> Option<&'a [u8]> {
        let end = self.pos.checked_add(n)?;
        let s = self.buf.get(self.pos..end)?;
        self.pos = end;
        Some(s)
    }

    /// Skip `n` bytes; returns `None` (without advancing) if fewer remain.
    pub(crate) fn skip(&mut self, n: usize) -> Option<()> {
        let end = self.pos.checked_add(n)?;
        if end > self.buf.len() {
            return None;
        }
        self.pos = end;
        Some(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn reads_advance_and_bound_check() {
        let mut r = ByteReader::new(&[0x12, 0x34, 0x56, 0x78, 0x9A]);
        assert_eq!(r.u8(), Some(0x12));
        assert_eq!(r.u16_be(), Some(0x3456));
        assert_eq!(r.position(), 3);
        assert_eq!(r.u32_be(), None); // only 2 left
        assert_eq!(r.take(2), Some(&[0x78, 0x9A][..]));
        assert_eq!(r.u8(), None);
    }

    #[test]
    fn skip_bounds_checks() {
        let mut r = ByteReader::new(&[0x00, 0x01, 0x02, 0xFF]);
        assert_eq!(r.skip(3), Some(()));
        assert_eq!(r.position(), 3);
        assert_eq!(r.skip(2), None); // only 1 left → no advance
        assert_eq!(r.position(), 3);
        assert_eq!(r.u8(), Some(0xFF));
    }

    #[test]
    fn empty_buffer_never_panics() {
        let mut r = ByteReader::new(&[]);
        assert_eq!(r.u8(), None);
        assert_eq!(r.u16_be(), None);
        assert_eq!(r.u32_be(), None);
        assert_eq!(r.take(5), None);
        assert_eq!(r.skip(3), None);
    }
}