Skip to main content

lvqr_codec/
ts.rs

1//! Focused MPEG-TS demuxer for SRT and file-based ingest.
2//!
3//! Parses a byte stream of 188-byte TS packets, extracts PAT and
4//! PMT tables to discover elementary stream PIDs and types, and
5//! reassembles PES packets across TS packet boundaries. The
6//! caller feeds arbitrary byte chunks via [`TsDemuxer::feed`];
7//! the demuxer handles sync-byte recovery internally.
8//!
9//! Scope: PAT, single-program PMT, PES reassembly with PTS/DTS
10//! extraction for H.264 (0x1B), HEVC (0x24), and AAC (0x0F).
11//! Session 152 added private-section reassembly for SCTE-35
12//! (stream_type 0x86), surfaced via
13//! [`TsDemuxer::take_scte35_sections`] alongside the existing
14//! `feed`-returns-PES interface. Multi-program TS, DVB
15//! descriptors, and PCR recovery are still out of scope; the
16//! SRT ingest path only needs single-program demux from
17//! broadcast encoders.
18
19use std::collections::HashMap;
20
21const TS_PACKET_SIZE: usize = 188;
22const SYNC_BYTE: u8 = 0x47;
23const PAT_PID: u16 = 0;
24
25/// Elementary stream type codes from ISO/IEC 13818-1.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum StreamType {
28    H264,
29    H265,
30    Aac,
31    /// SCTE-35 cue messages per ANSI/SCTE 35-2024 section 7
32    /// (stream_type 0x86 in the PMT). Not a media stream;
33    /// payload is private-section data, not PES.
34    Scte35,
35    Unknown(u8),
36}
37
38impl StreamType {
39    fn from_byte(b: u8) -> Self {
40        match b {
41            0x1B => Self::H264,
42            0x24 => Self::H265,
43            0x0F | 0x11 => Self::Aac,
44            0x86 => Self::Scte35,
45            other => Self::Unknown(other),
46        }
47    }
48}
49
50/// One reassembled SCTE-35 splice_info_section yielded by
51/// [`TsDemuxer::take_scte35_sections`]. The raw bytes are the
52/// full section from `table_id` (0xFC) through `CRC_32`,
53/// suitable for direct passthrough into
54/// [`crate::scte35::parse_splice_info_section`].
55#[derive(Debug, Clone)]
56pub struct Scte35Section {
57    /// PMT-discovered PID this section arrived on. Egress
58    /// renderers that multiplex multiple SCTE-35 PIDs into one
59    /// event stream may key on it; v1 LVQR ignores it.
60    pub pid: u16,
61    /// Raw splice_info_section bytes (table_id .. CRC_32).
62    pub raw: Vec<u8>,
63}
64
65/// One reassembled PES packet yielded by [`TsDemuxer::feed`].
66#[derive(Debug, Clone)]
67pub struct PesPacket {
68    pub pid: u16,
69    pub stream_type: StreamType,
70    /// Presentation timestamp in 90 kHz ticks. `None` when the
71    /// PES header does not carry a PTS (uncommon for video/audio).
72    pub pts: Option<u64>,
73    /// Decode timestamp in 90 kHz ticks. `None` when PTS == DTS
74    /// (most audio, non-B-frame video).
75    pub dts: Option<u64>,
76    /// Raw elementary stream bytes (Annex B for video, raw AAC
77    /// frame for audio after ADTS stripping if present).
78    pub payload: Vec<u8>,
79}
80
81/// Per-PID reassembly buffer.
82#[derive(Debug)]
83struct PesBuffer {
84    stream_type: StreamType,
85    buf: Vec<u8>,
86    started: bool,
87}
88
89/// Per-PID SCTE-35 section reassembly buffer.
90///
91/// Sections are MPEG-2 private sections (table_id 0xFC) which
92/// can span multiple TS packets. PUSI=1 packets carry a
93/// `pointer_field` byte then start a new section; PUSI=0
94/// packets continue the in-progress section. Completion is
95/// detected when `section_length` (read from the first 3 bytes)
96/// + 3 prefix bytes are present.
97#[derive(Debug, Default)]
98struct SectionBuffer {
99    buf: Vec<u8>,
100    expected_len: Option<usize>,
101}
102
103/// MPEG-TS demuxer with sync recovery and PES reassembly.
104#[derive(Debug)]
105pub struct TsDemuxer {
106    /// Leftover bytes from the previous `feed` call that did not
107    /// align to a 188-byte boundary.
108    remainder: Vec<u8>,
109    /// PMT PID discovered from the PAT.
110    pmt_pid: Option<u16>,
111    /// Elementary stream PID -> stream type, populated from PMT.
112    streams: HashMap<u16, StreamType>,
113    /// Per-PID PES reassembly buffers.
114    pes_bufs: HashMap<u16, PesBuffer>,
115    /// Per-PID SCTE-35 private-section reassembly buffers,
116    /// populated when the PMT registers a stream as
117    /// [`StreamType::Scte35`]. Sections drain via
118    /// [`TsDemuxer::take_scte35_sections`].
119    section_bufs: HashMap<u16, SectionBuffer>,
120    /// Completed SCTE-35 sections awaiting drain. Bounded by
121    /// the caller draining promptly between feed calls.
122    pending_scte35: Vec<Scte35Section>,
123}
124
125impl Default for TsDemuxer {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131impl TsDemuxer {
132    pub fn new() -> Self {
133        Self {
134            remainder: Vec::new(),
135            pmt_pid: None,
136            streams: HashMap::new(),
137            pes_bufs: HashMap::new(),
138            section_bufs: HashMap::new(),
139            pending_scte35: Vec::new(),
140        }
141    }
142
143    /// Drain any reassembled SCTE-35 sections accumulated since
144    /// the previous call. Sections appear in arrival order. The
145    /// returned vector is owned by the caller; the demuxer's
146    /// internal pending queue is cleared.
147    ///
148    /// Pair with [`TsDemuxer::feed`]: drain sections after each
149    /// feed so the per-PID section buffers stay bounded. Sections
150    /// are raw `splice_info_section` bytes (table_id 0xFC through
151    /// CRC_32); pass each one to
152    /// [`crate::scte35::parse_splice_info_section`] for the
153    /// SpliceInfo decode.
154    pub fn take_scte35_sections(&mut self) -> Vec<Scte35Section> {
155        std::mem::take(&mut self.pending_scte35)
156    }
157
158    /// Feed an arbitrary byte slice into the demuxer. Returns
159    /// zero or more fully reassembled PES packets. The demuxer
160    /// handles sync-byte recovery and cross-call buffering
161    /// internally; callers may pass any chunk size.
162    pub fn feed(&mut self, data: &[u8]) -> Vec<PesPacket> {
163        let mut out = Vec::new();
164
165        // Fast path: drain any buffered remainder first by
166        // completing one packet from remainder + new data, then
167        // process aligned packets directly from the input slice
168        // without copying into the remainder buffer. This avoids
169        // O(N^2) drain cost for large inputs.
170        let input = if self.remainder.is_empty() {
171            data
172        } else {
173            self.remainder.extend_from_slice(data);
174            // Process everything from remainder, then clear it and
175            // return an empty slice so the main loop is skipped.
176            self.process_buf(&mut out);
177            &[]
178        };
179
180        // Process aligned packets directly from the input slice.
181        let mut pos = 0;
182        while pos < input.len() {
183            let sync_off = match input[pos..].iter().position(|&b| b == SYNC_BYTE) {
184                Some(p) => p,
185                None => break,
186            };
187            pos += sync_off;
188            if pos + TS_PACKET_SIZE > input.len() {
189                break;
190            }
191            let pkt: &[u8; TS_PACKET_SIZE] = input[pos..pos + TS_PACKET_SIZE].try_into().unwrap();
192            self.process_packet(pkt, &mut out);
193            pos += TS_PACKET_SIZE;
194        }
195
196        // Stash any trailing bytes for the next call.
197        if pos < input.len() {
198            self.remainder.extend_from_slice(&input[pos..]);
199        }
200
201        out
202    }
203
204    /// Drain the remainder buffer, processing complete packets.
205    fn process_buf(&mut self, out: &mut Vec<PesPacket>) {
206        let mut pos = 0;
207        while pos < self.remainder.len() {
208            let sync_off = match self.remainder[pos..].iter().position(|&b| b == SYNC_BYTE) {
209                Some(p) => p,
210                None => {
211                    self.remainder.clear();
212                    return;
213                }
214            };
215            pos += sync_off;
216            if pos + TS_PACKET_SIZE > self.remainder.len() {
217                break;
218            }
219            let pkt: [u8; TS_PACKET_SIZE] = self.remainder[pos..pos + TS_PACKET_SIZE].try_into().unwrap();
220            self.process_packet(&pkt, out);
221            pos += TS_PACKET_SIZE;
222        }
223        // Keep only the unprocessed tail.
224        if pos > 0 {
225            self.remainder.drain(..pos);
226        }
227    }
228
229    fn process_packet(&mut self, pkt: &[u8; TS_PACKET_SIZE], out: &mut Vec<PesPacket>) {
230        let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
231        let pusi = pkt[1] & 0x40 != 0;
232        let afc = (pkt[3] >> 4) & 0x03;
233
234        let payload_offset = match afc {
235            0b01 => 4,
236            0b11 => {
237                let af_len = pkt[4] as usize;
238                5 + af_len
239            }
240            _ => return,
241        };
242        if payload_offset >= TS_PACKET_SIZE {
243            return;
244        }
245        let payload = &pkt[payload_offset..];
246
247        if pid == PAT_PID {
248            self.parse_pat(payload, pusi);
249        } else if Some(pid) == self.pmt_pid {
250            self.parse_pmt(payload, pusi);
251        } else if let Some(&st) = self.streams.get(&pid) {
252            if st == StreamType::Scte35 {
253                self.push_section(pid, payload, pusi);
254            } else {
255                self.push_pes(pid, payload, pusi, out);
256            }
257        }
258    }
259
260    /// Accumulate one TS packet's worth of payload into the SCTE-35
261    /// section buffer for `pid`, completing and queueing the section
262    /// when section_length-1 bytes have arrived after the
263    /// section_length field.
264    ///
265    /// Per ISO/IEC 13818-1 a private section starts on a PUSI=1
266    /// packet and the first byte of payload is `pointer_field`. Any
267    /// bytes before the pointer are stuffing; the section starts at
268    /// `payload[1 + pointer_field]`. Sections may span multiple TS
269    /// packets; PUSI=0 packets carry continuation bytes only.
270    fn push_section(&mut self, pid: u16, payload: &[u8], pusi: bool) {
271        let buf = self.section_bufs.entry(pid).or_default();
272        if pusi {
273            // Drop any in-progress section (incomplete prior frame
274            // is unrecoverable per spec; the new pointer_field marks
275            // a fresh section start).
276            buf.buf.clear();
277            buf.expected_len = None;
278            if payload.is_empty() {
279                return;
280            }
281            let pointer = payload[0] as usize;
282            let start = 1 + pointer;
283            if start >= payload.len() {
284                return;
285            }
286            buf.buf.extend_from_slice(&payload[start..]);
287        } else {
288            if buf.buf.is_empty() && buf.expected_len.is_none() {
289                // Continuation packet without a prior PUSI start;
290                // ignore (we missed the section header).
291                return;
292            }
293            buf.buf.extend_from_slice(payload);
294        }
295
296        // Determine expected length on first sufficient header read.
297        if buf.expected_len.is_none() && buf.buf.len() >= 3 {
298            let section_length = (((buf.buf[1] & 0x0F) as usize) << 8) | buf.buf[2] as usize;
299            buf.expected_len = Some(3 + section_length);
300        }
301
302        // Flush completed sections, accommodating the rare case where
303        // more than one section's bytes arrive in the same packet
304        // (only possible if the publisher concatenates sections
305        // back-to-back after the first pointer_field).
306        while let Some(expected) = buf.expected_len {
307            if buf.buf.len() < expected {
308                break;
309            }
310            let section_bytes = buf.buf.drain(..expected).collect::<Vec<_>>();
311            self.pending_scte35.push(Scte35Section {
312                pid,
313                raw: section_bytes,
314            });
315            buf.expected_len = None;
316            if buf.buf.len() >= 3 {
317                let section_length = (((buf.buf[1] & 0x0F) as usize) << 8) | buf.buf[2] as usize;
318                buf.expected_len = Some(3 + section_length);
319            } else if buf.buf.iter().all(|&b| b == 0xFF) {
320                // Trailing stuffing bytes: discard.
321                buf.buf.clear();
322                break;
323            }
324        }
325    }
326
327    fn parse_pat(&mut self, payload: &[u8], pusi: bool) {
328        let data = if pusi && !payload.is_empty() {
329            let pointer = payload[0] as usize;
330            if 1 + pointer >= payload.len() {
331                return;
332            }
333            &payload[1 + pointer..]
334        } else {
335            payload
336        };
337        // table_id(1) + flags/length(2) + ts_id(2) + version(1) +
338        // section/last(2) = 8 bytes header before the program loop.
339        if data.len() < 12 {
340            return;
341        }
342        let section_length = (((data[1] & 0x0F) as usize) << 8) | data[2] as usize;
343        let table_end = 3 + section_length;
344        if table_end > data.len() || section_length < 9 {
345            return;
346        }
347        // Program loop starts at byte 8, ends 4 bytes before CRC.
348        let loop_end = table_end.saturating_sub(4);
349        let mut i = 8;
350        while i + 4 <= loop_end {
351            let prog_num = ((data[i] as u16) << 8) | data[i + 1] as u16;
352            let map_pid = (((data[i + 2] & 0x1F) as u16) << 8) | data[i + 3] as u16;
353            if prog_num != 0 {
354                self.pmt_pid = Some(map_pid);
355                break;
356            }
357            i += 4;
358        }
359    }
360
361    fn parse_pmt(&mut self, payload: &[u8], pusi: bool) {
362        let data = if pusi && !payload.is_empty() {
363            let pointer = payload[0] as usize;
364            if 1 + pointer >= payload.len() {
365                return;
366            }
367            &payload[1 + pointer..]
368        } else {
369            payload
370        };
371        if data.len() < 16 {
372            return;
373        }
374        let section_length = (((data[1] & 0x0F) as usize) << 8) | data[2] as usize;
375        let table_end = 3 + section_length;
376        if table_end > data.len() || section_length < 13 {
377            return;
378        }
379        let prog_info_len = (((data[10] & 0x0F) as usize) << 8) | data[11] as usize;
380        let mut i = 12 + prog_info_len;
381        let loop_end = table_end.saturating_sub(4);
382        self.streams.clear();
383        while i + 5 <= loop_end {
384            let st = data[i];
385            let es_pid = (((data[i + 1] & 0x1F) as u16) << 8) | data[i + 2] as u16;
386            let es_info_len = (((data[i + 3] & 0x0F) as usize) << 8) | data[i + 4] as usize;
387            self.streams.insert(es_pid, StreamType::from_byte(st));
388            i += 5 + es_info_len;
389        }
390    }
391
392    fn push_pes(&mut self, pid: u16, payload: &[u8], pusi: bool, out: &mut Vec<PesPacket>) {
393        let stream_type = *self.streams.get(&pid).unwrap_or(&StreamType::Unknown(0));
394
395        if pusi {
396            if let Some(buf) = self.pes_bufs.get_mut(&pid) {
397                if buf.started && !buf.buf.is_empty() {
398                    if let Some(pkt) = Self::finish_pes(pid, buf) {
399                        out.push(pkt);
400                    }
401                }
402            }
403            let entry = self.pes_bufs.entry(pid).or_insert_with(|| PesBuffer {
404                stream_type,
405                buf: Vec::with_capacity(64 * 1024),
406                started: false,
407            });
408            entry.buf.clear();
409            entry.buf.extend_from_slice(payload);
410            entry.started = true;
411            entry.stream_type = stream_type;
412        } else if let Some(buf) = self.pes_bufs.get_mut(&pid) {
413            if buf.started {
414                buf.extend(payload);
415            }
416        }
417    }
418
419    fn finish_pes(pid: u16, buf: &mut PesBuffer) -> Option<PesPacket> {
420        let data = &buf.buf;
421        if data.len() < 9 || data[0] != 0 || data[1] != 0 || data[2] != 1 {
422            return None;
423        }
424        let pes_packet_length = ((data[4] as usize) << 8) | data[5] as usize;
425        let header_data_len = data[8] as usize;
426        let es_start = 9 + header_data_len;
427        if es_start > data.len() {
428            return None;
429        }
430        let flags = data[7];
431        let pts_flag = flags & 0x80 != 0;
432        let dts_flag = flags & 0x40 != 0;
433
434        let pts = if pts_flag && header_data_len >= 5 {
435            Some(parse_ts_timestamp(&data[9..14]))
436        } else {
437            None
438        };
439        let dts = if dts_flag && header_data_len >= 10 {
440            Some(parse_ts_timestamp(&data[14..19]))
441        } else {
442            None
443        };
444
445        // When PES_packet_length is non-zero, it specifies the
446        // exact number of bytes after the 6-byte PES header
447        // prefix. Use it to trim trailing TS padding. When zero
448        // (unbounded, common for video), take everything.
449        let es_end = if pes_packet_length > 0 {
450            (6 + pes_packet_length).min(data.len())
451        } else {
452            data.len()
453        };
454        let payload = data[es_start..es_end].to_vec();
455        if payload.is_empty() {
456            return None;
457        }
458
459        Some(PesPacket {
460            pid,
461            stream_type: buf.stream_type,
462            pts,
463            dts,
464            payload,
465        })
466    }
467}
468
469impl PesBuffer {
470    fn extend(&mut self, data: &[u8]) {
471        self.buf.extend_from_slice(data);
472    }
473}
474
475/// Parse a 33-bit MPEG-TS timestamp from the 5-byte PTS/DTS
476/// encoding with marker bits. The layout is:
477/// `0bXXXa_bbbY cccc_cccc YYYY_dddd eeee_eeeY`
478/// where a-e are the 33 timestamp bits and X/Y are markers.
479fn parse_ts_timestamp(b: &[u8]) -> u64 {
480    let a = ((b[0] as u64 >> 1) & 0x07) << 30;
481    let bc = ((b[1] as u64) << 7 | (b[2] as u64 >> 1)) << 15;
482    let de = (b[3] as u64) << 7 | (b[4] as u64 >> 1);
483    a | bc | de
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489
490    fn make_ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> [u8; 188] {
491        let mut pkt = [0xFFu8; 188];
492        pkt[0] = SYNC_BYTE;
493        pkt[1] = if pusi { 0x40 } else { 0x00 } | ((pid >> 8) as u8 & 0x1F);
494        pkt[2] = pid as u8;
495        pkt[3] = 0x10; // payload only, CC=0
496        let copy_len = payload.len().min(184);
497        pkt[4..4 + copy_len].copy_from_slice(&payload[..copy_len]);
498        // Stuff remaining bytes with 0xFF (already done by init).
499        pkt
500    }
501
502    fn minimal_pat(pmt_pid: u16) -> Vec<u8> {
503        // pointer_field(1) + table_id(1) + flags/length(2) +
504        // ts_id(2) + version(1) + section(1) + last_section(1)
505        // + program_number(2) + reserved/pmt_pid(2) + CRC(4)
506        let mut data = vec![
507            0x00, // pointer field
508            0x00, // table_id = PAT
509            0xB0, 0x0D, // section_syntax + length = 13
510            0x00, 0x01, // transport_stream_id
511            0xC1, // version=0, current
512            0x00, 0x00, // section 0 of 0
513            0x00, 0x01, // program_number = 1
514        ];
515        data.push(0xE0 | ((pmt_pid >> 8) as u8 & 0x1F));
516        data.push(pmt_pid as u8);
517        data.extend_from_slice(&[0x00; 4]); // CRC placeholder
518        data
519    }
520
521    fn minimal_pmt(video_pid: u16, audio_pid: u16) -> Vec<u8> {
522        // pointer_field + table_id + flags/length + program_number +
523        // version + section + pcr_pid + program_info_length +
524        // stream entries + CRC
525        let mut data = vec![
526            0x00, // pointer field
527            0x02, // table_id = PMT
528            0xB0, 0x17, // section_syntax + length = 23
529            0x00, 0x01, // program_number = 1
530            0xC1, // version=0, current
531            0x00, 0x00, // section 0 of 0
532            0xE1, 0x00, // PCR_PID = 0x100
533            0xF0, 0x00, // program_info_length = 0
534        ];
535        // Video stream entry: H.264
536        data.push(0x1B); // stream_type
537        data.push(0xE0 | ((video_pid >> 8) as u8 & 0x1F));
538        data.push(video_pid as u8);
539        data.push(0xF0);
540        data.push(0x00); // ES_info_length = 0
541        // Audio stream entry: AAC
542        data.push(0x0F); // stream_type
543        data.push(0xE0 | ((audio_pid >> 8) as u8 & 0x1F));
544        data.push(audio_pid as u8);
545        data.push(0xF0);
546        data.push(0x00); // ES_info_length = 0
547        data.extend_from_slice(&[0x00; 4]); // CRC placeholder
548        data
549    }
550
551    fn minimal_pes(pts_90k: u64, es_payload: &[u8]) -> Vec<u8> {
552        // PES_packet_length = header (3 bytes: flags + PTS flag +
553        // header_data_length) + PTS (5 bytes) + ES payload.
554        let pes_len = (3 + 5 + es_payload.len()) as u16;
555        let mut data = vec![
556            0x00,
557            0x00,
558            0x01, // start code
559            0xE0, // stream_id (video)
560            (pes_len >> 8) as u8,
561            pes_len as u8,
562            0x80, // marker bits
563            0x80, // PTS flag set, no DTS
564            0x05, // header_data_length = 5
565        ];
566        // Encode PTS into 5 bytes with marker bits.
567        let pts = pts_90k & 0x1_FFFF_FFFF;
568        data.push(0x21 | ((pts >> 29) as u8 & 0x0E));
569        data.push((pts >> 22) as u8);
570        data.push(0x01 | ((pts >> 14) as u8 & 0xFE));
571        data.push((pts >> 7) as u8);
572        data.push(0x01 | ((pts << 1) as u8 & 0xFE));
573        data.extend_from_slice(es_payload);
574        data
575    }
576
577    #[test]
578    fn demux_discovers_streams_and_yields_pes() {
579        let mut demux = TsDemuxer::new();
580        let video_pid = 0x100;
581        let audio_pid = 0x101;
582        let pmt_pid = 0x1000;
583
584        // Feed PAT.
585        let pat = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
586        assert!(demux.feed(&pat).is_empty());
587        assert_eq!(demux.pmt_pid, Some(pmt_pid));
588
589        // Feed PMT.
590        let pmt = make_ts_packet(pmt_pid, true, &minimal_pmt(video_pid, audio_pid));
591        assert!(demux.feed(&pmt).is_empty());
592        assert_eq!(demux.streams.len(), 2);
593        assert_eq!(demux.streams[&video_pid], StreamType::H264);
594        assert_eq!(demux.streams[&audio_pid], StreamType::Aac);
595
596        // Feed a PES packet for video.
597        let pes = minimal_pes(90_000, b"nalunalunalu");
598        let pkt = make_ts_packet(video_pid, true, &pes);
599        // PES is not yielded until the next PUSI on the same PID.
600        assert!(demux.feed(&pkt).is_empty());
601
602        // Start a new PES on the same PID to flush the previous one.
603        let pes2 = minimal_pes(180_000, b"nalu2");
604        let pkt2 = make_ts_packet(video_pid, true, &pes2);
605        let packets = demux.feed(&pkt2);
606        assert_eq!(packets.len(), 1);
607        assert_eq!(packets[0].pid, video_pid);
608        assert_eq!(packets[0].stream_type, StreamType::H264);
609        assert_eq!(packets[0].pts, Some(90_000));
610        assert_eq!(packets[0].payload, b"nalunalunalu");
611    }
612
613    #[test]
614    fn sync_recovery_skips_garbage() {
615        let mut demux = TsDemuxer::new();
616        let pmt_pid = 0x1000;
617
618        // Feed garbage followed by a valid PAT packet.
619        let mut data = vec![0xDE, 0xAD, 0xBE, 0xEF];
620        data.extend_from_slice(&make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid)));
621        demux.feed(&data);
622        assert_eq!(demux.pmt_pid, Some(pmt_pid));
623    }
624
625    #[test]
626    fn cross_call_buffering_handles_partial_packets() {
627        let mut demux = TsDemuxer::new();
628        let pmt_pid = 0x1000;
629        let full = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
630
631        // Feed first half.
632        demux.feed(&full[..100]);
633        assert_eq!(demux.pmt_pid, None);
634
635        // Feed second half.
636        demux.feed(&full[100..]);
637        assert_eq!(demux.pmt_pid, Some(pmt_pid));
638    }
639
640    #[test]
641    fn pmt_with_scte35_pid_routes_to_section_drain() {
642        let mut demux = TsDemuxer::new();
643        let pmt_pid = 0x1000;
644        let scte35_pid = 0x1FFB;
645
646        // PAT.
647        let pat = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
648        demux.feed(&pat);
649
650        // Custom PMT with one stream entry: stream_type=0x86 (SCTE-35).
651        let mut pmt_payload = vec![
652            0x00, // pointer
653            0x02, // table_id PMT
654            0xB0, 0x12, // section_syntax + length = 18
655            0x00, 0x01, 0xC1, 0x00, 0x00, 0xE1, 0x00, 0xF0, 0x00,
656        ];
657        pmt_payload.push(0x86); // stream_type = SCTE-35
658        pmt_payload.push(0xE0 | ((scte35_pid >> 8) as u8 & 0x1F));
659        pmt_payload.push(scte35_pid as u8);
660        pmt_payload.push(0xF0);
661        pmt_payload.push(0x00);
662        pmt_payload.extend_from_slice(&[0x00; 4]); // CRC placeholder
663        let pmt = make_ts_packet(pmt_pid, true, &pmt_payload);
664        demux.feed(&pmt);
665
666        assert_eq!(demux.streams.get(&scte35_pid), Some(&StreamType::Scte35));
667
668        // Build a fake SCTE-35 splice_info_section: table_id 0xFC + 2-byte
669        // section_length + 17 bytes of body + padding. We do not validate
670        // the CRC at the demux layer; the parser handles that.
671        let section_body_len: usize = 17; // arbitrary; parser would CRC-check
672        let mut section = vec![
673            0xFCu8,
674            0x30 | ((section_body_len >> 8) as u8 & 0x0F),
675            section_body_len as u8,
676        ];
677        section.extend_from_slice(&vec![0x00u8; section_body_len]);
678
679        // Wrap section in a TS packet: PUSI=1, payload starts with
680        // pointer_field=0 then the section bytes.
681        let mut payload = vec![0u8]; // pointer_field
682        payload.extend_from_slice(&section);
683        let pkt = make_ts_packet(scte35_pid, true, &payload);
684        let pes = demux.feed(&pkt);
685        assert!(pes.is_empty(), "SCTE-35 PIDs do not yield PES packets");
686
687        let drained = demux.take_scte35_sections();
688        assert_eq!(drained.len(), 1, "one section drained");
689        assert_eq!(drained[0].pid, scte35_pid);
690        assert_eq!(&drained[0].raw[..], &section[..]);
691
692        // Drain is one-shot: a second call returns empty.
693        assert!(demux.take_scte35_sections().is_empty());
694    }
695
696    #[test]
697    fn parse_ts_timestamp_round_trips() {
698        let pts: u64 = 123_456_789;
699        let mut buf = [0u8; 5];
700        buf[0] = 0x21 | ((pts >> 29) as u8 & 0x0E);
701        buf[1] = (pts >> 22) as u8;
702        buf[2] = 0x01 | ((pts >> 14) as u8 & 0xFE);
703        buf[3] = (pts >> 7) as u8;
704        buf[4] = 0x01 | ((pts << 1) as u8 & 0xFE);
705        assert_eq!(parse_ts_timestamp(&buf), pts);
706    }
707}