Skip to main content

lvqr_codec/
ts.rs

1//! Focused MPEG-TS demuxer for SRT and file-based ingest.
2//!
3//! Parses a byte stream of 188-byte TS packets, extracts PAT and
4//! PMT tables to discover elementary stream PIDs and types, and
5//! reassembles PES packets across TS packet boundaries. The
6//! caller feeds arbitrary byte chunks via [`TsDemuxer::feed`];
7//! the demuxer handles sync-byte recovery internally.
8//!
9//! Scope: PAT, single-program PMT, PES reassembly with PTS/DTS
10//! extraction for H.264 (0x1B), HEVC (0x24), and AAC (0x0F).
11//! Session 152 added private-section reassembly for SCTE-35
12//! (stream_type 0x86), surfaced via
13//! [`TsDemuxer::take_scte35_sections`] alongside the existing
14//! `feed`-returns-PES interface. Multi-program TS, DVB
15//! descriptors, and PCR recovery are still out of scope; the
16//! SRT ingest path only needs single-program demux from
17//! broadcast encoders.
18
19use std::collections::HashMap;
20
21const TS_PACKET_SIZE: usize = 188;
22const SYNC_BYTE: u8 = 0x47;
23const PAT_PID: u16 = 0;
24
25/// Elementary stream type codes from ISO/IEC 13818-1.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum StreamType {
28    H264,
29    H265,
30    Aac,
31    /// SCTE-35 cue messages per ANSI/SCTE 35-2024 section 7
32    /// (stream_type 0x86 in the PMT). Not a media stream;
33    /// payload is private-section data, not PES.
34    Scte35,
35    Unknown(u8),
36}
37
38impl StreamType {
39    fn from_byte(b: u8) -> Self {
40        match b {
41            0x1B => Self::H264,
42            0x24 => Self::H265,
43            0x0F | 0x11 => Self::Aac,
44            0x86 => Self::Scte35,
45            other => Self::Unknown(other),
46        }
47    }
48}
49
50/// One reassembled SCTE-35 splice_info_section yielded by
51/// [`TsDemuxer::take_scte35_sections`]. The raw bytes are the
52/// full section from `table_id` (0xFC) through `CRC_32`,
53/// suitable for direct passthrough into
54/// [`crate::scte35::parse_splice_info_section`].
55#[derive(Debug, Clone)]
56pub struct Scte35Section {
57    /// PMT-discovered PID this section arrived on. Egress
58    /// renderers that multiplex multiple SCTE-35 PIDs into one
59    /// event stream may key on it; v1 LVQR ignores it.
60    pub pid: u16,
61    /// Raw splice_info_section bytes (table_id .. CRC_32).
62    pub raw: Vec<u8>,
63}
64
65/// One reassembled PES packet yielded by [`TsDemuxer::feed`].
66#[derive(Debug, Clone)]
67pub struct PesPacket {
68    pub pid: u16,
69    pub stream_type: StreamType,
70    /// Presentation timestamp in 90 kHz ticks. `None` when the
71    /// PES header does not carry a PTS (uncommon for video/audio).
72    pub pts: Option<u64>,
73    /// Decode timestamp in 90 kHz ticks. `None` when PTS == DTS
74    /// (most audio, non-B-frame video).
75    pub dts: Option<u64>,
76    /// Raw elementary stream bytes (Annex B for video, raw AAC
77    /// frame for audio after ADTS stripping if present).
78    pub payload: Vec<u8>,
79}
80
81/// Per-PID reassembly buffer.
82#[derive(Debug)]
83struct PesBuffer {
84    stream_type: StreamType,
85    buf: Vec<u8>,
86    started: bool,
87}
88
89/// Per-PID SCTE-35 section reassembly buffer.
90///
91/// Sections are MPEG-2 private sections (table_id 0xFC) which
92/// can span multiple TS packets. PUSI=1 packets carry a
93/// `pointer_field` byte then start a new section; PUSI=0
94/// packets continue the in-progress section. Completion is
95/// detected when `section_length` (read from the first 3 bytes)
96/// + 3 prefix bytes are present.
97#[derive(Debug, Default)]
98struct SectionBuffer {
99    buf: Vec<u8>,
100    expected_len: Option<usize>,
101}
102
103/// MPEG-TS demuxer with sync recovery and PES reassembly.
104#[derive(Debug)]
105pub struct TsDemuxer {
106    /// Leftover bytes from the previous `feed` call that did not
107    /// align to a 188-byte boundary.
108    remainder: Vec<u8>,
109    /// PMT PID discovered from the PAT.
110    pmt_pid: Option<u16>,
111    /// Elementary stream PID -> stream type, populated from PMT.
112    streams: HashMap<u16, StreamType>,
113    /// Per-PID PES reassembly buffers.
114    pes_bufs: HashMap<u16, PesBuffer>,
115    /// Per-PID SCTE-35 private-section reassembly buffers,
116    /// populated when the PMT registers a stream as
117    /// [`StreamType::Scte35`]. Sections drain via
118    /// [`TsDemuxer::take_scte35_sections`].
119    section_bufs: HashMap<u16, SectionBuffer>,
120    /// Completed SCTE-35 sections awaiting drain. Bounded by
121    /// the caller draining promptly between feed calls.
122    pending_scte35: Vec<Scte35Section>,
123}
124
125impl Default for TsDemuxer {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131impl TsDemuxer {
132    pub fn new() -> Self {
133        Self {
134            remainder: Vec::new(),
135            pmt_pid: None,
136            streams: HashMap::new(),
137            pes_bufs: HashMap::new(),
138            section_bufs: HashMap::new(),
139            pending_scte35: Vec::new(),
140        }
141    }
142
143    /// Drain any reassembled SCTE-35 sections accumulated since
144    /// the previous call. Sections appear in arrival order. The
145    /// returned vector is owned by the caller; the demuxer's
146    /// internal pending queue is cleared.
147    ///
148    /// Pair with [`TsDemuxer::feed`]: drain sections after each
149    /// feed so the per-PID section buffers stay bounded. Sections
150    /// are raw `splice_info_section` bytes (table_id 0xFC through
151    /// CRC_32); pass each one to
152    /// [`crate::scte35::parse_splice_info_section`] for the
153    /// SpliceInfo decode.
154    pub fn take_scte35_sections(&mut self) -> Vec<Scte35Section> {
155        std::mem::take(&mut self.pending_scte35)
156    }
157
158    /// Feed an arbitrary byte slice into the demuxer. Returns
159    /// zero or more fully reassembled PES packets. The demuxer
160    /// handles sync-byte recovery and cross-call buffering
161    /// internally; callers may pass any chunk size.
162    pub fn feed(&mut self, data: &[u8]) -> Vec<PesPacket> {
163        let mut out = Vec::new();
164
165        // Fast path: drain any buffered remainder first by
166        // completing one packet from remainder + new data, then
167        // process aligned packets directly from the input slice
168        // without copying into the remainder buffer. This avoids
169        // O(N^2) drain cost for large inputs.
170        let input = if self.remainder.is_empty() {
171            data
172        } else {
173            self.remainder.extend_from_slice(data);
174            // Process everything from remainder, then clear it and
175            // return an empty slice so the main loop is skipped.
176            self.process_buf(&mut out);
177            &[]
178        };
179
180        // Process aligned packets directly from the input slice.
181        let mut pos = 0;
182        while pos < input.len() {
183            let sync_off = match input[pos..].iter().position(|&b| b == SYNC_BYTE) {
184                Some(p) => p,
185                None => break,
186            };
187            pos += sync_off;
188            if pos + TS_PACKET_SIZE > input.len() {
189                break;
190            }
191            let pkt: &[u8; TS_PACKET_SIZE] = input[pos..pos + TS_PACKET_SIZE].try_into().unwrap();
192            self.process_packet(pkt, &mut out);
193            pos += TS_PACKET_SIZE;
194        }
195
196        // Stash any trailing bytes for the next call.
197        if pos < input.len() {
198            self.remainder.extend_from_slice(&input[pos..]);
199        }
200
201        out
202    }
203
204    /// Drain the remainder buffer, processing complete packets.
205    fn process_buf(&mut self, out: &mut Vec<PesPacket>) {
206        let mut pos = 0;
207        while pos < self.remainder.len() {
208            let sync_off = match self.remainder[pos..].iter().position(|&b| b == SYNC_BYTE) {
209                Some(p) => p,
210                None => {
211                    self.remainder.clear();
212                    return;
213                }
214            };
215            pos += sync_off;
216            if pos + TS_PACKET_SIZE > self.remainder.len() {
217                break;
218            }
219            let pkt: [u8; TS_PACKET_SIZE] = self.remainder[pos..pos + TS_PACKET_SIZE].try_into().unwrap();
220            self.process_packet(&pkt, out);
221            pos += TS_PACKET_SIZE;
222        }
223        // Keep only the unprocessed tail.
224        if pos > 0 {
225            self.remainder.drain(..pos);
226        }
227    }
228
229    fn process_packet(&mut self, pkt: &[u8; TS_PACKET_SIZE], out: &mut Vec<PesPacket>) {
230        let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
231        let pusi = pkt[1] & 0x40 != 0;
232        let afc = (pkt[3] >> 4) & 0x03;
233
234        let payload_offset = match afc {
235            0b01 => 4,
236            0b11 => {
237                let af_len = pkt[4] as usize;
238                5 + af_len
239            }
240            _ => return,
241        };
242        if payload_offset >= TS_PACKET_SIZE {
243            return;
244        }
245        let payload = &pkt[payload_offset..];
246
247        if pid == PAT_PID {
248            self.parse_pat(payload, pusi);
249        } else if Some(pid) == self.pmt_pid {
250            self.parse_pmt(payload, pusi);
251        } else if let Some(&st) = self.streams.get(&pid) {
252            if st == StreamType::Scte35 {
253                self.push_section(pid, payload, pusi);
254            } else {
255                self.push_pes(pid, payload, pusi, out);
256            }
257        }
258    }
259
260    /// Accumulate one TS packet's worth of payload into the SCTE-35
261    /// section buffer for `pid`, completing and queueing the section
262    /// when section_length-1 bytes have arrived after the
263    /// section_length field.
264    ///
265    /// Per ISO/IEC 13818-1 a private section starts on a PUSI=1
266    /// packet and the first byte of payload is `pointer_field`. Any
267    /// bytes before the pointer are stuffing; the section starts at
268    /// `payload[1 + pointer_field]`. Sections may span multiple TS
269    /// packets; PUSI=0 packets carry continuation bytes only.
270    fn push_section(&mut self, pid: u16, payload: &[u8], pusi: bool) {
271        let buf = self.section_bufs.entry(pid).or_default();
272        if pusi {
273            // Drop any in-progress section (incomplete prior frame
274            // is unrecoverable per spec; the new pointer_field marks
275            // a fresh section start).
276            buf.buf.clear();
277            buf.expected_len = None;
278            if payload.is_empty() {
279                return;
280            }
281            let pointer = payload[0] as usize;
282            let start = 1 + pointer;
283            if start >= payload.len() {
284                return;
285            }
286            buf.buf.extend_from_slice(&payload[start..]);
287        } else {
288            if buf.buf.is_empty() && buf.expected_len.is_none() {
289                // Continuation packet without a prior PUSI start;
290                // ignore (we missed the section header).
291                return;
292            }
293            buf.buf.extend_from_slice(payload);
294        }
295
296        // Determine expected length on first sufficient header read.
297        if buf.expected_len.is_none() && buf.buf.len() >= 3 {
298            let section_length = (((buf.buf[1] & 0x0F) as usize) << 8) | buf.buf[2] as usize;
299            buf.expected_len = Some(3 + section_length);
300        }
301
302        // Flush completed sections, accommodating the rare case where
303        // more than one section's bytes arrive in the same packet
304        // (only possible if the publisher concatenates sections
305        // back-to-back after the first pointer_field).
306        while let Some(expected) = buf.expected_len {
307            if buf.buf.len() < expected {
308                break;
309            }
310            let section_bytes = buf.buf.drain(..expected).collect::<Vec<_>>();
311            self.pending_scte35.push(Scte35Section {
312                pid,
313                raw: section_bytes,
314            });
315            buf.expected_len = None;
316            if buf.buf.len() >= 3 {
317                let section_length = (((buf.buf[1] & 0x0F) as usize) << 8) | buf.buf[2] as usize;
318                buf.expected_len = Some(3 + section_length);
319            } else if buf.buf.iter().all(|&b| b == 0xFF) {
320                // Trailing stuffing bytes: discard.
321                buf.buf.clear();
322                break;
323            }
324        }
325    }
326
327    fn parse_pat(&mut self, payload: &[u8], pusi: bool) {
328        let data = if pusi && !payload.is_empty() {
329            let pointer = payload[0] as usize;
330            if 1 + pointer >= payload.len() {
331                return;
332            }
333            &payload[1 + pointer..]
334        } else {
335            payload
336        };
337        // table_id(1) + flags/length(2) + ts_id(2) + version(1) +
338        // section/last(2) = 8 bytes header before the program loop.
339        if data.len() < 12 {
340            return;
341        }
342        let section_length = (((data[1] & 0x0F) as usize) << 8) | data[2] as usize;
343        let table_end = 3 + section_length;
344        if table_end > data.len() || section_length < 9 {
345            return;
346        }
347        // Program loop starts at byte 8, ends 4 bytes before CRC.
348        let loop_end = table_end.saturating_sub(4);
349        let mut i = 8;
350        while i + 4 <= loop_end {
351            let prog_num = ((data[i] as u16) << 8) | data[i + 1] as u16;
352            let map_pid = (((data[i + 2] & 0x1F) as u16) << 8) | data[i + 3] as u16;
353            if prog_num != 0 {
354                self.pmt_pid = Some(map_pid);
355                break;
356            }
357            i += 4;
358        }
359    }
360
361    fn parse_pmt(&mut self, payload: &[u8], pusi: bool) {
362        let data = if pusi && !payload.is_empty() {
363            let pointer = payload[0] as usize;
364            if 1 + pointer >= payload.len() {
365                return;
366            }
367            &payload[1 + pointer..]
368        } else {
369            payload
370        };
371        if data.len() < 16 {
372            return;
373        }
374        let section_length = (((data[1] & 0x0F) as usize) << 8) | data[2] as usize;
375        let table_end = 3 + section_length;
376        if table_end > data.len() || section_length < 13 {
377            return;
378        }
379        let prog_info_len = (((data[10] & 0x0F) as usize) << 8) | data[11] as usize;
380        let mut i = 12 + prog_info_len;
381        let loop_end = table_end.saturating_sub(4);
382        self.streams.clear();
383        while i + 5 <= loop_end {
384            let st = data[i];
385            let es_pid = (((data[i + 1] & 0x1F) as u16) << 8) | data[i + 2] as u16;
386            let es_info_len = (((data[i + 3] & 0x0F) as usize) << 8) | data[i + 4] as usize;
387            self.streams.insert(es_pid, StreamType::from_byte(st));
388            i += 5 + es_info_len;
389        }
390    }
391
392    fn push_pes(&mut self, pid: u16, payload: &[u8], pusi: bool, out: &mut Vec<PesPacket>) {
393        let stream_type = *self.streams.get(&pid).unwrap_or(&StreamType::Unknown(0));
394
395        if pusi {
396            if let Some(buf) = self.pes_bufs.get_mut(&pid) {
397                if buf.started && !buf.buf.is_empty() {
398                    if let Some(pkt) = Self::finish_pes(pid, buf) {
399                        out.push(pkt);
400                    }
401                }
402            }
403            let entry = self.pes_bufs.entry(pid).or_insert_with(|| PesBuffer {
404                stream_type,
405                buf: Vec::with_capacity(64 * 1024),
406                started: false,
407            });
408            entry.buf.clear();
409            entry.buf.extend_from_slice(payload);
410            entry.started = true;
411            entry.stream_type = stream_type;
412        } else if let Some(buf) = self.pes_bufs.get_mut(&pid) {
413            if buf.started {
414                buf.extend(payload);
415            }
416        }
417    }
418
419    fn finish_pes(pid: u16, buf: &mut PesBuffer) -> Option<PesPacket> {
420        let data = &buf.buf;
421        if data.len() < 9 || data[0] != 0 || data[1] != 0 || data[2] != 1 {
422            return None;
423        }
424        let pes_packet_length = ((data[4] as usize) << 8) | data[5] as usize;
425        let header_data_len = data[8] as usize;
426        let es_start = 9 + header_data_len;
427        if es_start > data.len() {
428            return None;
429        }
430        let flags = data[7];
431        let pts_flag = flags & 0x80 != 0;
432        let dts_flag = flags & 0x40 != 0;
433
434        let pts = if pts_flag && header_data_len >= 5 {
435            Some(parse_ts_timestamp(&data[9..14]))
436        } else {
437            None
438        };
439        let dts = if dts_flag && header_data_len >= 10 {
440            Some(parse_ts_timestamp(&data[14..19]))
441        } else {
442            None
443        };
444
445        // When PES_packet_length is non-zero, it specifies the
446        // exact number of bytes after the 6-byte PES header
447        // prefix. Use it to trim trailing TS padding. When zero
448        // (unbounded, common for video), take everything.
449        let es_end = if pes_packet_length > 0 {
450            (6 + pes_packet_length).min(data.len())
451        } else {
452            data.len()
453        };
454        // A malicious PES_packet_length can declare a payload that ends
455        // before the variable-length PES header does (es_end < es_start),
456        // which would panic the `data[es_start..es_end]` slice. Treat
457        // that as a malformed packet and drop it.
458        if es_start > es_end {
459            return None;
460        }
461        let payload = data[es_start..es_end].to_vec();
462        if payload.is_empty() {
463            return None;
464        }
465
466        Some(PesPacket {
467            pid,
468            stream_type: buf.stream_type,
469            pts,
470            dts,
471            payload,
472        })
473    }
474}
475
476impl PesBuffer {
477    fn extend(&mut self, data: &[u8]) {
478        self.buf.extend_from_slice(data);
479    }
480}
481
482/// Parse a 33-bit MPEG-TS timestamp from the 5-byte PTS/DTS
483/// encoding with marker bits. The layout is:
484/// `0bXXXa_bbbY cccc_cccc YYYY_dddd eeee_eeeY`
485/// where a-e are the 33 timestamp bits and X/Y are markers.
486fn parse_ts_timestamp(b: &[u8]) -> u64 {
487    let a = ((b[0] as u64 >> 1) & 0x07) << 30;
488    let bc = ((b[1] as u64) << 7 | (b[2] as u64 >> 1)) << 15;
489    let de = (b[3] as u64) << 7 | (b[4] as u64 >> 1);
490    a | bc | de
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496
497    fn make_ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> [u8; 188] {
498        let mut pkt = [0xFFu8; 188];
499        pkt[0] = SYNC_BYTE;
500        pkt[1] = if pusi { 0x40 } else { 0x00 } | ((pid >> 8) as u8 & 0x1F);
501        pkt[2] = pid as u8;
502        pkt[3] = 0x10; // payload only, CC=0
503        let copy_len = payload.len().min(184);
504        pkt[4..4 + copy_len].copy_from_slice(&payload[..copy_len]);
505        // Stuff remaining bytes with 0xFF (already done by init).
506        pkt
507    }
508
509    fn minimal_pat(pmt_pid: u16) -> Vec<u8> {
510        // pointer_field(1) + table_id(1) + flags/length(2) +
511        // ts_id(2) + version(1) + section(1) + last_section(1)
512        // + program_number(2) + reserved/pmt_pid(2) + CRC(4)
513        let mut data = vec![
514            0x00, // pointer field
515            0x00, // table_id = PAT
516            0xB0, 0x0D, // section_syntax + length = 13
517            0x00, 0x01, // transport_stream_id
518            0xC1, // version=0, current
519            0x00, 0x00, // section 0 of 0
520            0x00, 0x01, // program_number = 1
521        ];
522        data.push(0xE0 | ((pmt_pid >> 8) as u8 & 0x1F));
523        data.push(pmt_pid as u8);
524        data.extend_from_slice(&[0x00; 4]); // CRC placeholder
525        data
526    }
527
528    fn minimal_pmt(video_pid: u16, audio_pid: u16) -> Vec<u8> {
529        // pointer_field + table_id + flags/length + program_number +
530        // version + section + pcr_pid + program_info_length +
531        // stream entries + CRC
532        let mut data = vec![
533            0x00, // pointer field
534            0x02, // table_id = PMT
535            0xB0, 0x17, // section_syntax + length = 23
536            0x00, 0x01, // program_number = 1
537            0xC1, // version=0, current
538            0x00, 0x00, // section 0 of 0
539            0xE1, 0x00, // PCR_PID = 0x100
540            0xF0, 0x00, // program_info_length = 0
541        ];
542        // Video stream entry: H.264
543        data.push(0x1B); // stream_type
544        data.push(0xE0 | ((video_pid >> 8) as u8 & 0x1F));
545        data.push(video_pid as u8);
546        data.push(0xF0);
547        data.push(0x00); // ES_info_length = 0
548        // Audio stream entry: AAC
549        data.push(0x0F); // stream_type
550        data.push(0xE0 | ((audio_pid >> 8) as u8 & 0x1F));
551        data.push(audio_pid as u8);
552        data.push(0xF0);
553        data.push(0x00); // ES_info_length = 0
554        data.extend_from_slice(&[0x00; 4]); // CRC placeholder
555        data
556    }
557
558    fn minimal_pes(pts_90k: u64, es_payload: &[u8]) -> Vec<u8> {
559        // PES_packet_length = header (3 bytes: flags + PTS flag +
560        // header_data_length) + PTS (5 bytes) + ES payload.
561        let pes_len = (3 + 5 + es_payload.len()) as u16;
562        let mut data = vec![
563            0x00,
564            0x00,
565            0x01, // start code
566            0xE0, // stream_id (video)
567            (pes_len >> 8) as u8,
568            pes_len as u8,
569            0x80, // marker bits
570            0x80, // PTS flag set, no DTS
571            0x05, // header_data_length = 5
572        ];
573        // Encode PTS into 5 bytes with marker bits.
574        let pts = pts_90k & 0x1_FFFF_FFFF;
575        data.push(0x21 | ((pts >> 29) as u8 & 0x0E));
576        data.push((pts >> 22) as u8);
577        data.push(0x01 | ((pts >> 14) as u8 & 0xFE));
578        data.push((pts >> 7) as u8);
579        data.push(0x01 | ((pts << 1) as u8 & 0xFE));
580        data.extend_from_slice(es_payload);
581        data
582    }
583
584    #[test]
585    fn demux_discovers_streams_and_yields_pes() {
586        let mut demux = TsDemuxer::new();
587        let video_pid = 0x100;
588        let audio_pid = 0x101;
589        let pmt_pid = 0x1000;
590
591        // Feed PAT.
592        let pat = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
593        assert!(demux.feed(&pat).is_empty());
594        assert_eq!(demux.pmt_pid, Some(pmt_pid));
595
596        // Feed PMT.
597        let pmt = make_ts_packet(pmt_pid, true, &minimal_pmt(video_pid, audio_pid));
598        assert!(demux.feed(&pmt).is_empty());
599        assert_eq!(demux.streams.len(), 2);
600        assert_eq!(demux.streams[&video_pid], StreamType::H264);
601        assert_eq!(demux.streams[&audio_pid], StreamType::Aac);
602
603        // Feed a PES packet for video.
604        let pes = minimal_pes(90_000, b"nalunalunalu");
605        let pkt = make_ts_packet(video_pid, true, &pes);
606        // PES is not yielded until the next PUSI on the same PID.
607        assert!(demux.feed(&pkt).is_empty());
608
609        // Start a new PES on the same PID to flush the previous one.
610        let pes2 = minimal_pes(180_000, b"nalu2");
611        let pkt2 = make_ts_packet(video_pid, true, &pes2);
612        let packets = demux.feed(&pkt2);
613        assert_eq!(packets.len(), 1);
614        assert_eq!(packets[0].pid, video_pid);
615        assert_eq!(packets[0].stream_type, StreamType::H264);
616        assert_eq!(packets[0].pts, Some(90_000));
617        assert_eq!(packets[0].payload, b"nalunalunalu");
618    }
619
620    #[test]
621    fn malformed_pes_packet_length_does_not_panic() {
622        // Regression (ts_demux fuzz): a PES whose PES_packet_length
623        // declares a payload that ends before the variable-length PES
624        // header does (es_end < es_start) must not panic the payload
625        // slice. The fuzzer hit es_start=44, es_end=41.
626        let mut demux = TsDemuxer::new();
627        let video_pid = 0x100;
628        let pmt_pid = 0x1000;
629        demux.feed(&make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid)));
630        demux.feed(&make_ts_packet(pmt_pid, true, &minimal_pmt(video_pid, 0x101)));
631
632        // header_data_length = 35 -> es_start = 9 + 35 = 44.
633        // PES_packet_length  = 35 -> es_end   = 6 + 35 = 41.
634        let mut pes = vec![
635            0x00, 0x00, 0x01, // start code
636            0xE0, // stream_id (video)
637            0x00, 0x23, // PES_packet_length = 35
638            0x80, // marker bits
639            0x00, // flags: no PTS/DTS
640            0x23, // header_data_length = 35
641        ];
642        pes.resize(44, 0xFF); // data.len() == es_start == 44
643        assert!(demux.feed(&make_ts_packet(video_pid, true, &pes)).is_empty());
644
645        // A new PUSI flushes the malformed buffer through finish_pes,
646        // which must return None instead of panicking on the slice.
647        let flushed = demux.feed(&make_ts_packet(video_pid, true, &minimal_pes(90_000, b"ok")));
648        assert!(flushed.is_empty(), "malformed PES must be dropped, not yielded");
649    }
650
651    #[test]
652    fn sync_recovery_skips_garbage() {
653        let mut demux = TsDemuxer::new();
654        let pmt_pid = 0x1000;
655
656        // Feed garbage followed by a valid PAT packet.
657        let mut data = vec![0xDE, 0xAD, 0xBE, 0xEF];
658        data.extend_from_slice(&make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid)));
659        demux.feed(&data);
660        assert_eq!(demux.pmt_pid, Some(pmt_pid));
661    }
662
663    #[test]
664    fn cross_call_buffering_handles_partial_packets() {
665        let mut demux = TsDemuxer::new();
666        let pmt_pid = 0x1000;
667        let full = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
668
669        // Feed first half.
670        demux.feed(&full[..100]);
671        assert_eq!(demux.pmt_pid, None);
672
673        // Feed second half.
674        demux.feed(&full[100..]);
675        assert_eq!(demux.pmt_pid, Some(pmt_pid));
676    }
677
678    #[test]
679    fn pmt_with_scte35_pid_routes_to_section_drain() {
680        let mut demux = TsDemuxer::new();
681        let pmt_pid = 0x1000;
682        let scte35_pid = 0x1FFB;
683
684        // PAT.
685        let pat = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
686        demux.feed(&pat);
687
688        // Custom PMT with one stream entry: stream_type=0x86 (SCTE-35).
689        let mut pmt_payload = vec![
690            0x00, // pointer
691            0x02, // table_id PMT
692            0xB0, 0x12, // section_syntax + length = 18
693            0x00, 0x01, 0xC1, 0x00, 0x00, 0xE1, 0x00, 0xF0, 0x00,
694        ];
695        pmt_payload.push(0x86); // stream_type = SCTE-35
696        pmt_payload.push(0xE0 | ((scte35_pid >> 8) as u8 & 0x1F));
697        pmt_payload.push(scte35_pid as u8);
698        pmt_payload.push(0xF0);
699        pmt_payload.push(0x00);
700        pmt_payload.extend_from_slice(&[0x00; 4]); // CRC placeholder
701        let pmt = make_ts_packet(pmt_pid, true, &pmt_payload);
702        demux.feed(&pmt);
703
704        assert_eq!(demux.streams.get(&scte35_pid), Some(&StreamType::Scte35));
705
706        // Build a fake SCTE-35 splice_info_section: table_id 0xFC + 2-byte
707        // section_length + 17 bytes of body + padding. We do not validate
708        // the CRC at the demux layer; the parser handles that.
709        let section_body_len: usize = 17; // arbitrary; parser would CRC-check
710        let mut section = vec![
711            0xFCu8,
712            0x30 | ((section_body_len >> 8) as u8 & 0x0F),
713            section_body_len as u8,
714        ];
715        section.extend_from_slice(&vec![0x00u8; section_body_len]);
716
717        // Wrap section in a TS packet: PUSI=1, payload starts with
718        // pointer_field=0 then the section bytes.
719        let mut payload = vec![0u8]; // pointer_field
720        payload.extend_from_slice(&section);
721        let pkt = make_ts_packet(scte35_pid, true, &payload);
722        let pes = demux.feed(&pkt);
723        assert!(pes.is_empty(), "SCTE-35 PIDs do not yield PES packets");
724
725        let drained = demux.take_scte35_sections();
726        assert_eq!(drained.len(), 1, "one section drained");
727        assert_eq!(drained[0].pid, scte35_pid);
728        assert_eq!(&drained[0].raw[..], &section[..]);
729
730        // Drain is one-shot: a second call returns empty.
731        assert!(demux.take_scte35_sections().is_empty());
732    }
733
734    #[test]
735    fn parse_ts_timestamp_round_trips() {
736        let pts: u64 = 123_456_789;
737        let mut buf = [0u8; 5];
738        buf[0] = 0x21 | ((pts >> 29) as u8 & 0x0E);
739        buf[1] = (pts >> 22) as u8;
740        buf[2] = 0x01 | ((pts >> 14) as u8 & 0xFE);
741        buf[3] = (pts >> 7) as u8;
742        buf[4] = 0x01 | ((pts << 1) as u8 & 0xFE);
743        assert_eq!(parse_ts_timestamp(&buf), pts);
744    }
745}