Skip to main content

dvb_si/
demux.rs

1//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
2//!
3//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
4//! iterator of [`SectionEvent`]s — one per **changed** complete section.
5//! The demux reassembles sections per PID (via
6//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
7//! sections, and suppresses repeats through a version gate so that a steady
8//! carousel of unchanging tables produces no events after the first.
9//!
10//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
11//! cheap to clone; typed views ([`SectionEvent::table`],
12//! [`SectionEvent::parse`]) borrow the event lazily.
13//!
14//! ```
15//! use dvb_common::Serialize;
16//! use dvb_si::demux::SiDemux;
17//! use dvb_si::tables::AnyTable;
18//! use dvb_si::tables::pat::{Pat, PatEntry};
19//!
20//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
21//! // example is self-contained. In real code `packet` comes from your source.
22//! let pat = Pat {
23//!     transport_stream_id: 1, version_number: 0, current_next_indicator: true,
24//!     section_number: 0, last_section_number: 0,
25//!     entries: vec![PatEntry { program_number: 1, pid: 0x0100 }],
26//! };
27//! let mut section = vec![0u8; pat.serialized_len()];
28//! pat.serialize_into(&mut section).unwrap();
29//! let mut packet = [0xFFu8; 188];
30//! packet[0] = 0x47;  // sync
31//! packet[1] = 0x40;  // PUSI=1, PID hi=0
32//! packet[2] = 0x00;  // PID lo=0 (PAT)
33//! packet[3] = 0x10;  // payload only
34//! packet[4] = 0x00;  // pointer_field
35//! packet[5..5 + section.len()].copy_from_slice(&section);
36//!
37//! let mut demux = SiDemux::builder().build();
38//! let events: Vec<_> = demux.feed(&packet).collect();
39//! assert_eq!(events.len(), 1);
40//! match events[0].table() {
41//!     Ok(AnyTable::Pat(pat)) => {
42//!         println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
43//!         assert_eq!(pat.entries[0].pid, 0x0100);
44//!     }
45//!     other => panic!("expected PAT, got {other:?}"),
46//! }
47//! ```
48//!
49//! # Version gate
50//!
51//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
52//! into a `u64` key. The stored value is a change detector:
53//!
54//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
55//!   exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
56//!   gate stores `(version, crc32)`. A repeat with the same version *and* CRC
57//!   is suppressed.
58//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
59//!   the gate stores a CRC-32 *computed over the whole section* purely as a
60//!   change hash. `table_id_extension` and `section_number` collapse to 0 in
61//!   the key.
62//!
63//! # CRC policy
64//!
65//! CRC-bearing sections (every long-form section, plus the short-form TOT
66//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
67//! before gating. Failures are dropped and counted in
68//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
69//! TDT carries no CRC and is therefore never dropped for CRC reasons.
70
71use std::collections::{HashMap, VecDeque};
72
73use bytes::Bytes;
74
75use crate::pid::Pid;
76use crate::ts::{SectionReassembler, TsPacket};
77
78/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
79const PAT_TABLE_ID: u8 = 0x00;
80/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
81const TOT_TABLE_ID: u8 = 0x73;
82/// Minimum bytes required to read a section header (table_id + length field).
83const MIN_SECTION_LEN: usize = 3;
84/// Long-form extension header bytes (after the 3-byte common header).
85const LONG_FORM_EXTRA: usize = 5;
86/// Trailing CRC-32 length.
87const CRC_LEN: usize = 4;
88
89/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
90///
91/// A `SectionEvent` is only ever constructed for a section that
92/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
93/// validation. So [`SectionEvent::crc_ok`] is always `true` and
94/// [`SectionEvent::table_id`] never panics.
95#[derive(Debug, Clone)]
96pub struct SectionEvent {
97    pid: Pid,
98    bytes: Bytes,
99}
100
101impl SectionEvent {
102    /// PID this section was carried on.
103    #[must_use]
104    pub fn pid(&self) -> Pid {
105        self.pid
106    }
107
108    /// The full section bytes (header included, CRC included if present).
109    #[must_use]
110    pub fn bytes(&self) -> &Bytes {
111        &self.bytes
112    }
113
114    /// The `table_id` (byte 0). Never panics — events are only built for
115    /// sections of at least 3 bytes.
116    #[must_use]
117    pub fn table_id(&self) -> u8 {
118        self.bytes[0]
119    }
120
121    /// True when this section uses the long-form syntax
122    /// (`section_syntax_indicator == 1`).
123    #[must_use]
124    fn is_long_form(&self) -> bool {
125        (self.bytes[1] & 0x80) != 0
126    }
127
128    /// 5-bit `version_number`, or `None` for short-form sections (which carry
129    /// no version field). Note the TOT, despite being short-form, has no
130    /// version field either, so this is `None` for it.
131    #[must_use]
132    pub fn version(&self) -> Option<u8> {
133        if self.is_long_form() && self.bytes.len() > 5 {
134            Some((self.bytes[5] >> 1) & 0x1F)
135        } else {
136            None
137        }
138    }
139
140    /// 16-bit `table_id_extension`, or `None` for short-form sections.
141    #[must_use]
142    pub fn table_id_extension(&self) -> Option<u16> {
143        if self.is_long_form() && self.bytes.len() > 4 {
144            Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
145        } else {
146            None
147        }
148    }
149
150    /// `section_number`, or `None` for short-form sections.
151    #[must_use]
152    pub fn section_number(&self) -> Option<u8> {
153        if self.is_long_form() && self.bytes.len() > 6 {
154            Some(self.bytes[6])
155        } else {
156            None
157        }
158    }
159
160    /// Always `true`: events are emitted only after CRC validation (or for
161    /// CRC-less short-form sections, where there is nothing to validate).
162    #[must_use]
163    pub fn crc_ok(&self) -> bool {
164        true
165    }
166
167    /// Typed view (lazy, borrows this event's bytes).
168    ///
169    /// # Errors
170    /// Propagates the parse error from the dispatched table type.
171    pub fn table(&self) -> crate::Result<crate::tables::AnyTable<'_>> {
172        crate::tables::AnyTable::parse(&self.bytes)
173    }
174
175    /// Type-keyed view: `event.parse::<Eit>()`.
176    ///
177    /// # Errors
178    /// Propagates `T::parse` errors.
179    pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
180        <T as dvb_common::Parse>::parse(&self.bytes)
181    }
182}
183
184/// Section statistics, monotonically accumulated across `feed` calls.
185#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
186pub struct Stats {
187    /// TS packets fed (every `feed` call increments this).
188    pub packets: u64,
189    /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
190    pub sections_completed: u64,
191    /// Sections emitted as events (changed, valid).
192    pub emitted: u64,
193    /// Sections suppressed by the version gate (unchanged repeats).
194    pub suppressed: u64,
195    /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
196    /// reassembler) and CRC-failed sections share this counter. Sections are
197    /// dropped before emission; the gate is never updated for them.
198    pub crc_failures: u64,
199    /// TS packets that failed to parse (bad sync byte, too short).
200    pub malformed_packets: u64,
201    /// Gate entries evicted because the gate was at capacity.
202    pub gate_evictions: u64,
203}
204
205/// What the gate remembers for one key, to decide "changed?".
206#[derive(Clone, Copy, PartialEq, Eq)]
207struct GateEntry {
208    /// Long-form version_number, or 0 for short-form (unused there).
209    version: u8,
210    /// CRC-32 over the whole section — the change hash. For long-form this is
211    /// the trailing CRC; for short-form it is computed over all bytes.
212    crc: u32,
213}
214
215/// Configuration captured by [`SiDemuxBuilder`].
216struct Config {
217    follow_pat: bool,
218    emit_repeats: bool,
219    gate_capacity: usize,
220}
221
222/// Builder for [`SiDemux`].
223///
224/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
225/// `emit_repeats = false`, `gate_capacity = 65_536`.
226pub struct SiDemuxBuilder {
227    follow_pat: bool,
228    dvb_si_pids: bool,
229    emit_repeats: bool,
230    gate_capacity: usize,
231    extra_pids: Vec<Pid>,
232}
233
234impl Default for SiDemuxBuilder {
235    fn default() -> Self {
236        Self {
237            follow_pat: true,
238            dvb_si_pids: true,
239            emit_repeats: false,
240            gate_capacity: 65_536,
241            extra_pids: Vec::new(),
242        }
243    }
244}
245
246impl SiDemuxBuilder {
247    /// When `true` (default), an emitted (changed) PAT auto-adds each
248    /// programme's PMT PID to the watch set.
249    #[must_use]
250    pub fn follow_pat(mut self, on: bool) -> Self {
251        self.follow_pat = on;
252        self
253    }
254
255    /// When `true` (default), pre-populate the watch set with the well-known
256    /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
257    #[must_use]
258    pub fn dvb_si_pids(mut self, on: bool) -> Self {
259        self.dvb_si_pids = on;
260        self
261    }
262
263    /// Add a PID to the watch set (additive; may be called repeatedly).
264    #[must_use]
265    pub fn pid(mut self, pid: Pid) -> Self {
266        self.extra_pids.push(pid);
267        self
268    }
269
270    /// When `true`, emit every complete valid section, bypassing the version
271    /// gate's suppression (the gate is still updated). Default `false`.
272    #[must_use]
273    pub fn emit_repeats(mut self, on: bool) -> Self {
274        self.emit_repeats = on;
275        self
276    }
277
278    /// Maximum number of distinct gate keys retained. At capacity the gate
279    /// FIFO-evicts the oldest key. Default 65 536.
280    #[must_use]
281    pub fn gate_capacity(mut self, cap: usize) -> Self {
282        self.gate_capacity = cap;
283        self
284    }
285
286    /// Build the [`SiDemux`].
287    #[must_use]
288    pub fn build(self) -> SiDemux {
289        let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
290        if self.dvb_si_pids {
291            use crate::pid::well_known as wk;
292            for pid in [
293                wk::PAT,
294                wk::CAT,
295                wk::NIT,
296                wk::SDT_BAT,
297                wk::EIT,
298                wk::RST,
299                wk::TDT_TOT,
300                wk::SAT,
301            ] {
302                pids.entry(pid).or_default();
303            }
304        }
305        for p in self.extra_pids {
306            pids.entry(p).or_default();
307        }
308        SiDemux {
309            pids,
310            gate: HashMap::new(),
311            gate_order: VecDeque::new(),
312            cfg: Config {
313                follow_pat: self.follow_pat,
314                emit_repeats: self.emit_repeats,
315                gate_capacity: self.gate_capacity,
316            },
317            stats: Stats::default(),
318            scratch: Vec::new(),
319        }
320    }
321}
322
323/// PID-filtered, version-gated SI section demultiplexer.
324///
325/// See the [module docs](crate::demux) for the gate and CRC policies.
326pub struct SiDemux {
327    pids: HashMap<Pid, SectionReassembler>,
328    // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
329    // FxHash) would shave cycles at high section rates; revisit if profiling
330    // shows it.
331    gate: HashMap<u64, GateEntry>,
332    gate_order: VecDeque<u64>,
333    cfg: Config,
334    stats: Stats,
335    scratch: Vec<SectionEvent>,
336}
337
338impl SiDemux {
339    /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
340    #[must_use]
341    pub fn builder() -> SiDemuxBuilder {
342        SiDemuxBuilder::default()
343    }
344
345    /// Accumulated statistics.
346    #[must_use]
347    pub fn stats(&self) -> Stats {
348        self.stats
349    }
350
351    /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
352    /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
353    /// the changed sections this packet completed.
354    pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
355        self.scratch.clear();
356        self.stats.packets += 1;
357
358        match TsPacket::parse(packet) {
359            Err(_) => self.stats.malformed_packets += 1,
360            Ok(ts) => {
361                let pid = Pid::new(ts.header.pid);
362                // Cheap miss: one map lookup for non-watched PIDs.
363                if self.pids.contains_key(&pid) {
364                    let payload = ts.payload.unwrap_or(&[]);
365                    // Feed the reassembler; the borrow is released before
366                    // `consider` (which may insert new PMT PIDs into the map).
367                    self.pids
368                        .get_mut(&pid)
369                        .expect("checked above")
370                        .feed(payload, ts.header.pusi);
371                    while let Some(section) = self
372                        .pids
373                        .get_mut(&pid)
374                        .and_then(SectionReassembler::pop_section)
375                    {
376                        self.stats.sections_completed += 1;
377                        self.consider(pid, section);
378                    }
379                }
380            }
381        }
382
383        self.scratch.drain(..)
384    }
385
386    /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
387    fn consider(&mut self, pid: Pid, section: Bytes) {
388        // Guard: sub-3-byte sections cannot carry a header. The reassembler
389        // should never emit one (it needs >= 3 bytes to know `expected`), but
390        // guard defensively and count it as a CRC failure bucket — it is a
391        // structurally invalid section, dropped without emission.
392        if section.len() < MIN_SECTION_LEN {
393            self.stats.crc_failures += 1;
394            return;
395        }
396
397        let table_id = section[0];
398        let long_form = (section[1] & 0x80) != 0;
399        // The TOT is short-form by its SSI bit but uniquely carries a CRC.
400        let has_crc = long_form || table_id == TOT_TABLE_ID;
401
402        // CRC policy: validate CRC-bearing sections before gating.
403        if has_crc {
404            if section.len() < CRC_LEN {
405                self.stats.crc_failures += 1;
406                return;
407            }
408            let covered = &section[..section.len() - CRC_LEN];
409            let declared = u32::from_be_bytes([
410                section[section.len() - 4],
411                section[section.len() - 3],
412                section[section.len() - 2],
413                section[section.len() - 1],
414            ]);
415            let computed = dvb_common::crc32_mpeg2::compute(covered);
416            if computed != declared {
417                self.stats.crc_failures += 1;
418                return;
419            }
420        }
421
422        // Build the gate key and change detector.
423        let (ext, section_number, version, change_crc) =
424            if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
425                let ext = ((section[3] as u16) << 8) | section[4] as u16;
426                let version = (section[5] >> 1) & 0x1F;
427                let section_number = section[6];
428                // For long-form the trailing CRC already uniquely fingerprints the
429                // payload; reuse it as the change hash.
430                let crc = u32::from_be_bytes([
431                    section[section.len() - 4],
432                    section[section.len() - 3],
433                    section[section.len() - 2],
434                    section[section.len() - 1],
435                ]);
436                (ext, section_number, version, crc)
437            } else {
438                // Short-form (incl. TOT and any malformed long-form that slipped
439                // the size check above): no version, ext/section_number = 0,
440                // change detector is a CRC over all the section bytes.
441                (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(&section))
442            };
443
444        let key = (pid.value() as u64)
445            | ((table_id as u64) << 13)
446            | ((ext as u64) << 21)
447            | ((section_number as u64) << 37);
448
449        let entry = GateEntry {
450            version,
451            crc: change_crc,
452        };
453
454        let changed = match self.gate.get(&key) {
455            Some(prev) => *prev != entry,
456            None => true,
457        };
458
459        // Update the gate (FIFO-evict at capacity for newly-seen keys).
460        if !self.gate.contains_key(&key) {
461            if self.gate.len() >= self.cfg.gate_capacity {
462                if let Some(old) = self.gate_order.pop_front() {
463                    self.gate.remove(&old);
464                    self.stats.gate_evictions += 1;
465                }
466            }
467            self.gate_order.push_back(key);
468        }
469        self.gate.insert(key, entry);
470
471        if changed || self.cfg.emit_repeats {
472            let event = SectionEvent {
473                pid,
474                bytes: section,
475            };
476            // PAT-follow happens on an emitted (changed) PAT only.
477            if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
478                self.follow_pat(&event);
479            }
480            self.stats.emitted += 1;
481            self.scratch.push(event);
482        } else {
483            self.stats.suppressed += 1;
484        }
485    }
486
487    /// Parse the PAT and register each programme's PMT PID with a fresh
488    /// reassembler. Parse failures are silently ignored — a malformed PAT that
489    /// nonetheless passed CRC is implausible, and we never panic.
490    fn follow_pat(&mut self, event: &SectionEvent) {
491        use crate::tables::pat::Pat;
492        use dvb_common::Parse;
493        if let Ok(pat) = Pat::parse(&event.bytes) {
494            for entry in &pat.entries {
495                if entry.program_number != 0 {
496                    self.pids.entry(Pid::new(entry.pid)).or_default();
497                }
498            }
499        }
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use crate::ts::{TsHeader, TS_PACKET_SIZE};
507
508    /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
509    /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
510    fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
511        let mut pkt = [0xFFu8; TS_PACKET_SIZE];
512        let header = TsHeader {
513            tei: false,
514            pusi: true,
515            pid,
516            scrambling: 0,
517            has_adaptation: false,
518            has_payload: true,
519            continuity_counter: 0,
520        };
521        header.serialize_into(&mut pkt);
522        pkt[4] = 0x00; // pointer_field
523        let start = 5;
524        assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
525        pkt[start..start + section.len()].copy_from_slice(section);
526        pkt
527    }
528
529    /// Build a long-form section with a correct trailing CRC-32.
530    fn long_section(
531        table_id: u8,
532        ext: u16,
533        version: u8,
534        section_number: u8,
535        payload: &[u8],
536    ) -> Vec<u8> {
537        let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
538        let mut v = vec![
539            table_id,
540            0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
541            (section_length & 0xFF) as u8,
542            (ext >> 8) as u8,
543            (ext & 0xFF) as u8,
544            0xC0 | ((version & 0x1F) << 1) | 0x01,
545            section_number,
546            section_number, // last_section_number
547        ];
548        v.extend_from_slice(payload);
549        let crc = dvb_common::crc32_mpeg2::compute(&v);
550        v.extend_from_slice(&crc.to_be_bytes());
551        v
552    }
553
554    /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
555    fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
556        let mut body = Vec::new();
557        for &(pn, pid) in entries {
558            body.extend_from_slice(&pn.to_be_bytes());
559            body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
560            body.push((pid & 0xFF) as u8);
561        }
562        long_section(0x00, tsid, version, 0, &body)
563    }
564
565    /// Build a PMT section (real CRC). One stream entry.
566    fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
567        // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
568        // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
569        let body = [
570            0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
571            (pcr_pid & 0xFF) as u8,
572            0xF0,
573            0x00,
574            0x02,
575            0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
576            ((pcr_pid + 1) & 0xFF) as u8,
577            0xF0,
578            0x00,
579        ];
580        long_section(0x02, program_number, version, 0, &body)
581    }
582
583    #[test]
584    fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
585        let mut demux = SiDemux::builder().build();
586
587        let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
588        let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
589
590        let pkt_v0 = ts_packet(0x0000, &pat_v0);
591        let pkt_v1 = ts_packet(0x0000, &pat_v1);
592
593        let n0: Vec<_> = demux.feed(&pkt_v0).collect();
594        assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
595        assert_eq!(n0[0].table_id(), 0x00);
596        assert_eq!(n0[0].version(), Some(0));
597
598        let n1: Vec<_> = demux.feed(&pkt_v0).collect();
599        assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
600
601        let n2: Vec<_> = demux.feed(&pkt_v1).collect();
602        assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
603        assert_eq!(n2[0].version(), Some(1));
604
605        let s = demux.stats();
606        assert_eq!(s.sections_completed, 3);
607        assert_eq!(s.emitted, 2);
608        assert_eq!(s.suppressed, 1);
609        assert_eq!(s.crc_failures, 0);
610    }
611
612    #[test]
613    fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
614        use crate::tables::AnyTable;
615        let mut demux = SiDemux::builder().build();
616
617        // PAT maps programme 1 -> PMT on PID 0x0100.
618        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
619        let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
620        assert_eq!(pat_evts.len(), 1);
621
622        // Before follow, a PMT packet on 0x0100 would be ignored. After the
623        // PAT was emitted, 0x0100 is watched.
624        let pmt = pmt_section(1, 0, 0x0100);
625        let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
626        assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
627        assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
628        match pmt_evts[0].table().unwrap() {
629            AnyTable::Pmt(p) => assert_eq!(p.program_number, 1),
630            other => panic!("expected Pmt, got {other:?}"),
631        }
632    }
633
634    #[test]
635    fn corrupted_crc_sdt_dropped_and_counted() {
636        let mut demux = SiDemux::builder().build();
637        // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
638        let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
639        // Corrupt a payload byte AFTER the CRC was computed.
640        sdt[8] ^= 0xFF;
641        let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
642        assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
643        let s = demux.stats();
644        assert_eq!(s.crc_failures, 1);
645        assert_eq!(s.emitted, 0);
646        assert_eq!(s.sections_completed, 1);
647    }
648
649    #[test]
650    fn gate_capacity_evicts_fifo_and_reemits() {
651        let mut demux = SiDemux::builder().gate_capacity(2).build();
652
653        // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
654        // all on the EIT pid 0x0012.
655        let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
656        let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
657        let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
658
659        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
660        assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
661        // Inserting c evicts a (the oldest).
662        assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
663        assert_eq!(demux.stats().gate_evictions, 1);
664
665        // a was evicted -> re-feeding it re-emits (treated as newly seen).
666        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
667    }
668
669    #[test]
670    fn garbage_packet_counted_no_panic() {
671        let mut demux = SiDemux::builder().build();
672        let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
673        let evts: Vec<_> = demux.feed(&garbage).collect();
674        assert_eq!(evts.len(), 0);
675        assert_eq!(demux.stats().malformed_packets, 1);
676        assert_eq!(demux.stats().packets, 1);
677    }
678
679    #[test]
680    fn emit_repeats_bypasses_suppression() {
681        let mut demux = SiDemux::builder().emit_repeats(true).build();
682        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
683        let pkt = ts_packet(0x0000, &pat);
684        assert_eq!(demux.feed(&pkt).count(), 1);
685        assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
686        assert_eq!(demux.stats().suppressed, 0);
687        assert_eq!(demux.stats().emitted, 2);
688    }
689}