dvb_si/demux.rs
1//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
2//!
3//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
4//! iterator of [`SectionEvent`]s — one per **changed** complete section.
5//! The demux reassembles sections per PID (via
6//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
7//! sections, and suppresses repeats through a version gate so that a steady
8//! carousel of unchanging tables produces no events after the first.
9//!
10//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
11//! cheap to clone; typed views ([`SectionEvent::table`],
12//! [`SectionEvent::parse`]) borrow the event lazily.
13//!
14//! ```
15//! use dvb_common::Serialize;
16//! use dvb_si::demux::SiDemux;
17//! use dvb_si::tables::AnyTable;
18//! use dvb_si::tables::pat::{Pat, PatEntry};
19//!
20//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
21//! // example is self-contained. In real code `packet` comes from your source.
22//! let pat = Pat {
23//! transport_stream_id: 1, version_number: 0, current_next_indicator: true,
24//! section_number: 0, last_section_number: 0,
25//! entries: vec![PatEntry { program_number: 1, pid: 0x0100 }],
26//! };
27//! let mut section = vec![0u8; pat.serialized_len()];
28//! pat.serialize_into(&mut section).unwrap();
29//! let mut packet = [0xFFu8; 188];
30//! packet[0] = 0x47; // sync
31//! packet[1] = 0x40; // PUSI=1, PID hi=0
32//! packet[2] = 0x00; // PID lo=0 (PAT)
33//! packet[3] = 0x10; // payload only
34//! packet[4] = 0x00; // pointer_field
35//! packet[5..5 + section.len()].copy_from_slice(§ion);
36//!
37//! let mut demux = SiDemux::builder().build();
38//! let events: Vec<_> = demux.feed(&packet).collect();
39//! assert_eq!(events.len(), 1);
40//! match events[0].table() {
41//! Ok(AnyTable::Pat(pat)) => {
42//! println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
43//! assert_eq!(pat.entries[0].pid, 0x0100);
44//! }
45//! other => panic!("expected PAT, got {other:?}"),
46//! }
47//! ```
48//!
49//! # Version gate
50//!
51//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
52//! into a `u64` key. The stored value is a change detector:
53//!
54//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
55//! exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
56//! gate stores `(version, crc32)`. A repeat with the same version *and* CRC
57//! is suppressed.
58//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
59//! the gate stores a CRC-32 *computed over the whole section* purely as a
60//! change hash. `table_id_extension` and `section_number` collapse to 0 in
61//! the key.
62//!
63//! # CRC policy
64//!
65//! CRC-bearing sections (every long-form section, plus the short-form TOT
66//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
67//! before gating. Failures are dropped and counted in
68//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
69//! TDT carries no CRC and is therefore never dropped for CRC reasons.
70
71use std::collections::{HashMap, VecDeque};
72
73use bytes::Bytes;
74
75use crate::pid::Pid;
76use crate::ts::{SectionReassembler, TsPacket};
77
78/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
79const PAT_TABLE_ID: u8 = 0x00;
80/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
81const TOT_TABLE_ID: u8 = 0x73;
82/// Minimum bytes required to read a section header (table_id + length field).
83const MIN_SECTION_LEN: usize = 3;
84/// Long-form extension header bytes (after the 3-byte common header).
85const LONG_FORM_EXTRA: usize = 5;
86/// Trailing CRC-32 length.
87const CRC_LEN: usize = 4;
88
89/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
90///
91/// A `SectionEvent` is only ever constructed for a section that
92/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
93/// validation. So [`SectionEvent::crc_ok`] is always `true` and
94/// [`SectionEvent::table_id`] never panics.
95#[derive(Debug, Clone)]
96pub struct SectionEvent {
97 pid: Pid,
98 bytes: Bytes,
99}
100
101impl SectionEvent {
102 /// PID this section was carried on.
103 #[must_use]
104 pub fn pid(&self) -> Pid {
105 self.pid
106 }
107
108 /// The full section bytes (header included, CRC included if present).
109 #[must_use]
110 pub fn bytes(&self) -> &Bytes {
111 &self.bytes
112 }
113
114 /// The `table_id` (byte 0). Never panics — events are only built for
115 /// sections of at least 3 bytes.
116 #[must_use]
117 pub fn table_id(&self) -> u8 {
118 self.bytes[0]
119 }
120
121 /// True when this section uses the long-form syntax
122 /// (`section_syntax_indicator == 1`).
123 #[must_use]
124 fn is_long_form(&self) -> bool {
125 (self.bytes[1] & 0x80) != 0
126 }
127
128 /// 5-bit `version_number`, or `None` for short-form sections (which carry
129 /// no version field). Note the TOT, despite being short-form, has no
130 /// version field either, so this is `None` for it.
131 #[must_use]
132 pub fn version(&self) -> Option<u8> {
133 if self.is_long_form() && self.bytes.len() > 5 {
134 Some((self.bytes[5] >> 1) & 0x1F)
135 } else {
136 None
137 }
138 }
139
140 /// 16-bit `table_id_extension`, or `None` for short-form sections.
141 #[must_use]
142 pub fn table_id_extension(&self) -> Option<u16> {
143 if self.is_long_form() && self.bytes.len() > 4 {
144 Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
145 } else {
146 None
147 }
148 }
149
150 /// `section_number`, or `None` for short-form sections.
151 #[must_use]
152 pub fn section_number(&self) -> Option<u8> {
153 if self.is_long_form() && self.bytes.len() > 6 {
154 Some(self.bytes[6])
155 } else {
156 None
157 }
158 }
159
160 /// Always `true`: events are emitted only after CRC validation (or for
161 /// CRC-less short-form sections, where there is nothing to validate).
162 #[must_use]
163 pub fn crc_ok(&self) -> bool {
164 true
165 }
166
167 /// Typed view (lazy, borrows this event's bytes).
168 ///
169 /// # Errors
170 /// Propagates the parse error from the dispatched table type.
171 pub fn table(&self) -> crate::Result<crate::tables::AnyTable<'_>> {
172 crate::tables::AnyTable::parse(&self.bytes)
173 }
174
175 /// Type-keyed view: `event.parse::<Eit>()`.
176 ///
177 /// # Errors
178 /// Propagates `T::parse` errors.
179 pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
180 <T as dvb_common::Parse>::parse(&self.bytes)
181 }
182}
183
184/// Section statistics, monotonically accumulated across `feed` calls.
185#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
186pub struct Stats {
187 /// TS packets fed (every `feed` call increments this).
188 pub packets: u64,
189 /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
190 pub sections_completed: u64,
191 /// Sections emitted as events (changed, valid).
192 pub emitted: u64,
193 /// Sections suppressed by the version gate (unchanged repeats).
194 pub suppressed: u64,
195 /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
196 /// reassembler) and CRC-failed sections share this counter. Sections are
197 /// dropped before emission; the gate is never updated for them.
198 pub crc_failures: u64,
199 /// TS packets that failed to parse (bad sync byte, too short).
200 pub malformed_packets: u64,
201 /// Gate entries evicted because the gate was at capacity.
202 pub gate_evictions: u64,
203}
204
205/// What the gate remembers for one key, to decide "changed?".
206#[derive(Clone, Copy, PartialEq, Eq)]
207struct GateEntry {
208 /// Long-form version_number, or 0 for short-form (unused there).
209 version: u8,
210 /// CRC-32 over the whole section — the change hash. For long-form this is
211 /// the trailing CRC; for short-form it is computed over all bytes.
212 crc: u32,
213}
214
215/// Configuration captured by [`SiDemuxBuilder`].
216struct Config {
217 follow_pat: bool,
218 emit_repeats: bool,
219 gate_capacity: usize,
220}
221
222/// Builder for [`SiDemux`].
223///
224/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
225/// `emit_repeats = false`, `gate_capacity = 65_536`.
226pub struct SiDemuxBuilder {
227 follow_pat: bool,
228 dvb_si_pids: bool,
229 emit_repeats: bool,
230 gate_capacity: usize,
231 extra_pids: Vec<Pid>,
232}
233
234impl Default for SiDemuxBuilder {
235 fn default() -> Self {
236 Self {
237 follow_pat: true,
238 dvb_si_pids: true,
239 emit_repeats: false,
240 gate_capacity: 65_536,
241 extra_pids: Vec::new(),
242 }
243 }
244}
245
246impl SiDemuxBuilder {
247 /// When `true` (default), an emitted (changed) PAT auto-adds each
248 /// programme's PMT PID to the watch set.
249 #[must_use]
250 pub fn follow_pat(mut self, on: bool) -> Self {
251 self.follow_pat = on;
252 self
253 }
254
255 /// When `true` (default), pre-populate the watch set with the well-known
256 /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
257 #[must_use]
258 pub fn dvb_si_pids(mut self, on: bool) -> Self {
259 self.dvb_si_pids = on;
260 self
261 }
262
263 /// Add a PID to the watch set (additive; may be called repeatedly).
264 #[must_use]
265 pub fn pid(mut self, pid: Pid) -> Self {
266 self.extra_pids.push(pid);
267 self
268 }
269
270 /// When `true`, emit every complete valid section, bypassing the version
271 /// gate's suppression (the gate is still updated). Default `false`.
272 #[must_use]
273 pub fn emit_repeats(mut self, on: bool) -> Self {
274 self.emit_repeats = on;
275 self
276 }
277
278 /// Maximum number of distinct gate keys retained. At capacity the gate
279 /// FIFO-evicts the oldest key. Default 65 536.
280 #[must_use]
281 pub fn gate_capacity(mut self, cap: usize) -> Self {
282 self.gate_capacity = cap;
283 self
284 }
285
286 /// Build the [`SiDemux`].
287 #[must_use]
288 pub fn build(self) -> SiDemux {
289 let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
290 if self.dvb_si_pids {
291 use crate::pid::well_known as wk;
292 for pid in [
293 wk::PAT,
294 wk::CAT,
295 wk::NIT,
296 wk::SDT_BAT,
297 wk::EIT,
298 wk::RST,
299 wk::TDT_TOT,
300 wk::SAT,
301 ] {
302 pids.entry(pid).or_default();
303 }
304 }
305 for p in self.extra_pids {
306 pids.entry(p).or_default();
307 }
308 SiDemux {
309 pids,
310 gate: HashMap::new(),
311 gate_order: VecDeque::new(),
312 cfg: Config {
313 follow_pat: self.follow_pat,
314 emit_repeats: self.emit_repeats,
315 gate_capacity: self.gate_capacity,
316 },
317 stats: Stats::default(),
318 scratch: Vec::new(),
319 }
320 }
321}
322
323/// PID-filtered, version-gated SI section demultiplexer.
324///
325/// See the [module docs](crate::demux) for the gate and CRC policies.
326pub struct SiDemux {
327 pids: HashMap<Pid, SectionReassembler>,
328 // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
329 // FxHash) would shave cycles at high section rates; revisit if profiling
330 // shows it.
331 gate: HashMap<u64, GateEntry>,
332 gate_order: VecDeque<u64>,
333 cfg: Config,
334 stats: Stats,
335 scratch: Vec<SectionEvent>,
336}
337
338impl SiDemux {
339 /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
340 #[must_use]
341 pub fn builder() -> SiDemuxBuilder {
342 SiDemuxBuilder::default()
343 }
344
345 /// Accumulated statistics.
346 #[must_use]
347 pub fn stats(&self) -> Stats {
348 self.stats
349 }
350
351 /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
352 /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
353 /// the changed sections this packet completed.
354 pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
355 self.scratch.clear();
356 self.stats.packets += 1;
357
358 match TsPacket::parse(packet) {
359 Err(_) => self.stats.malformed_packets += 1,
360 Ok(ts) => {
361 let pid = Pid::new(ts.header.pid);
362 // Cheap miss: one map lookup for non-watched PIDs.
363 if self.pids.contains_key(&pid) {
364 let payload = ts.payload.unwrap_or(&[]);
365 // Feed the reassembler; the borrow is released before
366 // `consider` (which may insert new PMT PIDs into the map).
367 self.pids
368 .get_mut(&pid)
369 .expect("checked above")
370 .feed(payload, ts.header.pusi);
371 while let Some(section) = self
372 .pids
373 .get_mut(&pid)
374 .and_then(SectionReassembler::pop_section)
375 {
376 self.stats.sections_completed += 1;
377 self.consider(pid, section);
378 }
379 }
380 }
381 }
382
383 self.scratch.drain(..)
384 }
385
386 /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
387 fn consider(&mut self, pid: Pid, section: Bytes) {
388 // Guard: sub-3-byte sections cannot carry a header. The reassembler
389 // should never emit one (it needs >= 3 bytes to know `expected`), but
390 // guard defensively and count it as a CRC failure bucket — it is a
391 // structurally invalid section, dropped without emission.
392 if section.len() < MIN_SECTION_LEN {
393 self.stats.crc_failures += 1;
394 return;
395 }
396
397 let table_id = section[0];
398 let long_form = (section[1] & 0x80) != 0;
399 // The TOT is short-form by its SSI bit but uniquely carries a CRC.
400 let has_crc = long_form || table_id == TOT_TABLE_ID;
401
402 // CRC policy: validate CRC-bearing sections before gating.
403 if has_crc {
404 if section.len() < CRC_LEN {
405 self.stats.crc_failures += 1;
406 return;
407 }
408 let covered = §ion[..section.len() - CRC_LEN];
409 let declared = u32::from_be_bytes([
410 section[section.len() - 4],
411 section[section.len() - 3],
412 section[section.len() - 2],
413 section[section.len() - 1],
414 ]);
415 let computed = dvb_common::crc32_mpeg2::compute(covered);
416 if computed != declared {
417 self.stats.crc_failures += 1;
418 return;
419 }
420 }
421
422 // Build the gate key and change detector.
423 let (ext, section_number, version, change_crc) =
424 if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
425 let ext = ((section[3] as u16) << 8) | section[4] as u16;
426 let version = (section[5] >> 1) & 0x1F;
427 let section_number = section[6];
428 // For long-form the trailing CRC already uniquely fingerprints the
429 // payload; reuse it as the change hash.
430 let crc = u32::from_be_bytes([
431 section[section.len() - 4],
432 section[section.len() - 3],
433 section[section.len() - 2],
434 section[section.len() - 1],
435 ]);
436 (ext, section_number, version, crc)
437 } else {
438 // Short-form (incl. TOT and any malformed long-form that slipped
439 // the size check above): no version, ext/section_number = 0,
440 // change detector is a CRC over all the section bytes.
441 (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(§ion))
442 };
443
444 let key = (pid.value() as u64)
445 | ((table_id as u64) << 13)
446 | ((ext as u64) << 21)
447 | ((section_number as u64) << 37);
448
449 let entry = GateEntry {
450 version,
451 crc: change_crc,
452 };
453
454 let changed = match self.gate.get(&key) {
455 Some(prev) => *prev != entry,
456 None => true,
457 };
458
459 // Update the gate (FIFO-evict at capacity for newly-seen keys).
460 if !self.gate.contains_key(&key) {
461 if self.gate.len() >= self.cfg.gate_capacity {
462 if let Some(old) = self.gate_order.pop_front() {
463 self.gate.remove(&old);
464 self.stats.gate_evictions += 1;
465 }
466 }
467 self.gate_order.push_back(key);
468 }
469 self.gate.insert(key, entry);
470
471 if changed || self.cfg.emit_repeats {
472 let event = SectionEvent {
473 pid,
474 bytes: section,
475 };
476 // PAT-follow happens on an emitted (changed) PAT only.
477 if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
478 self.follow_pat(&event);
479 }
480 self.stats.emitted += 1;
481 self.scratch.push(event);
482 } else {
483 self.stats.suppressed += 1;
484 }
485 }
486
487 /// Parse the PAT and register each programme's PMT PID with a fresh
488 /// reassembler. Parse failures are silently ignored — a malformed PAT that
489 /// nonetheless passed CRC is implausible, and we never panic.
490 fn follow_pat(&mut self, event: &SectionEvent) {
491 use crate::tables::pat::Pat;
492 use dvb_common::Parse;
493 if let Ok(pat) = Pat::parse(&event.bytes) {
494 for entry in &pat.entries {
495 if entry.program_number != 0 {
496 self.pids.entry(Pid::new(entry.pid)).or_default();
497 }
498 }
499 }
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use crate::ts::{TsHeader, TS_PACKET_SIZE};
507
508 /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
509 /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
510 fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
511 let mut pkt = [0xFFu8; TS_PACKET_SIZE];
512 let header = TsHeader {
513 tei: false,
514 pusi: true,
515 pid,
516 scrambling: 0,
517 has_adaptation: false,
518 has_payload: true,
519 continuity_counter: 0,
520 };
521 header.serialize_into(&mut pkt);
522 pkt[4] = 0x00; // pointer_field
523 let start = 5;
524 assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
525 pkt[start..start + section.len()].copy_from_slice(section);
526 pkt
527 }
528
529 /// Build a long-form section with a correct trailing CRC-32.
530 fn long_section(
531 table_id: u8,
532 ext: u16,
533 version: u8,
534 section_number: u8,
535 payload: &[u8],
536 ) -> Vec<u8> {
537 let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
538 let mut v = vec![
539 table_id,
540 0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
541 (section_length & 0xFF) as u8,
542 (ext >> 8) as u8,
543 (ext & 0xFF) as u8,
544 0xC0 | ((version & 0x1F) << 1) | 0x01,
545 section_number,
546 section_number, // last_section_number
547 ];
548 v.extend_from_slice(payload);
549 let crc = dvb_common::crc32_mpeg2::compute(&v);
550 v.extend_from_slice(&crc.to_be_bytes());
551 v
552 }
553
554 /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
555 fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
556 let mut body = Vec::new();
557 for &(pn, pid) in entries {
558 body.extend_from_slice(&pn.to_be_bytes());
559 body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
560 body.push((pid & 0xFF) as u8);
561 }
562 long_section(0x00, tsid, version, 0, &body)
563 }
564
565 /// Build a PMT section (real CRC). One stream entry.
566 fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
567 // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
568 // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
569 let body = [
570 0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
571 (pcr_pid & 0xFF) as u8,
572 0xF0,
573 0x00,
574 0x02,
575 0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
576 ((pcr_pid + 1) & 0xFF) as u8,
577 0xF0,
578 0x00,
579 ];
580 long_section(0x02, program_number, version, 0, &body)
581 }
582
583 #[test]
584 fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
585 let mut demux = SiDemux::builder().build();
586
587 let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
588 let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
589
590 let pkt_v0 = ts_packet(0x0000, &pat_v0);
591 let pkt_v1 = ts_packet(0x0000, &pat_v1);
592
593 let n0: Vec<_> = demux.feed(&pkt_v0).collect();
594 assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
595 assert_eq!(n0[0].table_id(), 0x00);
596 assert_eq!(n0[0].version(), Some(0));
597
598 let n1: Vec<_> = demux.feed(&pkt_v0).collect();
599 assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
600
601 let n2: Vec<_> = demux.feed(&pkt_v1).collect();
602 assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
603 assert_eq!(n2[0].version(), Some(1));
604
605 let s = demux.stats();
606 assert_eq!(s.sections_completed, 3);
607 assert_eq!(s.emitted, 2);
608 assert_eq!(s.suppressed, 1);
609 assert_eq!(s.crc_failures, 0);
610 }
611
612 #[test]
613 fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
614 use crate::tables::AnyTable;
615 let mut demux = SiDemux::builder().build();
616
617 // PAT maps programme 1 -> PMT on PID 0x0100.
618 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
619 let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
620 assert_eq!(pat_evts.len(), 1);
621
622 // Before follow, a PMT packet on 0x0100 would be ignored. After the
623 // PAT was emitted, 0x0100 is watched.
624 let pmt = pmt_section(1, 0, 0x0100);
625 let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
626 assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
627 assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
628 match pmt_evts[0].table().unwrap() {
629 AnyTable::Pmt(p) => assert_eq!(p.program_number, 1),
630 other => panic!("expected Pmt, got {other:?}"),
631 }
632 }
633
634 #[test]
635 fn corrupted_crc_sdt_dropped_and_counted() {
636 let mut demux = SiDemux::builder().build();
637 // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
638 let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
639 // Corrupt a payload byte AFTER the CRC was computed.
640 sdt[8] ^= 0xFF;
641 let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
642 assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
643 let s = demux.stats();
644 assert_eq!(s.crc_failures, 1);
645 assert_eq!(s.emitted, 0);
646 assert_eq!(s.sections_completed, 1);
647 }
648
649 #[test]
650 fn gate_capacity_evicts_fifo_and_reemits() {
651 let mut demux = SiDemux::builder().gate_capacity(2).build();
652
653 // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
654 // all on the EIT pid 0x0012.
655 let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
656 let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
657 let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
658
659 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
660 assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
661 // Inserting c evicts a (the oldest).
662 assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
663 assert_eq!(demux.stats().gate_evictions, 1);
664
665 // a was evicted -> re-feeding it re-emits (treated as newly seen).
666 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
667 }
668
669 #[test]
670 fn garbage_packet_counted_no_panic() {
671 let mut demux = SiDemux::builder().build();
672 let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
673 let evts: Vec<_> = demux.feed(&garbage).collect();
674 assert_eq!(evts.len(), 0);
675 assert_eq!(demux.stats().malformed_packets, 1);
676 assert_eq!(demux.stats().packets, 1);
677 }
678
679 #[test]
680 fn emit_repeats_bypasses_suppression() {
681 let mut demux = SiDemux::builder().emit_repeats(true).build();
682 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
683 let pkt = ts_packet(0x0000, &pat);
684 assert_eq!(demux.feed(&pkt).count(), 1);
685 assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
686 assert_eq!(demux.stats().suppressed, 0);
687 assert_eq!(demux.stats().emitted, 2);
688 }
689}