Skip to main content

dvb_si/
demux.rs

1//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
2//!
3//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
4//! iterator of [`SectionEvent`]s — one per **changed** complete section.
5//! The demux reassembles sections per PID (via
6//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
7//! sections, and suppresses repeats through a version gate so that a steady
8//! carousel of unchanging tables produces no events after the first.
9//!
10//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
11//! cheap to clone; typed views ([`SectionEvent::table_section`],
12//! [`SectionEvent::parse`]) borrow the event lazily.
13//!
14//! ```
15//! use dvb_common::Serialize;
16//! use dvb_si::demux::SiDemux;
17//! use dvb_si::tables::AnyTableSection;
18//! use dvb_si::tables::pat::{PatSection, PatEntry};
19//!
20//! const PMT_PID: u16 = 0x0100;
21//! const TS_SYNC_BYTE: u8 = 0x47;
22//! const PAYLOAD_UNIT_START_INDICATOR: u8 = 0x40;
23//! const PAT_PID_LOW_BYTE: u8 = 0x00;
24//! const PAYLOAD_ONLY: u8 = 0x10;
25//! const POINTER_FIELD_START: u8 = 0x00;
26//! const STUFFING_BYTE: u8 = 0xFF;
27//!
28//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
29//! // example is self-contained. In real code `packet` comes from your source.
30//! let pat = PatSection {
31//!     transport_stream_id: 1, version_number: 0, current_next_indicator: true,
32//!     section_number: 0, last_section_number: 0,
33//!     entries: vec![PatEntry { program_number: 1, pid: PMT_PID }],
34//! };
35//! let mut section = vec![0u8; pat.serialized_len()];
36//! pat.serialize_into(&mut section).unwrap();
37//! let mut packet = [STUFFING_BYTE; 188];
38//! packet[0] = TS_SYNC_BYTE;
39//! packet[1] = PAYLOAD_UNIT_START_INDICATOR;
40//! packet[2] = PAT_PID_LOW_BYTE;
41//! packet[3] = PAYLOAD_ONLY;
42//! packet[4] = POINTER_FIELD_START;
43//! packet[5..5 + section.len()].copy_from_slice(&section);
44//!
45//! let mut demux = SiDemux::builder().build();
46//! let events: Vec<_> = demux.feed(&packet).collect();
47//! assert_eq!(events.len(), 1);
48//! match events[0].table_section() {
49//!     Ok(AnyTableSection::PatSection(pat)) => {
50//!         println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
51//!         assert_eq!(pat.entries[0].pid, PMT_PID);
52//!     }
53//!     other => panic!("expected PAT, got {other:?}"),
54//! }
55//! ```
56//!
57//! # Version gate
58//!
59//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
60//! into a `u64` key. The stored value is a change detector:
61//!
62//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
63//!   exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
64//!   gate stores `(version, crc32)`. A repeat with the same version *and* CRC
65//!   is suppressed.
66//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
67//!   the gate stores a CRC-32 *computed over the whole section* purely as a
68//!   change hash. `table_id_extension` and `section_number` collapse to 0 in
69//!   the key.
70//!
71//! # CRC policy
72//!
73//! CRC-bearing sections (every long-form section, plus the short-form TOT
74//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
75//! before gating. Failures are dropped and counted in
76//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
77//! TDT carries no CRC and is therefore never dropped for CRC reasons.
78
79use alloc::collections::{BTreeMap, VecDeque};
80use alloc::vec::Vec;
81
82use bytes::Bytes;
83
84use crate::pid::Pid;
85use crate::ts::{SectionReassembler, TsPacket};
86
87/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
88const PAT_TABLE_ID: u8 = 0x00;
89/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
90const TOT_TABLE_ID: u8 = 0x73;
91/// Minimum bytes required to read a section header (table_id + length field).
92const MIN_SECTION_LEN: usize = 3;
93/// Long-form extension header bytes (after the 3-byte common header).
94const LONG_FORM_EXTRA: usize = 5;
95/// Trailing CRC-32 length.
96const CRC_LEN: usize = 4;
97
98/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
99///
100/// A `SectionEvent` is only ever constructed for a section that
101/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
102/// validation. So [`SectionEvent::crc_ok`] is always `true` and
103/// [`SectionEvent::table_id`] never panics.
104#[derive(Debug, Clone)]
105pub struct SectionEvent {
106    pid: Pid,
107    bytes: Bytes,
108}
109
110impl SectionEvent {
111    /// PID this section was carried on.
112    #[must_use]
113    pub fn pid(&self) -> Pid {
114        self.pid
115    }
116
117    /// The full section bytes (header included, CRC included if present).
118    #[must_use]
119    pub fn bytes(&self) -> &Bytes {
120        &self.bytes
121    }
122
123    /// The `table_id` (byte 0). Never panics — events are only built for
124    /// sections of at least 3 bytes.
125    #[must_use]
126    pub fn table_id(&self) -> u8 {
127        self.bytes[0]
128    }
129
130    /// True when this section uses the long-form syntax
131    /// (`section_syntax_indicator == 1`).
132    #[must_use]
133    fn is_long_form(&self) -> bool {
134        (self.bytes[1] & 0x80) != 0
135    }
136
137    /// 5-bit `version_number`, or `None` for short-form sections (which carry
138    /// no version field). Note the TOT, despite being short-form, has no
139    /// version field either, so this is `None` for it.
140    #[must_use]
141    pub fn version(&self) -> Option<u8> {
142        if self.is_long_form() && self.bytes.len() > 5 {
143            Some((self.bytes[5] >> 1) & 0x1F)
144        } else {
145            None
146        }
147    }
148
149    /// 16-bit `table_id_extension`, or `None` for short-form sections.
150    #[must_use]
151    pub fn table_id_extension(&self) -> Option<u16> {
152        if self.is_long_form() && self.bytes.len() > 4 {
153            Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
154        } else {
155            None
156        }
157    }
158
159    /// `section_number`, or `None` for short-form sections.
160    #[must_use]
161    pub fn section_number(&self) -> Option<u8> {
162        if self.is_long_form() && self.bytes.len() > 6 {
163            Some(self.bytes[6])
164        } else {
165            None
166        }
167    }
168
169    /// Always `true`: events are emitted only after CRC validation (or for
170    /// CRC-less short-form sections, where there is nothing to validate).
171    #[must_use]
172    pub fn crc_ok(&self) -> bool {
173        true
174    }
175
176    /// Typed table-section view (lazy, borrows this event's bytes).
177    ///
178    /// # Errors
179    /// Propagates the parse error from the dispatched table-section type.
180    pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
181        crate::tables::AnyTableSection::parse(&self.bytes)
182    }
183
184    /// Typed table-section view with custom-registry support (lazy, borrows
185    /// this event's bytes).
186    ///
187    /// Precedence: custom-registered parser → built-in dispatch → Unknown.
188    /// See [`AnyTableSection::parse_with`][crate::tables::AnyTableSection::parse_with].
189    ///
190    /// # Errors
191    /// Propagates the parse error from the dispatched table-section type.
192    pub fn table_section_with(
193        &self,
194        registry: &crate::tables::registry::TableRegistry,
195    ) -> crate::Result<crate::tables::AnyTableSection<'_>> {
196        crate::tables::AnyTableSection::parse_with(registry, &self.bytes)
197    }
198
199    /// Type-keyed view: `event.parse::<EitSection>()`.
200    ///
201    /// # Errors
202    /// Propagates `T::parse` errors.
203    pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
204        <T as dvb_common::Parse>::parse(&self.bytes)
205    }
206}
207
208/// Section statistics, monotonically accumulated across `feed` calls.
209#[non_exhaustive]
210#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
211pub struct Stats {
212    /// TS packets fed (every `feed` call increments this).
213    pub packets: u64,
214    /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
215    pub sections_completed: u64,
216    /// Sections emitted as events (changed, valid).
217    pub emitted: u64,
218    /// Sections suppressed by the version gate (unchanged repeats).
219    pub suppressed: u64,
220    /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
221    /// reassembler) and CRC-failed sections share this counter. Sections are
222    /// dropped before emission; the gate is never updated for them.
223    pub crc_failures: u64,
224    /// TS packets that failed to parse (bad sync byte, too short).
225    pub malformed_packets: u64,
226    /// Gate entries evicted because the gate was at capacity.
227    pub gate_evictions: u64,
228}
229
230/// What the gate remembers for one key, to decide "changed?".
231#[derive(Clone, Copy, PartialEq, Eq)]
232struct GateEntry {
233    /// Long-form version_number, or 0 for short-form (unused there).
234    version: u8,
235    /// CRC-32 over the whole section — the change hash. For long-form this is
236    /// the trailing CRC; for short-form it is computed over all bytes.
237    crc: u32,
238}
239
240/// Configuration captured by [`SiDemuxBuilder`].
241struct Config {
242    follow_pat: bool,
243    emit_repeats: bool,
244    gate_capacity: usize,
245}
246
247/// Builder for [`SiDemux`].
248///
249/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
250/// `emit_repeats = false`, `gate_capacity = 65_536`.
251pub struct SiDemuxBuilder {
252    follow_pat: bool,
253    dvb_si_pids: bool,
254    emit_repeats: bool,
255    gate_capacity: usize,
256    extra_pids: Vec<Pid>,
257}
258
259impl Default for SiDemuxBuilder {
260    fn default() -> Self {
261        Self {
262            follow_pat: true,
263            dvb_si_pids: true,
264            emit_repeats: false,
265            gate_capacity: 65_536,
266            extra_pids: Vec::new(),
267        }
268    }
269}
270
271impl SiDemuxBuilder {
272    /// When `true` (default), an emitted (changed) PAT auto-adds each
273    /// programme's PMT PID to the watch set.
274    #[must_use]
275    pub fn follow_pat(mut self, on: bool) -> Self {
276        self.follow_pat = on;
277        self
278    }
279
280    /// When `true` (default), pre-populate the watch set with the well-known
281    /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
282    #[must_use]
283    pub fn dvb_si_pids(mut self, on: bool) -> Self {
284        self.dvb_si_pids = on;
285        self
286    }
287
288    /// Add a PID to the watch set (additive; may be called repeatedly).
289    #[must_use]
290    pub fn pid(mut self, pid: Pid) -> Self {
291        self.extra_pids.push(pid);
292        self
293    }
294
295    /// When `true`, emit every complete valid section, bypassing the version
296    /// gate's suppression (the gate is still updated). Default `false`.
297    #[must_use]
298    pub fn emit_repeats(mut self, on: bool) -> Self {
299        self.emit_repeats = on;
300        self
301    }
302
303    /// Maximum number of distinct gate keys retained. At capacity the gate
304    /// FIFO-evicts the oldest key. Default 65 536.
305    #[must_use]
306    pub fn gate_capacity(mut self, cap: usize) -> Self {
307        self.gate_capacity = cap;
308        self
309    }
310
311    /// Build the [`SiDemux`].
312    #[must_use]
313    pub fn build(self) -> SiDemux {
314        let mut pids: BTreeMap<Pid, SectionReassembler> = BTreeMap::new();
315        if self.dvb_si_pids {
316            use crate::pid::well_known as wk;
317            for pid in [
318                wk::PAT,
319                wk::CAT,
320                wk::NIT,
321                wk::SDT_BAT,
322                wk::EIT,
323                wk::RST,
324                wk::TDT_TOT,
325                wk::SAT,
326            ] {
327                pids.entry(pid).or_default();
328            }
329        }
330        for p in self.extra_pids {
331            pids.entry(p).or_default();
332        }
333        SiDemux {
334            pids,
335            gate: BTreeMap::new(),
336            gate_order: VecDeque::new(),
337            cfg: Config {
338                follow_pat: self.follow_pat,
339                emit_repeats: self.emit_repeats,
340                gate_capacity: self.gate_capacity,
341            },
342            stats: Stats::default(),
343            scratch: Vec::new(),
344            completed_scratch: Vec::new(),
345        }
346    }
347}
348
349/// PID-filtered, version-gated SI section demultiplexer.
350///
351/// See the [module docs](crate::demux) for the gate and CRC policies.
352pub struct SiDemux {
353    pids: BTreeMap<Pid, SectionReassembler>,
354    // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
355    // FxHash) would shave cycles at high section rates; revisit if profiling
356    // shows it.
357    gate: BTreeMap<u64, GateEntry>,
358    gate_order: VecDeque<u64>,
359    cfg: Config,
360    stats: Stats,
361    scratch: Vec<SectionEvent>,
362    completed_scratch: Vec<Bytes>,
363}
364
365impl SiDemux {
366    /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
367    #[must_use]
368    pub fn builder() -> SiDemuxBuilder {
369        SiDemuxBuilder::default()
370    }
371
372    /// Accumulated statistics.
373    #[must_use]
374    pub fn stats(&self) -> Stats {
375        self.stats
376    }
377
378    /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
379    /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
380    /// the changed sections this packet completed.
381    pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
382        self.scratch.clear();
383        self.stats.packets += 1;
384
385        match TsPacket::parse(packet) {
386            Err(_) => self.stats.malformed_packets += 1,
387            Ok(ts) => {
388                let pid = Pid::new(ts.header.pid);
389                let payload = ts.payload.unwrap_or(&[]);
390                // Single map lookup: feed and drain every section this packet
391                // completed while holding the reassembler borrow, then process
392                // them after the borrow ends (`consider` may insert new PMT
393                // PIDs into the map, so it cannot run while the borrow is live).
394                // A non-watched PID costs exactly one lookup and stops here;
395                // `completed` does not allocate until a section actually pops.
396                let mut completed = core::mem::take(&mut self.completed_scratch);
397                if let Some(reasm) = self.pids.get_mut(&pid) {
398                    reasm.feed(payload, ts.header.pusi);
399                    while let Some(section) = reasm.pop_section() {
400                        completed.push(section);
401                    }
402                }
403                self.stats.sections_completed += completed.len() as u64;
404                for section in completed.drain(..) {
405                    self.consider(pid, section);
406                }
407                self.completed_scratch = completed;
408            }
409        }
410
411        self.scratch.drain(..)
412    }
413
414    /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
415    fn consider(&mut self, pid: Pid, section: Bytes) {
416        // Guard: sub-3-byte sections cannot carry a header. The reassembler
417        // should never emit one (it needs >= 3 bytes to know `expected`), but
418        // guard defensively and count it as a CRC failure bucket — it is a
419        // structurally invalid section, dropped without emission.
420        if section.len() < MIN_SECTION_LEN {
421            self.stats.crc_failures += 1;
422            return;
423        }
424
425        let table_id = section[0];
426        let long_form = (section[1] & 0x80) != 0;
427        // The TOT is short-form by its SSI bit but uniquely carries a CRC.
428        let has_crc = long_form || table_id == TOT_TABLE_ID;
429
430        // CRC policy: validate CRC-bearing sections before gating.
431        if has_crc {
432            if section.len() < CRC_LEN {
433                self.stats.crc_failures += 1;
434                return;
435            }
436            let covered = &section[..section.len() - CRC_LEN];
437            let declared = u32::from_be_bytes([
438                section[section.len() - 4],
439                section[section.len() - 3],
440                section[section.len() - 2],
441                section[section.len() - 1],
442            ]);
443            let computed = dvb_common::crc32_mpeg2::compute(covered);
444            if computed != declared {
445                self.stats.crc_failures += 1;
446                return;
447            }
448        }
449
450        // Build the gate key and change detector.
451        let (ext, section_number, version, change_crc) =
452            if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
453                let ext = ((section[3] as u16) << 8) | section[4] as u16;
454                let version = (section[5] >> 1) & 0x1F;
455                let section_number = section[6];
456                // For long-form the trailing CRC already uniquely fingerprints the
457                // payload; reuse it as the change hash.
458                let crc = u32::from_be_bytes([
459                    section[section.len() - 4],
460                    section[section.len() - 3],
461                    section[section.len() - 2],
462                    section[section.len() - 1],
463                ]);
464                (ext, section_number, version, crc)
465            } else {
466                // Short-form (incl. TOT and any malformed long-form that slipped
467                // the size check above): no version, ext/section_number = 0,
468                // change detector is a CRC over all the section bytes.
469                (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(&section))
470            };
471
472        let key = (pid.value() as u64)
473            | ((table_id as u64) << 13)
474            | ((ext as u64) << 21)
475            | ((section_number as u64) << 37);
476
477        let entry = GateEntry {
478            version,
479            crc: change_crc,
480        };
481
482        let changed = match self.gate.get(&key) {
483            Some(prev) => *prev != entry,
484            None => true,
485        };
486
487        // Update the gate (FIFO-evict at capacity for newly-seen keys).
488        let is_new = !self.gate.contains_key(&key);
489        if is_new && self.gate.len() >= self.cfg.gate_capacity {
490            if let Some(old) = self.gate_order.pop_front() {
491                self.gate.remove(&old);
492                self.stats.gate_evictions += 1;
493            }
494        }
495        if is_new {
496            self.gate_order.push_back(key);
497        }
498        match self.gate.entry(key) {
499            alloc::collections::btree_map::Entry::Occupied(mut oe) => {
500                *oe.get_mut() = entry;
501            }
502            alloc::collections::btree_map::Entry::Vacant(ve) => {
503                ve.insert(entry);
504            }
505        }
506
507        if changed || self.cfg.emit_repeats {
508            let event = SectionEvent {
509                pid,
510                bytes: section,
511            };
512            // PAT-follow happens on an emitted (changed) PAT only.
513            if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
514                self.follow_pat(&event);
515            }
516            self.stats.emitted += 1;
517            self.scratch.push(event);
518        } else {
519            self.stats.suppressed += 1;
520        }
521    }
522
523    /// Parse the PAT and register each programme's PMT PID with a fresh
524    /// reassembler. Parse failures are silently ignored — a malformed PAT that
525    /// nonetheless passed CRC is implausible, and we never panic.
526    fn follow_pat(&mut self, event: &SectionEvent) {
527        use crate::tables::pat::PatSection;
528        use dvb_common::Parse;
529        if let Ok(pat) = PatSection::parse(&event.bytes) {
530            for entry in &pat.entries {
531                if entry.program_number != 0 {
532                    self.pids.entry(Pid::new(entry.pid)).or_default();
533                }
534            }
535        }
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542    use crate::ts::{TsHeader, TS_PACKET_SIZE};
543
544    /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
545    /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
546    fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
547        let mut pkt = [0xFFu8; TS_PACKET_SIZE];
548        let header = TsHeader {
549            tei: false,
550            pusi: true,
551            pid,
552            scrambling: 0,
553            has_adaptation: false,
554            has_payload: true,
555            continuity_counter: 0,
556        };
557        header.serialize_into(&mut pkt).unwrap();
558        pkt[4] = 0x00; // pointer_field
559        let start = 5;
560        assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
561        pkt[start..start + section.len()].copy_from_slice(section);
562        pkt
563    }
564
565    /// Build a long-form section with a correct trailing CRC-32.
566    fn long_section(
567        table_id: u8,
568        ext: u16,
569        version: u8,
570        section_number: u8,
571        payload: &[u8],
572    ) -> Vec<u8> {
573        let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
574        let mut v = vec![
575            table_id,
576            0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
577            (section_length & 0xFF) as u8,
578            (ext >> 8) as u8,
579            (ext & 0xFF) as u8,
580            0xC0 | ((version & 0x1F) << 1) | 0x01,
581            section_number,
582            section_number, // last_section_number
583        ];
584        v.extend_from_slice(payload);
585        let crc = dvb_common::crc32_mpeg2::compute(&v);
586        v.extend_from_slice(&crc.to_be_bytes());
587        v
588    }
589
590    /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
591    fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
592        let mut body = Vec::new();
593        for &(pn, pid) in entries {
594            body.extend_from_slice(&pn.to_be_bytes());
595            body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
596            body.push((pid & 0xFF) as u8);
597        }
598        long_section(0x00, tsid, version, 0, &body)
599    }
600
601    /// Build a PMT section (real CRC). One stream entry.
602    fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
603        // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
604        // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
605        let body = [
606            0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
607            (pcr_pid & 0xFF) as u8,
608            0xF0,
609            0x00,
610            0x02,
611            0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
612            ((pcr_pid + 1) & 0xFF) as u8,
613            0xF0,
614            0x00,
615        ];
616        long_section(0x02, program_number, version, 0, &body)
617    }
618
619    #[test]
620    fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
621        let mut demux = SiDemux::builder().build();
622
623        let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
624        let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
625
626        let pkt_v0 = ts_packet(0x0000, &pat_v0);
627        let pkt_v1 = ts_packet(0x0000, &pat_v1);
628
629        let n0: Vec<_> = demux.feed(&pkt_v0).collect();
630        assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
631        assert_eq!(n0[0].table_id(), 0x00);
632        assert_eq!(n0[0].version(), Some(0));
633
634        let n1: Vec<_> = demux.feed(&pkt_v0).collect();
635        assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
636
637        let n2: Vec<_> = demux.feed(&pkt_v1).collect();
638        assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
639        assert_eq!(n2[0].version(), Some(1));
640
641        let s = demux.stats();
642        assert_eq!(s.sections_completed, 3);
643        assert_eq!(s.emitted, 2);
644        assert_eq!(s.suppressed, 1);
645        assert_eq!(s.crc_failures, 0);
646    }
647
648    #[test]
649    fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
650        use crate::tables::AnyTableSection;
651        let mut demux = SiDemux::builder().build();
652
653        // PAT maps programme 1 -> PMT on PID 0x0100.
654        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
655        let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
656        assert_eq!(pat_evts.len(), 1);
657
658        // Before follow, a PMT packet on 0x0100 would be ignored. After the
659        // PAT was emitted, 0x0100 is watched.
660        let pmt = pmt_section(1, 0, 0x0100);
661        let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
662        assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
663        assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
664        match pmt_evts[0].table_section().unwrap() {
665            AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
666            other => panic!("expected PmtSection, got {other:?}"),
667        }
668    }
669
670    #[test]
671    fn corrupted_crc_sdt_dropped_and_counted() {
672        let mut demux = SiDemux::builder().build();
673        // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
674        let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
675        // Corrupt a payload byte AFTER the CRC was computed.
676        sdt[8] ^= 0xFF;
677        let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
678        assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
679        let s = demux.stats();
680        assert_eq!(s.crc_failures, 1);
681        assert_eq!(s.emitted, 0);
682        assert_eq!(s.sections_completed, 1);
683    }
684
685    #[test]
686    fn gate_capacity_evicts_fifo_and_reemits() {
687        let mut demux = SiDemux::builder().gate_capacity(2).build();
688
689        // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
690        // all on the EIT pid 0x0012.
691        let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
692        let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
693        let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
694
695        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
696        assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
697        // Inserting c evicts a (the oldest).
698        assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
699        assert_eq!(demux.stats().gate_evictions, 1);
700
701        // a was evicted -> re-feeding it re-emits (treated as newly seen).
702        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
703    }
704
705    #[test]
706    fn garbage_packet_counted_no_panic() {
707        let mut demux = SiDemux::builder().build();
708        let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
709        let evts: Vec<_> = demux.feed(&garbage).collect();
710        assert_eq!(evts.len(), 0);
711        assert_eq!(demux.stats().malformed_packets, 1);
712        assert_eq!(demux.stats().packets, 1);
713    }
714
715    #[test]
716    fn emit_repeats_bypasses_suppression() {
717        let mut demux = SiDemux::builder().emit_repeats(true).build();
718        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
719        let pkt = ts_packet(0x0000, &pat);
720        assert_eq!(demux.feed(&pkt).count(), 1);
721        assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
722        assert_eq!(demux.stats().suppressed, 0);
723        assert_eq!(demux.stats().emitted, 2);
724    }
725
726    #[test]
727    fn table_section_with_empty_registry_matches_table_section() {
728        use crate::tables::registry::TableRegistry;
729        use crate::tables::AnyTableSection;
730
731        let mut demux = SiDemux::builder().build();
732        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
733        let evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
734        assert_eq!(evts.len(), 1);
735
736        let reg = TableRegistry::new();
737        let with_reg = evts[0].table_section_with(&reg).unwrap();
738        let without = evts[0].table_section().unwrap();
739        assert!(matches!(with_reg, AnyTableSection::PatSection(_)));
740        assert!(matches!(without, AnyTableSection::PatSection(_)));
741    }
742
743    #[test]
744    fn table_section_with_custom_registry_yields_other() {
745        use crate::tables::registry::TableRegistry;
746        use crate::tables::AnyTableSection;
747        use crate::traits::TableDef;
748        use dvb_common::Parse;
749
750        const PRIVATE_TID: u8 = 0x90;
751
752        #[derive(Debug)]
753        #[cfg_attr(feature = "serde", derive(serde::Serialize))]
754        struct PrivateTable {
755            table_id: u8,
756        }
757
758        impl<'a> Parse<'a> for PrivateTable {
759            type Error = crate::Error;
760            fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
761                if bytes.is_empty() {
762                    return Err(crate::Error::BufferTooShort {
763                        need: 1,
764                        have: 0,
765                        what: "PrivateTable",
766                    });
767                }
768                Ok(Self { table_id: bytes[0] })
769            }
770        }
771
772        impl<'a> TableDef<'a> for PrivateTable {
773            const TABLE_ID_RANGES: &'static [(u8, u8)] = &[(PRIVATE_TID, PRIVATE_TID)];
774            const NAME: &'static str = "PRIVATE_TABLE";
775        }
776
777        let mut reg = TableRegistry::new();
778        reg.register::<PrivateTable>();
779
780        let mut demux = SiDemux::builder()
781            .dvb_si_pids(false)
782            .pid(Pid::new(0x0200))
783            .build();
784
785        let section = long_section(PRIVATE_TID, 0x0001, 0, 0, &[0x42]);
786        let evts: Vec<_> = demux.feed(&ts_packet(0x0200, &section)).collect();
787        assert_eq!(evts.len(), 1);
788
789        let result = evts[0].table_section_with(&reg).unwrap();
790        match result {
791            AnyTableSection::Other {
792                table_id,
793                ref value,
794            } => {
795                assert_eq!(table_id, PRIVATE_TID);
796                let pt = value.downcast_ref::<PrivateTable>().unwrap();
797                assert_eq!(pt.table_id, PRIVATE_TID);
798            }
799            other => panic!("expected Other, got {other:?}"),
800        }
801    }
802}