dvb_si/demux.rs
1//! [`SiDemux`] — PID-filtered, version-gated SI section pump.
2//!
3//! Feed 188-byte MPEG-TS packets in with [`SiDemux::feed`]; get back an
4//! iterator of [`SectionEvent`]s — one per **changed** complete section.
5//! The demux reassembles sections per PID (via
6//! [`crate::ts::SectionReassembler`]), validates the CRC of CRC-bearing
7//! sections, and suppresses repeats through a version gate so that a steady
8//! carousel of unchanging tables produces no events after the first.
9//!
10//! Events own their bytes ([`bytes::Bytes`]) and are therefore `'static` and
11//! cheap to clone; typed views ([`SectionEvent::table_section`],
12//! [`SectionEvent::parse`]) borrow the event lazily.
13//!
14//! ```
15//! use dvb_common::Serialize;
16//! use dvb_si::demux::SiDemux;
17//! use dvb_si::tables::AnyTableSection;
18//! use dvb_si::tables::pat::{PatSection, PatEntry};
19//!
20//! const PMT_PID: u16 = 0x0100;
21//! const TS_SYNC_BYTE: u8 = 0x47;
22//! const PAYLOAD_UNIT_START_INDICATOR: u8 = 0x40;
23//! const PAT_PID_LOW_BYTE: u8 = 0x00;
24//! const PAYLOAD_ONLY: u8 = 0x10;
25//! const POINTER_FIELD_START: u8 = 0x00;
26//! const STUFFING_BYTE: u8 = 0xFF;
27//!
28//! // Build one PAT section and wrap it in a single 188-byte TS packet so the
29//! // example is self-contained. In real code `packet` comes from your source.
30//! let pat = PatSection {
31//! transport_stream_id: 1, version_number: 0, current_next_indicator: true,
32//! section_number: 0, last_section_number: 0,
33//! entries: vec![PatEntry { program_number: 1, pid: PMT_PID }],
34//! };
35//! let mut section = vec![0u8; pat.serialized_len()];
36//! pat.serialize_into(&mut section).unwrap();
37//! let mut packet = [STUFFING_BYTE; 188];
38//! packet[0] = TS_SYNC_BYTE;
39//! packet[1] = PAYLOAD_UNIT_START_INDICATOR;
40//! packet[2] = PAT_PID_LOW_BYTE;
41//! packet[3] = PAYLOAD_ONLY;
42//! packet[4] = POINTER_FIELD_START;
43//! packet[5..5 + section.len()].copy_from_slice(§ion);
44//!
45//! let mut demux = SiDemux::builder().build();
46//! let events: Vec<_> = demux.feed(&packet).collect();
47//! assert_eq!(events.len(), 1);
48//! match events[0].table_section() {
49//! Ok(AnyTableSection::PatSection(pat)) => {
50//! println!("PAT v{} on {}", events[0].version().unwrap_or(0), events[0].pid());
51//! assert_eq!(pat.entries[0].pid, PMT_PID);
52//! }
53//! other => panic!("expected PAT, got {other:?}"),
54//! }
55//! ```
56//!
57//! # Version gate
58//!
59//! Each `(pid, table_id, table_id_extension, section_number)` tuple is packed
60//! into a `u64` key. The stored value is a change detector:
61//!
62//! - **Long-form** sections (`section_syntax_indicator == 1`, plus the TOT
63//! exception) carry a 5-bit `version_number` and a trailing CRC-32 — the
64//! gate stores `(version, crc32)`. A repeat with the same version *and* CRC
65//! is suppressed.
66//! - **Short-form** sections without a CRC (TDT/RST/ST/DIT) have no version;
67//! the gate stores a CRC-32 *computed over the whole section* purely as a
68//! change hash. `table_id_extension` and `section_number` collapse to 0 in
69//! the key.
70//!
71//! # CRC policy
72//!
73//! CRC-bearing sections (every long-form section, plus the short-form TOT
74//! which uniquely carries a CRC — ETSI EN 300 468 §5.2.6) are validated
75//! before gating. Failures are dropped and counted in
76//! [`Stats::crc_failures`]; they are never emitted and never update the gate.
77//! TDT carries no CRC and is therefore never dropped for CRC reasons.
78
79use std::collections::{HashMap, VecDeque};
80
81use bytes::Bytes;
82
83use crate::pid::Pid;
84use crate::ts::{SectionReassembler, TsPacket};
85
86/// table_id of the Program Association Table (PAT) — followed for PMT PIDs.
87const PAT_TABLE_ID: u8 = 0x00;
88/// table_id of the Time Offset Table — short-form (SSI=0) yet CRC-bearing.
89const TOT_TABLE_ID: u8 = 0x73;
90/// Minimum bytes required to read a section header (table_id + length field).
91const MIN_SECTION_LEN: usize = 3;
92/// Long-form extension header bytes (after the 3-byte common header).
93const LONG_FORM_EXTRA: usize = 5;
94/// Trailing CRC-32 length.
95const CRC_LEN: usize = 4;
96
97/// One complete, changed SI section. Owns its bytes — `'static`, cheap clone.
98///
99/// A `SectionEvent` is only ever constructed for a section that
100/// (a) is at least 3 bytes long, and (b) if it carries a CRC, passed CRC
101/// validation. So [`SectionEvent::crc_ok`] is always `true` and
102/// [`SectionEvent::table_id`] never panics.
103#[derive(Debug, Clone)]
104pub struct SectionEvent {
105 pid: Pid,
106 bytes: Bytes,
107}
108
109impl SectionEvent {
110 /// PID this section was carried on.
111 #[must_use]
112 pub fn pid(&self) -> Pid {
113 self.pid
114 }
115
116 /// The full section bytes (header included, CRC included if present).
117 #[must_use]
118 pub fn bytes(&self) -> &Bytes {
119 &self.bytes
120 }
121
122 /// The `table_id` (byte 0). Never panics — events are only built for
123 /// sections of at least 3 bytes.
124 #[must_use]
125 pub fn table_id(&self) -> u8 {
126 self.bytes[0]
127 }
128
129 /// True when this section uses the long-form syntax
130 /// (`section_syntax_indicator == 1`).
131 #[must_use]
132 fn is_long_form(&self) -> bool {
133 (self.bytes[1] & 0x80) != 0
134 }
135
136 /// 5-bit `version_number`, or `None` for short-form sections (which carry
137 /// no version field). Note the TOT, despite being short-form, has no
138 /// version field either, so this is `None` for it.
139 #[must_use]
140 pub fn version(&self) -> Option<u8> {
141 if self.is_long_form() && self.bytes.len() > 5 {
142 Some((self.bytes[5] >> 1) & 0x1F)
143 } else {
144 None
145 }
146 }
147
148 /// 16-bit `table_id_extension`, or `None` for short-form sections.
149 #[must_use]
150 pub fn table_id_extension(&self) -> Option<u16> {
151 if self.is_long_form() && self.bytes.len() > 4 {
152 Some(((self.bytes[3] as u16) << 8) | self.bytes[4] as u16)
153 } else {
154 None
155 }
156 }
157
158 /// `section_number`, or `None` for short-form sections.
159 #[must_use]
160 pub fn section_number(&self) -> Option<u8> {
161 if self.is_long_form() && self.bytes.len() > 6 {
162 Some(self.bytes[6])
163 } else {
164 None
165 }
166 }
167
168 /// Always `true`: events are emitted only after CRC validation (or for
169 /// CRC-less short-form sections, where there is nothing to validate).
170 #[must_use]
171 pub fn crc_ok(&self) -> bool {
172 true
173 }
174
175 /// Typed table-section view (lazy, borrows this event's bytes).
176 ///
177 /// # Errors
178 /// Propagates the parse error from the dispatched table-section type.
179 pub fn table_section(&self) -> crate::Result<crate::tables::AnyTableSection<'_>> {
180 crate::tables::AnyTableSection::parse(&self.bytes)
181 }
182
183 /// Type-keyed view: `event.parse::<EitSection>()`.
184 ///
185 /// # Errors
186 /// Propagates `T::parse` errors.
187 pub fn parse<'s, T: crate::traits::TableDef<'s>>(&'s self) -> crate::Result<T> {
188 <T as dvb_common::Parse>::parse(&self.bytes)
189 }
190}
191
192/// Section statistics, monotonically accumulated across `feed` calls.
193#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
194pub struct Stats {
195 /// TS packets fed (every `feed` call increments this).
196 pub packets: u64,
197 /// Complete sections produced by the reassemblers (pre-gate, pre-CRC).
198 pub sections_completed: u64,
199 /// Sections emitted as events (changed, valid).
200 pub emitted: u64,
201 /// Sections suppressed by the version gate (unchanged repeats).
202 pub suppressed: u64,
203 /// Structurally invalid (sub-3-byte; cannot occur from the in-crate
204 /// reassembler) and CRC-failed sections share this counter. Sections are
205 /// dropped before emission; the gate is never updated for them.
206 pub crc_failures: u64,
207 /// TS packets that failed to parse (bad sync byte, too short).
208 pub malformed_packets: u64,
209 /// Gate entries evicted because the gate was at capacity.
210 pub gate_evictions: u64,
211}
212
213/// What the gate remembers for one key, to decide "changed?".
214#[derive(Clone, Copy, PartialEq, Eq)]
215struct GateEntry {
216 /// Long-form version_number, or 0 for short-form (unused there).
217 version: u8,
218 /// CRC-32 over the whole section — the change hash. For long-form this is
219 /// the trailing CRC; for short-form it is computed over all bytes.
220 crc: u32,
221}
222
223/// Configuration captured by [`SiDemuxBuilder`].
224struct Config {
225 follow_pat: bool,
226 emit_repeats: bool,
227 gate_capacity: usize,
228}
229
230/// Builder for [`SiDemux`].
231///
232/// Defaults: `follow_pat = true`, `dvb_si_pids = true`,
233/// `emit_repeats = false`, `gate_capacity = 65_536`.
234pub struct SiDemuxBuilder {
235 follow_pat: bool,
236 dvb_si_pids: bool,
237 emit_repeats: bool,
238 gate_capacity: usize,
239 extra_pids: Vec<Pid>,
240}
241
242impl Default for SiDemuxBuilder {
243 fn default() -> Self {
244 Self {
245 follow_pat: true,
246 dvb_si_pids: true,
247 emit_repeats: false,
248 gate_capacity: 65_536,
249 extra_pids: Vec::new(),
250 }
251 }
252}
253
254impl SiDemuxBuilder {
255 /// When `true` (default), an emitted (changed) PAT auto-adds each
256 /// programme's PMT PID to the watch set.
257 #[must_use]
258 pub fn follow_pat(mut self, on: bool) -> Self {
259 self.follow_pat = on;
260 self
261 }
262
263 /// When `true` (default), pre-populate the watch set with the well-known
264 /// DVB/MPEG-2 SI PIDs (PAT, CAT, NIT, SDT/BAT, EIT, RST, TDT/TOT, SAT).
265 #[must_use]
266 pub fn dvb_si_pids(mut self, on: bool) -> Self {
267 self.dvb_si_pids = on;
268 self
269 }
270
271 /// Add a PID to the watch set (additive; may be called repeatedly).
272 #[must_use]
273 pub fn pid(mut self, pid: Pid) -> Self {
274 self.extra_pids.push(pid);
275 self
276 }
277
278 /// When `true`, emit every complete valid section, bypassing the version
279 /// gate's suppression (the gate is still updated). Default `false`.
280 #[must_use]
281 pub fn emit_repeats(mut self, on: bool) -> Self {
282 self.emit_repeats = on;
283 self
284 }
285
286 /// Maximum number of distinct gate keys retained. At capacity the gate
287 /// FIFO-evicts the oldest key. Default 65 536.
288 #[must_use]
289 pub fn gate_capacity(mut self, cap: usize) -> Self {
290 self.gate_capacity = cap;
291 self
292 }
293
294 /// Build the [`SiDemux`].
295 #[must_use]
296 pub fn build(self) -> SiDemux {
297 let mut pids: HashMap<Pid, SectionReassembler> = HashMap::new();
298 if self.dvb_si_pids {
299 use crate::pid::well_known as wk;
300 for pid in [
301 wk::PAT,
302 wk::CAT,
303 wk::NIT,
304 wk::SDT_BAT,
305 wk::EIT,
306 wk::RST,
307 wk::TDT_TOT,
308 wk::SAT,
309 ] {
310 pids.entry(pid).or_default();
311 }
312 }
313 for p in self.extra_pids {
314 pids.entry(p).or_default();
315 }
316 SiDemux {
317 pids,
318 gate: HashMap::new(),
319 gate_order: VecDeque::new(),
320 cfg: Config {
321 follow_pat: self.follow_pat,
322 emit_repeats: self.emit_repeats,
323 gate_capacity: self.gate_capacity,
324 },
325 stats: Stats::default(),
326 scratch: Vec::new(),
327 }
328 }
329}
330
331/// PID-filtered, version-gated SI section demultiplexer.
332///
333/// See the [module docs](crate::demux) for the gate and CRC policies.
334pub struct SiDemux {
335 pids: HashMap<Pid, SectionReassembler>,
336 // TODO(perf): keys are uniform internal u64s — a non-SipHash hasher (e.g.
337 // FxHash) would shave cycles at high section rates; revisit if profiling
338 // shows it.
339 gate: HashMap<u64, GateEntry>,
340 gate_order: VecDeque<u64>,
341 cfg: Config,
342 stats: Stats,
343 scratch: Vec<SectionEvent>,
344}
345
346impl SiDemux {
347 /// Start building a demux. See [`SiDemuxBuilder`] for defaults.
348 #[must_use]
349 pub fn builder() -> SiDemuxBuilder {
350 SiDemuxBuilder::default()
351 }
352
353 /// Accumulated statistics.
354 #[must_use]
355 pub fn stats(&self) -> Stats {
356 self.stats
357 }
358
359 /// Feed one 188-byte TS packet. Infallible: malformed packets are counted
360 /// in [`Stats::malformed_packets`], not raised. Returns an iterator over
361 /// the changed sections this packet completed.
362 pub fn feed(&mut self, packet: &[u8]) -> impl Iterator<Item = SectionEvent> + '_ {
363 self.scratch.clear();
364 self.stats.packets += 1;
365
366 match TsPacket::parse(packet) {
367 Err(_) => self.stats.malformed_packets += 1,
368 Ok(ts) => {
369 let pid = Pid::new(ts.header.pid);
370 // Cheap miss: one map lookup for non-watched PIDs.
371 if self.pids.contains_key(&pid) {
372 let payload = ts.payload.unwrap_or(&[]);
373 // Feed the reassembler; the borrow is released before
374 // `consider` (which may insert new PMT PIDs into the map).
375 self.pids
376 .get_mut(&pid)
377 .expect("checked above")
378 .feed(payload, ts.header.pusi);
379 while let Some(section) = self
380 .pids
381 .get_mut(&pid)
382 .and_then(SectionReassembler::pop_section)
383 {
384 self.stats.sections_completed += 1;
385 self.consider(pid, section);
386 }
387 }
388 }
389 }
390
391 self.scratch.drain(..)
392 }
393
394 /// Gate + CRC + (maybe) push to scratch. Handles PAT-follow on emit.
395 fn consider(&mut self, pid: Pid, section: Bytes) {
396 // Guard: sub-3-byte sections cannot carry a header. The reassembler
397 // should never emit one (it needs >= 3 bytes to know `expected`), but
398 // guard defensively and count it as a CRC failure bucket — it is a
399 // structurally invalid section, dropped without emission.
400 if section.len() < MIN_SECTION_LEN {
401 self.stats.crc_failures += 1;
402 return;
403 }
404
405 let table_id = section[0];
406 let long_form = (section[1] & 0x80) != 0;
407 // The TOT is short-form by its SSI bit but uniquely carries a CRC.
408 let has_crc = long_form || table_id == TOT_TABLE_ID;
409
410 // CRC policy: validate CRC-bearing sections before gating.
411 if has_crc {
412 if section.len() < CRC_LEN {
413 self.stats.crc_failures += 1;
414 return;
415 }
416 let covered = §ion[..section.len() - CRC_LEN];
417 let declared = u32::from_be_bytes([
418 section[section.len() - 4],
419 section[section.len() - 3],
420 section[section.len() - 2],
421 section[section.len() - 1],
422 ]);
423 let computed = dvb_common::crc32_mpeg2::compute(covered);
424 if computed != declared {
425 self.stats.crc_failures += 1;
426 return;
427 }
428 }
429
430 // Build the gate key and change detector.
431 let (ext, section_number, version, change_crc) =
432 if long_form && section.len() >= MIN_SECTION_LEN + LONG_FORM_EXTRA + CRC_LEN {
433 let ext = ((section[3] as u16) << 8) | section[4] as u16;
434 let version = (section[5] >> 1) & 0x1F;
435 let section_number = section[6];
436 // For long-form the trailing CRC already uniquely fingerprints the
437 // payload; reuse it as the change hash.
438 let crc = u32::from_be_bytes([
439 section[section.len() - 4],
440 section[section.len() - 3],
441 section[section.len() - 2],
442 section[section.len() - 1],
443 ]);
444 (ext, section_number, version, crc)
445 } else {
446 // Short-form (incl. TOT and any malformed long-form that slipped
447 // the size check above): no version, ext/section_number = 0,
448 // change detector is a CRC over all the section bytes.
449 (0u16, 0u8, 0u8, dvb_common::crc32_mpeg2::compute(§ion))
450 };
451
452 let key = (pid.value() as u64)
453 | ((table_id as u64) << 13)
454 | ((ext as u64) << 21)
455 | ((section_number as u64) << 37);
456
457 let entry = GateEntry {
458 version,
459 crc: change_crc,
460 };
461
462 let changed = match self.gate.get(&key) {
463 Some(prev) => *prev != entry,
464 None => true,
465 };
466
467 // Update the gate (FIFO-evict at capacity for newly-seen keys).
468 if !self.gate.contains_key(&key) {
469 if self.gate.len() >= self.cfg.gate_capacity {
470 if let Some(old) = self.gate_order.pop_front() {
471 self.gate.remove(&old);
472 self.stats.gate_evictions += 1;
473 }
474 }
475 self.gate_order.push_back(key);
476 }
477 self.gate.insert(key, entry);
478
479 if changed || self.cfg.emit_repeats {
480 let event = SectionEvent {
481 pid,
482 bytes: section,
483 };
484 // PAT-follow happens on an emitted (changed) PAT only.
485 if self.cfg.follow_pat && changed && table_id == PAT_TABLE_ID {
486 self.follow_pat(&event);
487 }
488 self.stats.emitted += 1;
489 self.scratch.push(event);
490 } else {
491 self.stats.suppressed += 1;
492 }
493 }
494
495 /// Parse the PAT and register each programme's PMT PID with a fresh
496 /// reassembler. Parse failures are silently ignored — a malformed PAT that
497 /// nonetheless passed CRC is implausible, and we never panic.
498 fn follow_pat(&mut self, event: &SectionEvent) {
499 use crate::tables::pat::PatSection;
500 use dvb_common::Parse;
501 if let Ok(pat) = PatSection::parse(&event.bytes) {
502 for entry in &pat.entries {
503 if entry.program_number != 0 {
504 self.pids.entry(Pid::new(entry.pid)).or_default();
505 }
506 }
507 }
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514 use crate::ts::{TsHeader, TS_PACKET_SIZE};
515
516 /// Wrap section bytes in a single PUSI TS packet on `pid`, with a
517 /// pointer_field of 0 and 0xFF stuffing tail. Section must fit one packet.
518 fn ts_packet(pid: u16, section: &[u8]) -> [u8; TS_PACKET_SIZE] {
519 let mut pkt = [0xFFu8; TS_PACKET_SIZE];
520 let header = TsHeader {
521 tei: false,
522 pusi: true,
523 pid,
524 scrambling: 0,
525 has_adaptation: false,
526 has_payload: true,
527 continuity_counter: 0,
528 };
529 header.serialize_into(&mut pkt);
530 pkt[4] = 0x00; // pointer_field
531 let start = 5;
532 assert!(start + section.len() <= TS_PACKET_SIZE, "section too big");
533 pkt[start..start + section.len()].copy_from_slice(section);
534 pkt
535 }
536
537 /// Build a long-form section with a correct trailing CRC-32.
538 fn long_section(
539 table_id: u8,
540 ext: u16,
541 version: u8,
542 section_number: u8,
543 payload: &[u8],
544 ) -> Vec<u8> {
545 let section_length = (LONG_FORM_EXTRA + payload.len() + CRC_LEN) as u16;
546 let mut v = vec![
547 table_id,
548 0x80 | 0x30 | ((section_length >> 8) as u8 & 0x0F),
549 (section_length & 0xFF) as u8,
550 (ext >> 8) as u8,
551 (ext & 0xFF) as u8,
552 0xC0 | ((version & 0x1F) << 1) | 0x01,
553 section_number,
554 section_number, // last_section_number
555 ];
556 v.extend_from_slice(payload);
557 let crc = dvb_common::crc32_mpeg2::compute(&v);
558 v.extend_from_slice(&crc.to_be_bytes());
559 v
560 }
561
562 /// Build a PAT section (real CRC) mapping (program_number, pmt_pid) pairs.
563 fn pat_section(tsid: u16, version: u8, entries: &[(u16, u16)]) -> Vec<u8> {
564 let mut body = Vec::new();
565 for &(pn, pid) in entries {
566 body.extend_from_slice(&pn.to_be_bytes());
567 body.push(0xE0 | ((pid >> 8) as u8 & 0x1F));
568 body.push((pid & 0xFF) as u8);
569 }
570 long_section(0x00, tsid, version, 0, &body)
571 }
572
573 /// Build a PMT section (real CRC). One stream entry.
574 fn pmt_section(program_number: u16, version: u8, pcr_pid: u16) -> Vec<u8> {
575 // pcr_pid(2) + program_info_length(2)=0 + one stream(5):
576 // stream type 0x02 (video), elementary_pid = pcr_pid+1, es_info_len 0.
577 let body = [
578 0xE0 | ((pcr_pid >> 8) as u8 & 0x1F),
579 (pcr_pid & 0xFF) as u8,
580 0xF0,
581 0x00,
582 0x02,
583 0xE0 | (((pcr_pid + 1) >> 8) as u8 & 0x1F),
584 ((pcr_pid + 1) & 0xFF) as u8,
585 0xF0,
586 0x00,
587 ];
588 long_section(0x02, program_number, version, 0, &body)
589 }
590
591 #[test]
592 fn pat_emits_once_suppresses_repeat_reemits_on_version_change() {
593 let mut demux = SiDemux::builder().build();
594
595 let pat_v0 = pat_section(0x0001, 0, &[(1, 0x0100)]);
596 let pat_v1 = pat_section(0x0001, 1, &[(1, 0x0100)]);
597
598 let pkt_v0 = ts_packet(0x0000, &pat_v0);
599 let pkt_v1 = ts_packet(0x0000, &pat_v1);
600
601 let n0: Vec<_> = demux.feed(&pkt_v0).collect();
602 assert_eq!(n0.len(), 1, "PAT v0 should emit one event");
603 assert_eq!(n0[0].table_id(), 0x00);
604 assert_eq!(n0[0].version(), Some(0));
605
606 let n1: Vec<_> = demux.feed(&pkt_v0).collect();
607 assert_eq!(n1.len(), 0, "repeat PAT should be suppressed");
608
609 let n2: Vec<_> = demux.feed(&pkt_v1).collect();
610 assert_eq!(n2.len(), 1, "PAT v1 should re-emit");
611 assert_eq!(n2[0].version(), Some(1));
612
613 let s = demux.stats();
614 assert_eq!(s.sections_completed, 3);
615 assert_eq!(s.emitted, 2);
616 assert_eq!(s.suppressed, 1);
617 assert_eq!(s.crc_failures, 0);
618 }
619
620 #[test]
621 fn follow_pat_registers_pmt_pid_and_emits_typed_pmt() {
622 use crate::tables::AnyTableSection;
623 let mut demux = SiDemux::builder().build();
624
625 // PAT maps programme 1 -> PMT on PID 0x0100.
626 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
627 let pat_evts: Vec<_> = demux.feed(&ts_packet(0x0000, &pat)).collect();
628 assert_eq!(pat_evts.len(), 1);
629
630 // Before follow, a PMT packet on 0x0100 would be ignored. After the
631 // PAT was emitted, 0x0100 is watched.
632 let pmt = pmt_section(1, 0, 0x0100);
633 let pmt_evts: Vec<_> = demux.feed(&ts_packet(0x0100, &pmt)).collect();
634 assert_eq!(pmt_evts.len(), 1, "PMT on the followed PID should emit");
635 assert_eq!(pmt_evts[0].pid(), Pid::new(0x0100));
636 match pmt_evts[0].table_section().unwrap() {
637 AnyTableSection::PmtSection(p) => assert_eq!(p.program_number, 1),
638 other => panic!("expected PmtSection, got {other:?}"),
639 }
640 }
641
642 #[test]
643 fn corrupted_crc_sdt_dropped_and_counted() {
644 let mut demux = SiDemux::builder().build();
645 // SDT actual = table_id 0x42, carried on SDT_BAT pid 0x0011.
646 let mut sdt = long_section(0x42, 0x0001, 0, 0, &[0xDE, 0xAD, 0xBE, 0xEF]);
647 // Corrupt a payload byte AFTER the CRC was computed.
648 sdt[8] ^= 0xFF;
649 let evts: Vec<_> = demux.feed(&ts_packet(0x0011, &sdt)).collect();
650 assert_eq!(evts.len(), 0, "corrupted SDT must not emit");
651 let s = demux.stats();
652 assert_eq!(s.crc_failures, 1);
653 assert_eq!(s.emitted, 0);
654 assert_eq!(s.sections_completed, 1);
655 }
656
657 #[test]
658 fn gate_capacity_evicts_fifo_and_reemits() {
659 let mut demux = SiDemux::builder().gate_capacity(2).build();
660
661 // Three distinct EIT sections (table_id 0x4E) by table_id_extension,
662 // all on the EIT pid 0x0012.
663 let a = long_section(0x4E, 0x0001, 0, 0, &[0x01]);
664 let b = long_section(0x4E, 0x0002, 0, 0, &[0x02]);
665 let c = long_section(0x4E, 0x0003, 0, 0, &[0x03]);
666
667 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
668 assert_eq!(demux.feed(&ts_packet(0x0012, &b)).count(), 1);
669 // Inserting c evicts a (the oldest).
670 assert_eq!(demux.feed(&ts_packet(0x0012, &c)).count(), 1);
671 assert_eq!(demux.stats().gate_evictions, 1);
672
673 // a was evicted -> re-feeding it re-emits (treated as newly seen).
674 assert_eq!(demux.feed(&ts_packet(0x0012, &a)).count(), 1);
675 }
676
677 #[test]
678 fn garbage_packet_counted_no_panic() {
679 let mut demux = SiDemux::builder().build();
680 let garbage = [0x00u8; TS_PACKET_SIZE]; // bad sync byte
681 let evts: Vec<_> = demux.feed(&garbage).collect();
682 assert_eq!(evts.len(), 0);
683 assert_eq!(demux.stats().malformed_packets, 1);
684 assert_eq!(demux.stats().packets, 1);
685 }
686
687 #[test]
688 fn emit_repeats_bypasses_suppression() {
689 let mut demux = SiDemux::builder().emit_repeats(true).build();
690 let pat = pat_section(0x0001, 0, &[(1, 0x0100)]);
691 let pkt = ts_packet(0x0000, &pat);
692 assert_eq!(demux.feed(&pkt).count(), 1);
693 assert_eq!(demux.feed(&pkt).count(), 1, "emit_repeats re-emits");
694 assert_eq!(demux.stats().suppressed, 0);
695 assert_eq!(demux.stats().emitted, 2);
696 }
697}