Skip to main content

dvb_si/
demux.rs

1//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
2//!
3//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
4//! iterator of [`SectionEvent`]s — one per **changed** complete section.
5//! The demux reassembles sections per PID (via
6//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
7//! sections, and suppresses repeats through a version gate so that a steady
8//! carousel of unchanging tables produces no events after the first.
9//!
10//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
11//! cheap to clone; typed views ([`SectionEvent::table_section`],
12//! [`SectionEvent::parse`]) borrow the event lazily.
13//!
14//! ```
15//! use dvb_common::Serialize;
16//! use dvb_si::demux::SiDemux;
17//! use dvb_si::tables::AnyTableSection;
18//! use dvb_si::tables::pat::{PatSection, PatEntry};
19//!
20//! const PMT_PID: u16 = 0x0100;
21//! const TS_SYNC_BYTE: u8 = 0x47;
22//! const PAYLOAD_UNIT_START_INDICATOR: u8 = 0x40;
23//! const PAT_PID_LOW_BYTE: u8 = 0x00;
24//! const PAYLOAD_ONLY: u8 = 0x10;
25//! const POINTER_FIELD_START: u8 = 0x00;
26//! const STUFFING_BYTE: u8 = 0xFF;
27//!
28//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
29//! // example is self-contained. In real code `packet` comes from your source.
30//! let pat = PatSection {
31//!     transport_stream_id: 1, version_number: 0, current_next_indicator: true,
32//!     section_number: 0, last_section_number: 0,
33//!     entries: vec![PatEntry { program_number: 1, pid: PMT_PID }],
34//! };
35//! let mut section = vec![0u8; pat.serialized_len()];
36//! pat.serialize_into(&mut section).unwrap();
37//! let mut packet = [STUFFING_BYTE; 188];
38//! packet[0] = TS_SYNC_BYTE;
39//! packet[1] = PAYLOAD_UNIT_START_INDICATOR;
40//! packet[2] = PAT_PID_LOW_BYTE;
41//! packet[3] = PAYLOAD_ONLY;
42//! packet[4] = POINTER_FIELD_START;
43//! packet[5..5 + section.len()].copy_from_slice(&section);
44//!
45//! let mut demux = SiDemux::builder().build();
46//! let events: Vec<_> = demux.feed(&packet).collect();
47//! assert_eq!(events.len(), 1);
48//! match events[0].table_section() {
49//!     Ok(AnyTableSection::PatSection(pat)) => {
50//!         println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
51//!         assert_eq!(pat.entries[0].pid, PMT_PID);
52//!     }
53//!     other => panic!("expected PAT, got {other:?}"),
54//! }
55//! ```
56//!
57//! # Version gate
58//!
59//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
60//! into a `u64` key. The stored value is a change detector:
61//!
62//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
63//!   exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
64//!   gate stores `(version, crc32)`. A repeat with the same version *and* CRC
65//!   is suppressed.
66//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
67//!   the gate stores a CRC-32 *computed over the whole section* purely as a
68//!   change hash. `table_id_extension` and `section_number` collapse to 0 in
69//!   the key.
70//!
71//! # CRC policy
72//!
73//! CRC-bearing sections (every long-form section, plus the short-form TOT
74//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
75//! before gating. Failures are dropped and counted in
76//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
77//! TDT carries no CRC and is therefore never dropped for CRC reasons.
78
79use alloc::collections::{BTreeMap, VecDeque};
80use alloc::vec::Vec;
81
82use bytes::Bytes;
83
84use crate::pid::Pid;
85use crate::ts::{SectionReassembler, TsPacket};
86
87/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
88const PAT_TABLE_ID: u8 = 0x00;
89/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
90const TOT_TABLE_ID: u8 = 0x73;
91/// Minimum bytes required to read a section header (table_id + length field).
92const MIN_SECTION_LEN: usize = 3;
93/// Long-form extension header bytes (after the 3-byte common header).
94const LONG_FORM_EXTRA: usize = 5;
95/// Trailing CRC-32 length.
96const CRC_LEN: usize = 4;
97
98/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
99///
100/// A `SectionEvent` is only ever constructed for a section that
101/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
102/// validation. So [`SectionEvent::crc_ok`] is always `true` and
103/// [`SectionEvent::table_id`] never panics.
104#[derive(Debug, Clone)]
105pub struct SectionEvent {
106    pid: Pid,
107    bytes: Bytes,
108}
109
110impl SectionEvent {
111    /// PID this section was carried on.
112    #[must_use]
113    pub fn pid(&self) -> Pid {
114        self.pid
115    }
116
117    /// The full section bytes (header included, CRC included if present).
118    #[must_use]
119    pub fn bytes(&self) -> &Bytes {
120        &self.bytes
121    }
122
123    /// The `table_id` (byte 0). Never panics — events are only built for
124    /// sections of at least 3 bytes.
125    #[must_use]
126    pub fn table_id(&self) -> u8 {
127        self.bytes[0]
128    }
129
130    /// True when this section uses the long-form syntax
131    /// (`section_syntax_indicator == 1`).
132    #[must_use]
133    fn is_long_form(&self) -> bool {
134        (self.bytes[1] & 0x80) != 0
135    }
136
137    /// 5-bit `version_number`, or `None` for short-form sections (which carry
138    /// no version field). Note the TOT, despite being short-form, has no
139    /// version field either, so this is `None` for it.
140    #[must_use]
141    pub fn version(&self) -> Option<u8> {
142        if self.is_long_form() && self.bytes.len() > 5 {
143            Some((self.bytes[5] >> 1) & 0x1F)
144        } else {
145            None
146        }
147    }
148
149    /// 16-bit `table_id_extension`, or `None` for short-form sections.
150    #[must_use]
151    pub fn table_id_extension(&self) -> Option<u16> {
152        if self.is_long_form() && self.bytes.len() > 4 {
153            Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
154        } else {
155            None
156        }
157    }
158
159    /// `section_number`, or `None` for short-form sections.
160    #[must_use]
161    pub fn section_number(&self) -> Option<u8> {
162        if self.is_long_form() && self.bytes.len() > 6 {
163            Some(self.bytes[6])
164        } else {
165            None
166        }
167    }
168
169    /// Always `true`: events are emitted only after CRC validation (or for
170    /// CRC-less short-form sections, where there is nothing to validate).
171    #[must_use]
172    pub fn crc_ok(&self) -> bool {
173        true
174    }
175
176    /// Typed table-section view (lazy, borrows this event's bytes).
177    ///
178    /// # Errors
179    /// Propagates the parse error from the dispatched table-section type.
180    pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
181        crate::tables::AnyTableSection::parse(&self.bytes)
182    }
183
184    /// Typed table-section view with custom-registry support (lazy, borrows
185    /// this event's bytes).
186    ///
187    /// Precedence: custom-registered parser → built-in dispatch → Unknown.
188    /// See [`AnyTableSection::parse_with`][crate::tables::AnyTableSection::parse_with].
189    ///
190    /// # Errors
191    /// Propagates the parse error from the dispatched table-section type.
192    pub fn table_section_with(
193        &self,
194        registry: &crate::tables::registry::TableRegistry,
195    ) -> crate::Result<crate::tables::AnyTableSection<'_>> {
196        crate::tables::AnyTableSection::parse_with(registry, &self.bytes)
197    }
198
199    /// Type-keyed view: `event.parse::<EitSection>()`.
200    ///
201    /// # Errors
202    /// Propagates `T::parse` errors.
203    pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
204        <T as dvb_common::Parse>::parse(&self.bytes)
205    }
206}
207
208/// Section statistics, monotonically accumulated across `feed` calls.
209#[non_exhaustive]
210#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
211pub struct Stats {
212    /// TS packets fed (every `feed` call increments this).
213    pub packets: u64,
214    /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
215    pub sections_completed: u64,
216    /// Sections emitted as events (changed, valid).
217    pub emitted: u64,
218    /// Sections suppressed by the version gate (unchanged repeats).
219    pub suppressed: u64,
220    /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
221    /// reassembler) and CRC-failed sections share this counter. Sections are
222    /// dropped before emission; the gate is never updated for them.
223    pub crc_failures: u64,
224    /// TS packets that failed to parse (bad sync byte, too short).
225    pub malformed_packets: u64,
226    /// Gate entries evicted because the gate was at capacity.
227    pub gate_evictions: u64,
228}
229
230/// What the gate remembers for one key, to decide "changed?".
231#[derive(Clone, Copy, PartialEq, Eq)]
232struct GateEntry {
233    /// Long-form version_number, or 0 for short-form (unused there).
234    version: u8,
235    /// CRC-32 over the whole section — the change hash. For long-form this is
236    /// the trailing CRC; for short-form it is computed over all bytes.
237    crc: u32,
238}
239
240/// Configuration captured by [`SiDemuxBuilder`].
241struct Config {
242    follow_pat: bool,
243    emit_repeats: bool,
244    gate_capacity: usize,
245}
246
247/// Builder for [`SiDemux`].
248///
249/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
250/// `emit_repeats = false`, `gate_capacity = 65_536`.
251pub struct SiDemuxBuilder {
252    follow_pat: bool,
253    dvb_si_pids: bool,
254    emit_repeats: bool,
255    gate_capacity: usize,
256    extra_pids: Vec<Pid>,
257}
258
259impl Default for SiDemuxBuilder {
260    fn default() -> Self {
261        Self {
262            follow_pat: true,
263            dvb_si_pids: true,
264            emit_repeats: false,
265            gate_capacity: 65_536,
266            extra_pids: Vec::new(),
267        }
268    }
269}
270
271impl SiDemuxBuilder {
272    /// When `true` (default), an emitted (changed) PAT auto-adds each
273    /// programme's PMT PID to the watch set.
274    #[must_use]
275    pub fn follow_pat(mut self, on: bool) -> Self {
276        self.follow_pat = on;
277        self
278    }
279
280    /// When `true` (default), pre-populate the watch set with the well-known
281    /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
282    #[must_use]
283    pub fn dvb_si_pids(mut self, on: bool) -> Self {
284        self.dvb_si_pids = on;
285        self
286    }
287
288    /// Add a PID to the watch set (additive; may be called repeatedly).
289    #[must_use]
290    pub fn pid(mut self, pid: Pid) -> Self {
291        self.extra_pids.push(pid);
292        self
293    }
294
295    /// When `true`, emit every complete valid section, bypassing the version
296    /// gate's suppression (the gate is still updated). Default `false`.
297    #[must_use]
298    pub fn emit_repeats(mut self, on: bool) -> Self {
299        self.emit_repeats = on;
300        self
301    }
302
303    /// Maximum number of distinct gate keys retained. At capacity the gate
304    /// FIFO-evicts the oldest key. Default 65 536.
305    #[must_use]
306    pub fn gate_capacity(mut self, cap: usize) -> Self {
307        self.gate_capacity = cap;
308        self
309    }
310
311    /// Build the [`SiDemux`].
312    #[must_use]
313    pub fn build(self) -> SiDemux {
314        let mut pids: BTreeMap<Pid, SectionReassembler> = BTreeMap::new();
315        if self.dvb_si_pids {
316            use crate::pid::well_known as wk;
317            for pid in [
318                wk::PAT,
319                wk::CAT,
320                wk::NIT,
321                wk::SDT_BAT,
322                wk::EIT,
323                wk::RST,
324                wk::TDT_TOT,
325                wk::SAT,
326            ] {
327                pids.entry(pid).or_default();
328            }
329        }
330        for p in self.extra_pids {
331            pids.entry(p).or_default();
332        }
333        SiDemux {
334            pids,
335            gate: BTreeMap::new(),
336            gate_order: VecDeque::new(),
337            cfg: Config {
338                follow_pat: self.follow_pat,
339                emit_repeats: self.emit_repeats,
340                gate_capacity: self.gate_capacity,
341            },
342            stats: Stats::default(),
343            scratch: Vec::new(),
344            completed_scratch: Vec::new(),
345        }
346    }
347}
348
349/// PID-filtered, version-gated SI section demultiplexer.
350///
351/// See the [module docs](crate::demux) for the gate and CRC policies.
352pub struct SiDemux {
353    pids: BTreeMap<Pid, SectionReassembler>,
354    // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
355    // FxHash) would shave cycles at high section rates; revisit if profiling
356    // shows it.
357    gate: BTreeMap<u64, GateEntry>,
358    gate_order: VecDeque<u64>,
359    cfg: Config,
360    stats: Stats,
361    scratch: Vec<SectionEvent>,
362    completed_scratch: Vec<Bytes>,
363}
364
365impl SiDemux {
366    /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
367    #[must_use]
368    pub fn builder() -> SiDemuxBuilder {
369        SiDemuxBuilder::default()
370    }
371
372    /// Accumulated statistics.
373    #[must_use]
374    pub fn stats(&self) -> Stats {
375        self.stats
376    }
377
378    /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
379    /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
380    /// the changed sections this packet completed.
381    pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
382        self.scratch.clear();
383        self.stats.packets += 1;
384
385        match TsPacket::parse(packet) {
386            Err(_) => self.stats.malformed_packets += 1,
387            Ok(ts) => {
388                let pid = Pid::new(ts.header.pid);
389                let payload = ts.payload.unwrap_or(&[]);
390                // Single map lookup: feed and drain every section this packet
391                // completed while holding the reassembler borrow, then process
392                // them after the borrow ends (`consider` may insert new PMT
393                // PIDs into the map, so it cannot run while the borrow is live).
394                // A non-watched PID costs exactly one lookup and stops here;
395                // `completed` does not allocate until a section actually pops.
396                let mut completed = core::mem::take(&mut self.completed_scratch);
397                if let Some(reasm) = self.pids.get_mut(&pid) {
398                    reasm.feed(payload, ts.header.pusi);
399                    while let Some(section) = reasm.pop_section() {
400                        completed.push(section);
401                    }
402                }
403                self.stats.sections_completed += completed.len() as u64;
404                for section in completed.drain(..) {
405                    self.consider(pid, section);
406                }
407                self.completed_scratch = completed;
408            }
409        }
410
411        self.scratch.drain(..)
412    }
413
414    /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
415    fn consider(&mut self, pid: Pid, section: Bytes) {
416        // Guard: sub-3-byte sections cannot carry a header. The reassembler
417        // should never emit one (it needs >= 3 bytes to know `expected`), but
418        // guard defensively and count it as a CRC failure bucket — it is a
419        // structurally invalid section, dropped without emission.
420        if section.len() < MIN_SECTION_LEN {
421            self.stats.crc_failures += 1;
422            return;
423        }
424
425        let table_id = section[0];
426        let long_form = (section[1] & 0x80) != 0;
427        // The TOT is short-form by its SSI bit but uniquely carries a CRC.
428        let has_crc = long_form || table_id == TOT_TABLE_ID;
429
430        // CRC policy: validate CRC-bearing sections before gating.
431        if has_crc {
432            if section.len() < CRC_LEN {
433                self.stats.crc_failures += 1;
434                return;
435            }
436            let (covered, crc_bytes) = section.split_last_chunk::<CRC_LEN>().unwrap();
437            let declared = u32::from_be_bytes(*crc_bytes);
438            let computed = dvb_common::crc32_mpeg2::compute(covered);
439            if computed != declared {
440                self.stats.crc_failures += 1;
441                return;
442            }
443        }
444
445        // Build the gate key and change detector.
446        let (ext, section_number, version, change_crc) =
447            if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
448                let ext = ((section[3] as u16) << 8) | section[4] as u16;
449                let version = (section[5] >> 1) & 0x1F;
450                let section_number = section[6];
451                // For long-form the trailing CRC already uniquely fingerprints the
452                // payload; reuse it as the change hash.
453                let (_, crc_bytes) = section.split_last_chunk::<CRC_LEN>().unwrap();
454                let crc = u32::from_be_bytes(*crc_bytes);
455                (ext, section_number, version, crc)
456            } else {
457                // Short-form (incl. TOT and any malformed long-form that slipped
458                // the size check above): no version, ext/section_number = 0,
459                // change detector is a CRC over all the section bytes.
460                (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(&section))
461            };
462
463        let key = (pid.value() as u64)
464            | ((table_id as u64) << 13)
465            | ((ext as u64) << 21)
466            | ((section_number as u64) << 37);
467
468        let entry = GateEntry {
469            version,
470            crc: change_crc,
471        };
472
473        let changed = match self.gate.get(&key) {
474            Some(prev) => *prev != entry,
475            None => true,
476        };
477
478        // Update the gate (FIFO-evict at capacity for newly-seen keys).
479        let is_new = !self.gate.contains_key(&key);
480        if is_new && self.gate.len() >= self.cfg.gate_capacity {
481            if let Some(old) = self.gate_order.pop_front() {
482                self.gate.remove(&old);
483                self.stats.gate_evictions += 1;
484            }
485        }
486        if is_new {
487            self.gate_order.push_back(key);
488        }
489        match self.gate.entry(key) {
490            alloc::collections::btree_map::Entry::Occupied(mut oe) => {
491                *oe.get_mut() = entry;
492            }
493            alloc::collections::btree_map::Entry::Vacant(ve) => {
494                ve.insert(entry);
495            }
496        }
497
498        if changed || self.cfg.emit_repeats {
499            let event = SectionEvent {
500                pid,
501                bytes: section,
502            };
503            // PAT-follow happens on an emitted (changed) PAT only.
504            if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
505                self.follow_pat(&event);
506            }
507            self.stats.emitted += 1;
508            self.scratch.push(event);
509        } else {
510            self.stats.suppressed += 1;
511        }
512    }
513
514    /// Parse the PAT and register each programme's PMT PID with a fresh
515    /// reassembler. Parse failures are silently ignored — a malformed PAT that
516    /// nonetheless passed CRC is implausible, and we never panic.
517    fn follow_pat(&mut self, event: &SectionEvent) {
518        use crate::tables::pat::PatSection;
519        use dvb_common::Parse;
520        if let Ok(pat) = PatSection::parse(&event.bytes) {
521            for entry in &pat.entries {
522                if entry.program_number != 0 {
523                    self.pids.entry(Pid::new(entry.pid)).or_default();
524                }
525            }
526        }
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533    use crate::ts::{TsHeader, TS_PACKET_SIZE};
534
535    /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
536    /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
537    fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
538        let mut pkt = [0xFFu8; TS_PACKET_SIZE];
539        let header = TsHeader {
540            tei: false,
541            pusi: true,
542            pid,
543            scrambling: 0,
544            has_adaptation: false,
545            has_payload: true,
546            continuity_counter: 0,
547        };
548        header.serialize_into(&mut pkt).unwrap();
549        pkt[4] = 0x00; // pointer_field
550        let start = 5;
551        assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
552        pkt[start..start + section.len()].copy_from_slice(section);
553        pkt
554    }
555
556    /// Build a long-form section with a correct trailing CRC-32.
557    fn long_section(
558        table_id: u8,
559        ext: u16,
560        version: u8,
561        section_number: u8,
562        payload: &[u8],
563    ) -> Vec<u8> {
564        let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
565        let mut v = vec![
566            table_id,
567            0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
568            (section_length & 0xFF) as u8,
569            (ext >> 8) as u8,
570            (ext & 0xFF) as u8,
571            0xC0 | ((version & 0x1F) << 1) | 0x01,
572            section_number,
573            section_number, // last_section_number
574        ];
575        v.extend_from_slice(payload);
576        let crc = dvb_common::crc32_mpeg2::compute(&v);
577        v.extend_from_slice(&crc.to_be_bytes());
578        v
579    }
580
581    /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
582    fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
583        let mut body = Vec::new();
584        for &(pn, pid) in entries {
585            body.extend_from_slice(&pn.to_be_bytes());
586            body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
587            body.push((pid & 0xFF) as u8);
588        }
589        long_section(0x00, tsid, version, 0, &body)
590    }
591
592    /// Build a PMT section (real CRC). One stream entry.
593    fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
594        // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
595        // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
596        let body = [
597            0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
598            (pcr_pid & 0xFF) as u8,
599            0xF0,
600            0x00,
601            0x02,
602            0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
603            ((pcr_pid + 1) & 0xFF) as u8,
604            0xF0,
605            0x00,
606        ];
607        long_section(0x02, program_number, version, 0, &body)
608    }
609
610    #[test]
611    fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
612        let mut demux = SiDemux::builder().build();
613
614        let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
615        let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
616
617        let pkt_v0 = ts_packet(0x0000, &pat_v0);
618        let pkt_v1 = ts_packet(0x0000, &pat_v1);
619
620        let n0: Vec<_> = demux.feed(&pkt_v0).collect();
621        assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
622        assert_eq!(n0[0].table_id(), 0x00);
623        assert_eq!(n0[0].version(), Some(0));
624
625        let n1: Vec<_> = demux.feed(&pkt_v0).collect();
626        assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
627
628        let n2: Vec<_> = demux.feed(&pkt_v1).collect();
629        assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
630        assert_eq!(n2[0].version(), Some(1));
631
632        let s = demux.stats();
633        assert_eq!(s.sections_completed, 3);
634        assert_eq!(s.emitted, 2);
635        assert_eq!(s.suppressed, 1);
636        assert_eq!(s.crc_failures, 0);
637    }
638
639    #[test]
640    fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
641        use crate::tables::AnyTableSection;
642        let mut demux = SiDemux::builder().build();
643
644        // PAT maps programme 1 -> PMT on PID 0x0100.
645        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
646        let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
647        assert_eq!(pat_evts.len(), 1);
648
649        // Before follow, a PMT packet on 0x0100 would be ignored. After the
650        // PAT was emitted, 0x0100 is watched.
651        let pmt = pmt_section(1, 0, 0x0100);
652        let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
653        assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
654        assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
655        match pmt_evts[0].table_section().unwrap() {
656            AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
657            other => panic!("expected PmtSection, got {other:?}"),
658        }
659    }
660
661    #[test]
662    fn corrupted_crc_sdt_dropped_and_counted() {
663        let mut demux = SiDemux::builder().build();
664        // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
665        let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
666        // Corrupt a payload byte AFTER the CRC was computed.
667        sdt[8] ^= 0xFF;
668        let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
669        assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
670        let s = demux.stats();
671        assert_eq!(s.crc_failures, 1);
672        assert_eq!(s.emitted, 0);
673        assert_eq!(s.sections_completed, 1);
674    }
675
676    #[test]
677    fn gate_capacity_evicts_fifo_and_reemits() {
678        let mut demux = SiDemux::builder().gate_capacity(2).build();
679
680        // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
681        // all on the EIT pid 0x0012.
682        let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
683        let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
684        let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
685
686        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
687        assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
688        // Inserting c evicts a (the oldest).
689        assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
690        assert_eq!(demux.stats().gate_evictions, 1);
691
692        // a was evicted -> re-feeding it re-emits (treated as newly seen).
693        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
694    }
695
696    #[test]
697    fn garbage_packet_counted_no_panic() {
698        let mut demux = SiDemux::builder().build();
699        let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
700        let evts: Vec<_> = demux.feed(&garbage).collect();
701        assert_eq!(evts.len(), 0);
702        assert_eq!(demux.stats().malformed_packets, 1);
703        assert_eq!(demux.stats().packets, 1);
704    }
705
706    #[test]
707    fn emit_repeats_bypasses_suppression() {
708        let mut demux = SiDemux::builder().emit_repeats(true).build();
709        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
710        let pkt = ts_packet(0x0000, &pat);
711        assert_eq!(demux.feed(&pkt).count(), 1);
712        assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
713        assert_eq!(demux.stats().suppressed, 0);
714        assert_eq!(demux.stats().emitted, 2);
715    }
716
717    #[test]
718    fn table_section_with_empty_registry_matches_table_section() {
719        use crate::tables::registry::TableRegistry;
720        use crate::tables::AnyTableSection;
721
722        let mut demux = SiDemux::builder().build();
723        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
724        let evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
725        assert_eq!(evts.len(), 1);
726
727        let reg = TableRegistry::new();
728        let with_reg = evts[0].table_section_with(&reg).unwrap();
729        let without = evts[0].table_section().unwrap();
730        assert!(matches!(with_reg, AnyTableSection::PatSection(_)));
731        assert!(matches!(without, AnyTableSection::PatSection(_)));
732    }
733
734    #[test]
735    fn table_section_with_custom_registry_yields_other() {
736        use crate::tables::registry::TableRegistry;
737        use crate::tables::AnyTableSection;
738        use crate::traits::TableDef;
739        use dvb_common::Parse;
740
741        const PRIVATE_TID: u8 = 0x90;
742
743        #[derive(Debug)]
744        #[cfg_attr(feature = "serde", derive(serde::Serialize))]
745        struct PrivateTable {
746            table_id: u8,
747        }
748
749        impl<'a> Parse<'a> for PrivateTable {
750            type Error = crate::Error;
751            fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
752                if bytes.is_empty() {
753                    return Err(crate::Error::BufferTooShort {
754                        need: 1,
755                        have: 0,
756                        what: "PrivateTable",
757                    });
758                }
759                Ok(Self { table_id: bytes[0] })
760            }
761        }
762
763        impl<'a> TableDef<'a> for PrivateTable {
764            const TABLE_ID_RANGES: &'static [(u8, u8)] = &[(PRIVATE_TID, PRIVATE_TID)];
765            const NAME: &'static str = "PRIVATE_TABLE";
766        }
767
768        let mut reg = TableRegistry::new();
769        reg.register::<PrivateTable>();
770
771        let mut demux = SiDemux::builder()
772            .dvb_si_pids(false)
773            .pid(Pid::new(0x0200))
774            .build();
775
776        let section = long_section(PRIVATE_TID, 0x0001, 0, 0, &[0x42]);
777        let evts: Vec<_> = demux.feed(&ts_packet(0x0200, &section)).collect();
778        assert_eq!(evts.len(), 1);
779
780        let result = evts[0].table_section_with(&reg).unwrap();
781        match result {
782            AnyTableSection::Other {
783                table_id,
784                ref value,
785            } => {
786                assert_eq!(table_id, PRIVATE_TID);
787                let pt = value.downcast_ref::<PrivateTable>().unwrap();
788                assert_eq!(pt.table_id, PRIVATE_TID);
789            }
790            other => panic!("expected Other, got {other:?}"),
791        }
792    }
793}