Skip to main content

dvb_si/
mux.rs

1//! Section → TS packetizer (the byte-exact inverse of
2//! [`SectionReassembler::feed`](crate::ts::SectionReassembler::feed)).
3//!
4//! Per the PSI carriage rules of ISO/IEC 13818-1:2007 §2.4.4
5//! (`docs/iso_13818_1_systems.md`): sections are packed into 188-byte packets
6//! with a `pointer_field` where sections begin, concatenated contiguously, and
7//! 0xFF-stuffed at the batch tail.
8
9use core::time::Duration;
10
11use crate::pid::well_known;
12use crate::ts::{TsHeader, CC_MASK, SECTION_LENGTH_HI_MASK, TS_PACKET_SIZE};
13
14/// Maximum data bytes in a PUSI=1 packet (188 − 4 header − 1 pointer_field). §2.4.4.
15const PUSI_PAYLOAD_CAP: usize = 183;
16/// Maximum data bytes in a continuation packet (188 − 4 header). §2.4.4.
17const PAYLOAD_CAP: usize = 184;
18/// Stuffing byte for unused TS payload bytes (ISO/IEC 13818-1 §2.4.4).
19const STUFFING_BYTE: u8 = 0xFF;
20
21/// Packetizes PSI/SI sections into 188-byte TS packets.
22///
23/// This is the byte-exact inverse of
24/// [`SectionReassembler::feed`](crate::ts::SectionReassembler::feed): packets
25/// produced here, when fed back through the reassembler, yield the same
26/// sections in order.
27///
28/// ISO/IEC 13818-1:2007 §2.4.4 (`docs/iso_13818_1_systems.md`).
29pub struct SectionPacketizer {
30    pid: u16,
31    continuity_counter: u8,
32}
33
34impl SectionPacketizer {
35    /// Start a packetizer for `pid` with continuity_counter = 0.
36    pub fn new(pid: u16) -> Self {
37        Self {
38            pid,
39            continuity_counter: 0,
40        }
41    }
42
43    /// Start at a specific continuity_counter (0..=15) — for resuming a stream.
44    pub fn with_continuity(pid: u16, cc: u8) -> Self {
45        Self {
46            pid,
47            continuity_counter: cc & CC_MASK,
48        }
49    }
50
51    /// The PID this packetizer emits packets for.
52    pub fn pid(&self) -> u16 {
53        self.pid
54    }
55
56    /// The continuity_counter for the next emitted packet.
57    pub fn continuity_counter(&self) -> u8 {
58        self.continuity_counter
59    }
60
61    /// Packetize a batch of complete sections into 188-byte TS packets,
62    /// appended to `out` (cleared first).
63    ///
64    /// Returns the number of packets appended.
65    pub fn packetize_into(
66        &mut self,
67        sections: &[&[u8]],
68        out: &mut Vec<[u8; TS_PACKET_SIZE]>,
69    ) -> usize {
70        out.clear();
71
72        if sections.is_empty() {
73            return 0;
74        }
75
76        // Concatenate all sections and record section-start byte offsets.
77        let total_len: usize = sections.iter().map(|s| s.len()).sum();
78        if total_len == 0 {
79            return 0;
80        }
81        let mut data = Vec::with_capacity(total_len);
82        let mut starts = Vec::with_capacity(sections.len());
83        for s in sections {
84            starts.push(data.len());
85            data.extend_from_slice(s);
86        }
87
88        let count_before = out.len();
89        let mut pos = 0usize;
90
91        while pos < data.len() {
92            // Smallest section-start offset ≥ pos.
93            let next_start = starts.iter().copied().find(|&s| s >= pos);
94
95            let pusi: bool;
96            let pointer_field: u8;
97            let cap: usize;
98
99            if let Some(ns) = next_start {
100                let diff = ns.saturating_sub(pos);
101                if diff <= PUSI_PAYLOAD_CAP {
102                    pusi = true;
103                    pointer_field = diff as u8;
104                    cap = PUSI_PAYLOAD_CAP;
105                } else {
106                    pusi = false;
107                    pointer_field = 0;
108                    cap = PAYLOAD_CAP;
109                }
110            } else {
111                pusi = false;
112                pointer_field = 0;
113                cap = PAYLOAD_CAP;
114            }
115
116            let mut pkt = [0u8; TS_PACKET_SIZE];
117
118            let header = TsHeader {
119                tei: false,
120                pusi,
121                pid: self.pid,
122                scrambling: 0,
123                has_adaptation: false,
124                has_payload: true,
125                continuity_counter: self.continuity_counter,
126            };
127            header
128                .serialize_into(&mut pkt[..4])
129                .expect("4-byte header buffer");
130
131            self.continuity_counter = (self.continuity_counter + 1) & CC_MASK;
132
133            let mut write_pos = 4usize;
134
135            if pusi {
136                pkt[write_pos] = pointer_field;
137                write_pos += 1;
138            }
139
140            let remaining = data.len() - pos;
141            let to_copy = remaining.min(cap);
142            pkt[write_pos..write_pos + to_copy].copy_from_slice(&data[pos..pos + to_copy]);
143            pos += to_copy;
144            write_pos += to_copy;
145
146            // 0xFF-stuff remaining payload bytes.
147            for b in &mut pkt[write_pos..] {
148                *b = STUFFING_BYTE;
149            }
150
151            out.push(pkt);
152        }
153
154        out.len() - count_before
155    }
156
157    /// Allocating convenience wrapper over [`packetize_into`](Self::packetize_into).
158    pub fn packetize(&mut self, sections: &[&[u8]]) -> Vec<[u8; TS_PACKET_SIZE]> {
159        let mut out = Vec::new();
160        self.packetize_into(sections, &mut out);
161        out
162    }
163}
164
165// ── Default interval constants ────────────────────────────────────────────────
166
167/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — NIT maximum interval.
168pub const NIT_MAX_INTERVAL: Duration = Duration::from_secs(10);
169/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — BAT maximum interval.
170pub const BAT_MAX_INTERVAL: Duration = Duration::from_secs(10);
171/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — SDT actual maximum interval.
172pub const SDT_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
173/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — SDT other maximum interval.
174pub const SDT_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
175/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — EIT p/f actual maximum interval.
176pub const EIT_PF_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
177/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT p/f other maximum interval (sat/cable;
178/// terrestrial is 20 s per §4.4.2).
179pub const EIT_PF_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
180/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT schedule first 8 days maximum interval.
181pub const EIT_SCHED_MAX_INTERVAL: Duration = Duration::from_secs(10);
182/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT schedule beyond 8 days maximum interval.
183pub const EIT_SCHED_EXT_MAX_INTERVAL: Duration = Duration::from_secs(30);
184/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — TDT maximum interval.
185pub const TDT_MAX_INTERVAL: Duration = Duration::from_secs(30);
186/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — TOT maximum interval.
187pub const TOT_MAX_INTERVAL: Duration = Duration::from_secs(30);
188
189/// dvb-si/docs/ts_101_154_av_coding.md §4.1.7 — PAT maximum interval (shall ≤ 100 ms).
190pub const PAT_MAX_INTERVAL: Duration = Duration::from_millis(100);
191/// dvb-si/docs/ts_101_154_av_coding.md §4.1.7 — PMT maximum interval (shall ≤ 100 ms).
192pub const PMT_MAX_INTERVAL: Duration = Duration::from_millis(100);
193
194/// dvb-si/docs/en_300_468.md §5.1.4.1 — minimum inter-section interval floor
195/// (≤100 Mbit/s TSs).
196pub const MIN_SECTION_INTERVAL: Duration = Duration::from_millis(25);
197
198// ── SiMux ────────────────────────────────────────────────────────────────────
199
200/// Section-repetition scheduler that builds TS packets on a caller-supplied clock.
201///
202/// Each entry is a PID + concatenated complete-section bytes + an emission
203/// interval. Call [`poll_into`](Self::poll_into) with monotonically-increasing
204/// `now` values to get 188-byte TS packets for every entry whose interval has
205/// elapsed since its last emission.
206///
207/// The scheduler owns its section bytes — call [`upsert`](Self::upsert) to
208/// update them when SI changes. Continuity counters are continuous per-PID
209/// across poll cycles.
210///
211/// # 25 ms floor
212///
213/// [`MIN_SECTION_INTERVAL`] is the minimum valid interval (EN 300 468 §5.1.4.1).
214/// Supplying an interval below this in [`upsert`](Self::upsert) triggers a
215/// `debug_assert` — in release builds the assertion compiles out; the caller
216/// must ensure compliance.
217pub struct SiMux {
218    entries: Vec<Entry>,
219}
220
221struct Entry {
222    pid: u16,
223    sections: Vec<u8>,
224    interval: Duration,
225    last_emit: Option<Duration>,
226    packetizer: SectionPacketizer,
227}
228
229impl SiMux {
230    /// Create an empty scheduler (no entries, no pending emissions).
231    pub fn new() -> Self {
232        Self {
233            entries: Vec::new(),
234        }
235    }
236
237    /// Register or replace the sections emitted on `pid` at `interval`.
238    ///
239    /// `sections` is the concatenated complete-section bytes for one emission
240    /// cycle. Re-calling for the same `pid` updates the bytes and interval
241    /// while preserving the continuity counter.
242    ///
243    /// # Panics (debug only)
244    ///
245    /// `debug_assert!(interval >= MIN_SECTION_INTERVAL)` — EN 300 468 §5.1.4.1
246    /// requires a minimum inter-section interval of 25 ms.
247    pub fn upsert(&mut self, pid: u16, sections: Vec<u8>, interval: Duration) {
248        debug_assert!(
249            interval >= MIN_SECTION_INTERVAL,
250            "interval {interval:?} is below the 25 ms minimum (EN 300 468 §5.1.4.1)"
251        );
252
253        if let Some(entry) = self.entries.iter_mut().find(|e| e.pid == pid) {
254            entry.sections = sections;
255            entry.interval = interval;
256        } else {
257            self.entries.push(Entry {
258                pid,
259                sections,
260                interval,
261                last_emit: None,
262                packetizer: SectionPacketizer::new(pid),
263            });
264        }
265    }
266
267    /// PAT at [`PAT_MAX_INTERVAL`] on PID 0x0000.
268    ///
269    /// Interval cites dvb-si/docs/ts_101_154_av_coding.md §4.1.7.
270    pub fn upsert_pat(&mut self, sections: Vec<u8>) {
271        self.upsert(well_known::PAT.value(), sections, PAT_MAX_INTERVAL);
272    }
273
274    /// PMT at [`PMT_MAX_INTERVAL`] on the caller-supplied `pid`.
275    ///
276    /// Interval cites dvb-si/docs/ts_101_154_av_coding.md §4.1.7.
277    pub fn upsert_pmt(&mut self, pid: u16, sections: Vec<u8>) {
278        self.upsert(pid, sections, PMT_MAX_INTERVAL);
279    }
280
281    /// SDT actual at [`SDT_ACTUAL_MAX_INTERVAL`] on PID 0x0011.
282    ///
283    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
284    pub fn upsert_sdt_actual(&mut self, sections: Vec<u8>) {
285        self.upsert(
286            well_known::SDT_BAT.value(),
287            sections,
288            SDT_ACTUAL_MAX_INTERVAL,
289        );
290    }
291
292    /// NIT at [`NIT_MAX_INTERVAL`] on PID 0x0010.
293    ///
294    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
295    pub fn upsert_nit(&mut self, sections: Vec<u8>) {
296        self.upsert(well_known::NIT.value(), sections, NIT_MAX_INTERVAL);
297    }
298
299    /// TDT at [`TDT_MAX_INTERVAL`] on PID 0x0014.
300    ///
301    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
302    pub fn upsert_tdt(&mut self, sections: Vec<u8>) {
303        self.upsert(well_known::TDT_TOT.value(), sections, TDT_MAX_INTERVAL);
304    }
305
306    /// TOT at [`TOT_MAX_INTERVAL`] on PID 0x0014.
307    ///
308    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
309    pub fn upsert_tot(&mut self, sections: Vec<u8>) {
310        self.upsert(well_known::TDT_TOT.value(), sections, TOT_MAX_INTERVAL);
311    }
312
313    /// Emit every entry due at `now` (i.e. `now - last_emit >= interval`, and
314    /// first call always due), packetizing via [`SectionPacketizer`], appended
315    /// to `out` (cleared first).
316    ///
317    /// Updates each emitted entry's `last_emit = now`. Deterministic given the
318    /// fed `now` sequence. Returns the packet count appended.
319    pub fn poll_into(&mut self, now: Duration, out: &mut Vec<[u8; TS_PACKET_SIZE]>) -> usize {
320        out.clear();
321        let before = out.len();
322
323        let mut tmp = Vec::new();
324        for entry in &mut self.entries {
325            let due = match entry.last_emit {
326                None => true,
327                Some(last) => now.saturating_sub(last) >= entry.interval,
328            };
329            if due {
330                let refs = split_sections(&entry.sections);
331                if !refs.is_empty() {
332                    entry.packetizer.packetize_into(&refs, &mut tmp);
333                    out.append(&mut tmp);
334                }
335                entry.last_emit = Some(now);
336            }
337        }
338
339        out.len() - before
340    }
341
342    /// Allocating convenience wrapper over [`poll_into`](Self::poll_into).
343    pub fn poll(&mut self, now: Duration) -> Vec<[u8; TS_PACKET_SIZE]> {
344        let mut out = Vec::new();
345        self.poll_into(now, &mut out);
346        out
347    }
348}
349
350impl Default for SiMux {
351    fn default() -> Self {
352        Self::new()
353    }
354}
355
356/// Split concatenated complete-section bytes into individual `&[u8]` slices.
357///
358/// Walks the PSI/SI section headers: byte 0 = table_id, bytes 1-2 =
359/// section_length (12 bits). Each slice is 3 + section_length bytes long.
360/// Trailing bytes that don't form a complete section header are discarded.
361fn split_sections(data: &[u8]) -> Vec<&[u8]> {
362    let mut result = Vec::new();
363    let mut pos = 0;
364    while pos + 3 <= data.len() {
365        let section_length =
366            (((data[pos + 1] & SECTION_LENGTH_HI_MASK) as usize) << 8) | (data[pos + 2] as usize);
367        let end = pos + 3 + section_length;
368        if end > data.len() {
369            break;
370        }
371        result.push(&data[pos..end]);
372        pos = end;
373    }
374    result
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use crate::ts::{SectionReassembler, TsPacket};
381
382    // ── helpers ──────────────────────────────────────────────────────────────
383
384    /// Build a long-form section with the given table_id and body bytes.
385    /// Returns the full section including its 3-byte header (no CRC — the
386    /// reassembler does not validate CRC).
387    fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
388        let section_length = body_after_length.len() as u16;
389        let mut v = Vec::with_capacity(3 + section_length as usize);
390        v.push(table_id);
391        // SSI=1, PI=0, reserved=11, length upper 4 bits
392        v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
393        v.push((section_length & 0xFF) as u8);
394        v.extend_from_slice(body_after_length);
395        v
396    }
397
398    fn concat_sections(sections: &[Vec<u8>]) -> Vec<u8> {
399        let total: usize = sections.iter().map(|s| s.len()).sum();
400        let mut out = Vec::with_capacity(total);
401        for s in sections {
402            out.extend_from_slice(s);
403        }
404        out
405    }
406
407    /// Round-trip `sections` through packetize → reassembler, asserting
408    /// byte-identical output in order and no leftovers.
409    fn assert_round_trip(sections: &[Vec<u8>]) {
410        let mut packetizer = SectionPacketizer::new(0x0100);
411        let refs: Vec<&[u8]> = sections.iter().map(|s| s.as_slice()).collect();
412        let packets = packetizer.packetize(&refs);
413
414        let mut reasm = SectionReassembler::default();
415        for pkt_raw in &packets {
416            let pkt = TsPacket::parse(pkt_raw).expect("parse generated packet");
417            let payload = pkt.payload.expect("payload present");
418            let pusi = pkt.header.pusi;
419            reasm.feed(payload, pusi);
420        }
421
422        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
423        assert_eq!(
424            got.len(),
425            sections.len(),
426            "section count mismatch: expected {}, got {}",
427            sections.len(),
428            got.len()
429        );
430        for (i, (orig, round)) in sections.iter().zip(got.iter()).enumerate() {
431            assert_eq!(
432                round.as_ref(),
433                orig.as_slice(),
434                "section {i} round-trip mismatch"
435            );
436        }
437        assert!(reasm.is_empty(), "reassembler should be empty after drain");
438    }
439
440    // ── split_sections ──────────────────────────────────────────────────────
441
442    #[test]
443    fn split_single_section() {
444        let s = build_section(0x42, &[0xAA; 5]);
445        let refs = split_sections(&s);
446        assert_eq!(refs.len(), 1);
447        assert_eq!(refs[0], s.as_slice());
448    }
449
450    #[test]
451    fn split_two_sections() {
452        let s1 = build_section(0x42, &[0x01, 0x02]);
453        let s2 = build_section(0x46, &[0x03, 0x04, 0x05]);
454        let both = concat_sections(&[s1.clone(), s2.clone()]);
455        let refs = split_sections(&both);
456        assert_eq!(refs.len(), 2);
457        assert_eq!(refs[0], s1.as_slice());
458        assert_eq!(refs[1], s2.as_slice());
459    }
460
461    #[test]
462    fn split_empty_input() {
463        let refs = split_sections(&[]);
464        assert!(refs.is_empty());
465    }
466
467    #[test]
468    fn split_trailing_garbage_ignored() {
469        let s = build_section(0x42, &[0xAA; 3]);
470        let mut data = s.clone();
471        data.push(0xFF); // trailing byte that doesn't complete a header
472        let refs = split_sections(&data);
473        assert_eq!(refs.len(), 1);
474        assert_eq!(refs[0], s.as_slice());
475    }
476
477    // ── interval constants pinned to spec ────────────────────────────────────
478
479    #[test]
480    fn interval_constants_match_spec() {
481        // TR 101 211 §4.4.1/§4.4.2
482        assert_eq!(NIT_MAX_INTERVAL, Duration::from_secs(10));
483        assert_eq!(BAT_MAX_INTERVAL, Duration::from_secs(10));
484        assert_eq!(SDT_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
485        assert_eq!(SDT_OTHER_MAX_INTERVAL, Duration::from_secs(10));
486        assert_eq!(EIT_PF_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
487        assert_eq!(EIT_PF_OTHER_MAX_INTERVAL, Duration::from_secs(10));
488        assert_eq!(EIT_SCHED_MAX_INTERVAL, Duration::from_secs(10));
489        assert_eq!(EIT_SCHED_EXT_MAX_INTERVAL, Duration::from_secs(30));
490        assert_eq!(TDT_MAX_INTERVAL, Duration::from_secs(30));
491        assert_eq!(TOT_MAX_INTERVAL, Duration::from_secs(30));
492
493        // TS 101 154 §4.1.7
494        assert_eq!(PAT_MAX_INTERVAL, Duration::from_millis(100));
495        assert_eq!(PMT_MAX_INTERVAL, Duration::from_millis(100));
496
497        // EN 300 468 §5.1.4.1
498        assert_eq!(MIN_SECTION_INTERVAL, Duration::from_millis(25));
499    }
500
501    // ── SiMux: basic behaviour ───────────────────────────────────────────────
502
503    #[test]
504    fn new_simux_is_empty() {
505        let mut mux = SiMux::new();
506        let pkts = mux.poll(Duration::ZERO);
507        assert!(pkts.is_empty());
508    }
509
510    #[test]
511    fn simux_default_is_empty() {
512        let mut mux = SiMux::default();
513        let pkts = mux.poll(Duration::ZERO);
514        assert!(pkts.is_empty());
515    }
516
517    #[test]
518    fn first_poll_always_emits() {
519        let s = build_section(0x42, &[0xAA; 10]);
520        let mut mux = SiMux::new();
521        mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
522        let pkts = mux.poll(Duration::ZERO);
523        assert!(!pkts.is_empty(), "first poll must emit");
524    }
525
526    #[test]
527    fn first_poll_emits_all_entries() {
528        let s1 = build_section(0x42, &[0x01; 5]);
529        let s2 = build_section(0x46, &[0x02; 5]);
530        let mut mux = SiMux::new();
531        mux.upsert(0x0100, s1.clone(), Duration::from_millis(100));
532        mux.upsert(0x0200, s2.clone(), Duration::from_millis(200));
533        let pkts = mux.poll(Duration::ZERO);
534        // Both entries should emit — we count PID occurrences
535        let pids: Vec<u16> = pkts
536            .iter()
537            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
538            .collect();
539        assert!(pids.contains(&0x0100));
540        assert!(pids.contains(&0x0200));
541    }
542
543    // ── deterministic schedule ───────────────────────────────────────────────
544
545    #[test]
546    fn entry_emits_only_when_due() {
547        let s = build_section(0x42, &[0xAA; 10]);
548        let mut mux = SiMux::new();
549        mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
550
551        // First poll at t=0: emits
552        let pkts0 = mux.poll(Duration::ZERO);
553        assert!(!pkts0.is_empty());
554
555        // t=50: not yet due
556        let pkts50 = mux.poll(Duration::from_millis(50));
557        assert!(pkts50.is_empty(), "should not emit at t=50");
558
559        // t=99: still not due
560        let pkts99 = mux.poll(Duration::from_millis(99));
561        assert!(pkts99.is_empty(), "should not emit at t=99");
562
563        // t=100: due again
564        let pkts100 = mux.poll(Duration::from_millis(100));
565        assert!(!pkts100.is_empty(), "should emit at t=100");
566    }
567
568    #[test]
569    fn two_entries_different_cadence() {
570        let s1 = build_section(0x42, &[0x01; 5]);
571        let s2 = build_section(0x46, &[0x02; 5]);
572        let mut mux = SiMux::new();
573        mux.upsert(0x0100, concat_sections(&[s1]), Duration::from_millis(100));
574        mux.upsert(0x0200, concat_sections(&[s2]), Duration::from_millis(200));
575
576        // t=0: both emit
577        let pkts = mux.poll(Duration::ZERO);
578        assert!(!pkts.is_empty());
579
580        // t=100: only 0x0100 emits (due at 100)
581        let pkts100 = mux.poll(Duration::from_millis(100));
582        assert!(!pkts100.is_empty());
583        let pids100: Vec<u16> = pkts100
584            .iter()
585            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
586            .collect();
587        assert!(pids100.contains(&0x0100), "PID 0x0100 should emit at t=100");
588        assert!(
589            !pids100.contains(&0x0200),
590            "PID 0x0200 should NOT emit at t=100"
591        );
592
593        // t=150: neither emits
594        let pkts150 = mux.poll(Duration::from_millis(150));
595        assert!(pkts150.is_empty(), "neither should emit at t=150");
596
597        // t=200: both emit again
598        let pkts200 = mux.poll(Duration::from_millis(200));
599        let pids200: Vec<u16> = pkts200
600            .iter()
601            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
602            .collect();
603        assert!(pids200.contains(&0x0100), "PID 0x0100 should emit at t=200");
604        assert!(pids200.contains(&0x0200), "PID 0x0200 should emit at t=200");
605    }
606
607    #[test]
608    fn entry_emits_again_after_full_interval() {
609        let s = build_section(0x42, &[0xAA; 20]);
610        let mut mux = SiMux::new();
611        mux.upsert(0x0100, s, Duration::from_secs(1));
612
613        // t=0
614        let n0 = mux.poll_into(Duration::ZERO, &mut Vec::new());
615        assert!(n0 > 0);
616
617        // t=999ms: not due
618        let mut buf = Vec::new();
619        let n999 = mux.poll_into(Duration::from_millis(999), &mut buf);
620        assert_eq!(n999, 0);
621
622        // t=1s: due
623        let mut buf2 = Vec::new();
624        let n1000 = mux.poll_into(Duration::from_millis(1000), &mut buf2);
625        assert!(n1000 > 0);
626    }
627
628    // ── upsert replaces existing entry ───────────────────────────────────────
629
630    #[test]
631    fn upsert_updates_existing_entry() {
632        let s1 = build_section(0x42, &[0xAA; 5]);
633        let s2 = build_section(0x46, &[0xBB; 10]);
634        let mut mux = SiMux::new();
635
636        mux.upsert(0x0100, s1, Duration::from_millis(100));
637        // Replace sections on same PID
638        mux.upsert(0x0100, s2.clone(), Duration::from_millis(200));
639
640        let pkts = mux.poll(Duration::ZERO);
641
642        // Round-trip the output to verify the updated sections
643        let mut reasm = SectionReassembler::default();
644        for raw in &pkts {
645            let pkt = TsPacket::parse(raw).unwrap();
646            if pkt.header.pid == 0x0100 {
647                reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
648            }
649        }
650        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
651        assert_eq!(got.len(), 1);
652        assert_eq!(got[0].as_ref(), s2.as_slice());
653    }
654
655    // ── convenience constructors ─────────────────────────────────────────────
656
657    #[test]
658    fn upsert_pat_uses_correct_pid() {
659        let s = build_section(0x00, &[0x01, 0x02]);
660        let mut mux = SiMux::new();
661        mux.upsert_pat(s);
662        let pkts = mux.poll(Duration::ZERO);
663        let pids: Vec<u16> = pkts
664            .iter()
665            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
666            .collect();
667        assert!(pids.iter().all(|&p| p == well_known::PAT.value()));
668    }
669
670    #[test]
671    fn upsert_sdt_actual_uses_correct_pid() {
672        let s = build_section(0x42, &[0x01]);
673        let mut mux = SiMux::new();
674        mux.upsert_sdt_actual(s);
675        let pkts = mux.poll(Duration::ZERO);
676        let pids: Vec<u16> = pkts
677            .iter()
678            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
679            .collect();
680        assert!(pids.iter().all(|&p| p == well_known::SDT_BAT.value()));
681    }
682
683    #[test]
684    fn upsert_nit_uses_correct_pid() {
685        let s = build_section(0x40, &[0x01]);
686        let mut mux = SiMux::new();
687        mux.upsert_nit(s);
688        let pkts = mux.poll(Duration::ZERO);
689        let pids: Vec<u16> = pkts
690            .iter()
691            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
692            .collect();
693        assert!(pids.iter().all(|&p| p == well_known::NIT.value()));
694    }
695
696    #[test]
697    fn upsert_tdt_uses_correct_pid() {
698        let s = build_section(0x70, &[0x01]);
699        let mut mux = SiMux::new();
700        mux.upsert_tdt(s);
701        let pkts = mux.poll(Duration::ZERO);
702        let pids: Vec<u16> = pkts
703            .iter()
704            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
705            .collect();
706        assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
707    }
708
709    #[test]
710    fn upsert_tot_uses_correct_pid() {
711        let s = build_section(0x73, &[0x01]);
712        let mut mux = SiMux::new();
713        mux.upsert_tot(s);
714        let pkts = mux.poll(Duration::ZERO);
715        let pids: Vec<u16> = pkts
716            .iter()
717            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
718            .collect();
719        assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
720    }
721
722    // ── round-trip through reassembler ───────────────────────────────────────
723
724    #[test]
725    fn simux_round_trip_single_entry() {
726        let s1 = build_section(0x42, &[0xAA; 20]);
727        let s2 = build_section(0x46, &[0xBB; 15]);
728        let mut mux = SiMux::new();
729        mux.upsert(
730            0x0100,
731            concat_sections(&[s1.clone(), s2.clone()]),
732            Duration::from_millis(100),
733        );
734
735        let pkts = mux.poll(Duration::ZERO);
736
737        let mut reasm = SectionReassembler::default();
738        for raw in &pkts {
739            let pkt = TsPacket::parse(raw).unwrap();
740            if pkt.header.pid == 0x0100 {
741                reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
742            }
743        }
744        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
745        assert_eq!(got.len(), 2, "round-trip must recover both sections");
746        assert_eq!(got[0].as_ref(), s1.as_slice());
747        assert_eq!(got[1].as_ref(), s2.as_slice());
748    }
749
750    #[test]
751    fn simux_round_trip_multi_pid() {
752        let s_a = build_section(0x42, &[0xA0; 10]);
753        let s_b = build_section(0x46, &[0xB0; 10]);
754        let mut mux = SiMux::new();
755        mux.upsert(0x0100, s_a.clone(), Duration::from_millis(100));
756        mux.upsert(0x0200, s_b.clone(), Duration::from_millis(100));
757
758        let pkts = mux.poll(Duration::ZERO);
759
760        let mut reasm_a = SectionReassembler::default();
761        let mut reasm_b = SectionReassembler::default();
762        for raw in &pkts {
763            let pkt = TsPacket::parse(raw).unwrap();
764            match pkt.header.pid {
765                0x0100 => reasm_a.feed(pkt.payload.unwrap(), pkt.header.pusi),
766                0x0200 => reasm_b.feed(pkt.payload.unwrap(), pkt.header.pusi),
767                _ => {}
768            }
769        }
770        let got_a: Vec<_> = std::iter::from_fn(|| reasm_a.pop_section()).collect();
771        let got_b: Vec<_> = std::iter::from_fn(|| reasm_b.pop_section()).collect();
772        assert_eq!(got_a.len(), 1);
773        assert_eq!(got_b.len(), 1);
774        assert_eq!(got_a[0].as_ref(), s_a.as_slice());
775        assert_eq!(got_b[0].as_ref(), s_b.as_slice());
776    }
777
778    // ── CC continuity across polls ───────────────────────────────────────────
779
780    #[test]
781    fn continuity_counter_continuous_across_polls() {
782        // Section large enough to span at least 2 packets.
783        let s = build_section(0x42, &[0xAA; 250]);
784        let mut mux = SiMux::new();
785        mux.upsert(0x0100, s, Duration::from_millis(100));
786
787        let pkts1 = mux.poll(Duration::ZERO);
788        assert!(pkts1.len() >= 2, "need ≥2 packets to test CC continuity");
789
790        let last_cc_pkts1 = TsPacket::parse(&pkts1[pkts1.len() - 1])
791            .unwrap()
792            .header
793            .continuity_counter;
794
795        let pkts2 = mux.poll(Duration::from_millis(100));
796        assert!(!pkts2.is_empty(), "second poll must emit");
797
798        let first_cc_pkts2 = TsPacket::parse(&pkts2[0])
799            .unwrap()
800            .header
801            .continuity_counter;
802
803        assert_eq!(
804            first_cc_pkts2,
805            (last_cc_pkts1 + 1) & 0x0F,
806            "CC must continue across poll cycles"
807        );
808    }
809
810    // ── poll_into clears output ──────────────────────────────────────────────
811
812    #[test]
813    fn poll_into_clears_out_before_appending() {
814        let s = build_section(0x42, &[0xAA; 5]);
815        let mut mux = SiMux::new();
816        mux.upsert(0x0100, s, Duration::from_millis(100));
817
818        let mut out = vec![[0u8; TS_PACKET_SIZE]; 42];
819        let n = mux.poll_into(Duration::ZERO, &mut out);
820        assert_eq!(n, out.len(), "out must contain only new packets");
821    }
822
823    // ── debug_assert on sub-25 ms interval ───────────────────────────────────
824
825    #[test]
826    #[cfg(debug_assertions)]
827    #[should_panic(expected = "25 ms")]
828    fn upsert_rejects_sub_25ms_interval_in_debug() {
829        let s = build_section(0x42, &[0xAA; 5]);
830        let mut mux = SiMux::new();
831        mux.upsert(0x0100, s, Duration::from_millis(10));
832    }
833
834    #[test]
835    fn upsert_accepts_25ms_interval() {
836        let s = build_section(0x42, &[0xAA; 5]);
837        let mut mux = SiMux::new();
838        mux.upsert(0x0100, s, MIN_SECTION_INTERVAL);
839        let pkts = mux.poll(Duration::ZERO);
840        assert!(!pkts.is_empty());
841    }
842
843    // ── round-trip property (the mandatory acceptance oracle) ────────────────
844
845    #[test]
846    fn round_trip_single_short_section() {
847        let s = build_section(0x42, &[0xAA; 10]);
848        assert_round_trip(&[s]);
849    }
850
851    #[test]
852    fn round_trip_one_byte_body() {
853        let s = build_section(0x46, &[0xBB]); // 4 bytes total
854        assert_round_trip(&[s]);
855    }
856
857    #[test]
858    fn round_trip_section_exactly_pusi_cap_boundary() {
859        // A section whose total length is exactly PUSI_PAYLOAD_CAP (183).
860        let body = vec![0xCC; PUSI_PAYLOAD_CAP - 3];
861        let s = build_section(0x50, &body);
862        assert_eq!(s.len(), PUSI_PAYLOAD_CAP);
863        assert_round_trip(&[s]);
864    }
865
866    #[test]
867    fn round_trip_section_just_over_pusi_cap() {
868        // One byte more than fits in a PUSI packet → must span to continuation.
869        let body = vec![0xDD; PUSI_PAYLOAD_CAP - 3 + 1];
870        let s = build_section(0x52, &body);
871        assert_eq!(s.len(), PUSI_PAYLOAD_CAP + 1);
872        assert_round_trip(&[s]);
873    }
874
875    #[test]
876    fn round_trip_section_spans_many_packets() {
877        // A 2000-byte section spans ~11 packets.
878        let body = vec![0xEE; 2000 - 3];
879        let s = build_section(0x60, &body);
880        assert_round_trip(&[s]);
881    }
882
883    #[test]
884    fn round_trip_section_at_max_size() {
885        // The maximum section (4096 total, the long-form ceiling), whose final
886        // continuation packet carries the tail followed by 0xFF stuffing. Since
887        // #148 the reassembler ignores that trailing stuffing instead of
888        // counting it toward MAX_SECTION_SIZE, so a full-size section round-trips.
889        let body = vec![0x11; 4096 - 3];
890        let s = build_section(0x80, &body);
891        assert_eq!(s.len(), 4096);
892        assert_round_trip(&[s]);
893    }
894
895    #[test]
896    fn round_trip_multiple_short_sections_in_one_batch() {
897        let s1 = build_section(0x42, &[0x01, 0x02]); // 5 bytes
898        let s2 = build_section(0x46, &[0x03]); // 4 bytes
899        let s3 = build_section(0x4A, &[0x04, 0x05, 0x06]); // 6 bytes
900        assert_round_trip(&[s1, s2, s3]);
901    }
902
903    #[test]
904    fn round_trip_section_ends_exactly_at_boundary() {
905        // First section is exactly PUSI_PAYLOAD_CAP bytes — ends at packet
906        // boundary.  Second section starts fresh in the next packet with
907        // PUSI=1, pointer_field=0.
908        let body1 = vec![0xA1; PUSI_PAYLOAD_CAP - 3];
909        let s1 = build_section(0x50, &body1);
910        assert_eq!(s1.len(), PUSI_PAYLOAD_CAP);
911
912        let s2 = build_section(0x52, &[0xB1, 0xB2]);
913        assert_round_trip(&[s1, s2]);
914    }
915
916    #[test]
917    fn round_trip_mix_small_large_sections() {
918        // Mix of small and spanning sections that stress pointer_field and
919        // concatenation.
920        let s1 = build_section(0x10, &[0xAA; 5]);
921        let body2 = vec![0xBB; 200];
922        let s2 = build_section(0x20, &body2);
923        let s3 = build_section(0x30, &[0xCC; 50]);
924        let body4 = vec![0xDD; 800];
925        let s4 = build_section(0x40, &body4);
926        let s5 = build_section(0x50, &[0xEE]); // 1-byte body
927        assert_round_trip(&[s1, s2, s3, s4, s5]);
928    }
929
930    // ── continuity counter ───────────────────────────────────────────────────
931
932    #[test]
933    fn continuity_counter_increments_per_packet() {
934        // Use a section large enough to span several packets.
935        let body = vec![0xAA; 500];
936        let section = build_section(0x42, &body);
937        let mut p = SectionPacketizer::new(0x0100);
938
939        let packets = p.packetize(&[&section]);
940        assert!(packets.len() >= 3, "need multiple packets to test CC");
941
942        let mut last_cc: Option<u8> = None;
943        for pkt_raw in &packets {
944            let pkt = TsPacket::parse(pkt_raw).unwrap();
945            let cc = pkt.header.continuity_counter;
946            if let Some(last) = last_cc {
947                assert_eq!(cc, (last + 1) & 0x0F, "CC must increment per packet");
948            }
949            last_cc = Some(cc);
950        }
951    }
952
953    #[test]
954    fn continuity_counter_wraps_and_continues_across_calls() {
955        let mut p = SectionPacketizer::with_continuity(0x0100, 14);
956        // Section large enough to span at least 3 packets.
957        let body = vec![0xBB; 500];
958        let s = build_section(0x42, &body);
959
960        // First call: CC 14, 15, 0, …
961        let pkts1 = p.packetize(&[&s]);
962        assert!(pkts1.len() >= 3, "section must span ≥3 packets");
963        let ccs1: Vec<u8> = pkts1
964            .iter()
965            .map(|b| TsPacket::parse(b).unwrap().header.continuity_counter)
966            .collect();
967        assert_eq!(ccs1[0], 14);
968        assert_eq!(ccs1[1], 15);
969        assert_eq!(ccs1[2], 0);
970
971        // Second call: CC continues from where first left off.
972        let pkts2 = p.packetize(&[&s]);
973        let cc_first_pkt2 = TsPacket::parse(&pkts2[0])
974            .unwrap()
975            .header
976            .continuity_counter;
977        assert_eq!(cc_first_pkt2, ccs1.last().map(|c| (c + 1) & 0x0F).unwrap());
978    }
979
980    // ── PUSI placement ──────────────────────────────────────────────────────
981
982    #[test]
983    fn pusi_set_when_section_starts() {
984        let s = build_section(0x42, &[0xAA; 10]);
985        let mut p = SectionPacketizer::new(0x0100);
986        let packets = p.packetize(&[&s]);
987        assert!(!packets.is_empty());
988        let pkt = TsPacket::parse(&packets[0]).unwrap();
989        assert!(pkt.header.pusi, "first packet must have PUSI=1");
990    }
991
992    #[test]
993    fn pusi_not_set_on_mid_section_continuation() {
994        let body = vec![0xAA; 500];
995        let s = build_section(0x42, &body);
996        let mut p = SectionPacketizer::new(0x0100);
997        let packets = p.packetize(&[&s]);
998        assert!(packets.len() >= 2);
999        let pkt1 = TsPacket::parse(&packets[0]).unwrap();
1000        let pkt2 = TsPacket::parse(&packets[1]).unwrap();
1001        assert!(pkt1.header.pusi, "first packet must have PUSI=1");
1002        assert!(
1003            !pkt2.header.pusi,
1004            "second packet is continuation, must have PUSI=0"
1005        );
1006    }
1007
1008    #[test]
1009    fn pointer_field_equals_tail_length_before_new_section() {
1010        // Section1 = 200 bytes.  Section2 = 50 bytes.
1011        // Packet 1: PUSI=1, pointer=0, section1 head.
1012        // Packet 2: PUSI=1, pointer > 0 (tail of section1 before section2).
1013        let body1 = vec![0xA1; 197]; // 200-byte section
1014        let s1 = build_section(0x52, &body1);
1015        assert_eq!(s1.len(), 200);
1016        let s2 = build_section(0x54, &[0xB1; 47]); // 50-byte section
1017        assert_eq!(s2.len(), 50);
1018
1019        let mut p = SectionPacketizer::new(0x0100);
1020        let packets = p.packetize(&[&s1, &s2]);
1021
1022        // Find the packet where PUSI=1 and pointer>0.
1023        let pkt_with_pointer = packets
1024            .iter()
1025            .map(|raw| TsPacket::parse(raw).unwrap())
1026            .find(|pkt| pkt.header.pusi && pkt.payload.is_some_and(|pl| pl.first() != Some(&0)))
1027            .expect("must have a PUSI packet with non-zero pointer");
1028
1029        let payload = pkt_with_pointer.payload.unwrap();
1030        let pointer = payload[0] as usize;
1031        assert!(pointer > 0, "pointer must be non-zero");
1032        // The tail bytes should be from the end of section1.
1033        let tail_start = s1.len() - pointer;
1034        assert_eq!(&payload[1..1 + pointer], &s1[tail_start..]);
1035    }
1036
1037    // ── stuffing ─────────────────────────────────────────────────────────────
1038
1039    #[test]
1040    fn final_packet_unused_tail_is_stuffing() {
1041        let s = build_section(0x42, &[0xAA; 5]); // 8 bytes total
1042        let mut p = SectionPacketizer::new(0x0100);
1043        let packets = p.packetize(&[&s]);
1044
1045        let pkt = TsPacket::parse(&packets[0]).unwrap();
1046        let payload = pkt.payload.unwrap();
1047        assert_eq!(payload[0], 0, "pointer_field should be 0");
1048
1049        let section_end = 1 + s.len(); // after pointer + section
1050        assert!(
1051            section_end < payload.len(),
1052            "must have stuffing after section"
1053        );
1054        for &b in &payload[section_end..] {
1055            assert_eq!(b, STUFFING_BYTE, "all trailing bytes must be 0xFF");
1056        }
1057    }
1058
1059    #[test]
1060    fn reassembler_discards_stuffing() {
1061        let s1 = build_section(0x42, &[0xAA; 10]);
1062        let s2 = build_section(0x46, &[0xBB; 5]);
1063
1064        let mut p = SectionPacketizer::new(0x0100);
1065        let packets = p.packetize(&[&s1, &s2]);
1066
1067        let mut reasm = SectionReassembler::default();
1068        for pkt_raw in &packets {
1069            let pkt = TsPacket::parse(pkt_raw).unwrap();
1070            reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1071        }
1072
1073        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
1074        assert_eq!(got.len(), 2);
1075        assert!(
1076            reasm.is_empty(),
1077            "stuffing tail must be discarded, not buffered"
1078        );
1079    }
1080
1081    // ── misc ─────────────────────────────────────────────────────────────────
1082
1083    #[test]
1084    fn empty_batch_produces_no_packets() {
1085        let mut p = SectionPacketizer::new(0x0100);
1086        let packets: Vec<[u8; TS_PACKET_SIZE]> = p.packetize(&[]);
1087        assert!(packets.is_empty());
1088    }
1089
1090    #[test]
1091    fn packetize_into_clears_out_first() {
1092        let s = build_section(0x42, &[0xAA; 5]);
1093        let mut p = SectionPacketizer::new(0x0100);
1094
1095        let mut out = vec![[0u8; TS_PACKET_SIZE]; 99]; // pre-existing junk
1096        let n = p.packetize_into(&[&s], &mut out);
1097        assert_eq!(n, out.len(), "out must contain only the new packets");
1098        // Verify the output is correct (round-trip).
1099        let mut reasm = SectionReassembler::default();
1100        for pkt_raw in &out {
1101            let pkt = TsPacket::parse(pkt_raw).unwrap();
1102            reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1103        }
1104        let got = reasm.pop_section().unwrap();
1105        assert_eq!(got.as_ref(), s.as_slice());
1106    }
1107
1108    #[test]
1109    fn pid_is_correct() {
1110        let p = SectionPacketizer::new(0x1234);
1111        assert_eq!(p.pid(), 0x1234);
1112    }
1113
1114    #[test]
1115    fn with_continuity_masks_to_4_bits() {
1116        let p = SectionPacketizer::with_continuity(0x0100, 0xFE);
1117        assert_eq!(p.continuity_counter(), 0x0E);
1118    }
1119
1120    #[test]
1121    fn has_payload_always_true_no_adaptation() {
1122        let s = build_section(0x42, &[0xAA; 50]);
1123        let mut p = SectionPacketizer::new(0x0100);
1124        let packets = p.packetize(&[&s]);
1125        for pkt_raw in &packets {
1126            let pkt = TsPacket::parse(pkt_raw).unwrap();
1127            assert!(pkt.header.has_payload, "every packet must carry payload");
1128            assert!(!pkt.header.has_adaptation, "no adaptation field is emitted");
1129            assert!(!pkt.header.tei, "TEI must be false");
1130            assert_eq!(pkt.header.scrambling, 0, "scrambling must be 0");
1131        }
1132    }
1133}