Skip to main content

dvb_si/
demux.rs

1//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
2//!
3//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
4//! iterator of [`SectionEvent`]s — one per **changed** complete section.
5//! The demux reassembles sections per PID (via
6//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
7//! sections, and suppresses repeats through a version gate so that a steady
8//! carousel of unchanging tables produces no events after the first.
9//!
10//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
11//! cheap to clone; typed views ([`SectionEvent::table_section`],
12//! [`SectionEvent::parse`]) borrow the event lazily.
13//!
14//! ```
15//! use dvb_common::Serialize;
16//! use dvb_si::demux::SiDemux;
17//! use dvb_si::tables::AnyTableSection;
18//! use dvb_si::tables::pat::{PatSection, PatEntry};
19//!
20//! const PMT_PID: u16 = 0x0100;
21//! const TS_SYNC_BYTE: u8 = 0x47;
22//! const PAYLOAD_UNIT_START_INDICATOR: u8 = 0x40;
23//! const PAT_PID_LOW_BYTE: u8 = 0x00;
24//! const PAYLOAD_ONLY: u8 = 0x10;
25//! const POINTER_FIELD_START: u8 = 0x00;
26//! const STUFFING_BYTE: u8 = 0xFF;
27//!
28//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
29//! // example is self-contained. In real code `packet` comes from your source.
30//! let pat = PatSection {
31//!     transport_stream_id: 1, version_number: 0, current_next_indicator: true,
32//!     section_number: 0, last_section_number: 0,
33//!     entries: vec![PatEntry { program_number: 1, pid: PMT_PID }],
34//! };
35//! let mut section = vec![0u8; pat.serialized_len()];
36//! pat.serialize_into(&mut section).unwrap();
37//! let mut packet = [STUFFING_BYTE; 188];
38//! packet[0] = TS_SYNC_BYTE;
39//! packet[1] = PAYLOAD_UNIT_START_INDICATOR;
40//! packet[2] = PAT_PID_LOW_BYTE;
41//! packet[3] = PAYLOAD_ONLY;
42//! packet[4] = POINTER_FIELD_START;
43//! packet[5..5 + section.len()].copy_from_slice(&section);
44//!
45//! let mut demux = SiDemux::builder().build();
46//! let events: Vec<_> = demux.feed(&packet).collect();
47//! assert_eq!(events.len(), 1);
48//! match events[0].table_section() {
49//!     Ok(AnyTableSection::PatSection(pat)) => {
50//!         println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
51//!         assert_eq!(pat.entries[0].pid, PMT_PID);
52//!     }
53//!     other => panic!("expected PAT, got {other:?}"),
54//! }
55//! ```
56//!
57//! # Version gate
58//!
59//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
60//! into a `u64` key. The stored value is a change detector:
61//!
62//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
63//!   exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
64//!   gate stores `(version, crc32)`. A repeat with the same version *and* CRC
65//!   is suppressed.
66//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
67//!   the gate stores a CRC-32 *computed over the whole section* purely as a
68//!   change hash. `table_id_extension` and `section_number` collapse to 0 in
69//!   the key.
70//!
71//! # CRC policy
72//!
73//! CRC-bearing sections (every long-form section, plus the short-form TOT
74//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
75//! before gating. Failures are dropped and counted in
76//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
77//! TDT carries no CRC and is therefore never dropped for CRC reasons.
78
79use std::collections::{HashMap, VecDeque};
80
81use bytes::Bytes;
82
83use crate::pid::Pid;
84use crate::ts::{SectionReassembler, TsPacket};
85
86/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
87const PAT_TABLE_ID: u8 = 0x00;
88/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
89const TOT_TABLE_ID: u8 = 0x73;
90/// Minimum bytes required to read a section header (table_id + length field).
91const MIN_SECTION_LEN: usize = 3;
92/// Long-form extension header bytes (after the 3-byte common header).
93const LONG_FORM_EXTRA: usize = 5;
94/// Trailing CRC-32 length.
95const CRC_LEN: usize = 4;
96
97/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
98///
99/// A `SectionEvent` is only ever constructed for a section that
100/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
101/// validation. So [`SectionEvent::crc_ok`] is always `true` and
102/// [`SectionEvent::table_id`] never panics.
103#[derive(Debug, Clone)]
104pub struct SectionEvent {
105    pid: Pid,
106    bytes: Bytes,
107}
108
109impl SectionEvent {
110    /// PID this section was carried on.
111    #[must_use]
112    pub fn pid(&self) -> Pid {
113        self.pid
114    }
115
116    /// The full section bytes (header included, CRC included if present).
117    #[must_use]
118    pub fn bytes(&self) -> &Bytes {
119        &self.bytes
120    }
121
122    /// The `table_id` (byte 0). Never panics — events are only built for
123    /// sections of at least 3 bytes.
124    #[must_use]
125    pub fn table_id(&self) -> u8 {
126        self.bytes[0]
127    }
128
129    /// True when this section uses the long-form syntax
130    /// (`section_syntax_indicator == 1`).
131    #[must_use]
132    fn is_long_form(&self) -> bool {
133        (self.bytes[1] & 0x80) != 0
134    }
135
136    /// 5-bit `version_number`, or `None` for short-form sections (which carry
137    /// no version field). Note the TOT, despite being short-form, has no
138    /// version field either, so this is `None` for it.
139    #[must_use]
140    pub fn version(&self) -> Option<u8> {
141        if self.is_long_form() && self.bytes.len() > 5 {
142            Some((self.bytes[5] >> 1) & 0x1F)
143        } else {
144            None
145        }
146    }
147
148    /// 16-bit `table_id_extension`, or `None` for short-form sections.
149    #[must_use]
150    pub fn table_id_extension(&self) -> Option<u16> {
151        if self.is_long_form() && self.bytes.len() > 4 {
152            Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
153        } else {
154            None
155        }
156    }
157
158    /// `section_number`, or `None` for short-form sections.
159    #[must_use]
160    pub fn section_number(&self) -> Option<u8> {
161        if self.is_long_form() && self.bytes.len() > 6 {
162            Some(self.bytes[6])
163        } else {
164            None
165        }
166    }
167
168    /// Always `true`: events are emitted only after CRC validation (or for
169    /// CRC-less short-form sections, where there is nothing to validate).
170    #[must_use]
171    pub fn crc_ok(&self) -> bool {
172        true
173    }
174
175    /// Typed table-section view (lazy, borrows this event's bytes).
176    ///
177    /// # Errors
178    /// Propagates the parse error from the dispatched table-section type.
179    pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
180        crate::tables::AnyTableSection::parse(&self.bytes)
181    }
182
183    /// Type-keyed view: `event.parse::<EitSection>()`.
184    ///
185    /// # Errors
186    /// Propagates `T::parse` errors.
187    pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
188        <T as dvb_common::Parse>::parse(&self.bytes)
189    }
190}
191
192/// Section statistics, monotonically accumulated across `feed` calls.
193#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
194pub struct Stats {
195    /// TS packets fed (every `feed` call increments this).
196    pub packets: u64,
197    /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
198    pub sections_completed: u64,
199    /// Sections emitted as events (changed, valid).
200    pub emitted: u64,
201    /// Sections suppressed by the version gate (unchanged repeats).
202    pub suppressed: u64,
203    /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
204    /// reassembler) and CRC-failed sections share this counter. Sections are
205    /// dropped before emission; the gate is never updated for them.
206    pub crc_failures: u64,
207    /// TS packets that failed to parse (bad sync byte, too short).
208    pub malformed_packets: u64,
209    /// Gate entries evicted because the gate was at capacity.
210    pub gate_evictions: u64,
211}
212
213/// What the gate remembers for one key, to decide "changed?".
214#[derive(Clone, Copy, PartialEq, Eq)]
215struct GateEntry {
216    /// Long-form version_number, or 0 for short-form (unused there).
217    version: u8,
218    /// CRC-32 over the whole section — the change hash. For long-form this is
219    /// the trailing CRC; for short-form it is computed over all bytes.
220    crc: u32,
221}
222
223/// Configuration captured by [`SiDemuxBuilder`].
224struct Config {
225    follow_pat: bool,
226    emit_repeats: bool,
227    gate_capacity: usize,
228}
229
230/// Builder for [`SiDemux`].
231///
232/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
233/// `emit_repeats = false`, `gate_capacity = 65_536`.
234pub struct SiDemuxBuilder {
235    follow_pat: bool,
236    dvb_si_pids: bool,
237    emit_repeats: bool,
238    gate_capacity: usize,
239    extra_pids: Vec<Pid>,
240}
241
242impl Default for SiDemuxBuilder {
243    fn default() -> Self {
244        Self {
245            follow_pat: true,
246            dvb_si_pids: true,
247            emit_repeats: false,
248            gate_capacity: 65_536,
249            extra_pids: Vec::new(),
250        }
251    }
252}
253
254impl SiDemuxBuilder {
255    /// When `true` (default), an emitted (changed) PAT auto-adds each
256    /// programme's PMT PID to the watch set.
257    #[must_use]
258    pub fn follow_pat(mut self, on: bool) -> Self {
259        self.follow_pat = on;
260        self
261    }
262
263    /// When `true` (default), pre-populate the watch set with the well-known
264    /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
265    #[must_use]
266    pub fn dvb_si_pids(mut self, on: bool) -> Self {
267        self.dvb_si_pids = on;
268        self
269    }
270
271    /// Add a PID to the watch set (additive; may be called repeatedly).
272    #[must_use]
273    pub fn pid(mut self, pid: Pid) -> Self {
274        self.extra_pids.push(pid);
275        self
276    }
277
278    /// When `true`, emit every complete valid section, bypassing the version
279    /// gate's suppression (the gate is still updated). Default `false`.
280    #[must_use]
281    pub fn emit_repeats(mut self, on: bool) -> Self {
282        self.emit_repeats = on;
283        self
284    }
285
286    /// Maximum number of distinct gate keys retained. At capacity the gate
287    /// FIFO-evicts the oldest key. Default 65 536.
288    #[must_use]
289    pub fn gate_capacity(mut self, cap: usize) -> Self {
290        self.gate_capacity = cap;
291        self
292    }
293
294    /// Build the [`SiDemux`].
295    #[must_use]
296    pub fn build(self) -> SiDemux {
297        let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
298        if self.dvb_si_pids {
299            use crate::pid::well_known as wk;
300            for pid in [
301                wk::PAT,
302                wk::CAT,
303                wk::NIT,
304                wk::SDT_BAT,
305                wk::EIT,
306                wk::RST,
307                wk::TDT_TOT,
308                wk::SAT,
309            ] {
310                pids.entry(pid).or_default();
311            }
312        }
313        for p in self.extra_pids {
314            pids.entry(p).or_default();
315        }
316        SiDemux {
317            pids,
318            gate: HashMap::new(),
319            gate_order: VecDeque::new(),
320            cfg: Config {
321                follow_pat: self.follow_pat,
322                emit_repeats: self.emit_repeats,
323                gate_capacity: self.gate_capacity,
324            },
325            stats: Stats::default(),
326            scratch: Vec::new(),
327        }
328    }
329}
330
331/// PID-filtered, version-gated SI section demultiplexer.
332///
333/// See the [module docs](crate::demux) for the gate and CRC policies.
334pub struct SiDemux {
335    pids: HashMap<Pid, SectionReassembler>,
336    // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
337    // FxHash) would shave cycles at high section rates; revisit if profiling
338    // shows it.
339    gate: HashMap<u64, GateEntry>,
340    gate_order: VecDeque<u64>,
341    cfg: Config,
342    stats: Stats,
343    scratch: Vec<SectionEvent>,
344}
345
346impl SiDemux {
347    /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
348    #[must_use]
349    pub fn builder() -> SiDemuxBuilder {
350        SiDemuxBuilder::default()
351    }
352
353    /// Accumulated statistics.
354    #[must_use]
355    pub fn stats(&self) -> Stats {
356        self.stats
357    }
358
359    /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
360    /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
361    /// the changed sections this packet completed.
362    pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
363        self.scratch.clear();
364        self.stats.packets += 1;
365
366        match TsPacket::parse(packet) {
367            Err(_) => self.stats.malformed_packets += 1,
368            Ok(ts) => {
369                let pid = Pid::new(ts.header.pid);
370                // Cheap miss: one map lookup for non-watched PIDs.
371                if self.pids.contains_key(&pid) {
372                    let payload = ts.payload.unwrap_or(&[]);
373                    // Feed the reassembler; the borrow is released before
374                    // `consider` (which may insert new PMT PIDs into the map).
375                    self.pids
376                        .get_mut(&pid)
377                        .expect("checked above")
378                        .feed(payload, ts.header.pusi);
379                    while let Some(section) = self
380                        .pids
381                        .get_mut(&pid)
382                        .and_then(SectionReassembler::pop_section)
383                    {
384                        self.stats.sections_completed += 1;
385                        self.consider(pid, section);
386                    }
387                }
388            }
389        }
390
391        self.scratch.drain(..)
392    }
393
394    /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
395    fn consider(&mut self, pid: Pid, section: Bytes) {
396        // Guard: sub-3-byte sections cannot carry a header. The reassembler
397        // should never emit one (it needs >= 3 bytes to know `expected`), but
398        // guard defensively and count it as a CRC failure bucket — it is a
399        // structurally invalid section, dropped without emission.
400        if section.len() < MIN_SECTION_LEN {
401            self.stats.crc_failures += 1;
402            return;
403        }
404
405        let table_id = section[0];
406        let long_form = (section[1] & 0x80) != 0;
407        // The TOT is short-form by its SSI bit but uniquely carries a CRC.
408        let has_crc = long_form || table_id == TOT_TABLE_ID;
409
410        // CRC policy: validate CRC-bearing sections before gating.
411        if has_crc {
412            if section.len() < CRC_LEN {
413                self.stats.crc_failures += 1;
414                return;
415            }
416            let covered = &section[..section.len() - CRC_LEN];
417            let declared = u32::from_be_bytes([
418                section[section.len() - 4],
419                section[section.len() - 3],
420                section[section.len() - 2],
421                section[section.len() - 1],
422            ]);
423            let computed = dvb_common::crc32_mpeg2::compute(covered);
424            if computed != declared {
425                self.stats.crc_failures += 1;
426                return;
427            }
428        }
429
430        // Build the gate key and change detector.
431        let (ext, section_number, version, change_crc) =
432            if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
433                let ext = ((section[3] as u16) << 8) | section[4] as u16;
434                let version = (section[5] >> 1) & 0x1F;
435                let section_number = section[6];
436                // For long-form the trailing CRC already uniquely fingerprints the
437                // payload; reuse it as the change hash.
438                let crc = u32::from_be_bytes([
439                    section[section.len() - 4],
440                    section[section.len() - 3],
441                    section[section.len() - 2],
442                    section[section.len() - 1],
443                ]);
444                (ext, section_number, version, crc)
445            } else {
446                // Short-form (incl. TOT and any malformed long-form that slipped
447                // the size check above): no version, ext/section_number = 0,
448                // change detector is a CRC over all the section bytes.
449                (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(&section))
450            };
451
452        let key = (pid.value() as u64)
453            | ((table_id as u64) << 13)
454            | ((ext as u64) << 21)
455            | ((section_number as u64) << 37);
456
457        let entry = GateEntry {
458            version,
459            crc: change_crc,
460        };
461
462        let changed = match self.gate.get(&key) {
463            Some(prev) => *prev != entry,
464            None => true,
465        };
466
467        // Update the gate (FIFO-evict at capacity for newly-seen keys).
468        if !self.gate.contains_key(&key) {
469            if self.gate.len() >= self.cfg.gate_capacity {
470                if let Some(old) = self.gate_order.pop_front() {
471                    self.gate.remove(&old);
472                    self.stats.gate_evictions += 1;
473                }
474            }
475            self.gate_order.push_back(key);
476        }
477        self.gate.insert(key, entry);
478
479        if changed || self.cfg.emit_repeats {
480            let event = SectionEvent {
481                pid,
482                bytes: section,
483            };
484            // PAT-follow happens on an emitted (changed) PAT only.
485            if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
486                self.follow_pat(&event);
487            }
488            self.stats.emitted += 1;
489            self.scratch.push(event);
490        } else {
491            self.stats.suppressed += 1;
492        }
493    }
494
495    /// Parse the PAT and register each programme's PMT PID with a fresh
496    /// reassembler. Parse failures are silently ignored — a malformed PAT that
497    /// nonetheless passed CRC is implausible, and we never panic.
498    fn follow_pat(&mut self, event: &SectionEvent) {
499        use crate::tables::pat::PatSection;
500        use dvb_common::Parse;
501        if let Ok(pat) = PatSection::parse(&event.bytes) {
502            for entry in &pat.entries {
503                if entry.program_number != 0 {
504                    self.pids.entry(Pid::new(entry.pid)).or_default();
505                }
506            }
507        }
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514    use crate::ts::{TsHeader, TS_PACKET_SIZE};
515
516    /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
517    /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
518    fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
519        let mut pkt = [0xFFu8; TS_PACKET_SIZE];
520        let header = TsHeader {
521            tei: false,
522            pusi: true,
523            pid,
524            scrambling: 0,
525            has_adaptation: false,
526            has_payload: true,
527            continuity_counter: 0,
528        };
529        header.serialize_into(&mut pkt);
530        pkt[4] = 0x00; // pointer_field
531        let start = 5;
532        assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
533        pkt[start..start + section.len()].copy_from_slice(section);
534        pkt
535    }
536
537    /// Build a long-form section with a correct trailing CRC-32.
538    fn long_section(
539        table_id: u8,
540        ext: u16,
541        version: u8,
542        section_number: u8,
543        payload: &[u8],
544    ) -> Vec<u8> {
545        let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
546        let mut v = vec![
547            table_id,
548            0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
549            (section_length & 0xFF) as u8,
550            (ext >> 8) as u8,
551            (ext & 0xFF) as u8,
552            0xC0 | ((version & 0x1F) << 1) | 0x01,
553            section_number,
554            section_number, // last_section_number
555        ];
556        v.extend_from_slice(payload);
557        let crc = dvb_common::crc32_mpeg2::compute(&v);
558        v.extend_from_slice(&crc.to_be_bytes());
559        v
560    }
561
562    /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
563    fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
564        let mut body = Vec::new();
565        for &(pn, pid) in entries {
566            body.extend_from_slice(&pn.to_be_bytes());
567            body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
568            body.push((pid & 0xFF) as u8);
569        }
570        long_section(0x00, tsid, version, 0, &body)
571    }
572
573    /// Build a PMT section (real CRC). One stream entry.
574    fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
575        // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
576        // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
577        let body = [
578            0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
579            (pcr_pid & 0xFF) as u8,
580            0xF0,
581            0x00,
582            0x02,
583            0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
584            ((pcr_pid + 1) & 0xFF) as u8,
585            0xF0,
586            0x00,
587        ];
588        long_section(0x02, program_number, version, 0, &body)
589    }
590
591    #[test]
592    fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
593        let mut demux = SiDemux::builder().build();
594
595        let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
596        let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
597
598        let pkt_v0 = ts_packet(0x0000, &pat_v0);
599        let pkt_v1 = ts_packet(0x0000, &pat_v1);
600
601        let n0: Vec<_> = demux.feed(&pkt_v0).collect();
602        assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
603        assert_eq!(n0[0].table_id(), 0x00);
604        assert_eq!(n0[0].version(), Some(0));
605
606        let n1: Vec<_> = demux.feed(&pkt_v0).collect();
607        assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
608
609        let n2: Vec<_> = demux.feed(&pkt_v1).collect();
610        assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
611        assert_eq!(n2[0].version(), Some(1));
612
613        let s = demux.stats();
614        assert_eq!(s.sections_completed, 3);
615        assert_eq!(s.emitted, 2);
616        assert_eq!(s.suppressed, 1);
617        assert_eq!(s.crc_failures, 0);
618    }
619
620    #[test]
621    fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
622        use crate::tables::AnyTableSection;
623        let mut demux = SiDemux::builder().build();
624
625        // PAT maps programme 1 -> PMT on PID 0x0100.
626        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
627        let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
628        assert_eq!(pat_evts.len(), 1);
629
630        // Before follow, a PMT packet on 0x0100 would be ignored. After the
631        // PAT was emitted, 0x0100 is watched.
632        let pmt = pmt_section(1, 0, 0x0100);
633        let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
634        assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
635        assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
636        match pmt_evts[0].table_section().unwrap() {
637            AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
638            other => panic!("expected PmtSection, got {other:?}"),
639        }
640    }
641
642    #[test]
643    fn corrupted_crc_sdt_dropped_and_counted() {
644        let mut demux = SiDemux::builder().build();
645        // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
646        let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
647        // Corrupt a payload byte AFTER the CRC was computed.
648        sdt[8] ^= 0xFF;
649        let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
650        assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
651        let s = demux.stats();
652        assert_eq!(s.crc_failures, 1);
653        assert_eq!(s.emitted, 0);
654        assert_eq!(s.sections_completed, 1);
655    }
656
657    #[test]
658    fn gate_capacity_evicts_fifo_and_reemits() {
659        let mut demux = SiDemux::builder().gate_capacity(2).build();
660
661        // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
662        // all on the EIT pid 0x0012.
663        let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
664        let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
665        let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
666
667        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
668        assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
669        // Inserting c evicts a (the oldest).
670        assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
671        assert_eq!(demux.stats().gate_evictions, 1);
672
673        // a was evicted -> re-feeding it re-emits (treated as newly seen).
674        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
675    }
676
677    #[test]
678    fn garbage_packet_counted_no_panic() {
679        let mut demux = SiDemux::builder().build();
680        let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
681        let evts: Vec<_> = demux.feed(&garbage).collect();
682        assert_eq!(evts.len(), 0);
683        assert_eq!(demux.stats().malformed_packets, 1);
684        assert_eq!(demux.stats().packets, 1);
685    }
686
687    #[test]
688    fn emit_repeats_bypasses_suppression() {
689        let mut demux = SiDemux::builder().emit_repeats(true).build();
690        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
691        let pkt = ts_packet(0x0000, &pat);
692        assert_eq!(demux.feed(&pkt).count(), 1);
693        assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
694        assert_eq!(demux.stats().suppressed, 0);
695        assert_eq!(demux.stats().emitted, 2);
696    }
697}