arcly-stream 0.1.3

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Shared RTP/RTCP parsing and H.264 depacketization (RFC 3550 + RFC 6184).
//!
//! Gated behind the internal `_rtp` marker, pulled in by both [`rtsp`] and
//! [`webrtc`]. The two transports differ only in how RTP packets reach the
//! process (TCP-interleaved / UDP for RTSP, DTLS-SRTP for WebRTC); once a packet
//! is in hand, reassembling NAL units into an Annex-B access unit is identical,
//! so it lives here once.
//!
//! [`rtsp`]: crate::protocol::rtsp
//! [`webrtc`]: crate::protocol::webrtc
//!
//! # What it does
//!
//! - [`RtpHeader::parse`] decodes the fixed RTP header (RFC 3550 §5.1), honoring
//!   the CSRC count and the extension-header flag to locate the payload.
//! - [`H264Depacketizer`] turns a sequence of RTP payloads into complete H.264
//!   access units in Annex-B form, handling the three NALU packetization modes
//!   defined by RFC 6184: single NAL units, STAP-A aggregation (type 24), and
//!   FU-A fragmentation (type 28). An access unit is emitted when the RTP marker
//!   bit is set or the RTP timestamp advances.
//!
//! # What it does not do
//!
//! Jitter-buffer reordering and loss concealment are the caller's concern — the
//! depacketizer assumes in-order delivery (true for TCP-interleaved RTSP; for
//! UDP/SRTP a small reorder buffer should sit in front of it). It reports a
//! [`DepacketizeError::OutOfOrder`] gap so a handler can request a keyframe
//! (PLI/FIR) rather than emit a corrupt access unit.

use bytes::Bytes;

/// Annex-B start code prefixed to every reassembled NAL unit.
const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];

/// A parsed RTP fixed header (RFC 3550 §5.1).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RtpHeader {
    /// Payload type (7 bits) — identifies the codec/format binding from SDP.
    pub payload_type: u8,
    /// Marker bit. For H.264 it flags the last packet of an access unit.
    pub marker: bool,
    /// 16-bit sequence number, increments by one per packet (wraps).
    pub sequence: u16,
    /// 32-bit media timestamp in the payload's clock (90 kHz for H.264 video).
    pub timestamp: u32,
    /// Synchronization source identifier.
    pub ssrc: u32,
    /// Byte offset at which the payload begins (past CSRCs and any extension).
    pub payload_offset: usize,
}

impl RtpHeader {
    /// Parse the fixed header from the front of `buf`, returning the header and
    /// the payload offset. Returns `None` if `buf` is too short or the version
    /// field is not 2.
    pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
        if buf.len() < 12 {
            return None;
        }
        let version = buf[0] >> 6;
        if version != 2 {
            return None;
        }
        let has_extension = buf[0] & 0x10 != 0;
        let csrc_count = (buf[0] & 0x0F) as usize;
        let marker = buf[1] & 0x80 != 0;
        let payload_type = buf[1] & 0x7F;
        let sequence = u16::from_be_bytes([buf[2], buf[3]]);
        let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
        let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);

        let mut offset = 12 + csrc_count * 4;
        if has_extension {
            // Extension header: 2-byte profile id, 2-byte length (in 32-bit words).
            if buf.len() < offset + 4 {
                return None;
            }
            let ext_words = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
            offset += 4 + ext_words * 4;
        }
        if buf.len() < offset {
            return None;
        }
        Some(RtpHeader {
            payload_type,
            marker,
            sequence,
            timestamp,
            ssrc,
            payload_offset: offset,
        })
    }
}

/// Errors surfaced while depacketizing an RTP stream.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DepacketizeError {
    /// The packet was shorter than the format requires.
    Truncated,
    /// A sequence-number discontinuity was detected mid-fragment; the partial
    /// access unit was dropped. The handler should request a keyframe.
    OutOfOrder,
    /// An unsupported NAL/aggregation type was encountered.
    Unsupported(u8),
}

/// Reassembles RFC 6184 H.264 RTP payloads into Annex-B access units.
///
/// Feed each packet's payload (the bytes after [`RtpHeader::payload_offset`])
/// with its marker bit and timestamp to [`push`](Self::push). When a complete
/// access unit is ready the method returns `Ok(Some(au))`, where `au` is the
/// concatenated NAL units each prefixed with a 4-byte Annex-B start code —
/// exactly the shape the codec parsers and `annexb_to_avcc` expect.
#[derive(Debug, Default)]
pub struct H264Depacketizer {
    /// Bytes accumulated for the current access unit (Annex-B framed).
    au: Vec<u8>,
    /// FU-A reassembly buffer for the NAL currently being defragmented.
    fua: Vec<u8>,
    /// `true` while an FU-A fragment is in progress (between Start and End bits).
    in_fragment: bool,
    /// Reconstructed NAL header byte for the in-progress FU-A NAL.
    fua_header: u8,
    /// Timestamp of the access unit currently being assembled.
    current_ts: Option<u32>,
    /// Last sequence number seen (for gap detection during fragmentation).
    last_seq: Option<u16>,
}

impl H264Depacketizer {
    /// A fresh depacketizer with no in-progress access unit.
    pub fn new() -> Self {
        Self::default()
    }

    /// Append one NAL unit (Annex-B framed) to the current access unit.
    fn append_nal(&mut self, nal: &[u8]) {
        self.au.extend_from_slice(&ANNEXB_START);
        self.au.extend_from_slice(nal);
    }

    /// Whether the pending access unit holds an IDR (type 5) NAL — a keyframe.
    fn pending_is_keyframe(&self) -> bool {
        // Scan the assembled Annex-B for a NAL header with type 5.
        let mut i = 0;
        while i + 4 < self.au.len() {
            if self.au[i..i + 4] == ANNEXB_START {
                let nal_type = self.au[i + 4] & 0x1F;
                if nal_type == 5 {
                    return true;
                }
            }
            i += 1;
        }
        false
    }

    /// Emit and reset the pending access unit, if any.
    fn take_au(&mut self) -> Option<AccessUnit> {
        if self.au.is_empty() {
            return None;
        }
        let keyframe = self.pending_is_keyframe();
        let timestamp = self.current_ts.unwrap_or(0);
        let data = Bytes::from(std::mem::take(&mut self.au));
        self.current_ts = None;
        Some(AccessUnit {
            data,
            timestamp,
            keyframe,
        })
    }

    /// Push one RTP H.264 payload. Returns a completed [`AccessUnit`] when the
    /// marker bit closes the frame (or the timestamp advances to a new one).
    pub fn push(
        &mut self,
        payload: &[u8],
        marker: bool,
        timestamp: u32,
        sequence: u16,
    ) -> Result<Option<AccessUnit>, DepacketizeError> {
        if payload.is_empty() {
            return Err(DepacketizeError::Truncated);
        }

        // A timestamp change flushes the previous access unit before starting the
        // new one (some encoders omit the marker bit).
        let mut completed = None;
        if let Some(ts) = self.current_ts {
            if ts != timestamp && !self.in_fragment {
                completed = self.take_au();
            }
        }
        self.current_ts = Some(timestamp);

        let nal_type = payload[0] & 0x1F;
        match nal_type {
            1..=23 => {
                // Single NAL unit packet — the payload *is* the NAL.
                self.append_nal(payload);
            }
            24 => {
                // STAP-A: one byte type, then [u16 size][nal]… aggregates.
                let mut i = 1;
                while i + 2 <= payload.len() {
                    let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
                    i += 2;
                    if i + size > payload.len() {
                        return Err(DepacketizeError::Truncated);
                    }
                    self.append_nal(&payload[i..i + size]);
                    i += size;
                }
            }
            28 => {
                // FU-A: byte0 = FU indicator, byte1 = FU header (S|E|R|type).
                if payload.len() < 2 {
                    return Err(DepacketizeError::Truncated);
                }
                let fu_header = payload[1];
                let start = fu_header & 0x80 != 0;
                let end = fu_header & 0x40 != 0;
                let frag_type = fu_header & 0x1F;

                if start {
                    // Reconstruct the original NAL header: F|NRI from the indicator,
                    // type from the FU header.
                    self.fua_header = (payload[0] & 0xE0) | frag_type;
                    self.fua.clear();
                    self.fua.push(self.fua_header);
                    self.in_fragment = true;
                } else if !self.in_fragment {
                    // Mid/last fragment with no start — lost the head.
                    return Err(DepacketizeError::OutOfOrder);
                } else if self.seq_gap(sequence) {
                    self.in_fragment = false;
                    self.fua.clear();
                    return Err(DepacketizeError::OutOfOrder);
                }
                self.fua.extend_from_slice(&payload[2..]);

                if end && self.in_fragment {
                    let nal = std::mem::take(&mut self.fua);
                    self.append_nal(&nal);
                    self.in_fragment = false;
                }
            }
            other => return Err(DepacketizeError::Unsupported(other)),
        }

        self.last_seq = Some(sequence);

        if completed.is_some() {
            return Ok(completed);
        }
        if marker {
            return Ok(self.take_au());
        }
        Ok(None)
    }

    /// Detect a one-step sequence-number gap relative to the previous packet.
    fn seq_gap(&self, sequence: u16) -> bool {
        match self.last_seq {
            Some(prev) => sequence.wrapping_sub(prev) != 1,
            None => false,
        }
    }
}

/// A reassembled H.264 access unit in Annex-B form.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AccessUnit {
    /// Concatenated NAL units, each prefixed with a 4-byte start code.
    pub data: Bytes,
    /// RTP media timestamp (90 kHz) of the access unit.
    pub timestamp: u32,
    /// Whether the access unit contains an IDR (keyframe) NAL.
    pub keyframe: bool,
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Build a minimal 12-byte RTP packet with the given fields and payload.
    fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
        let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
        p.extend_from_slice(&seq.to_be_bytes());
        p.extend_from_slice(&ts.to_be_bytes());
        p.extend_from_slice(&[0, 0, 0, 1]); // ssrc
        p.extend_from_slice(payload);
        p
    }

    #[test]
    fn parses_fixed_header_and_payload_offset() {
        let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
        let h = RtpHeader::parse(&pkt).unwrap();
        assert_eq!(h.sequence, 7);
        assert_eq!(h.timestamp, 9000);
        assert!(h.marker);
        assert_eq!(h.payload_type, 96);
        assert_eq!(h.payload_offset, 12);
        assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
    }

    #[test]
    fn rejects_wrong_version_and_short_buffers() {
        assert!(RtpHeader::parse(&[0x00; 12]).is_none()); // version 0
        assert!(RtpHeader::parse(&[0x80; 4]).is_none()); // too short
    }

    #[test]
    fn honors_csrc_count_in_payload_offset() {
        let mut pkt = rtp(1, 0, false, &[0x41]);
        pkt[0] = 0x82; // version 2, CSRC count = 2
        let mut with_csrc = pkt[..12].to_vec();
        with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); // 2 CSRCs
        with_csrc.push(0x41);
        let h = RtpHeader::parse(&with_csrc).unwrap();
        assert_eq!(h.payload_offset, 20);
    }

    #[test]
    fn single_nal_packet_emits_annexb_on_marker() {
        let mut d = H264Depacketizer::new();
        // Type 1 (non-IDR slice), marker set → one access unit.
        let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
        assert!(!out.keyframe);
        assert_eq!(out.timestamp, 3000);
    }

    #[test]
    fn idr_single_nal_is_flagged_keyframe() {
        let mut d = H264Depacketizer::new();
        let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
        assert!(out.keyframe);
    }

    #[test]
    fn stap_a_splits_aggregated_nals() {
        // STAP-A (24): [24][size=2][AA BB][size=3][CC DD EE]
        let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
        let mut d = H264Depacketizer::new();
        let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
        assert_eq!(
            &out.data[..],
            &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
        );
    }

    #[test]
    fn fu_a_reassembles_fragmented_nal() {
        let mut d = H264Depacketizer::new();
        // FU indicator 0x7C (F=0,NRI=3,type=28), FU header start 0x85 (S=1,type=5).
        assert!(d
            .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
            .unwrap()
            .is_none());
        // Middle fragment (S=0,E=0).
        assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
        // End fragment (E=1), marker closes the AU.
        let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
        // Reconstructed NAL header: NRI 0x60 | type 5 = 0x65, then payload bytes.
        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
        assert!(out.keyframe);
    }

    #[test]
    fn fu_a_sequence_gap_reports_out_of_order() {
        let mut d = H264Depacketizer::new();
        d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
        // Jump from seq 1 to seq 5 mid-fragment.
        assert_eq!(
            d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
            Err(DepacketizeError::OutOfOrder)
        );
    }

    #[test]
    fn timestamp_change_flushes_previous_au_without_marker() {
        let mut d = H264Depacketizer::new();
        // First AU, no marker.
        assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
        // New timestamp flushes the first AU.
        let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
        assert_eq!(out.timestamp, 1000);
        assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
    }
}