Skip to main content

dvb_si/
demux.rs

1//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
2//!
3//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
4//! iterator of [`SectionEvent`]s — one per **changed** complete section.
5//! The demux reassembles sections per PID (via
6//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
7//! sections, and suppresses repeats through a version gate so that a steady
8//! carousel of unchanging tables produces no events after the first.
9//!
10//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
11//! cheap to clone; typed views ([`SectionEvent::table_section`],
12//! [`SectionEvent::parse`]) borrow the event lazily.
13//!
14//! ```
15//! use dvb_common::Serialize;
16//! use dvb_si::demux::SiDemux;
17//! use dvb_si::tables::AnyTableSection;
18//! use dvb_si::tables::pat::{PatSection, PatEntry};
19//!
20//! const PMT_PID: u16 = 0x0100;
21//! const TS_SYNC_BYTE: u8 = 0x47;
22//! const PAYLOAD_UNIT_START_INDICATOR: u8 = 0x40;
23//! const PAT_PID_LOW_BYTE: u8 = 0x00;
24//! const PAYLOAD_ONLY: u8 = 0x10;
25//! const POINTER_FIELD_START: u8 = 0x00;
26//! const STUFFING_BYTE: u8 = 0xFF;
27//!
28//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
29//! // example is self-contained. In real code `packet` comes from your source.
30//! let pat = PatSection {
31//!     transport_stream_id: 1, version_number: 0, current_next_indicator: true,
32//!     section_number: 0, last_section_number: 0,
33//!     entries: vec![PatEntry { program_number: 1, pid: PMT_PID }],
34//! };
35//! let mut section = vec![0u8; pat.serialized_len()];
36//! pat.serialize_into(&mut section).unwrap();
37//! let mut packet = [STUFFING_BYTE; 188];
38//! packet[0] = TS_SYNC_BYTE;
39//! packet[1] = PAYLOAD_UNIT_START_INDICATOR;
40//! packet[2] = PAT_PID_LOW_BYTE;
41//! packet[3] = PAYLOAD_ONLY;
42//! packet[4] = POINTER_FIELD_START;
43//! packet[5..5 + section.len()].copy_from_slice(&section);
44//!
45//! let mut demux = SiDemux::builder().build();
46//! let events: Vec<_> = demux.feed(&packet).collect();
47//! assert_eq!(events.len(), 1);
48//! match events[0].table_section() {
49//!     Ok(AnyTableSection::PatSection(pat)) => {
50//!         println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
51//!         assert_eq!(pat.entries[0].pid, PMT_PID);
52//!     }
53//!     other => panic!("expected PAT, got {other:?}"),
54//! }
55//! ```
56//!
57//! # Version gate
58//!
59//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
60//! into a `u64` key. The stored value is a change detector:
61//!
62//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
63//!   exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
64//!   gate stores `(version, crc32)`. A repeat with the same version *and* CRC
65//!   is suppressed.
66//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
67//!   the gate stores a CRC-32 *computed over the whole section* purely as a
68//!   change hash. `table_id_extension` and `section_number` collapse to 0 in
69//!   the key.
70//!
71//! # CRC policy
72//!
73//! CRC-bearing sections (every long-form section, plus the short-form TOT
74//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
75//! before gating. Failures are dropped and counted in
76//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
77//! TDT carries no CRC and is therefore never dropped for CRC reasons.
78
79use std::collections::{HashMap, VecDeque};
80
81use bytes::Bytes;
82
83use crate::pid::Pid;
84use crate::ts::{SectionReassembler, TsPacket};
85
86/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
87const PAT_TABLE_ID: u8 = 0x00;
88/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
89const TOT_TABLE_ID: u8 = 0x73;
90/// Minimum bytes required to read a section header (table_id + length field).
91const MIN_SECTION_LEN: usize = 3;
92/// Long-form extension header bytes (after the 3-byte common header).
93const LONG_FORM_EXTRA: usize = 5;
94/// Trailing CRC-32 length.
95const CRC_LEN: usize = 4;
96
97/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
98///
99/// A `SectionEvent` is only ever constructed for a section that
100/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
101/// validation. So [`SectionEvent::crc_ok`] is always `true` and
102/// [`SectionEvent::table_id`] never panics.
103#[derive(Debug, Clone)]
104pub struct SectionEvent {
105    pid: Pid,
106    bytes: Bytes,
107}
108
109impl SectionEvent {
110    /// PID this section was carried on.
111    #[must_use]
112    pub fn pid(&self) -> Pid {
113        self.pid
114    }
115
116    /// The full section bytes (header included, CRC included if present).
117    #[must_use]
118    pub fn bytes(&self) -> &Bytes {
119        &self.bytes
120    }
121
122    /// The `table_id` (byte 0). Never panics — events are only built for
123    /// sections of at least 3 bytes.
124    #[must_use]
125    pub fn table_id(&self) -> u8 {
126        self.bytes[0]
127    }
128
129    /// True when this section uses the long-form syntax
130    /// (`section_syntax_indicator == 1`).
131    #[must_use]
132    fn is_long_form(&self) -> bool {
133        (self.bytes[1] & 0x80) != 0
134    }
135
136    /// 5-bit `version_number`, or `None` for short-form sections (which carry
137    /// no version field). Note the TOT, despite being short-form, has no
138    /// version field either, so this is `None` for it.
139    #[must_use]
140    pub fn version(&self) -> Option<u8> {
141        if self.is_long_form() && self.bytes.len() > 5 {
142            Some((self.bytes[5] >> 1) & 0x1F)
143        } else {
144            None
145        }
146    }
147
148    /// 16-bit `table_id_extension`, or `None` for short-form sections.
149    #[must_use]
150    pub fn table_id_extension(&self) -> Option<u16> {
151        if self.is_long_form() && self.bytes.len() > 4 {
152            Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
153        } else {
154            None
155        }
156    }
157
158    /// `section_number`, or `None` for short-form sections.
159    #[must_use]
160    pub fn section_number(&self) -> Option<u8> {
161        if self.is_long_form() && self.bytes.len() > 6 {
162            Some(self.bytes[6])
163        } else {
164            None
165        }
166    }
167
168    /// Always `true`: events are emitted only after CRC validation (or for
169    /// CRC-less short-form sections, where there is nothing to validate).
170    #[must_use]
171    pub fn crc_ok(&self) -> bool {
172        true
173    }
174
175    /// Typed table-section view (lazy, borrows this event's bytes).
176    ///
177    /// # Errors
178    /// Propagates the parse error from the dispatched table-section type.
179    pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
180        crate::tables::AnyTableSection::parse(&self.bytes)
181    }
182
183    /// Typed table-section view with custom-registry support (lazy, borrows
184    /// this event's bytes).
185    ///
186    /// Precedence: custom-registered parser → built-in dispatch → Unknown.
187    /// See [`AnyTableSection::parse_with`][crate::tables::AnyTableSection::parse_with].
188    ///
189    /// # Errors
190    /// Propagates the parse error from the dispatched table-section type.
191    pub fn table_section_with(
192        &self,
193        registry: &crate::tables::registry::TableRegistry,
194    ) -> crate::Result<crate::tables::AnyTableSection<'_>> {
195        crate::tables::AnyTableSection::parse_with(registry, &self.bytes)
196    }
197
198    /// Type-keyed view: `event.parse::<EitSection>()`.
199    ///
200    /// # Errors
201    /// Propagates `T::parse` errors.
202    pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
203        <T as dvb_common::Parse>::parse(&self.bytes)
204    }
205}
206
207/// Section statistics, monotonically accumulated across `feed` calls.
208#[non_exhaustive]
209#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
210pub struct Stats {
211    /// TS packets fed (every `feed` call increments this).
212    pub packets: u64,
213    /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
214    pub sections_completed: u64,
215    /// Sections emitted as events (changed, valid).
216    pub emitted: u64,
217    /// Sections suppressed by the version gate (unchanged repeats).
218    pub suppressed: u64,
219    /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
220    /// reassembler) and CRC-failed sections share this counter. Sections are
221    /// dropped before emission; the gate is never updated for them.
222    pub crc_failures: u64,
223    /// TS packets that failed to parse (bad sync byte, too short).
224    pub malformed_packets: u64,
225    /// Gate entries evicted because the gate was at capacity.
226    pub gate_evictions: u64,
227}
228
229/// What the gate remembers for one key, to decide "changed?".
230#[derive(Clone, Copy, PartialEq, Eq)]
231struct GateEntry {
232    /// Long-form version_number, or 0 for short-form (unused there).
233    version: u8,
234    /// CRC-32 over the whole section — the change hash. For long-form this is
235    /// the trailing CRC; for short-form it is computed over all bytes.
236    crc: u32,
237}
238
239/// Configuration captured by [`SiDemuxBuilder`].
240struct Config {
241    follow_pat: bool,
242    emit_repeats: bool,
243    gate_capacity: usize,
244}
245
246/// Builder for [`SiDemux`].
247///
248/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
249/// `emit_repeats = false`, `gate_capacity = 65_536`.
250pub struct SiDemuxBuilder {
251    follow_pat: bool,
252    dvb_si_pids: bool,
253    emit_repeats: bool,
254    gate_capacity: usize,
255    extra_pids: Vec<Pid>,
256}
257
258impl Default for SiDemuxBuilder {
259    fn default() -> Self {
260        Self {
261            follow_pat: true,
262            dvb_si_pids: true,
263            emit_repeats: false,
264            gate_capacity: 65_536,
265            extra_pids: Vec::new(),
266        }
267    }
268}
269
270impl SiDemuxBuilder {
271    /// When `true` (default), an emitted (changed) PAT auto-adds each
272    /// programme's PMT PID to the watch set.
273    #[must_use]
274    pub fn follow_pat(mut self, on: bool) -> Self {
275        self.follow_pat = on;
276        self
277    }
278
279    /// When `true` (default), pre-populate the watch set with the well-known
280    /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
281    #[must_use]
282    pub fn dvb_si_pids(mut self, on: bool) -> Self {
283        self.dvb_si_pids = on;
284        self
285    }
286
287    /// Add a PID to the watch set (additive; may be called repeatedly).
288    #[must_use]
289    pub fn pid(mut self, pid: Pid) -> Self {
290        self.extra_pids.push(pid);
291        self
292    }
293
294    /// When `true`, emit every complete valid section, bypassing the version
295    /// gate's suppression (the gate is still updated). Default `false`.
296    #[must_use]
297    pub fn emit_repeats(mut self, on: bool) -> Self {
298        self.emit_repeats = on;
299        self
300    }
301
302    /// Maximum number of distinct gate keys retained. At capacity the gate
303    /// FIFO-evicts the oldest key. Default 65 536.
304    #[must_use]
305    pub fn gate_capacity(mut self, cap: usize) -> Self {
306        self.gate_capacity = cap;
307        self
308    }
309
310    /// Build the [`SiDemux`].
311    #[must_use]
312    pub fn build(self) -> SiDemux {
313        let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
314        if self.dvb_si_pids {
315            use crate::pid::well_known as wk;
316            for pid in [
317                wk::PAT,
318                wk::CAT,
319                wk::NIT,
320                wk::SDT_BAT,
321                wk::EIT,
322                wk::RST,
323                wk::TDT_TOT,
324                wk::SAT,
325            ] {
326                pids.entry(pid).or_default();
327            }
328        }
329        for p in self.extra_pids {
330            pids.entry(p).or_default();
331        }
332        SiDemux {
333            pids,
334            gate: HashMap::new(),
335            gate_order: VecDeque::new(),
336            cfg: Config {
337                follow_pat: self.follow_pat,
338                emit_repeats: self.emit_repeats,
339                gate_capacity: self.gate_capacity,
340            },
341            stats: Stats::default(),
342            scratch: Vec::new(),
343            completed_scratch: Vec::new(),
344        }
345    }
346}
347
348/// PID-filtered, version-gated SI section demultiplexer.
349///
350/// See the [module docs](crate::demux) for the gate and CRC policies.
351pub struct SiDemux {
352    pids: HashMap<Pid, SectionReassembler>,
353    // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
354    // FxHash) would shave cycles at high section rates; revisit if profiling
355    // shows it.
356    gate: HashMap<u64, GateEntry>,
357    gate_order: VecDeque<u64>,
358    cfg: Config,
359    stats: Stats,
360    scratch: Vec<SectionEvent>,
361    completed_scratch: Vec<Bytes>,
362}
363
364impl SiDemux {
365    /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
366    #[must_use]
367    pub fn builder() -> SiDemuxBuilder {
368        SiDemuxBuilder::default()
369    }
370
371    /// Accumulated statistics.
372    #[must_use]
373    pub fn stats(&self) -> Stats {
374        self.stats
375    }
376
377    /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
378    /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
379    /// the changed sections this packet completed.
380    pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
381        self.scratch.clear();
382        self.stats.packets += 1;
383
384        match TsPacket::parse(packet) {
385            Err(_) => self.stats.malformed_packets += 1,
386            Ok(ts) => {
387                let pid = Pid::new(ts.header.pid);
388                let payload = ts.payload.unwrap_or(&[]);
389                // Single map lookup: feed and drain every section this packet
390                // completed while holding the reassembler borrow, then process
391                // them after the borrow ends (`consider` may insert new PMT
392                // PIDs into the map, so it cannot run while the borrow is live).
393                // A non-watched PID costs exactly one lookup and stops here;
394                // `completed` does not allocate until a section actually pops.
395                let mut completed = std::mem::take(&mut self.completed_scratch);
396                if let Some(reasm) = self.pids.get_mut(&pid) {
397                    reasm.feed(payload, ts.header.pusi);
398                    while let Some(section) = reasm.pop_section() {
399                        completed.push(section);
400                    }
401                }
402                self.stats.sections_completed += completed.len() as u64;
403                for section in completed.drain(..) {
404                    self.consider(pid, section);
405                }
406                self.completed_scratch = completed;
407            }
408        }
409
410        self.scratch.drain(..)
411    }
412
413    /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
414    fn consider(&mut self, pid: Pid, section: Bytes) {
415        // Guard: sub-3-byte sections cannot carry a header. The reassembler
416        // should never emit one (it needs >= 3 bytes to know `expected`), but
417        // guard defensively and count it as a CRC failure bucket — it is a
418        // structurally invalid section, dropped without emission.
419        if section.len() < MIN_SECTION_LEN {
420            self.stats.crc_failures += 1;
421            return;
422        }
423
424        let table_id = section[0];
425        let long_form = (section[1] & 0x80) != 0;
426        // The TOT is short-form by its SSI bit but uniquely carries a CRC.
427        let has_crc = long_form || table_id == TOT_TABLE_ID;
428
429        // CRC policy: validate CRC-bearing sections before gating.
430        if has_crc {
431            if section.len() < CRC_LEN {
432                self.stats.crc_failures += 1;
433                return;
434            }
435            let covered = &section[..section.len() - CRC_LEN];
436            let declared = u32::from_be_bytes([
437                section[section.len() - 4],
438                section[section.len() - 3],
439                section[section.len() - 2],
440                section[section.len() - 1],
441            ]);
442            let computed = dvb_common::crc32_mpeg2::compute(covered);
443            if computed != declared {
444                self.stats.crc_failures += 1;
445                return;
446            }
447        }
448
449        // Build the gate key and change detector.
450        let (ext, section_number, version, change_crc) =
451            if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
452                let ext = ((section[3] as u16) << 8) | section[4] as u16;
453                let version = (section[5] >> 1) & 0x1F;
454                let section_number = section[6];
455                // For long-form the trailing CRC already uniquely fingerprints the
456                // payload; reuse it as the change hash.
457                let crc = u32::from_be_bytes([
458                    section[section.len() - 4],
459                    section[section.len() - 3],
460                    section[section.len() - 2],
461                    section[section.len() - 1],
462                ]);
463                (ext, section_number, version, crc)
464            } else {
465                // Short-form (incl. TOT and any malformed long-form that slipped
466                // the size check above): no version, ext/section_number = 0,
467                // change detector is a CRC over all the section bytes.
468                (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(&section))
469            };
470
471        let key = (pid.value() as u64)
472            | ((table_id as u64) << 13)
473            | ((ext as u64) << 21)
474            | ((section_number as u64) << 37);
475
476        let entry = GateEntry {
477            version,
478            crc: change_crc,
479        };
480
481        let changed = match self.gate.get(&key) {
482            Some(prev) => *prev != entry,
483            None => true,
484        };
485
486        // Update the gate (FIFO-evict at capacity for newly-seen keys).
487        let is_new = !self.gate.contains_key(&key);
488        if is_new && self.gate.len() >= self.cfg.gate_capacity {
489            if let Some(old) = self.gate_order.pop_front() {
490                self.gate.remove(&old);
491                self.stats.gate_evictions += 1;
492            }
493        }
494        if is_new {
495            self.gate_order.push_back(key);
496        }
497        match self.gate.entry(key) {
498            std::collections::hash_map::Entry::Occupied(mut oe) => {
499                *oe.get_mut() = entry;
500            }
501            std::collections::hash_map::Entry::Vacant(ve) => {
502                ve.insert(entry);
503            }
504        }
505
506        if changed || self.cfg.emit_repeats {
507            let event = SectionEvent {
508                pid,
509                bytes: section,
510            };
511            // PAT-follow happens on an emitted (changed) PAT only.
512            if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
513                self.follow_pat(&event);
514            }
515            self.stats.emitted += 1;
516            self.scratch.push(event);
517        } else {
518            self.stats.suppressed += 1;
519        }
520    }
521
522    /// Parse the PAT and register each programme's PMT PID with a fresh
523    /// reassembler. Parse failures are silently ignored — a malformed PAT that
524    /// nonetheless passed CRC is implausible, and we never panic.
525    fn follow_pat(&mut self, event: &SectionEvent) {
526        use crate::tables::pat::PatSection;
527        use dvb_common::Parse;
528        if let Ok(pat) = PatSection::parse(&event.bytes) {
529            for entry in &pat.entries {
530                if entry.program_number != 0 {
531                    self.pids.entry(Pid::new(entry.pid)).or_default();
532                }
533            }
534        }
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541    use crate::ts::{TsHeader, TS_PACKET_SIZE};
542
543    /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
544    /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
545    fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
546        let mut pkt = [0xFFu8; TS_PACKET_SIZE];
547        let header = TsHeader {
548            tei: false,
549            pusi: true,
550            pid,
551            scrambling: 0,
552            has_adaptation: false,
553            has_payload: true,
554            continuity_counter: 0,
555        };
556        header.serialize_into(&mut pkt).unwrap();
557        pkt[4] = 0x00; // pointer_field
558        let start = 5;
559        assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
560        pkt[start..start + section.len()].copy_from_slice(section);
561        pkt
562    }
563
564    /// Build a long-form section with a correct trailing CRC-32.
565    fn long_section(
566        table_id: u8,
567        ext: u16,
568        version: u8,
569        section_number: u8,
570        payload: &[u8],
571    ) -> Vec<u8> {
572        let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
573        let mut v = vec![
574            table_id,
575            0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
576            (section_length & 0xFF) as u8,
577            (ext >> 8) as u8,
578            (ext & 0xFF) as u8,
579            0xC0 | ((version & 0x1F) << 1) | 0x01,
580            section_number,
581            section_number, // last_section_number
582        ];
583        v.extend_from_slice(payload);
584        let crc = dvb_common::crc32_mpeg2::compute(&v);
585        v.extend_from_slice(&crc.to_be_bytes());
586        v
587    }
588
589    /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
590    fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
591        let mut body = Vec::new();
592        for &(pn, pid) in entries {
593            body.extend_from_slice(&pn.to_be_bytes());
594            body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
595            body.push((pid & 0xFF) as u8);
596        }
597        long_section(0x00, tsid, version, 0, &body)
598    }
599
600    /// Build a PMT section (real CRC). One stream entry.
601    fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
602        // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
603        // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
604        let body = [
605            0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
606            (pcr_pid & 0xFF) as u8,
607            0xF0,
608            0x00,
609            0x02,
610            0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
611            ((pcr_pid + 1) & 0xFF) as u8,
612            0xF0,
613            0x00,
614        ];
615        long_section(0x02, program_number, version, 0, &body)
616    }
617
618    #[test]
619    fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
620        let mut demux = SiDemux::builder().build();
621
622        let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
623        let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
624
625        let pkt_v0 = ts_packet(0x0000, &pat_v0);
626        let pkt_v1 = ts_packet(0x0000, &pat_v1);
627
628        let n0: Vec<_> = demux.feed(&pkt_v0).collect();
629        assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
630        assert_eq!(n0[0].table_id(), 0x00);
631        assert_eq!(n0[0].version(), Some(0));
632
633        let n1: Vec<_> = demux.feed(&pkt_v0).collect();
634        assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
635
636        let n2: Vec<_> = demux.feed(&pkt_v1).collect();
637        assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
638        assert_eq!(n2[0].version(), Some(1));
639
640        let s = demux.stats();
641        assert_eq!(s.sections_completed, 3);
642        assert_eq!(s.emitted, 2);
643        assert_eq!(s.suppressed, 1);
644        assert_eq!(s.crc_failures, 0);
645    }
646
647    #[test]
648    fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
649        use crate::tables::AnyTableSection;
650        let mut demux = SiDemux::builder().build();
651
652        // PAT maps programme 1 -> PMT on PID 0x0100.
653        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
654        let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
655        assert_eq!(pat_evts.len(), 1);
656
657        // Before follow, a PMT packet on 0x0100 would be ignored. After the
658        // PAT was emitted, 0x0100 is watched.
659        let pmt = pmt_section(1, 0, 0x0100);
660        let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
661        assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
662        assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
663        match pmt_evts[0].table_section().unwrap() {
664            AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
665            other => panic!("expected PmtSection, got {other:?}"),
666        }
667    }
668
669    #[test]
670    fn corrupted_crc_sdt_dropped_and_counted() {
671        let mut demux = SiDemux::builder().build();
672        // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
673        let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
674        // Corrupt a payload byte AFTER the CRC was computed.
675        sdt[8] ^= 0xFF;
676        let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
677        assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
678        let s = demux.stats();
679        assert_eq!(s.crc_failures, 1);
680        assert_eq!(s.emitted, 0);
681        assert_eq!(s.sections_completed, 1);
682    }
683
684    #[test]
685    fn gate_capacity_evicts_fifo_and_reemits() {
686        let mut demux = SiDemux::builder().gate_capacity(2).build();
687
688        // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
689        // all on the EIT pid 0x0012.
690        let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
691        let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
692        let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
693
694        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
695        assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
696        // Inserting c evicts a (the oldest).
697        assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
698        assert_eq!(demux.stats().gate_evictions, 1);
699
700        // a was evicted -> re-feeding it re-emits (treated as newly seen).
701        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
702    }
703
704    #[test]
705    fn garbage_packet_counted_no_panic() {
706        let mut demux = SiDemux::builder().build();
707        let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
708        let evts: Vec<_> = demux.feed(&garbage).collect();
709        assert_eq!(evts.len(), 0);
710        assert_eq!(demux.stats().malformed_packets, 1);
711        assert_eq!(demux.stats().packets, 1);
712    }
713
714    #[test]
715    fn emit_repeats_bypasses_suppression() {
716        let mut demux = SiDemux::builder().emit_repeats(true).build();
717        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
718        let pkt = ts_packet(0x0000, &pat);
719        assert_eq!(demux.feed(&pkt).count(), 1);
720        assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
721        assert_eq!(demux.stats().suppressed, 0);
722        assert_eq!(demux.stats().emitted, 2);
723    }
724
725    #[test]
726    fn table_section_with_empty_registry_matches_table_section() {
727        use crate::tables::registry::TableRegistry;
728        use crate::tables::AnyTableSection;
729
730        let mut demux = SiDemux::builder().build();
731        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
732        let evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
733        assert_eq!(evts.len(), 1);
734
735        let reg = TableRegistry::new();
736        let with_reg = evts[0].table_section_with(&reg).unwrap();
737        let without = evts[0].table_section().unwrap();
738        assert!(matches!(with_reg, AnyTableSection::PatSection(_)));
739        assert!(matches!(without, AnyTableSection::PatSection(_)));
740    }
741
742    #[test]
743    fn table_section_with_custom_registry_yields_other() {
744        use crate::tables::registry::TableRegistry;
745        use crate::tables::AnyTableSection;
746        use crate::traits::TableDef;
747        use dvb_common::Parse;
748
749        const PRIVATE_TID: u8 = 0x90;
750
751        #[derive(Debug)]
752        #[cfg_attr(feature = "serde", derive(serde::Serialize))]
753        struct PrivateTable {
754            table_id: u8,
755        }
756
757        impl<'a> Parse<'a> for PrivateTable {
758            type Error = crate::Error;
759            fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
760                if bytes.is_empty() {
761                    return Err(crate::Error::BufferTooShort {
762                        need: 1,
763                        have: 0,
764                        what: "PrivateTable",
765                    });
766                }
767                Ok(Self { table_id: bytes[0] })
768            }
769        }
770
771        impl<'a> TableDef<'a> for PrivateTable {
772            const TABLE_ID_RANGES: &'static [(u8, u8)] = &[(PRIVATE_TID, PRIVATE_TID)];
773            const NAME: &'static str = "PRIVATE_TABLE";
774        }
775
776        let mut reg = TableRegistry::new();
777        reg.register::<PrivateTable>();
778
779        let mut demux = SiDemux::builder()
780            .dvb_si_pids(false)
781            .pid(Pid::new(0x0200))
782            .build();
783
784        let section = long_section(PRIVATE_TID, 0x0001, 0, 0, &[0x42]);
785        let evts: Vec<_> = demux.feed(&ts_packet(0x0200, &section)).collect();
786        assert_eq!(evts.len(), 1);
787
788        let result = evts[0].table_section_with(&reg).unwrap();
789        match result {
790            AnyTableSection::Other {
791                table_id,
792                ref value,
793            } => {
794                assert_eq!(table_id, PRIVATE_TID);
795                let pt = value.downcast_ref::<PrivateTable>().unwrap();
796                assert_eq!(pt.table_id, PRIVATE_TID);
797            }
798            other => panic!("expected Other, got {other:?}"),
799        }
800    }
801}