arcly-stream 0.1.3

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! A focused MPEG-TS demuxer for SRT ingest.
//!
//! SRT carries an MPEG-TS bytestream in its data packets. This demuxer consumes
//! 188-byte TS packets, follows the PAT → PMT → elementary-PID chain, reassembles
//! PES packets on the video PID, and emits one [`TsPayload`] per access unit in
//! Annex-B form with a decoded PTS.
//!
//! It is deliberately small: single program, first video elementary stream,
//! H.264 (`stream_type` 0x1B) and H.265 (0x24). Audio PIDs are recognized but
//! not forwarded (the SRT ingest path publishes the video elementary stream;
//! audio passthrough is a follow-up). Continuity-counter gaps are tolerated — a
//! lost packet simply truncates the in-progress PES, which is dropped.

use crate::CodecId;
use bytes::Bytes;

/// One reassembled access unit demuxed from the TS stream.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TsPayload {
    /// Elementary-stream access unit (Annex-B framed for H.264/H.265).
    pub data: Bytes,
    /// Codec identified from the PMT `stream_type`.
    pub codec: CodecId,
    /// Presentation timestamp in milliseconds (PES PTS / 90).
    pub pts_ms: i64,
    /// Whether the access unit holds a keyframe (IDR).
    pub keyframe: bool,
}

const TS_PACKET_LEN: usize = 188;
const TS_SYNC: u8 = 0x47;

/// Stateful MPEG-TS demuxer. Feed it TS bytes with [`push`](Self::push).
#[derive(Debug)]
pub struct TsDemuxer {
    /// PID carrying the PMT, learned from the PAT (PID 0).
    pmt_pid: Option<u16>,
    /// PID carrying the chosen video elementary stream, learned from the PMT.
    video_pid: Option<u16>,
    /// Codec of the video elementary stream.
    codec: CodecId,
    /// PES reassembly buffer for the video PID.
    pes: Vec<u8>,
    /// PTS (90 kHz) of the PES currently being reassembled.
    pes_pts: i64,
    /// Whether a PES is currently open (between two PUSI markers).
    pes_open: bool,
    /// Carry for TS bytes that span `push` calls but don't fill a packet.
    carry: Vec<u8>,
}

impl Default for TsDemuxer {
    fn default() -> Self {
        Self::new()
    }
}

impl TsDemuxer {
    /// A fresh demuxer that has not yet seen a PAT.
    pub fn new() -> Self {
        Self {
            pmt_pid: None,
            video_pid: None,
            codec: CodecId::Unknown,
            pes: Vec::new(),
            pes_pts: 0,
            pes_open: false,
            carry: Vec::new(),
        }
    }

    /// Push a chunk of the TS bytestream, returning any access units completed.
    pub fn push(&mut self, bytes: &[u8]) -> Vec<TsPayload> {
        let mut out = Vec::new();
        // Prepend any carried partial packet.
        let mut data = std::mem::take(&mut self.carry);
        data.extend_from_slice(bytes);

        let mut i = 0;
        while i + TS_PACKET_LEN <= data.len() {
            let pkt = &data[i..i + TS_PACKET_LEN];
            if pkt[0] == TS_SYNC {
                self.handle_packet(pkt, &mut out);
                i += TS_PACKET_LEN;
            } else {
                // Resync: skip a byte and look for the next sync.
                i += 1;
            }
        }
        // Carry the remainder (a partial packet) to the next call.
        self.carry = data[i..].to_vec();
        out
    }

    /// Process one 188-byte TS packet.
    fn handle_packet(&mut self, pkt: &[u8], out: &mut Vec<TsPayload>) {
        let pusi = pkt[1] & 0x40 != 0;
        let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
        let adaptation = (pkt[3] >> 4) & 0x03;
        let has_payload = adaptation == 1 || adaptation == 3;
        if !has_payload {
            return;
        }
        // Skip the adaptation field if present.
        let mut payload_start = 4;
        if adaptation == 3 {
            let af_len = pkt[4] as usize;
            payload_start = 5 + af_len;
        }
        if payload_start >= TS_PACKET_LEN {
            return;
        }
        let payload = &pkt[payload_start..];

        if pid == 0 {
            self.parse_pat(payload, pusi);
        } else if Some(pid) == self.pmt_pid {
            self.parse_pmt(payload, pusi);
        } else if Some(pid) == self.video_pid {
            self.parse_video(payload, pusi, out);
        }
    }

    /// Parse the PAT to learn the PMT PID (first program).
    fn parse_pat(&mut self, payload: &[u8], pusi: bool) {
        let section = section_body(payload, pusi);
        let Some(section) = section else { return };
        // PAT entries start after the 8-byte section header, 4 bytes each.
        let mut i = 8;
        while i + 4 <= section.len().saturating_sub(4) {
            let program = u16::from_be_bytes([section[i], section[i + 1]]);
            let pid = (((section[i + 2] & 0x1F) as u16) << 8) | section[i + 3] as u16;
            if program != 0 {
                self.pmt_pid = Some(pid);
                return;
            }
            i += 4;
        }
    }

    /// Parse the PMT to learn the first video elementary PID and codec.
    fn parse_pmt(&mut self, payload: &[u8], pusi: bool) {
        let Some(section) = section_body(payload, pusi) else {
            return;
        };
        if section.len() < 12 {
            return;
        }
        let program_info_len = (((section[10] & 0x0F) as usize) << 8) | section[11] as usize;
        let mut i = 12 + program_info_len;
        while i + 5 <= section.len().saturating_sub(4) {
            let stream_type = section[i];
            let pid = (((section[i + 1] & 0x1F) as u16) << 8) | section[i + 2] as u16;
            let es_info_len = (((section[i + 3] & 0x0F) as usize) << 8) | section[i + 4] as usize;
            let codec = match stream_type {
                0x1B => CodecId::H264,
                0x24 => CodecId::H265,
                _ => CodecId::Unknown,
            };
            if codec != CodecId::Unknown && self.video_pid.is_none() {
                self.video_pid = Some(pid);
                self.codec = codec;
            }
            i += 5 + es_info_len;
        }
    }

    /// Reassemble PES on the video PID, flushing the previous access unit on each
    /// new PES start (PUSI).
    fn parse_video(&mut self, payload: &[u8], pusi: bool, out: &mut Vec<TsPayload>) {
        if pusi {
            // Flush the access unit that just ended.
            self.flush_pes(out);
            // A new PES starts: parse its header to find PTS and the ES offset.
            if let Some((pts, es_offset)) = parse_pes_header(payload) {
                self.pes_pts = pts;
                self.pes_open = true;
                self.pes.extend_from_slice(&payload[es_offset..]);
            }
        } else if self.pes_open {
            self.pes.extend_from_slice(payload);
        }
    }

    /// Emit the buffered PES as an access unit, if any.
    fn flush_pes(&mut self, out: &mut Vec<TsPayload>) {
        if !self.pes_open || self.pes.is_empty() {
            self.pes.clear();
            self.pes_open = false;
            return;
        }
        let es = std::mem::take(&mut self.pes);
        let keyframe = is_keyframe(&es, self.codec);
        out.push(TsPayload {
            data: Bytes::from(es),
            codec: self.codec,
            pts_ms: self.pes_pts / 90,
            keyframe,
        });
        self.pes_open = false;
    }
}

/// Extract the PSI section body from a TS payload, honoring the `pointer_field`
/// that precedes a section in a PUSI packet.
fn section_body(payload: &[u8], pusi: bool) -> Option<&[u8]> {
    if pusi {
        let pointer = *payload.first()? as usize;
        payload.get(1 + pointer..)
    } else {
        Some(payload)
    }
}

/// Parse a PES header, returning `(pts_90khz, es_payload_offset)`.
fn parse_pes_header(p: &[u8]) -> Option<(i64, usize)> {
    // Start code 00 00 01, stream_id, 2-byte length, then the optional header.
    if p.len() < 9 || p[0] != 0 || p[1] != 0 || p[2] != 1 {
        return None;
    }
    let header_data_len = p[8] as usize;
    let es_offset = 9 + header_data_len;
    let pts_dts_flags = p[7] >> 6;
    let pts = if pts_dts_flags & 0x02 != 0 && p.len() >= 14 {
        // 33-bit PTS spread across 5 bytes with marker bits.
        let b = &p[9..14];
        (((b[0] as i64 >> 1) & 0x07) << 30)
            | ((b[1] as i64) << 22)
            | (((b[2] as i64 >> 1) & 0x7F) << 15)
            | ((b[3] as i64) << 7)
            | ((b[4] as i64 >> 1) & 0x7F)
    } else {
        0
    };
    Some((pts, es_offset))
}

/// Detect a keyframe in an Annex-B elementary access unit.
fn is_keyframe(es: &[u8], codec: CodecId) -> bool {
    let mut i = 0;
    while i + 4 < es.len() {
        // Match 3- or 4-byte start codes.
        let sc3 = es[i] == 0 && es[i + 1] == 0 && es[i + 2] == 1;
        let sc4 = es[i] == 0 && es[i + 1] == 0 && es[i + 2] == 0 && es[i + 3] == 1;
        if sc3 || sc4 {
            let nal_off = if sc4 { i + 4 } else { i + 3 };
            if let Some(&hdr) = es.get(nal_off) {
                match codec {
                    CodecId::H264 if hdr & 0x1F == 5 => return true,
                    CodecId::H265 if (16..=21).contains(&((hdr >> 1) & 0x3F)) => return true,
                    _ => {}
                }
            }
            i = nal_off;
        } else {
            i += 1;
        }
    }
    false
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Build a 188-byte TS packet for `pid` with `pusi` and the given payload.
    fn ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> Vec<u8> {
        let mut pkt = vec![0u8; TS_PACKET_LEN];
        pkt[0] = TS_SYNC;
        pkt[1] = if pusi { 0x40 } else { 0 } | ((pid >> 8) as u8 & 0x1F);
        pkt[2] = (pid & 0xFF) as u8;
        pkt[3] = 0x10; // payload only, cc 0
        let n = payload.len().min(TS_PACKET_LEN - 4);
        pkt[4..4 + n].copy_from_slice(&payload[..n]);
        pkt
    }

    /// A PAT pointing program 1 at PMT PID 0x1000.
    fn pat() -> Vec<u8> {
        let mut sec = vec![0u8]; // pointer_field = 0
                                 // table_id 0, section header (8 bytes from table_id), then one program.
        sec.extend_from_slice(&[0x00, 0xB0, 0x0D, 0, 0, 0xC1, 0, 0]);
        sec.extend_from_slice(&[0x00, 0x01]); // program_number 1
        sec.extend_from_slice(&[0xE0 | 0x10, 0x00]); // PMT PID 0x1000
        sec.extend_from_slice(&[0, 0, 0, 0]); // CRC placeholder
        ts_packet(0, true, &sec)
    }

    /// A PMT declaring an H.264 video elementary stream on PID 0x0100.
    fn pmt() -> Vec<u8> {
        let mut sec = vec![0u8]; // pointer_field
                                 // table_id 2, header to program_info_length.
        sec.extend_from_slice(&[0x02, 0xB0, 0x12, 0, 0x01, 0xC1, 0, 0]);
        sec.extend_from_slice(&[0xE1, 0x00]); // PCR PID
        sec.extend_from_slice(&[0xF0, 0x00]); // program_info_length 0
        sec.extend_from_slice(&[0x1B, 0xE1, 0x00, 0xF0, 0x00]); // H.264 on PID 0x100
        sec.extend_from_slice(&[0, 0, 0, 0]); // CRC placeholder
        ts_packet(0x1000, true, &sec)
    }

    /// A PES packet on PID 0x100 wrapping `es` with a PTS.
    fn video_pes(es: &[u8], pts: i64) -> Vec<u8> {
        let mut p = vec![0x00, 0x00, 0x01, 0xE0, 0x00, 0x00, 0x80, 0x80, 0x05];
        // 5-byte PTS with marker bits.
        let pts = pts as u64;
        p.push((0x21 | (((pts >> 30) & 0x07) << 1)) as u8);
        p.push(((pts >> 22) & 0xFF) as u8);
        p.push((0x01 | (((pts >> 15) & 0x7F) << 1)) as u8);
        p.push(((pts >> 7) & 0xFF) as u8);
        p.push((0x01 | ((pts & 0x7F) << 1)) as u8);
        p.extend_from_slice(es);
        ts_packet(0x0100, true, &p)
    }

    #[test]
    fn pes_header_decodes_pts() {
        // Reuse the builder's PES bytes (strip the 4-byte TS header).
        let pes = video_pes(&[], 90_000);
        let (pts, _off) = parse_pes_header(&pes[4..]).unwrap();
        assert_eq!(pts, 90_000);
    }

    #[test]
    fn keyframe_detection_h264_idr() {
        let idr = [0, 0, 0, 1, 0x65, 0xAA];
        assert!(is_keyframe(&idr, CodecId::H264));
        let non_idr = [0, 0, 0, 1, 0x41, 0xAA];
        assert!(!is_keyframe(&non_idr, CodecId::H264));
    }

    #[test]
    fn full_chain_pat_pmt_pes_emits_access_unit() {
        let mut d = TsDemuxer::new();
        assert!(d.push(&pat()).is_empty());
        assert!(d.push(&pmt()).is_empty());
        assert_eq!(d.video_pid, Some(0x0100));
        assert_eq!(d.codec, CodecId::H264);

        // First PES opens; its AU is emitted when the next PES (PUSI) arrives.
        let idr = [0, 0, 0, 1, 0x65, 0x11, 0x22];
        assert!(d.push(&video_pes(&idr, 9000)).is_empty());
        let delta = [0, 0, 0, 1, 0x41, 0x33];
        let out = d.push(&video_pes(&delta, 12000));
        assert_eq!(out.len(), 1);
        assert_eq!(out[0].codec, CodecId::H264);
        assert_eq!(out[0].pts_ms, 100); // 9000 / 90
        assert!(out[0].keyframe);
        // The access unit begins with the IDR NAL (the fixed-size test packet
        // zero-pads past the payload; valid TS uses adaptation-field stuffing).
        assert!(out[0].data.starts_with(&idr));
    }

    #[test]
    fn carries_partial_packet_across_pushes() {
        let mut d = TsDemuxer::new();
        let p = pat();
        // Feed the PAT split mid-packet; the demuxer must carry the remainder.
        assert!(d.push(&p[..100]).is_empty());
        assert!(d.push(&p[100..]).is_empty());
        // Then a PMT in one shot resolves the video PID.
        d.push(&pmt());
        assert_eq!(d.video_pid, Some(0x0100));
    }
}