Skip to main content

dvb_si/
demux.rs

1//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
2//!
3//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
4//! iterator of [`SectionEvent`]s — one per **changed** complete section.
5//! The demux reassembles sections per PID (via
6//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
7//! sections, and suppresses repeats through a version gate so that a steady
8//! carousel of unchanging tables produces no events after the first.
9//!
10//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
11//! cheap to clone; typed views ([`SectionEvent::table_section`],
12//! [`SectionEvent::parse`]) borrow the event lazily.
13//!
14//! ```
15//! use dvb_common::Serialize;
16//! use dvb_si::demux::SiDemux;
17//! use dvb_si::tables::AnyTableSection;
18//! use dvb_si::tables::pat::{PatSection, PatEntry};
19//!
20//! const PMT_PID: u16 = 0x0100;
21//! const TS_SYNC_BYTE: u8 = 0x47;
22//! const PAYLOAD_UNIT_START_INDICATOR: u8 = 0x40;
23//! const PAT_PID_LOW_BYTE: u8 = 0x00;
24//! const PAYLOAD_ONLY: u8 = 0x10;
25//! const POINTER_FIELD_START: u8 = 0x00;
26//! const STUFFING_BYTE: u8 = 0xFF;
27//!
28//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
29//! // example is self-contained. In real code `packet` comes from your source.
30//! let pat = PatSection {
31//!     transport_stream_id: 1, version_number: 0, current_next_indicator: true,
32//!     section_number: 0, last_section_number: 0,
33//!     entries: vec![PatEntry { program_number: 1, pid: PMT_PID }],
34//! };
35//! let mut section = vec![0u8; pat.serialized_len()];
36//! pat.serialize_into(&mut section).unwrap();
37//! let mut packet = [STUFFING_BYTE; 188];
38//! packet[0] = TS_SYNC_BYTE;
39//! packet[1] = PAYLOAD_UNIT_START_INDICATOR;
40//! packet[2] = PAT_PID_LOW_BYTE;
41//! packet[3] = PAYLOAD_ONLY;
42//! packet[4] = POINTER_FIELD_START;
43//! packet[5..5 + section.len()].copy_from_slice(&section);
44//!
45//! let mut demux = SiDemux::builder().build();
46//! let events: Vec<_> = demux.feed(&packet).collect();
47//! assert_eq!(events.len(), 1);
48//! match events[0].table_section() {
49//!     Ok(AnyTableSection::PatSection(pat)) => {
50//!         println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
51//!         assert_eq!(pat.entries[0].pid, PMT_PID);
52//!     }
53//!     other => panic!("expected PAT, got {other:?}"),
54//! }
55//! ```
56//!
57//! # Version gate
58//!
59//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
60//! into a `u64` key. The stored value is a change detector:
61//!
62//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
63//!   exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
64//!   gate stores `(version, crc32)`. A repeat with the same version *and* CRC
65//!   is suppressed.
66//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
67//!   the gate stores a CRC-32 *computed over the whole section* purely as a
68//!   change hash. `table_id_extension` and `section_number` collapse to 0 in
69//!   the key.
70//!
71//! # CRC policy
72//!
73//! CRC-bearing sections (every long-form section, plus the short-form TOT
74//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
75//! before gating. Failures are dropped and counted in
76//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
77//! TDT carries no CRC and is therefore never dropped for CRC reasons.
78
79use std::collections::{HashMap, VecDeque};
80
81use bytes::Bytes;
82
83use crate::pid::Pid;
84use crate::ts::{SectionReassembler, TsPacket};
85
86/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
87const PAT_TABLE_ID: u8 = 0x00;
88/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
89const TOT_TABLE_ID: u8 = 0x73;
90/// Minimum bytes required to read a section header (table_id + length field).
91const MIN_SECTION_LEN: usize = 3;
92/// Long-form extension header bytes (after the 3-byte common header).
93const LONG_FORM_EXTRA: usize = 5;
94/// Trailing CRC-32 length.
95const CRC_LEN: usize = 4;
96
97/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
98///
99/// A `SectionEvent` is only ever constructed for a section that
100/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
101/// validation. So [`SectionEvent::crc_ok`] is always `true` and
102/// [`SectionEvent::table_id`] never panics.
103#[derive(Debug, Clone)]
104pub struct SectionEvent {
105    pid: Pid,
106    bytes: Bytes,
107}
108
109impl SectionEvent {
110    /// PID this section was carried on.
111    #[must_use]
112    pub fn pid(&self) -> Pid {
113        self.pid
114    }
115
116    /// The full section bytes (header included, CRC included if present).
117    #[must_use]
118    pub fn bytes(&self) -> &Bytes {
119        &self.bytes
120    }
121
122    /// The `table_id` (byte 0). Never panics — events are only built for
123    /// sections of at least 3 bytes.
124    #[must_use]
125    pub fn table_id(&self) -> u8 {
126        self.bytes[0]
127    }
128
129    /// True when this section uses the long-form syntax
130    /// (`section_syntax_indicator == 1`).
131    #[must_use]
132    fn is_long_form(&self) -> bool {
133        (self.bytes[1] & 0x80) != 0
134    }
135
136    /// 5-bit `version_number`, or `None` for short-form sections (which carry
137    /// no version field). Note the TOT, despite being short-form, has no
138    /// version field either, so this is `None` for it.
139    #[must_use]
140    pub fn version(&self) -> Option<u8> {
141        if self.is_long_form() && self.bytes.len() > 5 {
142            Some((self.bytes[5] >> 1) & 0x1F)
143        } else {
144            None
145        }
146    }
147
148    /// 16-bit `table_id_extension`, or `None` for short-form sections.
149    #[must_use]
150    pub fn table_id_extension(&self) -> Option<u16> {
151        if self.is_long_form() && self.bytes.len() > 4 {
152            Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
153        } else {
154            None
155        }
156    }
157
158    /// `section_number`, or `None` for short-form sections.
159    #[must_use]
160    pub fn section_number(&self) -> Option<u8> {
161        if self.is_long_form() && self.bytes.len() > 6 {
162            Some(self.bytes[6])
163        } else {
164            None
165        }
166    }
167
168    /// Always `true`: events are emitted only after CRC validation (or for
169    /// CRC-less short-form sections, where there is nothing to validate).
170    #[must_use]
171    pub fn crc_ok(&self) -> bool {
172        true
173    }
174
175    /// Typed table-section view (lazy, borrows this event's bytes).
176    ///
177    /// # Errors
178    /// Propagates the parse error from the dispatched table-section type.
179    pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
180        crate::tables::AnyTableSection::parse(&self.bytes)
181    }
182
183    /// Typed table-section view with custom-registry support (lazy, borrows
184    /// this event's bytes).
185    ///
186    /// Precedence: custom-registered parser → built-in dispatch → Unknown.
187    /// See [`AnyTableSection::parse_with`][crate::tables::AnyTableSection::parse_with].
188    ///
189    /// # Errors
190    /// Propagates the parse error from the dispatched table-section type.
191    pub fn table_section_with(
192        &self,
193        registry: &crate::tables::registry::TableRegistry,
194    ) -> crate::Result<crate::tables::AnyTableSection<'_>> {
195        crate::tables::AnyTableSection::parse_with(registry, &self.bytes)
196    }
197
198    /// Type-keyed view: `event.parse::<EitSection>()`.
199    ///
200    /// # Errors
201    /// Propagates `T::parse` errors.
202    pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
203        <T as dvb_common::Parse>::parse(&self.bytes)
204    }
205}
206
207/// Section statistics, monotonically accumulated across `feed` calls.
208#[non_exhaustive]
209#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
210pub struct Stats {
211    /// TS packets fed (every `feed` call increments this).
212    pub packets: u64,
213    /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
214    pub sections_completed: u64,
215    /// Sections emitted as events (changed, valid).
216    pub emitted: u64,
217    /// Sections suppressed by the version gate (unchanged repeats).
218    pub suppressed: u64,
219    /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
220    /// reassembler) and CRC-failed sections share this counter. Sections are
221    /// dropped before emission; the gate is never updated for them.
222    pub crc_failures: u64,
223    /// TS packets that failed to parse (bad sync byte, too short).
224    pub malformed_packets: u64,
225    /// Gate entries evicted because the gate was at capacity.
226    pub gate_evictions: u64,
227}
228
229/// What the gate remembers for one key, to decide "changed?".
230#[derive(Clone, Copy, PartialEq, Eq)]
231struct GateEntry {
232    /// Long-form version_number, or 0 for short-form (unused there).
233    version: u8,
234    /// CRC-32 over the whole section — the change hash. For long-form this is
235    /// the trailing CRC; for short-form it is computed over all bytes.
236    crc: u32,
237}
238
239/// Configuration captured by [`SiDemuxBuilder`].
240struct Config {
241    follow_pat: bool,
242    emit_repeats: bool,
243    gate_capacity: usize,
244}
245
246/// Builder for [`SiDemux`].
247///
248/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
249/// `emit_repeats = false`, `gate_capacity = 65_536`.
250pub struct SiDemuxBuilder {
251    follow_pat: bool,
252    dvb_si_pids: bool,
253    emit_repeats: bool,
254    gate_capacity: usize,
255    extra_pids: Vec<Pid>,
256}
257
258impl Default for SiDemuxBuilder {
259    fn default() -> Self {
260        Self {
261            follow_pat: true,
262            dvb_si_pids: true,
263            emit_repeats: false,
264            gate_capacity: 65_536,
265            extra_pids: Vec::new(),
266        }
267    }
268}
269
270impl SiDemuxBuilder {
271    /// When `true` (default), an emitted (changed) PAT auto-adds each
272    /// programme's PMT PID to the watch set.
273    #[must_use]
274    pub fn follow_pat(mut self, on: bool) -> Self {
275        self.follow_pat = on;
276        self
277    }
278
279    /// When `true` (default), pre-populate the watch set with the well-known
280    /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
281    #[must_use]
282    pub fn dvb_si_pids(mut self, on: bool) -> Self {
283        self.dvb_si_pids = on;
284        self
285    }
286
287    /// Add a PID to the watch set (additive; may be called repeatedly).
288    #[must_use]
289    pub fn pid(mut self, pid: Pid) -> Self {
290        self.extra_pids.push(pid);
291        self
292    }
293
294    /// When `true`, emit every complete valid section, bypassing the version
295    /// gate's suppression (the gate is still updated). Default `false`.
296    #[must_use]
297    pub fn emit_repeats(mut self, on: bool) -> Self {
298        self.emit_repeats = on;
299        self
300    }
301
302    /// Maximum number of distinct gate keys retained. At capacity the gate
303    /// FIFO-evicts the oldest key. Default 65 536.
304    #[must_use]
305    pub fn gate_capacity(mut self, cap: usize) -> Self {
306        self.gate_capacity = cap;
307        self
308    }
309
310    /// Build the [`SiDemux`].
311    #[must_use]
312    pub fn build(self) -> SiDemux {
313        let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
314        if self.dvb_si_pids {
315            use crate::pid::well_known as wk;
316            for pid in [
317                wk::PAT,
318                wk::CAT,
319                wk::NIT,
320                wk::SDT_BAT,
321                wk::EIT,
322                wk::RST,
323                wk::TDT_TOT,
324                wk::SAT,
325            ] {
326                pids.entry(pid).or_default();
327            }
328        }
329        for p in self.extra_pids {
330            pids.entry(p).or_default();
331        }
332        SiDemux {
333            pids,
334            gate: HashMap::new(),
335            gate_order: VecDeque::new(),
336            cfg: Config {
337                follow_pat: self.follow_pat,
338                emit_repeats: self.emit_repeats,
339                gate_capacity: self.gate_capacity,
340            },
341            stats: Stats::default(),
342            scratch: Vec::new(),
343        }
344    }
345}
346
347/// PID-filtered, version-gated SI section demultiplexer.
348///
349/// See the [module docs](crate::demux) for the gate and CRC policies.
350pub struct SiDemux {
351    pids: HashMap<Pid, SectionReassembler>,
352    // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
353    // FxHash) would shave cycles at high section rates; revisit if profiling
354    // shows it.
355    gate: HashMap<u64, GateEntry>,
356    gate_order: VecDeque<u64>,
357    cfg: Config,
358    stats: Stats,
359    scratch: Vec<SectionEvent>,
360}
361
362impl SiDemux {
363    /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
364    #[must_use]
365    pub fn builder() -> SiDemuxBuilder {
366        SiDemuxBuilder::default()
367    }
368
369    /// Accumulated statistics.
370    #[must_use]
371    pub fn stats(&self) -> Stats {
372        self.stats
373    }
374
375    /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
376    /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
377    /// the changed sections this packet completed.
378    pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
379        self.scratch.clear();
380        self.stats.packets += 1;
381
382        match TsPacket::parse(packet) {
383            Err(_) => self.stats.malformed_packets += 1,
384            Ok(ts) => {
385                let pid = Pid::new(ts.header.pid);
386                let payload = ts.payload.unwrap_or(&[]);
387                // Single map lookup: feed and drain every section this packet
388                // completed while holding the reassembler borrow, then process
389                // them after the borrow ends (`consider` may insert new PMT
390                // PIDs into the map, so it cannot run while the borrow is live).
391                // A non-watched PID costs exactly one lookup and stops here;
392                // `completed` does not allocate until a section actually pops.
393                let mut completed: Vec<Bytes> = Vec::new();
394                if let Some(reasm) = self.pids.get_mut(&pid) {
395                    reasm.feed(payload, ts.header.pusi);
396                    while let Some(section) = reasm.pop_section() {
397                        completed.push(section);
398                    }
399                }
400                self.stats.sections_completed += completed.len() as u64;
401                for section in completed {
402                    self.consider(pid, section);
403                }
404            }
405        }
406
407        self.scratch.drain(..)
408    }
409
410    /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
411    fn consider(&mut self, pid: Pid, section: Bytes) {
412        // Guard: sub-3-byte sections cannot carry a header. The reassembler
413        // should never emit one (it needs >= 3 bytes to know `expected`), but
414        // guard defensively and count it as a CRC failure bucket — it is a
415        // structurally invalid section, dropped without emission.
416        if section.len() < MIN_SECTION_LEN {
417            self.stats.crc_failures += 1;
418            return;
419        }
420
421        let table_id = section[0];
422        let long_form = (section[1] & 0x80) != 0;
423        // The TOT is short-form by its SSI bit but uniquely carries a CRC.
424        let has_crc = long_form || table_id == TOT_TABLE_ID;
425
426        // CRC policy: validate CRC-bearing sections before gating.
427        if has_crc {
428            if section.len() < CRC_LEN {
429                self.stats.crc_failures += 1;
430                return;
431            }
432            let covered = &section[..section.len() - CRC_LEN];
433            let declared = u32::from_be_bytes([
434                section[section.len() - 4],
435                section[section.len() - 3],
436                section[section.len() - 2],
437                section[section.len() - 1],
438            ]);
439            let computed = dvb_common::crc32_mpeg2::compute(covered);
440            if computed != declared {
441                self.stats.crc_failures += 1;
442                return;
443            }
444        }
445
446        // Build the gate key and change detector.
447        let (ext, section_number, version, change_crc) =
448            if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
449                let ext = ((section[3] as u16) << 8) | section[4] as u16;
450                let version = (section[5] >> 1) & 0x1F;
451                let section_number = section[6];
452                // For long-form the trailing CRC already uniquely fingerprints the
453                // payload; reuse it as the change hash.
454                let crc = u32::from_be_bytes([
455                    section[section.len() - 4],
456                    section[section.len() - 3],
457                    section[section.len() - 2],
458                    section[section.len() - 1],
459                ]);
460                (ext, section_number, version, crc)
461            } else {
462                // Short-form (incl. TOT and any malformed long-form that slipped
463                // the size check above): no version, ext/section_number = 0,
464                // change detector is a CRC over all the section bytes.
465                (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(&section))
466            };
467
468        let key = (pid.value() as u64)
469            | ((table_id as u64) << 13)
470            | ((ext as u64) << 21)
471            | ((section_number as u64) << 37);
472
473        let entry = GateEntry {
474            version,
475            crc: change_crc,
476        };
477
478        let changed = match self.gate.get(&key) {
479            Some(prev) => *prev != entry,
480            None => true,
481        };
482
483        // Update the gate (FIFO-evict at capacity for newly-seen keys).
484        if !self.gate.contains_key(&key) {
485            if self.gate.len() >= self.cfg.gate_capacity {
486                if let Some(old) = self.gate_order.pop_front() {
487                    self.gate.remove(&old);
488                    self.stats.gate_evictions += 1;
489                }
490            }
491            self.gate_order.push_back(key);
492        }
493        self.gate.insert(key, entry);
494
495        if changed || self.cfg.emit_repeats {
496            let event = SectionEvent {
497                pid,
498                bytes: section,
499            };
500            // PAT-follow happens on an emitted (changed) PAT only.
501            if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
502                self.follow_pat(&event);
503            }
504            self.stats.emitted += 1;
505            self.scratch.push(event);
506        } else {
507            self.stats.suppressed += 1;
508        }
509    }
510
511    /// Parse the PAT and register each programme's PMT PID with a fresh
512    /// reassembler. Parse failures are silently ignored — a malformed PAT that
513    /// nonetheless passed CRC is implausible, and we never panic.
514    fn follow_pat(&mut self, event: &SectionEvent) {
515        use crate::tables::pat::PatSection;
516        use dvb_common::Parse;
517        if let Ok(pat) = PatSection::parse(&event.bytes) {
518            for entry in &pat.entries {
519                if entry.program_number != 0 {
520                    self.pids.entry(Pid::new(entry.pid)).or_default();
521                }
522            }
523        }
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530    use crate::ts::{TsHeader, TS_PACKET_SIZE};
531
532    /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
533    /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
534    fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
535        let mut pkt = [0xFFu8; TS_PACKET_SIZE];
536        let header = TsHeader {
537            tei: false,
538            pusi: true,
539            pid,
540            scrambling: 0,
541            has_adaptation: false,
542            has_payload: true,
543            continuity_counter: 0,
544        };
545        header.serialize_into(&mut pkt).unwrap();
546        pkt[4] = 0x00; // pointer_field
547        let start = 5;
548        assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
549        pkt[start..start + section.len()].copy_from_slice(section);
550        pkt
551    }
552
553    /// Build a long-form section with a correct trailing CRC-32.
554    fn long_section(
555        table_id: u8,
556        ext: u16,
557        version: u8,
558        section_number: u8,
559        payload: &[u8],
560    ) -> Vec<u8> {
561        let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
562        let mut v = vec![
563            table_id,
564            0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
565            (section_length & 0xFF) as u8,
566            (ext >> 8) as u8,
567            (ext & 0xFF) as u8,
568            0xC0 | ((version & 0x1F) << 1) | 0x01,
569            section_number,
570            section_number, // last_section_number
571        ];
572        v.extend_from_slice(payload);
573        let crc = dvb_common::crc32_mpeg2::compute(&v);
574        v.extend_from_slice(&crc.to_be_bytes());
575        v
576    }
577
578    /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
579    fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
580        let mut body = Vec::new();
581        for &(pn, pid) in entries {
582            body.extend_from_slice(&pn.to_be_bytes());
583            body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
584            body.push((pid & 0xFF) as u8);
585        }
586        long_section(0x00, tsid, version, 0, &body)
587    }
588
589    /// Build a PMT section (real CRC). One stream entry.
590    fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
591        // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
592        // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
593        let body = [
594            0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
595            (pcr_pid & 0xFF) as u8,
596            0xF0,
597            0x00,
598            0x02,
599            0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
600            ((pcr_pid + 1) & 0xFF) as u8,
601            0xF0,
602            0x00,
603        ];
604        long_section(0x02, program_number, version, 0, &body)
605    }
606
607    #[test]
608    fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
609        let mut demux = SiDemux::builder().build();
610
611        let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
612        let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
613
614        let pkt_v0 = ts_packet(0x0000, &pat_v0);
615        let pkt_v1 = ts_packet(0x0000, &pat_v1);
616
617        let n0: Vec<_> = demux.feed(&pkt_v0).collect();
618        assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
619        assert_eq!(n0[0].table_id(), 0x00);
620        assert_eq!(n0[0].version(), Some(0));
621
622        let n1: Vec<_> = demux.feed(&pkt_v0).collect();
623        assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
624
625        let n2: Vec<_> = demux.feed(&pkt_v1).collect();
626        assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
627        assert_eq!(n2[0].version(), Some(1));
628
629        let s = demux.stats();
630        assert_eq!(s.sections_completed, 3);
631        assert_eq!(s.emitted, 2);
632        assert_eq!(s.suppressed, 1);
633        assert_eq!(s.crc_failures, 0);
634    }
635
636    #[test]
637    fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
638        use crate::tables::AnyTableSection;
639        let mut demux = SiDemux::builder().build();
640
641        // PAT maps programme 1 -> PMT on PID 0x0100.
642        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
643        let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
644        assert_eq!(pat_evts.len(), 1);
645
646        // Before follow, a PMT packet on 0x0100 would be ignored. After the
647        // PAT was emitted, 0x0100 is watched.
648        let pmt = pmt_section(1, 0, 0x0100);
649        let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
650        assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
651        assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
652        match pmt_evts[0].table_section().unwrap() {
653            AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
654            other => panic!("expected PmtSection, got {other:?}"),
655        }
656    }
657
658    #[test]
659    fn corrupted_crc_sdt_dropped_and_counted() {
660        let mut demux = SiDemux::builder().build();
661        // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
662        let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
663        // Corrupt a payload byte AFTER the CRC was computed.
664        sdt[8] ^= 0xFF;
665        let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
666        assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
667        let s = demux.stats();
668        assert_eq!(s.crc_failures, 1);
669        assert_eq!(s.emitted, 0);
670        assert_eq!(s.sections_completed, 1);
671    }
672
673    #[test]
674    fn gate_capacity_evicts_fifo_and_reemits() {
675        let mut demux = SiDemux::builder().gate_capacity(2).build();
676
677        // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
678        // all on the EIT pid 0x0012.
679        let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
680        let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
681        let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
682
683        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
684        assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
685        // Inserting c evicts a (the oldest).
686        assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
687        assert_eq!(demux.stats().gate_evictions, 1);
688
689        // a was evicted -> re-feeding it re-emits (treated as newly seen).
690        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
691    }
692
693    #[test]
694    fn garbage_packet_counted_no_panic() {
695        let mut demux = SiDemux::builder().build();
696        let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
697        let evts: Vec<_> = demux.feed(&garbage).collect();
698        assert_eq!(evts.len(), 0);
699        assert_eq!(demux.stats().malformed_packets, 1);
700        assert_eq!(demux.stats().packets, 1);
701    }
702
703    #[test]
704    fn emit_repeats_bypasses_suppression() {
705        let mut demux = SiDemux::builder().emit_repeats(true).build();
706        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
707        let pkt = ts_packet(0x0000, &pat);
708        assert_eq!(demux.feed(&pkt).count(), 1);
709        assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
710        assert_eq!(demux.stats().suppressed, 0);
711        assert_eq!(demux.stats().emitted, 2);
712    }
713
714    #[test]
715    fn table_section_with_empty_registry_matches_table_section() {
716        use crate::tables::registry::TableRegistry;
717        use crate::tables::AnyTableSection;
718
719        let mut demux = SiDemux::builder().build();
720        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
721        let evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
722        assert_eq!(evts.len(), 1);
723
724        let reg = TableRegistry::new();
725        let with_reg = evts[0].table_section_with(&reg).unwrap();
726        let without = evts[0].table_section().unwrap();
727        assert!(matches!(with_reg, AnyTableSection::PatSection(_)));
728        assert!(matches!(without, AnyTableSection::PatSection(_)));
729    }
730
731    #[test]
732    fn table_section_with_custom_registry_yields_other() {
733        use crate::tables::registry::TableRegistry;
734        use crate::tables::AnyTableSection;
735        use crate::traits::TableDef;
736        use dvb_common::Parse;
737
738        const PRIVATE_TID: u8 = 0x90;
739
740        #[derive(Debug)]
741        #[cfg_attr(feature = "serde", derive(serde::Serialize))]
742        struct PrivateTable {
743            table_id: u8,
744        }
745
746        impl<'a> Parse<'a> for PrivateTable {
747            type Error = crate::Error;
748            fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
749                if bytes.is_empty() {
750                    return Err(crate::Error::BufferTooShort {
751                        need: 1,
752                        have: 0,
753                        what: "PrivateTable",
754                    });
755                }
756                Ok(Self { table_id: bytes[0] })
757            }
758        }
759
760        impl<'a> TableDef<'a> for PrivateTable {
761            const TABLE_ID_RANGES: &'static [(u8, u8)] = &[(PRIVATE_TID, PRIVATE_TID)];
762            const NAME: &'static str = "PRIVATE_TABLE";
763        }
764
765        let mut reg = TableRegistry::new();
766        reg.register::<PrivateTable>();
767
768        let mut demux = SiDemux::builder()
769            .dvb_si_pids(false)
770            .pid(Pid::new(0x0200))
771            .build();
772
773        let section = long_section(PRIVATE_TID, 0x0001, 0, 0, &[0x42]);
774        let evts: Vec<_> = demux.feed(&ts_packet(0x0200, &section)).collect();
775        assert_eq!(evts.len(), 1);
776
777        let result = evts[0].table_section_with(&reg).unwrap();
778        match result {
779            AnyTableSection::Other {
780                table_id,
781                ref value,
782            } => {
783                assert_eq!(table_id, PRIVATE_TID);
784                let pt = value.downcast_ref::<PrivateTable>().unwrap();
785                assert_eq!(pt.table_id, PRIVATE_TID);
786            }
787            other => panic!("expected Other, got {other:?}"),
788        }
789    }
790}