dvb-si 2.0.0

ETSI EN 300 468 DVB Service Information parser + builder. MPEG-2 PSI included.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
//!
//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
//! iterator of [`SectionEvent`]s — one per **changed** complete section.
//! The demux reassembles sections per PID (via
//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
//! sections, and suppresses repeats through a version gate so that a steady
//! carousel of unchanging tables produces no events after the first.
//!
//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
//! cheap to clone; typed views ([`SectionEvent::table`],
//! [`SectionEvent::parse`]) borrow the event lazily.
//!
//! ```
//! use dvb_common::Serialize;
//! use dvb_si::demux::SiDemux;
//! use dvb_si::tables::AnyTable;
//! use dvb_si::tables::pat::{Pat, PatEntry};
//!
//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
//! // example is self-contained. In real code `packet` comes from your source.
//! let pat = Pat {
//!     transport_stream_id: 1, version_number: 0, current_next_indicator: true,
//!     section_number: 0, last_section_number: 0,
//!     entries: vec![PatEntry { program_number: 1, pid: 0x0100 }],
//! };
//! let mut section = vec![0u8; pat.serialized_len()];
//! pat.serialize_into(&mut section).unwrap();
//! let mut packet = [0xFFu8; 188];
//! packet[0] = 0x47;  // sync
//! packet[1] = 0x40;  // PUSI=1, PID hi=0
//! packet[2] = 0x00;  // PID lo=0 (PAT)
//! packet[3] = 0x10;  // payload only
//! packet[4] = 0x00;  // pointer_field
//! packet[5..5 + section.len()].copy_from_slice(&section);
//!
//! let mut demux = SiDemux::builder().build();
//! let events: Vec<_> = demux.feed(&packet).collect();
//! assert_eq!(events.len(), 1);
//! match events[0].table() {
//!     Ok(AnyTable::Pat(pat)) => {
//!         println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
//!         assert_eq!(pat.entries[0].pid, 0x0100);
//!     }
//!     other => panic!("expected PAT, got {other:?}"),
//! }
//! ```
//!
//! # Version gate
//!
//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
//! into a `u64` key. The stored value is a change detector:
//!
//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
//!   exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
//!   gate stores `(version, crc32)`. A repeat with the same version *and* CRC
//!   is suppressed.
//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
//!   the gate stores a CRC-32 *computed over the whole section* purely as a
//!   change hash. `table_id_extension` and `section_number` collapse to 0 in
//!   the key.
//!
//! # CRC policy
//!
//! CRC-bearing sections (every long-form section, plus the short-form TOT
//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
//! before gating. Failures are dropped and counted in
//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
//! TDT carries no CRC and is therefore never dropped for CRC reasons.

use std::collections::{HashMap, VecDeque};

use bytes::Bytes;

use crate::pid::Pid;
use crate::ts::{SectionReassembler, TsPacket};

/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
const PAT_TABLE_ID: u8 = 0x00;
/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
const TOT_TABLE_ID: u8 = 0x73;
/// Minimum bytes required to read a section header (table_id + length field).
const MIN_SECTION_LEN: usize = 3;
/// Long-form extension header bytes (after the 3-byte common header).
const LONG_FORM_EXTRA: usize = 5;
/// Trailing CRC-32 length.
const CRC_LEN: usize = 4;

/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
///
/// A `SectionEvent` is only ever constructed for a section that
/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
/// validation. So [`SectionEvent::crc_ok`] is always `true` and
/// [`SectionEvent::table_id`] never panics.
#[derive(Debug, Clone)]
pub struct SectionEvent {
    pid: Pid,
    bytes: Bytes,
}

impl SectionEvent {
    /// PID this section was carried on.
    #[must_use]
    pub fn pid(&self) -> Pid {
        self.pid
    }

    /// The full section bytes (header included, CRC included if present).
    #[must_use]
    pub fn bytes(&self) -> &Bytes {
        &self.bytes
    }

    /// The `table_id` (byte 0). Never panics — events are only built for
    /// sections of at least 3 bytes.
    #[must_use]
    pub fn table_id(&self) -> u8 {
        self.bytes[0]
    }

    /// True when this section uses the long-form syntax
    /// (`section_syntax_indicator == 1`).
    #[must_use]
    fn is_long_form(&self) -> bool {
        (self.bytes[1] & 0x80) != 0
    }

    /// 5-bit `version_number`, or `None` for short-form sections (which carry
    /// no version field). Note the TOT, despite being short-form, has no
    /// version field either, so this is `None` for it.
    #[must_use]
    pub fn version(&self) -> Option<u8> {
        if self.is_long_form() && self.bytes.len() > 5 {
            Some((self.bytes[5] >> 1) & 0x1F)
        } else {
            None
        }
    }

    /// 16-bit `table_id_extension`, or `None` for short-form sections.
    #[must_use]
    pub fn table_id_extension(&self) -> Option<u16> {
        if self.is_long_form() && self.bytes.len() > 4 {
            Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
        } else {
            None
        }
    }

    /// `section_number`, or `None` for short-form sections.
    #[must_use]
    pub fn section_number(&self) -> Option<u8> {
        if self.is_long_form() && self.bytes.len() > 6 {
            Some(self.bytes[6])
        } else {
            None
        }
    }

    /// Always `true`: events are emitted only after CRC validation (or for
    /// CRC-less short-form sections, where there is nothing to validate).
    #[must_use]
    pub fn crc_ok(&self) -> bool {
        true
    }

    /// Typed view (lazy, borrows this event's bytes).
    ///
    /// # Errors
    /// Propagates the parse error from the dispatched table type.
    pub fn table(&self) -> crate::Result<crate::tables::AnyTable<'_>> {
        crate::tables::AnyTable::parse(&self.bytes)
    }

    /// Type-keyed view: `event.parse::<Eit>()`.
    ///
    /// # Errors
    /// Propagates `T::parse` errors.
    pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
        <T as dvb_common::Parse>::parse(&self.bytes)
    }
}

/// Section statistics, monotonically accumulated across `feed` calls.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct Stats {
    /// TS packets fed (every `feed` call increments this).
    pub packets: u64,
    /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
    pub sections_completed: u64,
    /// Sections emitted as events (changed, valid).
    pub emitted: u64,
    /// Sections suppressed by the version gate (unchanged repeats).
    pub suppressed: u64,
    /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
    /// reassembler) and CRC-failed sections share this counter. Sections are
    /// dropped before emission; the gate is never updated for them.
    pub crc_failures: u64,
    /// TS packets that failed to parse (bad sync byte, too short).
    pub malformed_packets: u64,
    /// Gate entries evicted because the gate was at capacity.
    pub gate_evictions: u64,
}

/// What the gate remembers for one key, to decide "changed?".
#[derive(Clone, Copy, PartialEq, Eq)]
struct GateEntry {
    /// Long-form version_number, or 0 for short-form (unused there).
    version: u8,
    /// CRC-32 over the whole section — the change hash. For long-form this is
    /// the trailing CRC; for short-form it is computed over all bytes.
    crc: u32,
}

/// Configuration captured by [`SiDemuxBuilder`].
struct Config {
    follow_pat: bool,
    emit_repeats: bool,
    gate_capacity: usize,
}

/// Builder for [`SiDemux`].
///
/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
/// `emit_repeats = false`, `gate_capacity = 65_536`.
pub struct SiDemuxBuilder {
    follow_pat: bool,
    dvb_si_pids: bool,
    emit_repeats: bool,
    gate_capacity: usize,
    extra_pids: Vec<Pid>,
}

impl Default for SiDemuxBuilder {
    fn default() -> Self {
        Self {
            follow_pat: true,
            dvb_si_pids: true,
            emit_repeats: false,
            gate_capacity: 65_536,
            extra_pids: Vec::new(),
        }
    }
}

impl SiDemuxBuilder {
    /// When `true` (default), an emitted (changed) PAT auto-adds each
    /// programme's PMT PID to the watch set.
    #[must_use]
    pub fn follow_pat(mut self, on: bool) -> Self {
        self.follow_pat = on;
        self
    }

    /// When `true` (default), pre-populate the watch set with the well-known
    /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
    #[must_use]
    pub fn dvb_si_pids(mut self, on: bool) -> Self {
        self.dvb_si_pids = on;
        self
    }

    /// Add a PID to the watch set (additive; may be called repeatedly).
    #[must_use]
    pub fn pid(mut self, pid: Pid) -> Self {
        self.extra_pids.push(pid);
        self
    }

    /// When `true`, emit every complete valid section, bypassing the version
    /// gate's suppression (the gate is still updated). Default `false`.
    #[must_use]
    pub fn emit_repeats(mut self, on: bool) -> Self {
        self.emit_repeats = on;
        self
    }

    /// Maximum number of distinct gate keys retained. At capacity the gate
    /// FIFO-evicts the oldest key. Default 65 536.
    #[must_use]
    pub fn gate_capacity(mut self, cap: usize) -> Self {
        self.gate_capacity = cap;
        self
    }

    /// Build the [`SiDemux`].
    #[must_use]
    pub fn build(self) -> SiDemux {
        let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
        if self.dvb_si_pids {
            use crate::pid::well_known as wk;
            for pid in [
                wk::PAT,
                wk::CAT,
                wk::NIT,
                wk::SDT_BAT,
                wk::EIT,
                wk::RST,
                wk::TDT_TOT,
                wk::SAT,
            ] {
                pids.entry(pid).or_default();
            }
        }
        for p in self.extra_pids {
            pids.entry(p).or_default();
        }
        SiDemux {
            pids,
            gate: HashMap::new(),
            gate_order: VecDeque::new(),
            cfg: Config {
                follow_pat: self.follow_pat,
                emit_repeats: self.emit_repeats,
                gate_capacity: self.gate_capacity,
            },
            stats: Stats::default(),
            scratch: Vec::new(),
        }
    }
}

/// PID-filtered, version-gated SI section demultiplexer.
///
/// See the [module docs](crate::demux) for the gate and CRC policies.
pub struct SiDemux {
    pids: HashMap<Pid, SectionReassembler>,
    // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
    // FxHash) would shave cycles at high section rates; revisit if profiling
    // shows it.
    gate: HashMap<u64, GateEntry>,
    gate_order: VecDeque<u64>,
    cfg: Config,
    stats: Stats,
    scratch: Vec<SectionEvent>,
}

impl SiDemux {
    /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
    #[must_use]
    pub fn builder() -> SiDemuxBuilder {
        SiDemuxBuilder::default()
    }

    /// Accumulated statistics.
    #[must_use]
    pub fn stats(&self) -> Stats {
        self.stats
    }

    /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
    /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
    /// the changed sections this packet completed.
    pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
        self.scratch.clear();
        self.stats.packets += 1;

        match TsPacket::parse(packet) {
            Err(_) => self.stats.malformed_packets += 1,
            Ok(ts) => {
                let pid = Pid::new(ts.header.pid);
                // Cheap miss: one map lookup for non-watched PIDs.
                if self.pids.contains_key(&pid) {
                    let payload = ts.payload.unwrap_or(&[]);
                    // Feed the reassembler; the borrow is released before
                    // `consider` (which may insert new PMT PIDs into the map).
                    self.pids
                        .get_mut(&pid)
                        .expect("checked above")
                        .feed(payload, ts.header.pusi);
                    while let Some(section) = self
                        .pids
                        .get_mut(&pid)
                        .and_then(SectionReassembler::pop_section)
                    {
                        self.stats.sections_completed += 1;
                        self.consider(pid, section);
                    }
                }
            }
        }

        self.scratch.drain(..)
    }

    /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
    fn consider(&mut self, pid: Pid, section: Bytes) {
        // Guard: sub-3-byte sections cannot carry a header. The reassembler
        // should never emit one (it needs >= 3 bytes to know `expected`), but
        // guard defensively and count it as a CRC failure bucket — it is a
        // structurally invalid section, dropped without emission.
        if section.len() < MIN_SECTION_LEN {
            self.stats.crc_failures += 1;
            return;
        }

        let table_id = section[0];
        let long_form = (section[1] & 0x80) != 0;
        // The TOT is short-form by its SSI bit but uniquely carries a CRC.
        let has_crc = long_form || table_id == TOT_TABLE_ID;

        // CRC policy: validate CRC-bearing sections before gating.
        if has_crc {
            if section.len() < CRC_LEN {
                self.stats.crc_failures += 1;
                return;
            }
            let covered = &section[..section.len() - CRC_LEN];
            let declared = u32::from_be_bytes([
                section[section.len() - 4],
                section[section.len() - 3],
                section[section.len() - 2],
                section[section.len() - 1],
            ]);
            let computed = dvb_common::crc32_mpeg2::compute(covered);
            if computed != declared {
                self.stats.crc_failures += 1;
                return;
            }
        }

        // Build the gate key and change detector.
        let (ext, section_number, version, change_crc) =
            if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
                let ext = ((section[3] as u16) << 8) | section[4] as u16;
                let version = (section[5] >> 1) & 0x1F;
                let section_number = section[6];
                // For long-form the trailing CRC already uniquely fingerprints the
                // payload; reuse it as the change hash.
                let crc = u32::from_be_bytes([
                    section[section.len() - 4],
                    section[section.len() - 3],
                    section[section.len() - 2],
                    section[section.len() - 1],
                ]);
                (ext, section_number, version, crc)
            } else {
                // Short-form (incl. TOT and any malformed long-form that slipped
                // the size check above): no version, ext/section_number = 0,
                // change detector is a CRC over all the section bytes.
                (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(&section))
            };

        let key = (pid.value() as u64)
            | ((table_id as u64) << 13)
            | ((ext as u64) << 21)
            | ((section_number as u64) << 37);

        let entry = GateEntry {
            version,
            crc: change_crc,
        };

        let changed = match self.gate.get(&key) {
            Some(prev) => *prev != entry,
            None => true,
        };

        // Update the gate (FIFO-evict at capacity for newly-seen keys).
        if !self.gate.contains_key(&key) {
            if self.gate.len() >= self.cfg.gate_capacity {
                if let Some(old) = self.gate_order.pop_front() {
                    self.gate.remove(&old);
                    self.stats.gate_evictions += 1;
                }
            }
            self.gate_order.push_back(key);
        }
        self.gate.insert(key, entry);

        if changed || self.cfg.emit_repeats {
            let event = SectionEvent {
                pid,
                bytes: section,
            };
            // PAT-follow happens on an emitted (changed) PAT only.
            if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
                self.follow_pat(&event);
            }
            self.stats.emitted += 1;
            self.scratch.push(event);
        } else {
            self.stats.suppressed += 1;
        }
    }

    /// Parse the PAT and register each programme's PMT PID with a fresh
    /// reassembler. Parse failures are silently ignored — a malformed PAT that
    /// nonetheless passed CRC is implausible, and we never panic.
    fn follow_pat(&mut self, event: &SectionEvent) {
        use crate::tables::pat::Pat;
        use dvb_common::Parse;
        if let Ok(pat) = Pat::parse(&event.bytes) {
            for entry in &pat.entries {
                if entry.program_number != 0 {
                    self.pids.entry(Pid::new(entry.pid)).or_default();
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::ts::{TsHeader, TS_PACKET_SIZE};

    /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
    /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
    fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
        let mut pkt = [0xFFu8; TS_PACKET_SIZE];
        let header = TsHeader {
            tei: false,
            pusi: true,
            pid,
            scrambling: 0,
            has_adaptation: false,
            has_payload: true,
            continuity_counter: 0,
        };
        header.serialize_into(&mut pkt);
        pkt[4] = 0x00; // pointer_field
        let start = 5;
        assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
        pkt[start..start + section.len()].copy_from_slice(section);
        pkt
    }

    /// Build a long-form section with a correct trailing CRC-32.
    fn long_section(
        table_id: u8,
        ext: u16,
        version: u8,
        section_number: u8,
        payload: &[u8],
    ) -> Vec<u8> {
        let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
        let mut v = vec![
            table_id,
            0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
            (section_length & 0xFF) as u8,
            (ext >> 8) as u8,
            (ext & 0xFF) as u8,
            0xC0 | ((version & 0x1F) << 1) | 0x01,
            section_number,
            section_number, // last_section_number
        ];
        v.extend_from_slice(payload);
        let crc = dvb_common::crc32_mpeg2::compute(&v);
        v.extend_from_slice(&crc.to_be_bytes());
        v
    }

    /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
    fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
        let mut body = Vec::new();
        for &(pn, pid) in entries {
            body.extend_from_slice(&pn.to_be_bytes());
            body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
            body.push((pid & 0xFF) as u8);
        }
        long_section(0x00, tsid, version, 0, &body)
    }

    /// Build a PMT section (real CRC). One stream entry.
    fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
        // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
        // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
        let body = [
            0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
            (pcr_pid & 0xFF) as u8,
            0xF0,
            0x00,
            0x02,
            0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
            ((pcr_pid + 1) & 0xFF) as u8,
            0xF0,
            0x00,
        ];
        long_section(0x02, program_number, version, 0, &body)
    }

    #[test]
    fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
        let mut demux = SiDemux::builder().build();

        let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
        let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);

        let pkt_v0 = ts_packet(0x0000, &pat_v0);
        let pkt_v1 = ts_packet(0x0000, &pat_v1);

        let n0: Vec<_> = demux.feed(&pkt_v0).collect();
        assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
        assert_eq!(n0[0].table_id(), 0x00);
        assert_eq!(n0[0].version(), Some(0));

        let n1: Vec<_> = demux.feed(&pkt_v0).collect();
        assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");

        let n2: Vec<_> = demux.feed(&pkt_v1).collect();
        assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
        assert_eq!(n2[0].version(), Some(1));

        let s = demux.stats();
        assert_eq!(s.sections_completed, 3);
        assert_eq!(s.emitted, 2);
        assert_eq!(s.suppressed, 1);
        assert_eq!(s.crc_failures, 0);
    }

    #[test]
    fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
        use crate::tables::AnyTable;
        let mut demux = SiDemux::builder().build();

        // PAT maps programme 1 -> PMT on PID 0x0100.
        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
        let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
        assert_eq!(pat_evts.len(), 1);

        // Before follow, a PMT packet on 0x0100 would be ignored. After the
        // PAT was emitted, 0x0100 is watched.
        let pmt = pmt_section(1, 0, 0x0100);
        let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
        assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
        assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
        match pmt_evts[0].table().unwrap() {
            AnyTable::Pmt(p) => assert_eq!(p.program_number, 1),
            other => panic!("expected Pmt, got {other:?}"),
        }
    }

    #[test]
    fn corrupted_crc_sdt_dropped_and_counted() {
        let mut demux = SiDemux::builder().build();
        // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
        let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
        // Corrupt a payload byte AFTER the CRC was computed.
        sdt[8] ^= 0xFF;
        let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
        assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
        let s = demux.stats();
        assert_eq!(s.crc_failures, 1);
        assert_eq!(s.emitted, 0);
        assert_eq!(s.sections_completed, 1);
    }

    #[test]
    fn gate_capacity_evicts_fifo_and_reemits() {
        let mut demux = SiDemux::builder().gate_capacity(2).build();

        // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
        // all on the EIT pid 0x0012.
        let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
        let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
        let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);

        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
        assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
        // Inserting c evicts a (the oldest).
        assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
        assert_eq!(demux.stats().gate_evictions, 1);

        // a was evicted -> re-feeding it re-emits (treated as newly seen).
        assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
    }

    #[test]
    fn garbage_packet_counted_no_panic() {
        let mut demux = SiDemux::builder().build();
        let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
        let evts: Vec<_> = demux.feed(&garbage).collect();
        assert_eq!(evts.len(), 0);
        assert_eq!(demux.stats().malformed_packets, 1);
        assert_eq!(demux.stats().packets, 1);
    }

    #[test]
    fn emit_repeats_bypasses_suppression() {
        let mut demux = SiDemux::builder().emit_repeats(true).build();
        let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
        let pkt = ts_packet(0x0000, &pat);
        assert_eq!(demux.feed(&pkt).count(), 1);
        assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
        assert_eq!(demux.stats().suppressed, 0);
        assert_eq!(demux.stats().emitted, 2);
    }
}