Skip to main content

dvb_si/
mux.rs

1//! Section → TS packetizer (the byte-exact inverse of
2//! [`SectionReassembler::feed`](crate::ts::SectionReassembler::feed)).
3//!
4//! Per the PSI carriage rules of ISO/IEC 13818-1:2007 §2.4.4
5//! (`docs/iso_13818_1_systems.md`): sections are packed into 188-byte packets
6//! with a `pointer_field` where sections begin, concatenated contiguously, and
7//! 0xFF-stuffed at the batch tail.
8
9use alloc::vec::Vec;
10use core::time::Duration;
11
12use crate::pid::well_known;
13use crate::ts::{TsHeader, CC_MASK, SECTION_LENGTH_HI_MASK, TS_PACKET_SIZE};
14
15/// Maximum data bytes in a PUSI=1 packet (188 − 4 header − 1 pointer_field). §2.4.4.
16const PUSI_PAYLOAD_CAP: usize = 183;
17/// Maximum data bytes in a continuation packet (188 − 4 header). §2.4.4.
18const PAYLOAD_CAP: usize = 184;
19/// Stuffing byte for unused TS payload bytes (ISO/IEC 13818-1 §2.4.4).
20const STUFFING_BYTE: u8 = 0xFF;
21
22/// Packetizes PSI/SI sections into 188-byte TS packets.
23///
24/// This is the byte-exact inverse of
25/// [`SectionReassembler::feed`](crate::ts::SectionReassembler::feed): packets
26/// produced here, when fed back through the reassembler, yield the same
27/// sections in order.
28///
29/// ISO/IEC 13818-1:2007 §2.4.4 (`docs/iso_13818_1_systems.md`).
30pub struct SectionPacketizer {
31    pid: u16,
32    continuity_counter: u8,
33}
34
35impl SectionPacketizer {
36    /// Start a packetizer for `pid` with continuity_counter = 0.
37    pub fn new(pid: u16) -> Self {
38        Self {
39            pid,
40            continuity_counter: 0,
41        }
42    }
43
44    /// Start at a specific continuity_counter (0..=15) — for resuming a stream.
45    pub fn with_continuity(pid: u16, cc: u8) -> Self {
46        Self {
47            pid,
48            continuity_counter: cc & CC_MASK,
49        }
50    }
51
52    /// The PID this packetizer emits packets for.
53    pub fn pid(&self) -> u16 {
54        self.pid
55    }
56
57    /// The continuity_counter for the next emitted packet.
58    pub fn continuity_counter(&self) -> u8 {
59        self.continuity_counter
60    }
61
62    /// Packetize a batch of complete sections into 188-byte TS packets,
63    /// appended to `out` (cleared first).
64    ///
65    /// Returns the number of packets appended.
66    pub fn packetize_into(
67        &mut self,
68        sections: &[&[u8]],
69        out: &mut Vec<[u8; TS_PACKET_SIZE]>,
70    ) -> usize {
71        out.clear();
72
73        if sections.is_empty() {
74            return 0;
75        }
76
77        // Concatenate all sections and record section-start byte offsets.
78        let total_len: usize = sections.iter().map(|s| s.len()).sum();
79        if total_len == 0 {
80            return 0;
81        }
82        let mut data = Vec::with_capacity(total_len);
83        let mut starts = Vec::with_capacity(sections.len());
84        for s in sections {
85            starts.push(data.len());
86            data.extend_from_slice(s);
87        }
88
89        let count_before = out.len();
90        let mut pos = 0usize;
91
92        while pos < data.len() {
93            // Smallest section-start offset ≥ pos.
94            let next_start = starts.iter().copied().find(|&s| s >= pos);
95
96            let pusi: bool;
97            let pointer_field: u8;
98            let cap: usize;
99
100            if let Some(ns) = next_start {
101                let diff = ns.saturating_sub(pos);
102                if diff <= PUSI_PAYLOAD_CAP {
103                    pusi = true;
104                    pointer_field = diff as u8;
105                    cap = PUSI_PAYLOAD_CAP;
106                } else {
107                    pusi = false;
108                    pointer_field = 0;
109                    cap = PAYLOAD_CAP;
110                }
111            } else {
112                pusi = false;
113                pointer_field = 0;
114                cap = PAYLOAD_CAP;
115            }
116
117            let mut pkt = [0u8; TS_PACKET_SIZE];
118
119            let header = TsHeader {
120                tei: false,
121                pusi,
122                pid: self.pid,
123                scrambling: 0,
124                has_adaptation: false,
125                has_payload: true,
126                continuity_counter: self.continuity_counter,
127            };
128            header
129                .serialize_into(&mut pkt[..4])
130                .expect("4-byte header buffer");
131
132            self.continuity_counter = (self.continuity_counter + 1) & CC_MASK;
133
134            let mut write_pos = 4usize;
135
136            if pusi {
137                pkt[write_pos] = pointer_field;
138                write_pos += 1;
139            }
140
141            let remaining = data.len() - pos;
142            let to_copy = remaining.min(cap);
143            pkt[write_pos..write_pos + to_copy].copy_from_slice(&data[pos..pos + to_copy]);
144            pos += to_copy;
145            write_pos += to_copy;
146
147            // 0xFF-stuff remaining payload bytes.
148            for b in &mut pkt[write_pos..] {
149                *b = STUFFING_BYTE;
150            }
151
152            out.push(pkt);
153        }
154
155        out.len() - count_before
156    }
157
158    /// Allocating convenience wrapper over [`packetize_into`](Self::packetize_into).
159    pub fn packetize(&mut self, sections: &[&[u8]]) -> Vec<[u8; TS_PACKET_SIZE]> {
160        let mut out = Vec::new();
161        self.packetize_into(sections, &mut out);
162        out
163    }
164}
165
166// ── Default interval constants ────────────────────────────────────────────────
167
168/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — NIT maximum interval.
169pub const NIT_MAX_INTERVAL: Duration = Duration::from_secs(10);
170/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — BAT maximum interval.
171pub const BAT_MAX_INTERVAL: Duration = Duration::from_secs(10);
172/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — SDT actual maximum interval.
173pub const SDT_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
174/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — SDT other maximum interval.
175pub const SDT_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
176/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — EIT p/f actual maximum interval.
177pub const EIT_PF_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
178/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT p/f other maximum interval (sat/cable;
179/// terrestrial is 20 s per §4.4.2).
180pub const EIT_PF_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
181/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT schedule first 8 days maximum interval.
182pub const EIT_SCHED_MAX_INTERVAL: Duration = Duration::from_secs(10);
183/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT schedule beyond 8 days maximum interval.
184pub const EIT_SCHED_EXT_MAX_INTERVAL: Duration = Duration::from_secs(30);
185/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — TDT maximum interval.
186pub const TDT_MAX_INTERVAL: Duration = Duration::from_secs(30);
187/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — TOT maximum interval.
188pub const TOT_MAX_INTERVAL: Duration = Duration::from_secs(30);
189
190/// dvb-si/docs/ts_101_154_av_coding.md §4.1.7 — PAT maximum interval (shall ≤ 100 ms).
191pub const PAT_MAX_INTERVAL: Duration = Duration::from_millis(100);
192/// dvb-si/docs/ts_101_154_av_coding.md §4.1.7 — PMT maximum interval (shall ≤ 100 ms).
193pub const PMT_MAX_INTERVAL: Duration = Duration::from_millis(100);
194
195/// dvb-si/docs/en_300_468.md §5.1.4.1 — minimum inter-section interval floor
196/// (≤100 Mbit/s TSs).
197pub const MIN_SECTION_INTERVAL: Duration = Duration::from_millis(25);
198
199// ── SiMux ────────────────────────────────────────────────────────────────────
200
201/// Section-repetition scheduler that builds TS packets on a caller-supplied clock.
202///
203/// Each entry is a PID + concatenated complete-section bytes + an emission
204/// interval. Call [`poll_into`](Self::poll_into) with monotonically-increasing
205/// `now` values to get 188-byte TS packets for every entry whose interval has
206/// elapsed since its last emission.
207///
208/// The scheduler owns its section bytes — call [`upsert`](Self::upsert) to
209/// update them when SI changes. Continuity counters are continuous per-PID
210/// across poll cycles.
211///
212/// # 25 ms floor
213///
214/// [`MIN_SECTION_INTERVAL`] is the minimum valid interval (EN 300 468 §5.1.4.1).
215/// Supplying an interval below this in [`upsert`](Self::upsert) triggers a
216/// `debug_assert` — in release builds the assertion compiles out; the caller
217/// must ensure compliance.
218pub struct SiMux {
219    entries: Vec<Entry>,
220}
221
222struct Entry {
223    pid: u16,
224    sections: Vec<u8>,
225    interval: Duration,
226    last_emit: Option<Duration>,
227    packetizer: SectionPacketizer,
228}
229
230impl SiMux {
231    /// Create an empty scheduler (no entries, no pending emissions).
232    pub fn new() -> Self {
233        Self {
234            entries: Vec::new(),
235        }
236    }
237
238    /// Register or replace the sections emitted on `pid` at `interval`.
239    ///
240    /// `sections` is the concatenated complete-section bytes for one emission
241    /// cycle. Re-calling for the same `pid` updates the bytes and interval
242    /// while preserving the continuity counter.
243    ///
244    /// # Panics (debug only)
245    ///
246    /// `debug_assert!(interval >= MIN_SECTION_INTERVAL)` — EN 300 468 §5.1.4.1
247    /// requires a minimum inter-section interval of 25 ms.
248    pub fn upsert(&mut self, pid: u16, sections: Vec<u8>, interval: Duration) {
249        debug_assert!(
250            interval >= MIN_SECTION_INTERVAL,
251            "interval {interval:?} is below the 25 ms minimum (EN 300 468 §5.1.4.1)"
252        );
253
254        if let Some(entry) = self.entries.iter_mut().find(|e| e.pid == pid) {
255            entry.sections = sections;
256            entry.interval = interval;
257        } else {
258            self.entries.push(Entry {
259                pid,
260                sections,
261                interval,
262                last_emit: None,
263                packetizer: SectionPacketizer::new(pid),
264            });
265        }
266    }
267
268    /// PAT at [`PAT_MAX_INTERVAL`] on PID 0x0000.
269    ///
270    /// Interval cites dvb-si/docs/ts_101_154_av_coding.md §4.1.7.
271    pub fn upsert_pat(&mut self, sections: Vec<u8>) {
272        self.upsert(well_known::PAT.value(), sections, PAT_MAX_INTERVAL);
273    }
274
275    /// PMT at [`PMT_MAX_INTERVAL`] on the caller-supplied `pid`.
276    ///
277    /// Interval cites dvb-si/docs/ts_101_154_av_coding.md §4.1.7.
278    pub fn upsert_pmt(&mut self, pid: u16, sections: Vec<u8>) {
279        self.upsert(pid, sections, PMT_MAX_INTERVAL);
280    }
281
282    /// SDT actual at [`SDT_ACTUAL_MAX_INTERVAL`] on PID 0x0011.
283    ///
284    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
285    pub fn upsert_sdt_actual(&mut self, sections: Vec<u8>) {
286        self.upsert(
287            well_known::SDT_BAT.value(),
288            sections,
289            SDT_ACTUAL_MAX_INTERVAL,
290        );
291    }
292
293    /// NIT at [`NIT_MAX_INTERVAL`] on PID 0x0010.
294    ///
295    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
296    pub fn upsert_nit(&mut self, sections: Vec<u8>) {
297        self.upsert(well_known::NIT.value(), sections, NIT_MAX_INTERVAL);
298    }
299
300    /// TDT at [`TDT_MAX_INTERVAL`] on PID 0x0014.
301    ///
302    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
303    pub fn upsert_tdt(&mut self, sections: Vec<u8>) {
304        self.upsert(well_known::TDT_TOT.value(), sections, TDT_MAX_INTERVAL);
305    }
306
307    /// TOT at [`TOT_MAX_INTERVAL`] on PID 0x0014.
308    ///
309    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
310    pub fn upsert_tot(&mut self, sections: Vec<u8>) {
311        self.upsert(well_known::TDT_TOT.value(), sections, TOT_MAX_INTERVAL);
312    }
313
314    /// Emit every entry due at `now` (i.e. `now - last_emit >= interval`, and
315    /// first call always due), packetizing via [`SectionPacketizer`], appended
316    /// to `out` (cleared first).
317    ///
318    /// Updates each emitted entry's `last_emit = now`. Deterministic given the
319    /// fed `now` sequence. Returns the packet count appended.
320    pub fn poll_into(&mut self, now: Duration, out: &mut Vec<[u8; TS_PACKET_SIZE]>) -> usize {
321        out.clear();
322        let before = out.len();
323
324        let mut tmp = Vec::new();
325        for entry in &mut self.entries {
326            let due = match entry.last_emit {
327                None => true,
328                Some(last) => now.saturating_sub(last) >= entry.interval,
329            };
330            if due {
331                let refs = split_sections(&entry.sections);
332                if !refs.is_empty() {
333                    entry.packetizer.packetize_into(&refs, &mut tmp);
334                    out.append(&mut tmp);
335                }
336                entry.last_emit = Some(now);
337            }
338        }
339
340        out.len() - before
341    }
342
343    /// Allocating convenience wrapper over [`poll_into`](Self::poll_into).
344    pub fn poll(&mut self, now: Duration) -> Vec<[u8; TS_PACKET_SIZE]> {
345        let mut out = Vec::new();
346        self.poll_into(now, &mut out);
347        out
348    }
349}
350
351impl Default for SiMux {
352    fn default() -> Self {
353        Self::new()
354    }
355}
356
357/// Split concatenated complete-section bytes into individual `&[u8]` slices.
358///
359/// Walks the PSI/SI section headers: byte 0 = table_id, bytes 1-2 =
360/// section_length (12 bits). Each slice is 3 + section_length bytes long.
361/// Trailing bytes that don't form a complete section header are discarded.
362fn split_sections(data: &[u8]) -> Vec<&[u8]> {
363    let mut result = Vec::new();
364    let mut pos = 0;
365    while pos + 3 <= data.len() {
366        let section_length =
367            (((data[pos + 1] & SECTION_LENGTH_HI_MASK) as usize) << 8) | (data[pos + 2] as usize);
368        let end = pos + 3 + section_length;
369        if end > data.len() {
370            break;
371        }
372        result.push(&data[pos..end]);
373        pos = end;
374    }
375    result
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use crate::ts::{SectionReassembler, TsPacket};
382
383    // ── helpers ──────────────────────────────────────────────────────────────
384
385    /// Build a long-form section with the given table_id and body bytes.
386    /// Returns the full section including its 3-byte header (no CRC — the
387    /// reassembler does not validate CRC).
388    fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
389        let section_length = body_after_length.len() as u16;
390        let mut v = Vec::with_capacity(3 + section_length as usize);
391        v.push(table_id);
392        // SSI=1, PI=0, reserved=11, length upper 4 bits
393        v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
394        v.push((section_length & 0xFF) as u8);
395        v.extend_from_slice(body_after_length);
396        v
397    }
398
399    fn concat_sections(sections: &[Vec<u8>]) -> Vec<u8> {
400        let total: usize = sections.iter().map(|s| s.len()).sum();
401        let mut out = Vec::with_capacity(total);
402        for s in sections {
403            out.extend_from_slice(s);
404        }
405        out
406    }
407
408    /// Round-trip `sections` through packetize → reassembler, asserting
409    /// byte-identical output in order and no leftovers.
410    fn assert_round_trip(sections: &[Vec<u8>]) {
411        let mut packetizer = SectionPacketizer::new(0x0100);
412        let refs: Vec<&[u8]> = sections.iter().map(|s| s.as_slice()).collect();
413        let packets = packetizer.packetize(&refs);
414
415        let mut reasm = SectionReassembler::default();
416        for pkt_raw in &packets {
417            let pkt = TsPacket::parse(pkt_raw).expect("parse generated packet");
418            let payload = pkt.payload.expect("payload present");
419            let pusi = pkt.header.pusi;
420            reasm.feed(payload, pusi);
421        }
422
423        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
424        assert_eq!(
425            got.len(),
426            sections.len(),
427            "section count mismatch: expected {}, got {}",
428            sections.len(),
429            got.len()
430        );
431        for (i, (orig, round)) in sections.iter().zip(got.iter()).enumerate() {
432            assert_eq!(
433                round.as_ref(),
434                orig.as_slice(),
435                "section {i} round-trip mismatch"
436            );
437        }
438        assert!(reasm.is_empty(), "reassembler should be empty after drain");
439    }
440
441    // ── split_sections ──────────────────────────────────────────────────────
442
443    #[test]
444    fn split_single_section() {
445        let s = build_section(0x42, &[0xAA; 5]);
446        let refs = split_sections(&s);
447        assert_eq!(refs.len(), 1);
448        assert_eq!(refs[0], s.as_slice());
449    }
450
451    #[test]
452    fn split_two_sections() {
453        let s1 = build_section(0x42, &[0x01, 0x02]);
454        let s2 = build_section(0x46, &[0x03, 0x04, 0x05]);
455        let both = concat_sections(&[s1.clone(), s2.clone()]);
456        let refs = split_sections(&both);
457        assert_eq!(refs.len(), 2);
458        assert_eq!(refs[0], s1.as_slice());
459        assert_eq!(refs[1], s2.as_slice());
460    }
461
462    #[test]
463    fn split_empty_input() {
464        let refs = split_sections(&[]);
465        assert!(refs.is_empty());
466    }
467
468    #[test]
469    fn split_trailing_garbage_ignored() {
470        let s = build_section(0x42, &[0xAA; 3]);
471        let mut data = s.clone();
472        data.push(0xFF); // trailing byte that doesn't complete a header
473        let refs = split_sections(&data);
474        assert_eq!(refs.len(), 1);
475        assert_eq!(refs[0], s.as_slice());
476    }
477
478    // ── interval constants pinned to spec ────────────────────────────────────
479
480    #[test]
481    fn interval_constants_match_spec() {
482        // TR 101 211 §4.4.1/§4.4.2
483        assert_eq!(NIT_MAX_INTERVAL, Duration::from_secs(10));
484        assert_eq!(BAT_MAX_INTERVAL, Duration::from_secs(10));
485        assert_eq!(SDT_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
486        assert_eq!(SDT_OTHER_MAX_INTERVAL, Duration::from_secs(10));
487        assert_eq!(EIT_PF_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
488        assert_eq!(EIT_PF_OTHER_MAX_INTERVAL, Duration::from_secs(10));
489        assert_eq!(EIT_SCHED_MAX_INTERVAL, Duration::from_secs(10));
490        assert_eq!(EIT_SCHED_EXT_MAX_INTERVAL, Duration::from_secs(30));
491        assert_eq!(TDT_MAX_INTERVAL, Duration::from_secs(30));
492        assert_eq!(TOT_MAX_INTERVAL, Duration::from_secs(30));
493
494        // TS 101 154 §4.1.7
495        assert_eq!(PAT_MAX_INTERVAL, Duration::from_millis(100));
496        assert_eq!(PMT_MAX_INTERVAL, Duration::from_millis(100));
497
498        // EN 300 468 §5.1.4.1
499        assert_eq!(MIN_SECTION_INTERVAL, Duration::from_millis(25));
500    }
501
502    // ── SiMux: basic behaviour ───────────────────────────────────────────────
503
504    #[test]
505    fn new_simux_is_empty() {
506        let mut mux = SiMux::new();
507        let pkts = mux.poll(Duration::ZERO);
508        assert!(pkts.is_empty());
509    }
510
511    #[test]
512    fn simux_default_is_empty() {
513        let mut mux = SiMux::default();
514        let pkts = mux.poll(Duration::ZERO);
515        assert!(pkts.is_empty());
516    }
517
518    #[test]
519    fn first_poll_always_emits() {
520        let s = build_section(0x42, &[0xAA; 10]);
521        let mut mux = SiMux::new();
522        mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
523        let pkts = mux.poll(Duration::ZERO);
524        assert!(!pkts.is_empty(), "first poll must emit");
525    }
526
527    #[test]
528    fn first_poll_emits_all_entries() {
529        let s1 = build_section(0x42, &[0x01; 5]);
530        let s2 = build_section(0x46, &[0x02; 5]);
531        let mut mux = SiMux::new();
532        mux.upsert(0x0100, s1.clone(), Duration::from_millis(100));
533        mux.upsert(0x0200, s2.clone(), Duration::from_millis(200));
534        let pkts = mux.poll(Duration::ZERO);
535        // Both entries should emit — we count PID occurrences
536        let pids: Vec<u16> = pkts
537            .iter()
538            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
539            .collect();
540        assert!(pids.contains(&0x0100));
541        assert!(pids.contains(&0x0200));
542    }
543
544    // ── deterministic schedule ───────────────────────────────────────────────
545
546    #[test]
547    fn entry_emits_only_when_due() {
548        let s = build_section(0x42, &[0xAA; 10]);
549        let mut mux = SiMux::new();
550        mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
551
552        // First poll at t=0: emits
553        let pkts0 = mux.poll(Duration::ZERO);
554        assert!(!pkts0.is_empty());
555
556        // t=50: not yet due
557        let pkts50 = mux.poll(Duration::from_millis(50));
558        assert!(pkts50.is_empty(), "should not emit at t=50");
559
560        // t=99: still not due
561        let pkts99 = mux.poll(Duration::from_millis(99));
562        assert!(pkts99.is_empty(), "should not emit at t=99");
563
564        // t=100: due again
565        let pkts100 = mux.poll(Duration::from_millis(100));
566        assert!(!pkts100.is_empty(), "should emit at t=100");
567    }
568
569    #[test]
570    fn two_entries_different_cadence() {
571        let s1 = build_section(0x42, &[0x01; 5]);
572        let s2 = build_section(0x46, &[0x02; 5]);
573        let mut mux = SiMux::new();
574        mux.upsert(0x0100, concat_sections(&[s1]), Duration::from_millis(100));
575        mux.upsert(0x0200, concat_sections(&[s2]), Duration::from_millis(200));
576
577        // t=0: both emit
578        let pkts = mux.poll(Duration::ZERO);
579        assert!(!pkts.is_empty());
580
581        // t=100: only 0x0100 emits (due at 100)
582        let pkts100 = mux.poll(Duration::from_millis(100));
583        assert!(!pkts100.is_empty());
584        let pids100: Vec<u16> = pkts100
585            .iter()
586            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
587            .collect();
588        assert!(pids100.contains(&0x0100), "PID 0x0100 should emit at t=100");
589        assert!(
590            !pids100.contains(&0x0200),
591            "PID 0x0200 should NOT emit at t=100"
592        );
593
594        // t=150: neither emits
595        let pkts150 = mux.poll(Duration::from_millis(150));
596        assert!(pkts150.is_empty(), "neither should emit at t=150");
597
598        // t=200: both emit again
599        let pkts200 = mux.poll(Duration::from_millis(200));
600        let pids200: Vec<u16> = pkts200
601            .iter()
602            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
603            .collect();
604        assert!(pids200.contains(&0x0100), "PID 0x0100 should emit at t=200");
605        assert!(pids200.contains(&0x0200), "PID 0x0200 should emit at t=200");
606    }
607
608    #[test]
609    fn entry_emits_again_after_full_interval() {
610        let s = build_section(0x42, &[0xAA; 20]);
611        let mut mux = SiMux::new();
612        mux.upsert(0x0100, s, Duration::from_secs(1));
613
614        // t=0
615        let n0 = mux.poll_into(Duration::ZERO, &mut Vec::new());
616        assert!(n0 > 0);
617
618        // t=999ms: not due
619        let mut buf = Vec::new();
620        let n999 = mux.poll_into(Duration::from_millis(999), &mut buf);
621        assert_eq!(n999, 0);
622
623        // t=1s: due
624        let mut buf2 = Vec::new();
625        let n1000 = mux.poll_into(Duration::from_millis(1000), &mut buf2);
626        assert!(n1000 > 0);
627    }
628
629    // ── upsert replaces existing entry ───────────────────────────────────────
630
631    #[test]
632    fn upsert_updates_existing_entry() {
633        let s1 = build_section(0x42, &[0xAA; 5]);
634        let s2 = build_section(0x46, &[0xBB; 10]);
635        let mut mux = SiMux::new();
636
637        mux.upsert(0x0100, s1, Duration::from_millis(100));
638        // Replace sections on same PID
639        mux.upsert(0x0100, s2.clone(), Duration::from_millis(200));
640
641        let pkts = mux.poll(Duration::ZERO);
642
643        // Round-trip the output to verify the updated sections
644        let mut reasm = SectionReassembler::default();
645        for raw in &pkts {
646            let pkt = TsPacket::parse(raw).unwrap();
647            if pkt.header.pid == 0x0100 {
648                reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
649            }
650        }
651        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
652        assert_eq!(got.len(), 1);
653        assert_eq!(got[0].as_ref(), s2.as_slice());
654    }
655
656    // ── convenience constructors ─────────────────────────────────────────────
657
658    #[test]
659    fn upsert_pat_uses_correct_pid() {
660        let s = build_section(0x00, &[0x01, 0x02]);
661        let mut mux = SiMux::new();
662        mux.upsert_pat(s);
663        let pkts = mux.poll(Duration::ZERO);
664        let pids: Vec<u16> = pkts
665            .iter()
666            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
667            .collect();
668        assert!(pids.iter().all(|&p| p == well_known::PAT.value()));
669    }
670
671    #[test]
672    fn upsert_sdt_actual_uses_correct_pid() {
673        let s = build_section(0x42, &[0x01]);
674        let mut mux = SiMux::new();
675        mux.upsert_sdt_actual(s);
676        let pkts = mux.poll(Duration::ZERO);
677        let pids: Vec<u16> = pkts
678            .iter()
679            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
680            .collect();
681        assert!(pids.iter().all(|&p| p == well_known::SDT_BAT.value()));
682    }
683
684    #[test]
685    fn upsert_nit_uses_correct_pid() {
686        let s = build_section(0x40, &[0x01]);
687        let mut mux = SiMux::new();
688        mux.upsert_nit(s);
689        let pkts = mux.poll(Duration::ZERO);
690        let pids: Vec<u16> = pkts
691            .iter()
692            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
693            .collect();
694        assert!(pids.iter().all(|&p| p == well_known::NIT.value()));
695    }
696
697    #[test]
698    fn upsert_tdt_uses_correct_pid() {
699        let s = build_section(0x70, &[0x01]);
700        let mut mux = SiMux::new();
701        mux.upsert_tdt(s);
702        let pkts = mux.poll(Duration::ZERO);
703        let pids: Vec<u16> = pkts
704            .iter()
705            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
706            .collect();
707        assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
708    }
709
710    #[test]
711    fn upsert_tot_uses_correct_pid() {
712        let s = build_section(0x73, &[0x01]);
713        let mut mux = SiMux::new();
714        mux.upsert_tot(s);
715        let pkts = mux.poll(Duration::ZERO);
716        let pids: Vec<u16> = pkts
717            .iter()
718            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
719            .collect();
720        assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
721    }
722
723    // ── round-trip through reassembler ───────────────────────────────────────
724
725    #[test]
726    fn simux_round_trip_single_entry() {
727        let s1 = build_section(0x42, &[0xAA; 20]);
728        let s2 = build_section(0x46, &[0xBB; 15]);
729        let mut mux = SiMux::new();
730        mux.upsert(
731            0x0100,
732            concat_sections(&[s1.clone(), s2.clone()]),
733            Duration::from_millis(100),
734        );
735
736        let pkts = mux.poll(Duration::ZERO);
737
738        let mut reasm = SectionReassembler::default();
739        for raw in &pkts {
740            let pkt = TsPacket::parse(raw).unwrap();
741            if pkt.header.pid == 0x0100 {
742                reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
743            }
744        }
745        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
746        assert_eq!(got.len(), 2, "round-trip must recover both sections");
747        assert_eq!(got[0].as_ref(), s1.as_slice());
748        assert_eq!(got[1].as_ref(), s2.as_slice());
749    }
750
751    #[test]
752    fn simux_round_trip_multi_pid() {
753        let s_a = build_section(0x42, &[0xA0; 10]);
754        let s_b = build_section(0x46, &[0xB0; 10]);
755        let mut mux = SiMux::new();
756        mux.upsert(0x0100, s_a.clone(), Duration::from_millis(100));
757        mux.upsert(0x0200, s_b.clone(), Duration::from_millis(100));
758
759        let pkts = mux.poll(Duration::ZERO);
760
761        let mut reasm_a = SectionReassembler::default();
762        let mut reasm_b = SectionReassembler::default();
763        for raw in &pkts {
764            let pkt = TsPacket::parse(raw).unwrap();
765            match pkt.header.pid {
766                0x0100 => reasm_a.feed(pkt.payload.unwrap(), pkt.header.pusi),
767                0x0200 => reasm_b.feed(pkt.payload.unwrap(), pkt.header.pusi),
768                _ => {}
769            }
770        }
771        let got_a: Vec<_> = std::iter::from_fn(|| reasm_a.pop_section()).collect();
772        let got_b: Vec<_> = std::iter::from_fn(|| reasm_b.pop_section()).collect();
773        assert_eq!(got_a.len(), 1);
774        assert_eq!(got_b.len(), 1);
775        assert_eq!(got_a[0].as_ref(), s_a.as_slice());
776        assert_eq!(got_b[0].as_ref(), s_b.as_slice());
777    }
778
779    // ── CC continuity across polls ───────────────────────────────────────────
780
781    #[test]
782    fn continuity_counter_continuous_across_polls() {
783        // Section large enough to span at least 2 packets.
784        let s = build_section(0x42, &[0xAA; 250]);
785        let mut mux = SiMux::new();
786        mux.upsert(0x0100, s, Duration::from_millis(100));
787
788        let pkts1 = mux.poll(Duration::ZERO);
789        assert!(pkts1.len() >= 2, "need ≥2 packets to test CC continuity");
790
791        let last_cc_pkts1 = TsPacket::parse(&pkts1[pkts1.len() - 1])
792            .unwrap()
793            .header
794            .continuity_counter;
795
796        let pkts2 = mux.poll(Duration::from_millis(100));
797        assert!(!pkts2.is_empty(), "second poll must emit");
798
799        let first_cc_pkts2 = TsPacket::parse(&pkts2[0])
800            .unwrap()
801            .header
802            .continuity_counter;
803
804        assert_eq!(
805            first_cc_pkts2,
806            (last_cc_pkts1 + 1) & 0x0F,
807            "CC must continue across poll cycles"
808        );
809    }
810
811    // ── poll_into clears output ──────────────────────────────────────────────
812
813    #[test]
814    fn poll_into_clears_out_before_appending() {
815        let s = build_section(0x42, &[0xAA; 5]);
816        let mut mux = SiMux::new();
817        mux.upsert(0x0100, s, Duration::from_millis(100));
818
819        let mut out = vec![[0u8; TS_PACKET_SIZE]; 42];
820        let n = mux.poll_into(Duration::ZERO, &mut out);
821        assert_eq!(n, out.len(), "out must contain only new packets");
822    }
823
824    // ── debug_assert on sub-25 ms interval ───────────────────────────────────
825
826    #[test]
827    #[cfg(debug_assertions)]
828    #[should_panic(expected = "25 ms")]
829    fn upsert_rejects_sub_25ms_interval_in_debug() {
830        let s = build_section(0x42, &[0xAA; 5]);
831        let mut mux = SiMux::new();
832        mux.upsert(0x0100, s, Duration::from_millis(10));
833    }
834
835    #[test]
836    fn upsert_accepts_25ms_interval() {
837        let s = build_section(0x42, &[0xAA; 5]);
838        let mut mux = SiMux::new();
839        mux.upsert(0x0100, s, MIN_SECTION_INTERVAL);
840        let pkts = mux.poll(Duration::ZERO);
841        assert!(!pkts.is_empty());
842    }
843
844    // ── round-trip property (the mandatory acceptance oracle) ────────────────
845
846    #[test]
847    fn round_trip_single_short_section() {
848        let s = build_section(0x42, &[0xAA; 10]);
849        assert_round_trip(&[s]);
850    }
851
852    #[test]
853    fn round_trip_one_byte_body() {
854        let s = build_section(0x46, &[0xBB]); // 4 bytes total
855        assert_round_trip(&[s]);
856    }
857
858    #[test]
859    fn round_trip_section_exactly_pusi_cap_boundary() {
860        // A section whose total length is exactly PUSI_PAYLOAD_CAP (183).
861        let body = vec![0xCC; PUSI_PAYLOAD_CAP - 3];
862        let s = build_section(0x50, &body);
863        assert_eq!(s.len(), PUSI_PAYLOAD_CAP);
864        assert_round_trip(&[s]);
865    }
866
867    #[test]
868    fn round_trip_section_just_over_pusi_cap() {
869        // One byte more than fits in a PUSI packet → must span to continuation.
870        let body = vec![0xDD; PUSI_PAYLOAD_CAP - 3 + 1];
871        let s = build_section(0x52, &body);
872        assert_eq!(s.len(), PUSI_PAYLOAD_CAP + 1);
873        assert_round_trip(&[s]);
874    }
875
876    #[test]
877    fn round_trip_section_spans_many_packets() {
878        // A 2000-byte section spans ~11 packets.
879        let body = vec![0xEE; 2000 - 3];
880        let s = build_section(0x60, &body);
881        assert_round_trip(&[s]);
882    }
883
884    #[test]
885    fn round_trip_section_at_max_size() {
886        // The maximum section (4096 total, the long-form ceiling), whose final
887        // continuation packet carries the tail followed by 0xFF stuffing. Since
888        // #148 the reassembler ignores that trailing stuffing instead of
889        // counting it toward MAX_SECTION_SIZE, so a full-size section round-trips.
890        let body = vec![0x11; 4096 - 3];
891        let s = build_section(0x80, &body);
892        assert_eq!(s.len(), 4096);
893        assert_round_trip(&[s]);
894    }
895
896    #[test]
897    fn round_trip_multiple_short_sections_in_one_batch() {
898        let s1 = build_section(0x42, &[0x01, 0x02]); // 5 bytes
899        let s2 = build_section(0x46, &[0x03]); // 4 bytes
900        let s3 = build_section(0x4A, &[0x04, 0x05, 0x06]); // 6 bytes
901        assert_round_trip(&[s1, s2, s3]);
902    }
903
904    #[test]
905    fn round_trip_section_ends_exactly_at_boundary() {
906        // First section is exactly PUSI_PAYLOAD_CAP bytes — ends at packet
907        // boundary.  Second section starts fresh in the next packet with
908        // PUSI=1, pointer_field=0.
909        let body1 = vec![0xA1; PUSI_PAYLOAD_CAP - 3];
910        let s1 = build_section(0x50, &body1);
911        assert_eq!(s1.len(), PUSI_PAYLOAD_CAP);
912
913        let s2 = build_section(0x52, &[0xB1, 0xB2]);
914        assert_round_trip(&[s1, s2]);
915    }
916
917    #[test]
918    fn round_trip_mix_small_large_sections() {
919        // Mix of small and spanning sections that stress pointer_field and
920        // concatenation.
921        let s1 = build_section(0x10, &[0xAA; 5]);
922        let body2 = vec![0xBB; 200];
923        let s2 = build_section(0x20, &body2);
924        let s3 = build_section(0x30, &[0xCC; 50]);
925        let body4 = vec![0xDD; 800];
926        let s4 = build_section(0x40, &body4);
927        let s5 = build_section(0x50, &[0xEE]); // 1-byte body
928        assert_round_trip(&[s1, s2, s3, s4, s5]);
929    }
930
931    // ── continuity counter ───────────────────────────────────────────────────
932
933    #[test]
934    fn continuity_counter_increments_per_packet() {
935        // Use a section large enough to span several packets.
936        let body = vec![0xAA; 500];
937        let section = build_section(0x42, &body);
938        let mut p = SectionPacketizer::new(0x0100);
939
940        let packets = p.packetize(&[&section]);
941        assert!(packets.len() >= 3, "need multiple packets to test CC");
942
943        let mut last_cc: Option<u8> = None;
944        for pkt_raw in &packets {
945            let pkt = TsPacket::parse(pkt_raw).unwrap();
946            let cc = pkt.header.continuity_counter;
947            if let Some(last) = last_cc {
948                assert_eq!(cc, (last + 1) & 0x0F, "CC must increment per packet");
949            }
950            last_cc = Some(cc);
951        }
952    }
953
954    #[test]
955    fn continuity_counter_wraps_and_continues_across_calls() {
956        let mut p = SectionPacketizer::with_continuity(0x0100, 14);
957        // Section large enough to span at least 3 packets.
958        let body = vec![0xBB; 500];
959        let s = build_section(0x42, &body);
960
961        // First call: CC 14, 15, 0, …
962        let pkts1 = p.packetize(&[&s]);
963        assert!(pkts1.len() >= 3, "section must span ≥3 packets");
964        let ccs1: Vec<u8> = pkts1
965            .iter()
966            .map(|b| TsPacket::parse(b).unwrap().header.continuity_counter)
967            .collect();
968        assert_eq!(ccs1[0], 14);
969        assert_eq!(ccs1[1], 15);
970        assert_eq!(ccs1[2], 0);
971
972        // Second call: CC continues from where first left off.
973        let pkts2 = p.packetize(&[&s]);
974        let cc_first_pkt2 = TsPacket::parse(&pkts2[0])
975            .unwrap()
976            .header
977            .continuity_counter;
978        assert_eq!(cc_first_pkt2, ccs1.last().map(|c| (c + 1) & 0x0F).unwrap());
979    }
980
981    // ── PUSI placement ──────────────────────────────────────────────────────
982
983    #[test]
984    fn pusi_set_when_section_starts() {
985        let s = build_section(0x42, &[0xAA; 10]);
986        let mut p = SectionPacketizer::new(0x0100);
987        let packets = p.packetize(&[&s]);
988        assert!(!packets.is_empty());
989        let pkt = TsPacket::parse(&packets[0]).unwrap();
990        assert!(pkt.header.pusi, "first packet must have PUSI=1");
991    }
992
993    #[test]
994    fn pusi_not_set_on_mid_section_continuation() {
995        let body = vec![0xAA; 500];
996        let s = build_section(0x42, &body);
997        let mut p = SectionPacketizer::new(0x0100);
998        let packets = p.packetize(&[&s]);
999        assert!(packets.len() >= 2);
1000        let pkt1 = TsPacket::parse(&packets[0]).unwrap();
1001        let pkt2 = TsPacket::parse(&packets[1]).unwrap();
1002        assert!(pkt1.header.pusi, "first packet must have PUSI=1");
1003        assert!(
1004            !pkt2.header.pusi,
1005            "second packet is continuation, must have PUSI=0"
1006        );
1007    }
1008
1009    #[test]
1010    fn pointer_field_equals_tail_length_before_new_section() {
1011        // Section1 = 200 bytes.  Section2 = 50 bytes.
1012        // Packet 1: PUSI=1, pointer=0, section1 head.
1013        // Packet 2: PUSI=1, pointer > 0 (tail of section1 before section2).
1014        let body1 = vec![0xA1; 197]; // 200-byte section
1015        let s1 = build_section(0x52, &body1);
1016        assert_eq!(s1.len(), 200);
1017        let s2 = build_section(0x54, &[0xB1; 47]); // 50-byte section
1018        assert_eq!(s2.len(), 50);
1019
1020        let mut p = SectionPacketizer::new(0x0100);
1021        let packets = p.packetize(&[&s1, &s2]);
1022
1023        // Find the packet where PUSI=1 and pointer>0.
1024        let pkt_with_pointer = packets
1025            .iter()
1026            .map(|raw| TsPacket::parse(raw).unwrap())
1027            .find(|pkt| pkt.header.pusi && pkt.payload.is_some_and(|pl| pl.first() != Some(&0)))
1028            .expect("must have a PUSI packet with non-zero pointer");
1029
1030        let payload = pkt_with_pointer.payload.unwrap();
1031        let pointer = payload[0] as usize;
1032        assert!(pointer > 0, "pointer must be non-zero");
1033        // The tail bytes should be from the end of section1.
1034        let tail_start = s1.len() - pointer;
1035        assert_eq!(&payload[1..1 + pointer], &s1[tail_start..]);
1036    }
1037
1038    // ── stuffing ─────────────────────────────────────────────────────────────
1039
1040    #[test]
1041    fn final_packet_unused_tail_is_stuffing() {
1042        let s = build_section(0x42, &[0xAA; 5]); // 8 bytes total
1043        let mut p = SectionPacketizer::new(0x0100);
1044        let packets = p.packetize(&[&s]);
1045
1046        let pkt = TsPacket::parse(&packets[0]).unwrap();
1047        let payload = pkt.payload.unwrap();
1048        assert_eq!(payload[0], 0, "pointer_field should be 0");
1049
1050        let section_end = 1 + s.len(); // after pointer + section
1051        assert!(
1052            section_end < payload.len(),
1053            "must have stuffing after section"
1054        );
1055        for &b in &payload[section_end..] {
1056            assert_eq!(b, STUFFING_BYTE, "all trailing bytes must be 0xFF");
1057        }
1058    }
1059
1060    #[test]
1061    fn reassembler_discards_stuffing() {
1062        let s1 = build_section(0x42, &[0xAA; 10]);
1063        let s2 = build_section(0x46, &[0xBB; 5]);
1064
1065        let mut p = SectionPacketizer::new(0x0100);
1066        let packets = p.packetize(&[&s1, &s2]);
1067
1068        let mut reasm = SectionReassembler::default();
1069        for pkt_raw in &packets {
1070            let pkt = TsPacket::parse(pkt_raw).unwrap();
1071            reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1072        }
1073
1074        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
1075        assert_eq!(got.len(), 2);
1076        assert!(
1077            reasm.is_empty(),
1078            "stuffing tail must be discarded, not buffered"
1079        );
1080    }
1081
1082    // ── misc ─────────────────────────────────────────────────────────────────
1083
1084    #[test]
1085    fn empty_batch_produces_no_packets() {
1086        let mut p = SectionPacketizer::new(0x0100);
1087        let packets: Vec<[u8; TS_PACKET_SIZE]> = p.packetize(&[]);
1088        assert!(packets.is_empty());
1089    }
1090
1091    #[test]
1092    fn packetize_into_clears_out_first() {
1093        let s = build_section(0x42, &[0xAA; 5]);
1094        let mut p = SectionPacketizer::new(0x0100);
1095
1096        let mut out = vec![[0u8; TS_PACKET_SIZE]; 99]; // pre-existing junk
1097        let n = p.packetize_into(&[&s], &mut out);
1098        assert_eq!(n, out.len(), "out must contain only the new packets");
1099        // Verify the output is correct (round-trip).
1100        let mut reasm = SectionReassembler::default();
1101        for pkt_raw in &out {
1102            let pkt = TsPacket::parse(pkt_raw).unwrap();
1103            reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1104        }
1105        let got = reasm.pop_section().unwrap();
1106        assert_eq!(got.as_ref(), s.as_slice());
1107    }
1108
1109    #[test]
1110    fn pid_is_correct() {
1111        let p = SectionPacketizer::new(0x1234);
1112        assert_eq!(p.pid(), 0x1234);
1113    }
1114
1115    #[test]
1116    fn with_continuity_masks_to_4_bits() {
1117        let p = SectionPacketizer::with_continuity(0x0100, 0xFE);
1118        assert_eq!(p.continuity_counter(), 0x0E);
1119    }
1120
1121    #[test]
1122    fn has_payload_always_true_no_adaptation() {
1123        let s = build_section(0x42, &[0xAA; 50]);
1124        let mut p = SectionPacketizer::new(0x0100);
1125        let packets = p.packetize(&[&s]);
1126        for pkt_raw in &packets {
1127            let pkt = TsPacket::parse(pkt_raw).unwrap();
1128            assert!(pkt.header.has_payload, "every packet must carry payload");
1129            assert!(!pkt.header.has_adaptation, "no adaptation field is emitted");
1130            assert!(!pkt.header.tei, "TEI must be false");
1131            assert_eq!(pkt.header.scrambling, 0, "scrambling must be 0");
1132        }
1133    }
1134}