Skip to main content

mpeg_ts/
mux.rs

1//! Section → TS packetizer (the byte-exact inverse of
2//! [`SectionReassembler::feed`](crate::ts::SectionReassembler::feed)).
3//!
4//! Per the PSI carriage rules of ITU-T H.222.0 §2.4.4 (= ISO/IEC 13818-1:2007 §2.4.4)
5//! (`docs/iso_13818_1_systems.md`): sections are packed into 188-byte packets
6//! with a `pointer_field` where sections begin, concatenated contiguously, and
7//! 0xFF-stuffed at the batch tail.
8
9use alloc::vec::Vec;
10use core::time::Duration;
11
12use crate::pid::well_known;
13use crate::ts::{TsHeader, CC_MASK, SECTION_LENGTH_HI_MASK, TS_PACKET_SIZE};
14
15/// Maximum data bytes in a PUSI=1 packet (188 − 4 header − 1 pointer_field). §2.4.4.
16const PUSI_PAYLOAD_CAP: usize = 183;
17/// Maximum data bytes in a continuation packet (188 − 4 header). §2.4.4.
18const PAYLOAD_CAP: usize = 184;
19/// Stuffing byte for unused TS payload bytes (ISO/IEC 13818-1 §2.4.4).
20const STUFFING_BYTE: u8 = 0xFF;
21
22/// Packetizes PSI/SI sections into 188-byte TS packets.
23///
24/// This is the byte-exact inverse of
25/// [`SectionReassembler::feed`](crate::ts::SectionReassembler::feed): packets
26/// produced here, when fed back through the reassembler, yield the same
27/// sections in order.
28///
29/// ISO/IEC 13818-1:2007 §2.4.4 (`docs/iso_13818_1_systems.md`).
30pub struct SectionPacketizer {
31    pid: u16,
32    continuity_counter: u8,
33}
34
35impl SectionPacketizer {
36    /// Start a packetizer for `pid` with continuity_counter = 0.
37    pub fn new(pid: u16) -> Self {
38        Self {
39            pid,
40            continuity_counter: 0,
41        }
42    }
43
44    /// Start at a specific continuity_counter (0..=15) — for resuming a stream.
45    pub fn with_continuity(pid: u16, cc: u8) -> Self {
46        Self {
47            pid,
48            continuity_counter: cc & CC_MASK,
49        }
50    }
51
52    /// The PID this packetizer emits packets for.
53    pub fn pid(&self) -> u16 {
54        self.pid
55    }
56
57    /// The continuity_counter for the next emitted packet.
58    pub fn continuity_counter(&self) -> u8 {
59        self.continuity_counter
60    }
61
62    /// Packetize a batch of complete sections into 188-byte TS packets,
63    /// appended to `out` (cleared first).
64    ///
65    /// Returns the number of packets appended.
66    pub fn packetize_into(
67        &mut self,
68        sections: &[&[u8]],
69        out: &mut Vec<[u8; TS_PACKET_SIZE]>,
70    ) -> usize {
71        out.clear();
72
73        if sections.is_empty() {
74            return 0;
75        }
76
77        // Concatenate all sections and record section-start byte offsets.
78        let total_len: usize = sections.iter().map(|s| s.len()).sum();
79        if total_len == 0 {
80            return 0;
81        }
82        let mut data = Vec::with_capacity(total_len);
83        let mut starts = Vec::with_capacity(sections.len());
84        for s in sections {
85            starts.push(data.len());
86            data.extend_from_slice(s);
87        }
88
89        let count_before = out.len();
90        let mut pos = 0usize;
91
92        while pos < data.len() {
93            // Smallest section-start offset ≥ pos.
94            let next_start = starts.iter().copied().find(|&s| s >= pos);
95
96            let pusi: bool;
97            let pointer_field: u8;
98            let cap: usize;
99
100            if let Some(ns) = next_start {
101                let diff = ns.saturating_sub(pos);
102                if diff <= PUSI_PAYLOAD_CAP {
103                    pusi = true;
104                    pointer_field = diff as u8;
105                    cap = PUSI_PAYLOAD_CAP;
106                } else {
107                    pusi = false;
108                    pointer_field = 0;
109                    cap = PAYLOAD_CAP;
110                }
111            } else {
112                pusi = false;
113                pointer_field = 0;
114                cap = PAYLOAD_CAP;
115            }
116
117            let mut pkt = [0u8; TS_PACKET_SIZE];
118
119            let header = TsHeader {
120                tei: false,
121                pusi,
122                pid: self.pid,
123                scrambling: 0,
124                has_adaptation: false,
125                has_payload: true,
126                continuity_counter: self.continuity_counter,
127            };
128            header
129                .serialize_into(&mut pkt[..4])
130                .expect("4-byte header buffer");
131
132            self.continuity_counter = (self.continuity_counter + 1) & CC_MASK;
133
134            let mut write_pos = 4usize;
135
136            if pusi {
137                pkt[write_pos] = pointer_field;
138                write_pos += 1;
139            }
140
141            let remaining = data.len() - pos;
142            let to_copy = remaining.min(cap);
143            pkt[write_pos..write_pos + to_copy].copy_from_slice(&data[pos..pos + to_copy]);
144            pos += to_copy;
145            write_pos += to_copy;
146
147            // 0xFF-stuff remaining payload bytes.
148            for b in &mut pkt[write_pos..] {
149                *b = STUFFING_BYTE;
150            }
151
152            out.push(pkt);
153        }
154
155        out.len() - count_before
156    }
157
158    /// Allocating convenience wrapper over [`packetize_into`](Self::packetize_into).
159    pub fn packetize(&mut self, sections: &[&[u8]]) -> Vec<[u8; TS_PACKET_SIZE]> {
160        let mut out = Vec::new();
161        self.packetize_into(sections, &mut out);
162        out
163    }
164}
165
166// ── Default interval constants ────────────────────────────────────────────────
167
168/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — NIT maximum interval.
169pub const NIT_MAX_INTERVAL: Duration = Duration::from_secs(10);
170/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — BAT maximum interval.
171pub const BAT_MAX_INTERVAL: Duration = Duration::from_secs(10);
172/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — SDT actual maximum interval.
173pub const SDT_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
174/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — SDT other maximum interval.
175pub const SDT_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
176/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — EIT p/f actual maximum interval.
177pub const EIT_PF_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
178/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT p/f other maximum interval (sat/cable;
179/// terrestrial is 20 s per §4.4.2).
180pub const EIT_PF_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
181/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT schedule first 8 days maximum interval.
182pub const EIT_SCHED_MAX_INTERVAL: Duration = Duration::from_secs(10);
183/// dvb-si/docs/tr_101_211.md §4.4.1 — EIT schedule beyond 8 days maximum interval.
184pub const EIT_SCHED_EXT_MAX_INTERVAL: Duration = Duration::from_secs(30);
185/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — TDT maximum interval.
186pub const TDT_MAX_INTERVAL: Duration = Duration::from_secs(30);
187/// dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2 — TOT maximum interval.
188pub const TOT_MAX_INTERVAL: Duration = Duration::from_secs(30);
189
190/// dvb-si/docs/ts_101_154_av_coding.md §4.1.7 — PAT maximum interval (shall ≤ 100 ms).
191pub const PAT_MAX_INTERVAL: Duration = Duration::from_millis(100);
192/// dvb-si/docs/ts_101_154_av_coding.md §4.1.7 — PMT maximum interval (shall ≤ 100 ms).
193pub const PMT_MAX_INTERVAL: Duration = Duration::from_millis(100);
194
195/// dvb-si/docs/en_300_468.md §5.1.4.1 — minimum inter-section interval floor
196/// (≤100 Mbit/s TSs).
197pub const MIN_SECTION_INTERVAL: Duration = Duration::from_millis(25);
198
199// ── SiMux ────────────────────────────────────────────────────────────────────
200
201/// Section-repetition scheduler that builds TS packets on a caller-supplied clock.
202///
203/// Each entry is a PID + concatenated complete-section bytes + an emission
204/// interval. Call [`poll_into`](Self::poll_into) with monotonically-increasing
205/// `now` values to get 188-byte TS packets for every entry whose interval has
206/// elapsed since its last emission.
207///
208/// The scheduler owns its section bytes — call [`upsert`](Self::upsert) to
209/// update them when SI changes. Continuity counters are continuous per-PID
210/// across poll cycles.
211///
212/// # 25 ms floor
213///
214/// [`MIN_SECTION_INTERVAL`] is the minimum valid interval (EN 300 468 §5.1.4.1).
215/// Supplying an interval below this in [`upsert`](Self::upsert) triggers a
216/// `debug_assert` — in release builds the assertion compiles out; the caller
217/// must ensure compliance.
218pub struct SiMux {
219    entries: Vec<Entry>,
220}
221
222struct Entry {
223    pid: u16,
224    sections: Vec<u8>,
225    interval: Duration,
226    last_emit: Option<Duration>,
227    packetizer: SectionPacketizer,
228}
229
230impl SiMux {
231    /// Create an empty scheduler (no entries, no pending emissions).
232    pub fn new() -> Self {
233        Self {
234            entries: Vec::new(),
235        }
236    }
237
238    /// Register or replace the sections emitted on `pid` at `interval`.
239    ///
240    /// `sections` is the concatenated complete-section bytes for one emission
241    /// cycle. Re-calling for the same `pid` updates the bytes and interval
242    /// while preserving the continuity counter.
243    ///
244    /// # Panics (debug only)
245    ///
246    /// `debug_assert!(interval >= MIN_SECTION_INTERVAL)` — EN 300 468 §5.1.4.1
247    /// requires a minimum inter-section interval of 25 ms.
248    pub fn upsert(&mut self, pid: u16, sections: Vec<u8>, interval: Duration) {
249        debug_assert!(
250            interval >= MIN_SECTION_INTERVAL,
251            "interval {interval:?} is below the 25 ms minimum (EN 300 468 §5.1.4.1)"
252        );
253
254        if let Some(entry) = self.entries.iter_mut().find(|e| e.pid == pid) {
255            entry.sections = sections;
256            entry.interval = interval;
257        } else {
258            self.entries.push(Entry {
259                pid,
260                sections,
261                interval,
262                last_emit: None,
263                packetizer: SectionPacketizer::new(pid),
264            });
265        }
266    }
267
268    /// PAT at [`PAT_MAX_INTERVAL`] on PID 0x0000.
269    ///
270    /// Interval cites dvb-si/docs/ts_101_154_av_coding.md §4.1.7.
271    pub fn upsert_pat(&mut self, sections: Vec<u8>) {
272        self.upsert(well_known::PAT.value(), sections, PAT_MAX_INTERVAL);
273    }
274
275    /// PMT at [`PMT_MAX_INTERVAL`] on the caller-supplied `pid`.
276    ///
277    /// Interval cites dvb-si/docs/ts_101_154_av_coding.md §4.1.7.
278    pub fn upsert_pmt(&mut self, pid: u16, sections: Vec<u8>) {
279        self.upsert(pid, sections, PMT_MAX_INTERVAL);
280    }
281
282    /// SDT actual at [`SDT_ACTUAL_MAX_INTERVAL`] on PID 0x0011.
283    ///
284    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
285    pub fn upsert_sdt_actual(&mut self, sections: Vec<u8>) {
286        self.upsert(
287            well_known::SDT_BAT.value(),
288            sections,
289            SDT_ACTUAL_MAX_INTERVAL,
290        );
291    }
292
293    /// NIT at [`NIT_MAX_INTERVAL`] on PID 0x0010.
294    ///
295    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
296    pub fn upsert_nit(&mut self, sections: Vec<u8>) {
297        self.upsert(well_known::NIT.value(), sections, NIT_MAX_INTERVAL);
298    }
299
300    /// TDT at [`TDT_MAX_INTERVAL`] on PID 0x0014.
301    ///
302    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
303    pub fn upsert_tdt(&mut self, sections: Vec<u8>) {
304        self.upsert(well_known::TDT_TOT.value(), sections, TDT_MAX_INTERVAL);
305    }
306
307    /// TOT at [`TOT_MAX_INTERVAL`] on PID 0x0014.
308    ///
309    /// Interval cites dvb-si/docs/tr_101_211.md §4.4.1/§4.4.2.
310    pub fn upsert_tot(&mut self, sections: Vec<u8>) {
311        self.upsert(well_known::TDT_TOT.value(), sections, TOT_MAX_INTERVAL);
312    }
313
314    /// Emit every entry due at `now` (i.e. `now - last_emit >= interval`, and
315    /// first call always due), packetizing via [`SectionPacketizer`], appended
316    /// to `out` (cleared first).
317    ///
318    /// Updates each emitted entry's `last_emit = now`. Deterministic given the
319    /// fed `now` sequence. Returns the packet count appended.
320    pub fn poll_into(&mut self, now: Duration, out: &mut Vec<[u8; TS_PACKET_SIZE]>) -> usize {
321        out.clear();
322        let before = out.len();
323
324        let mut tmp = Vec::new();
325        for entry in &mut self.entries {
326            let due = match entry.last_emit {
327                None => true,
328                Some(last) => now.saturating_sub(last) >= entry.interval,
329            };
330            if due {
331                let refs = split_sections(&entry.sections);
332                if !refs.is_empty() {
333                    entry.packetizer.packetize_into(&refs, &mut tmp);
334                    out.append(&mut tmp);
335                }
336                entry.last_emit = Some(now);
337            }
338        }
339
340        out.len() - before
341    }
342
343    /// Allocating convenience wrapper over [`poll_into`](Self::poll_into).
344    pub fn poll(&mut self, now: Duration) -> Vec<[u8; TS_PACKET_SIZE]> {
345        let mut out = Vec::new();
346        self.poll_into(now, &mut out);
347        out
348    }
349}
350
351impl Default for SiMux {
352    fn default() -> Self {
353        Self::new()
354    }
355}
356
357/// Split concatenated complete-section bytes into individual `&[u8]` slices.
358///
359/// Walks the PSI/SI section headers: byte 0 = table_id, bytes 1-2 =
360/// section_length (12 bits). Each slice is 3 + section_length bytes long.
361/// Trailing bytes that don't form a complete section header are discarded.
362fn split_sections(data: &[u8]) -> Vec<&[u8]> {
363    let mut result = Vec::new();
364    let mut pos = 0;
365    while pos + 3 <= data.len() {
366        let section_length =
367            (((data[pos + 1] & SECTION_LENGTH_HI_MASK) as usize) << 8) | (data[pos + 2] as usize);
368        let end = pos + 3 + section_length;
369        if end > data.len() {
370            break;
371        }
372        result.push(&data[pos..end]);
373        pos = end;
374    }
375    result
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use crate::ts::{SectionReassembler, TsPacket};
382    use alloc::vec;
383    use alloc::vec::Vec;
384
385    // ── helpers ──────────────────────────────────────────────────────────────
386
387    /// Build a long-form section with the given table_id and body bytes.
388    /// Returns the full section including its 3-byte header (no CRC — the
389    /// reassembler does not validate CRC).
390    fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
391        let section_length = body_after_length.len() as u16;
392        let mut v = Vec::with_capacity(3 + section_length as usize);
393        v.push(table_id);
394        // SSI=1, PI=0, reserved=11, length upper 4 bits
395        v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
396        v.push((section_length & 0xFF) as u8);
397        v.extend_from_slice(body_after_length);
398        v
399    }
400
401    fn concat_sections(sections: &[Vec<u8>]) -> Vec<u8> {
402        let total: usize = sections.iter().map(|s| s.len()).sum();
403        let mut out = Vec::with_capacity(total);
404        for s in sections {
405            out.extend_from_slice(s);
406        }
407        out
408    }
409
410    /// Round-trip `sections` through packetize → reassembler, asserting
411    /// byte-identical output in order and no leftovers.
412    fn assert_round_trip(sections: &[Vec<u8>]) {
413        let mut packetizer = SectionPacketizer::new(0x0100);
414        let refs: Vec<&[u8]> = sections.iter().map(|s| s.as_slice()).collect();
415        let packets = packetizer.packetize(&refs);
416
417        let mut reasm = SectionReassembler::default();
418        for pkt_raw in &packets {
419            let pkt = TsPacket::parse(pkt_raw).expect("parse generated packet");
420            let payload = pkt.payload.expect("payload present");
421            let pusi = pkt.header.pusi;
422            reasm.feed(payload, pusi);
423        }
424
425        let got: Vec<_> = core::iter::from_fn(|| reasm.pop_section()).collect();
426        assert_eq!(
427            got.len(),
428            sections.len(),
429            "section count mismatch: expected {}, got {}",
430            sections.len(),
431            got.len()
432        );
433        for (i, (orig, round)) in sections.iter().zip(got.iter()).enumerate() {
434            assert_eq!(
435                round.as_ref(),
436                orig.as_slice(),
437                "section {i} round-trip mismatch"
438            );
439        }
440        assert!(reasm.is_empty(), "reassembler should be empty after drain");
441    }
442
443    // ── split_sections ──────────────────────────────────────────────────────
444
445    #[test]
446    fn split_single_section() {
447        let s = build_section(0x42, &[0xAA; 5]);
448        let refs = split_sections(&s);
449        assert_eq!(refs.len(), 1);
450        assert_eq!(refs[0], s.as_slice());
451    }
452
453    #[test]
454    fn split_two_sections() {
455        let s1 = build_section(0x42, &[0x01, 0x02]);
456        let s2 = build_section(0x46, &[0x03, 0x04, 0x05]);
457        let both = concat_sections(&[s1.clone(), s2.clone()]);
458        let refs = split_sections(&both);
459        assert_eq!(refs.len(), 2);
460        assert_eq!(refs[0], s1.as_slice());
461        assert_eq!(refs[1], s2.as_slice());
462    }
463
464    #[test]
465    fn split_empty_input() {
466        let refs = split_sections(&[]);
467        assert!(refs.is_empty());
468    }
469
470    #[test]
471    fn split_trailing_garbage_ignored() {
472        let s = build_section(0x42, &[0xAA; 3]);
473        let mut data = s.clone();
474        data.push(0xFF); // trailing byte that doesn't complete a header
475        let refs = split_sections(&data);
476        assert_eq!(refs.len(), 1);
477        assert_eq!(refs[0], s.as_slice());
478    }
479
480    // ── interval constants pinned to spec ────────────────────────────────────
481
482    #[test]
483    fn interval_constants_match_spec() {
484        // TR 101 211 §4.4.1/§4.4.2
485        assert_eq!(NIT_MAX_INTERVAL, Duration::from_secs(10));
486        assert_eq!(BAT_MAX_INTERVAL, Duration::from_secs(10));
487        assert_eq!(SDT_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
488        assert_eq!(SDT_OTHER_MAX_INTERVAL, Duration::from_secs(10));
489        assert_eq!(EIT_PF_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
490        assert_eq!(EIT_PF_OTHER_MAX_INTERVAL, Duration::from_secs(10));
491        assert_eq!(EIT_SCHED_MAX_INTERVAL, Duration::from_secs(10));
492        assert_eq!(EIT_SCHED_EXT_MAX_INTERVAL, Duration::from_secs(30));
493        assert_eq!(TDT_MAX_INTERVAL, Duration::from_secs(30));
494        assert_eq!(TOT_MAX_INTERVAL, Duration::from_secs(30));
495
496        // TS 101 154 §4.1.7
497        assert_eq!(PAT_MAX_INTERVAL, Duration::from_millis(100));
498        assert_eq!(PMT_MAX_INTERVAL, Duration::from_millis(100));
499
500        // EN 300 468 §5.1.4.1
501        assert_eq!(MIN_SECTION_INTERVAL, Duration::from_millis(25));
502    }
503
504    // ── SiMux: basic behaviour ───────────────────────────────────────────────
505
506    #[test]
507    fn new_simux_is_empty() {
508        let mut mux = SiMux::new();
509        let pkts = mux.poll(Duration::ZERO);
510        assert!(pkts.is_empty());
511    }
512
513    #[test]
514    fn simux_default_is_empty() {
515        let mut mux = SiMux::default();
516        let pkts = mux.poll(Duration::ZERO);
517        assert!(pkts.is_empty());
518    }
519
520    #[test]
521    fn first_poll_always_emits() {
522        let s = build_section(0x42, &[0xAA; 10]);
523        let mut mux = SiMux::new();
524        mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
525        let pkts = mux.poll(Duration::ZERO);
526        assert!(!pkts.is_empty(), "first poll must emit");
527    }
528
529    #[test]
530    fn first_poll_emits_all_entries() {
531        let s1 = build_section(0x42, &[0x01; 5]);
532        let s2 = build_section(0x46, &[0x02; 5]);
533        let mut mux = SiMux::new();
534        mux.upsert(0x0100, s1.clone(), Duration::from_millis(100));
535        mux.upsert(0x0200, s2.clone(), Duration::from_millis(200));
536        let pkts = mux.poll(Duration::ZERO);
537        // Both entries should emit — we count PID occurrences
538        let pids: Vec<u16> = pkts
539            .iter()
540            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
541            .collect();
542        assert!(pids.contains(&0x0100));
543        assert!(pids.contains(&0x0200));
544    }
545
546    // ── deterministic schedule ───────────────────────────────────────────────
547
548    #[test]
549    fn entry_emits_only_when_due() {
550        let s = build_section(0x42, &[0xAA; 10]);
551        let mut mux = SiMux::new();
552        mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
553
554        // First poll at t=0: emits
555        let pkts0 = mux.poll(Duration::ZERO);
556        assert!(!pkts0.is_empty());
557
558        // t=50: not yet due
559        let pkts50 = mux.poll(Duration::from_millis(50));
560        assert!(pkts50.is_empty(), "should not emit at t=50");
561
562        // t=99: still not due
563        let pkts99 = mux.poll(Duration::from_millis(99));
564        assert!(pkts99.is_empty(), "should not emit at t=99");
565
566        // t=100: due again
567        let pkts100 = mux.poll(Duration::from_millis(100));
568        assert!(!pkts100.is_empty(), "should emit at t=100");
569    }
570
571    #[test]
572    fn two_entries_different_cadence() {
573        let s1 = build_section(0x42, &[0x01; 5]);
574        let s2 = build_section(0x46, &[0x02; 5]);
575        let mut mux = SiMux::new();
576        mux.upsert(0x0100, concat_sections(&[s1]), Duration::from_millis(100));
577        mux.upsert(0x0200, concat_sections(&[s2]), Duration::from_millis(200));
578
579        // t=0: both emit
580        let pkts = mux.poll(Duration::ZERO);
581        assert!(!pkts.is_empty());
582
583        // t=100: only 0x0100 emits (due at 100)
584        let pkts100 = mux.poll(Duration::from_millis(100));
585        assert!(!pkts100.is_empty());
586        let pids100: Vec<u16> = pkts100
587            .iter()
588            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
589            .collect();
590        assert!(pids100.contains(&0x0100), "PID 0x0100 should emit at t=100");
591        assert!(
592            !pids100.contains(&0x0200),
593            "PID 0x0200 should NOT emit at t=100"
594        );
595
596        // t=150: neither emits
597        let pkts150 = mux.poll(Duration::from_millis(150));
598        assert!(pkts150.is_empty(), "neither should emit at t=150");
599
600        // t=200: both emit again
601        let pkts200 = mux.poll(Duration::from_millis(200));
602        let pids200: Vec<u16> = pkts200
603            .iter()
604            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
605            .collect();
606        assert!(pids200.contains(&0x0100), "PID 0x0100 should emit at t=200");
607        assert!(pids200.contains(&0x0200), "PID 0x0200 should emit at t=200");
608    }
609
610    #[test]
611    fn entry_emits_again_after_full_interval() {
612        let s = build_section(0x42, &[0xAA; 20]);
613        let mut mux = SiMux::new();
614        mux.upsert(0x0100, s, Duration::from_secs(1));
615
616        // t=0
617        let n0 = mux.poll_into(Duration::ZERO, &mut Vec::new());
618        assert!(n0 > 0);
619
620        // t=999ms: not due
621        let mut buf = Vec::new();
622        let n999 = mux.poll_into(Duration::from_millis(999), &mut buf);
623        assert_eq!(n999, 0);
624
625        // t=1s: due
626        let mut buf2 = Vec::new();
627        let n1000 = mux.poll_into(Duration::from_millis(1000), &mut buf2);
628        assert!(n1000 > 0);
629    }
630
631    // ── upsert replaces existing entry ───────────────────────────────────────
632
633    #[test]
634    fn upsert_updates_existing_entry() {
635        let s1 = build_section(0x42, &[0xAA; 5]);
636        let s2 = build_section(0x46, &[0xBB; 10]);
637        let mut mux = SiMux::new();
638
639        mux.upsert(0x0100, s1, Duration::from_millis(100));
640        // Replace sections on same PID
641        mux.upsert(0x0100, s2.clone(), Duration::from_millis(200));
642
643        let pkts = mux.poll(Duration::ZERO);
644
645        // Round-trip the output to verify the updated sections
646        let mut reasm = SectionReassembler::default();
647        for raw in &pkts {
648            let pkt = TsPacket::parse(raw).unwrap();
649            if pkt.header.pid == 0x0100 {
650                reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
651            }
652        }
653        let got: Vec<_> = core::iter::from_fn(|| reasm.pop_section()).collect();
654        assert_eq!(got.len(), 1);
655        assert_eq!(got[0].as_ref(), s2.as_slice());
656    }
657
658    // ── convenience constructors ─────────────────────────────────────────────
659
660    #[test]
661    fn upsert_pat_uses_correct_pid() {
662        let s = build_section(0x00, &[0x01, 0x02]);
663        let mut mux = SiMux::new();
664        mux.upsert_pat(s);
665        let pkts = mux.poll(Duration::ZERO);
666        let pids: Vec<u16> = pkts
667            .iter()
668            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
669            .collect();
670        assert!(pids.iter().all(|&p| p == well_known::PAT.value()));
671    }
672
673    #[test]
674    fn upsert_sdt_actual_uses_correct_pid() {
675        let s = build_section(0x42, &[0x01]);
676        let mut mux = SiMux::new();
677        mux.upsert_sdt_actual(s);
678        let pkts = mux.poll(Duration::ZERO);
679        let pids: Vec<u16> = pkts
680            .iter()
681            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
682            .collect();
683        assert!(pids.iter().all(|&p| p == well_known::SDT_BAT.value()));
684    }
685
686    #[test]
687    fn upsert_nit_uses_correct_pid() {
688        let s = build_section(0x40, &[0x01]);
689        let mut mux = SiMux::new();
690        mux.upsert_nit(s);
691        let pkts = mux.poll(Duration::ZERO);
692        let pids: Vec<u16> = pkts
693            .iter()
694            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
695            .collect();
696        assert!(pids.iter().all(|&p| p == well_known::NIT.value()));
697    }
698
699    #[test]
700    fn upsert_tdt_uses_correct_pid() {
701        let s = build_section(0x70, &[0x01]);
702        let mut mux = SiMux::new();
703        mux.upsert_tdt(s);
704        let pkts = mux.poll(Duration::ZERO);
705        let pids: Vec<u16> = pkts
706            .iter()
707            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
708            .collect();
709        assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
710    }
711
712    #[test]
713    fn upsert_tot_uses_correct_pid() {
714        let s = build_section(0x73, &[0x01]);
715        let mut mux = SiMux::new();
716        mux.upsert_tot(s);
717        let pkts = mux.poll(Duration::ZERO);
718        let pids: Vec<u16> = pkts
719            .iter()
720            .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
721            .collect();
722        assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
723    }
724
725    // ── round-trip through reassembler ───────────────────────────────────────
726
727    #[test]
728    fn simux_round_trip_single_entry() {
729        let s1 = build_section(0x42, &[0xAA; 20]);
730        let s2 = build_section(0x46, &[0xBB; 15]);
731        let mut mux = SiMux::new();
732        mux.upsert(
733            0x0100,
734            concat_sections(&[s1.clone(), s2.clone()]),
735            Duration::from_millis(100),
736        );
737
738        let pkts = mux.poll(Duration::ZERO);
739
740        let mut reasm = SectionReassembler::default();
741        for raw in &pkts {
742            let pkt = TsPacket::parse(raw).unwrap();
743            if pkt.header.pid == 0x0100 {
744                reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
745            }
746        }
747        let got: Vec<_> = core::iter::from_fn(|| reasm.pop_section()).collect();
748        assert_eq!(got.len(), 2, "round-trip must recover both sections");
749        assert_eq!(got[0].as_ref(), s1.as_slice());
750        assert_eq!(got[1].as_ref(), s2.as_slice());
751    }
752
753    #[test]
754    fn simux_round_trip_multi_pid() {
755        let s_a = build_section(0x42, &[0xA0; 10]);
756        let s_b = build_section(0x46, &[0xB0; 10]);
757        let mut mux = SiMux::new();
758        mux.upsert(0x0100, s_a.clone(), Duration::from_millis(100));
759        mux.upsert(0x0200, s_b.clone(), Duration::from_millis(100));
760
761        let pkts = mux.poll(Duration::ZERO);
762
763        let mut reasm_a = SectionReassembler::default();
764        let mut reasm_b = SectionReassembler::default();
765        for raw in &pkts {
766            let pkt = TsPacket::parse(raw).unwrap();
767            match pkt.header.pid {
768                0x0100 => reasm_a.feed(pkt.payload.unwrap(), pkt.header.pusi),
769                0x0200 => reasm_b.feed(pkt.payload.unwrap(), pkt.header.pusi),
770                _ => {}
771            }
772        }
773        let got_a: Vec<_> = core::iter::from_fn(|| reasm_a.pop_section()).collect();
774        let got_b: Vec<_> = core::iter::from_fn(|| reasm_b.pop_section()).collect();
775        assert_eq!(got_a.len(), 1);
776        assert_eq!(got_b.len(), 1);
777        assert_eq!(got_a[0].as_ref(), s_a.as_slice());
778        assert_eq!(got_b[0].as_ref(), s_b.as_slice());
779    }
780
781    // ── CC continuity across polls ───────────────────────────────────────────
782
783    #[test]
784    fn continuity_counter_continuous_across_polls() {
785        // Section large enough to span at least 2 packets.
786        let s = build_section(0x42, &[0xAA; 250]);
787        let mut mux = SiMux::new();
788        mux.upsert(0x0100, s, Duration::from_millis(100));
789
790        let pkts1 = mux.poll(Duration::ZERO);
791        assert!(pkts1.len() >= 2, "need ≥2 packets to test CC continuity");
792
793        let last_cc_pkts1 = TsPacket::parse(&pkts1[pkts1.len() - 1])
794            .unwrap()
795            .header
796            .continuity_counter;
797
798        let pkts2 = mux.poll(Duration::from_millis(100));
799        assert!(!pkts2.is_empty(), "second poll must emit");
800
801        let first_cc_pkts2 = TsPacket::parse(&pkts2[0])
802            .unwrap()
803            .header
804            .continuity_counter;
805
806        assert_eq!(
807            first_cc_pkts2,
808            (last_cc_pkts1 + 1) & 0x0F,
809            "CC must continue across poll cycles"
810        );
811    }
812
813    // ── poll_into clears output ──────────────────────────────────────────────
814
815    #[test]
816    fn poll_into_clears_out_before_appending() {
817        let s = build_section(0x42, &[0xAA; 5]);
818        let mut mux = SiMux::new();
819        mux.upsert(0x0100, s, Duration::from_millis(100));
820
821        let mut out = vec![[0u8; TS_PACKET_SIZE]; 42];
822        let n = mux.poll_into(Duration::ZERO, &mut out);
823        assert_eq!(n, out.len(), "out must contain only new packets");
824    }
825
826    // ── debug_assert on sub-25 ms interval ───────────────────────────────────
827
828    #[test]
829    #[cfg(debug_assertions)]
830    #[should_panic(expected = "25 ms")]
831    fn upsert_rejects_sub_25ms_interval_in_debug() {
832        let s = build_section(0x42, &[0xAA; 5]);
833        let mut mux = SiMux::new();
834        mux.upsert(0x0100, s, Duration::from_millis(10));
835    }
836
837    #[test]
838    fn upsert_accepts_25ms_interval() {
839        let s = build_section(0x42, &[0xAA; 5]);
840        let mut mux = SiMux::new();
841        mux.upsert(0x0100, s, MIN_SECTION_INTERVAL);
842        let pkts = mux.poll(Duration::ZERO);
843        assert!(!pkts.is_empty());
844    }
845
846    // ── round-trip property (the mandatory acceptance oracle) ────────────────
847
848    #[test]
849    fn round_trip_single_short_section() {
850        let s = build_section(0x42, &[0xAA; 10]);
851        assert_round_trip(&[s]);
852    }
853
854    #[test]
855    fn round_trip_one_byte_body() {
856        let s = build_section(0x46, &[0xBB]); // 4 bytes total
857        assert_round_trip(&[s]);
858    }
859
860    #[test]
861    fn round_trip_section_exactly_pusi_cap_boundary() {
862        // A section whose total length is exactly PUSI_PAYLOAD_CAP (183).
863        let body = vec![0xCC; PUSI_PAYLOAD_CAP - 3];
864        let s = build_section(0x50, &body);
865        assert_eq!(s.len(), PUSI_PAYLOAD_CAP);
866        assert_round_trip(&[s]);
867    }
868
869    #[test]
870    fn round_trip_section_just_over_pusi_cap() {
871        // One byte more than fits in a PUSI packet → must span to continuation.
872        let body = vec![0xDD; PUSI_PAYLOAD_CAP - 3 + 1];
873        let s = build_section(0x52, &body);
874        assert_eq!(s.len(), PUSI_PAYLOAD_CAP + 1);
875        assert_round_trip(&[s]);
876    }
877
878    #[test]
879    fn round_trip_section_spans_many_packets() {
880        // A 2000-byte section spans ~11 packets.
881        let body = vec![0xEE; 2000 - 3];
882        let s = build_section(0x60, &body);
883        assert_round_trip(&[s]);
884    }
885
886    #[test]
887    fn round_trip_section_at_max_size() {
888        // The maximum section (4096 total, the long-form ceiling), whose final
889        // continuation packet carries the tail followed by 0xFF stuffing. Since
890        // #148 the reassembler ignores that trailing stuffing instead of
891        // counting it toward MAX_SECTION_SIZE, so a full-size section round-trips.
892        let body = vec![0x11; 4096 - 3];
893        let s = build_section(0x80, &body);
894        assert_eq!(s.len(), 4096);
895        assert_round_trip(&[s]);
896    }
897
898    #[test]
899    fn round_trip_multiple_short_sections_in_one_batch() {
900        let s1 = build_section(0x42, &[0x01, 0x02]); // 5 bytes
901        let s2 = build_section(0x46, &[0x03]); // 4 bytes
902        let s3 = build_section(0x4A, &[0x04, 0x05, 0x06]); // 6 bytes
903        assert_round_trip(&[s1, s2, s3]);
904    }
905
906    #[test]
907    fn round_trip_section_ends_exactly_at_boundary() {
908        // First section is exactly PUSI_PAYLOAD_CAP bytes — ends at packet
909        // boundary.  Second section starts fresh in the next packet with
910        // PUSI=1, pointer_field=0.
911        let body1 = vec![0xA1; PUSI_PAYLOAD_CAP - 3];
912        let s1 = build_section(0x50, &body1);
913        assert_eq!(s1.len(), PUSI_PAYLOAD_CAP);
914
915        let s2 = build_section(0x52, &[0xB1, 0xB2]);
916        assert_round_trip(&[s1, s2]);
917    }
918
919    #[test]
920    fn round_trip_mix_small_large_sections() {
921        // Mix of small and spanning sections that stress pointer_field and
922        // concatenation.
923        let s1 = build_section(0x10, &[0xAA; 5]);
924        let body2 = vec![0xBB; 200];
925        let s2 = build_section(0x20, &body2);
926        let s3 = build_section(0x30, &[0xCC; 50]);
927        let body4 = vec![0xDD; 800];
928        let s4 = build_section(0x40, &body4);
929        let s5 = build_section(0x50, &[0xEE]); // 1-byte body
930        assert_round_trip(&[s1, s2, s3, s4, s5]);
931    }
932
933    // ── continuity counter ───────────────────────────────────────────────────
934
935    #[test]
936    fn continuity_counter_increments_per_packet() {
937        // Use a section large enough to span several packets.
938        let body = vec![0xAA; 500];
939        let section = build_section(0x42, &body);
940        let mut p = SectionPacketizer::new(0x0100);
941
942        let packets = p.packetize(&[&section]);
943        assert!(packets.len() >= 3, "need multiple packets to test CC");
944
945        let mut last_cc: Option<u8> = None;
946        for pkt_raw in &packets {
947            let pkt = TsPacket::parse(pkt_raw).unwrap();
948            let cc = pkt.header.continuity_counter;
949            if let Some(last) = last_cc {
950                assert_eq!(cc, (last + 1) & 0x0F, "CC must increment per packet");
951            }
952            last_cc = Some(cc);
953        }
954    }
955
956    #[test]
957    fn continuity_counter_wraps_and_continues_across_calls() {
958        let mut p = SectionPacketizer::with_continuity(0x0100, 14);
959        // Section large enough to span at least 3 packets.
960        let body = vec![0xBB; 500];
961        let s = build_section(0x42, &body);
962
963        // First call: CC 14, 15, 0, …
964        let pkts1 = p.packetize(&[&s]);
965        assert!(pkts1.len() >= 3, "section must span ≥3 packets");
966        let ccs1: Vec<u8> = pkts1
967            .iter()
968            .map(|b| TsPacket::parse(b).unwrap().header.continuity_counter)
969            .collect();
970        assert_eq!(ccs1[0], 14);
971        assert_eq!(ccs1[1], 15);
972        assert_eq!(ccs1[2], 0);
973
974        // Second call: CC continues from where first left off.
975        let pkts2 = p.packetize(&[&s]);
976        let cc_first_pkt2 = TsPacket::parse(&pkts2[0])
977            .unwrap()
978            .header
979            .continuity_counter;
980        assert_eq!(cc_first_pkt2, ccs1.last().map(|c| (c + 1) & 0x0F).unwrap());
981    }
982
983    // ── PUSI placement ──────────────────────────────────────────────────────
984
985    #[test]
986    fn pusi_set_when_section_starts() {
987        let s = build_section(0x42, &[0xAA; 10]);
988        let mut p = SectionPacketizer::new(0x0100);
989        let packets = p.packetize(&[&s]);
990        assert!(!packets.is_empty());
991        let pkt = TsPacket::parse(&packets[0]).unwrap();
992        assert!(pkt.header.pusi, "first packet must have PUSI=1");
993    }
994
995    #[test]
996    fn pusi_not_set_on_mid_section_continuation() {
997        let body = vec![0xAA; 500];
998        let s = build_section(0x42, &body);
999        let mut p = SectionPacketizer::new(0x0100);
1000        let packets = p.packetize(&[&s]);
1001        assert!(packets.len() >= 2);
1002        let pkt1 = TsPacket::parse(&packets[0]).unwrap();
1003        let pkt2 = TsPacket::parse(&packets[1]).unwrap();
1004        assert!(pkt1.header.pusi, "first packet must have PUSI=1");
1005        assert!(
1006            !pkt2.header.pusi,
1007            "second packet is continuation, must have PUSI=0"
1008        );
1009    }
1010
1011    #[test]
1012    fn pointer_field_equals_tail_length_before_new_section() {
1013        // Section1 = 200 bytes.  Section2 = 50 bytes.
1014        // Packet 1: PUSI=1, pointer=0, section1 head.
1015        // Packet 2: PUSI=1, pointer > 0 (tail of section1 before section2).
1016        let body1 = vec![0xA1; 197]; // 200-byte section
1017        let s1 = build_section(0x52, &body1);
1018        assert_eq!(s1.len(), 200);
1019        let s2 = build_section(0x54, &[0xB1; 47]); // 50-byte section
1020        assert_eq!(s2.len(), 50);
1021
1022        let mut p = SectionPacketizer::new(0x0100);
1023        let packets = p.packetize(&[&s1, &s2]);
1024
1025        // Find the packet where PUSI=1 and pointer>0.
1026        let pkt_with_pointer = packets
1027            .iter()
1028            .map(|raw| TsPacket::parse(raw).unwrap())
1029            .find(|pkt| pkt.header.pusi && pkt.payload.is_some_and(|pl| pl.first() != Some(&0)))
1030            .expect("must have a PUSI packet with non-zero pointer");
1031
1032        let payload = pkt_with_pointer.payload.unwrap();
1033        let pointer = payload[0] as usize;
1034        assert!(pointer > 0, "pointer must be non-zero");
1035        // The tail bytes should be from the end of section1.
1036        let tail_start = s1.len() - pointer;
1037        assert_eq!(&payload[1..1 + pointer], &s1[tail_start..]);
1038    }
1039
1040    // ── stuffing ─────────────────────────────────────────────────────────────
1041
1042    #[test]
1043    fn final_packet_unused_tail_is_stuffing() {
1044        let s = build_section(0x42, &[0xAA; 5]); // 8 bytes total
1045        let mut p = SectionPacketizer::new(0x0100);
1046        let packets = p.packetize(&[&s]);
1047
1048        let pkt = TsPacket::parse(&packets[0]).unwrap();
1049        let payload = pkt.payload.unwrap();
1050        assert_eq!(payload[0], 0, "pointer_field should be 0");
1051
1052        let section_end = 1 + s.len(); // after pointer + section
1053        assert!(
1054            section_end < payload.len(),
1055            "must have stuffing after section"
1056        );
1057        for &b in &payload[section_end..] {
1058            assert_eq!(b, STUFFING_BYTE, "all trailing bytes must be 0xFF");
1059        }
1060    }
1061
1062    #[test]
1063    fn reassembler_discards_stuffing() {
1064        let s1 = build_section(0x42, &[0xAA; 10]);
1065        let s2 = build_section(0x46, &[0xBB; 5]);
1066
1067        let mut p = SectionPacketizer::new(0x0100);
1068        let packets = p.packetize(&[&s1, &s2]);
1069
1070        let mut reasm = SectionReassembler::default();
1071        for pkt_raw in &packets {
1072            let pkt = TsPacket::parse(pkt_raw).unwrap();
1073            reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1074        }
1075
1076        let got: Vec<_> = core::iter::from_fn(|| reasm.pop_section()).collect();
1077        assert_eq!(got.len(), 2);
1078        assert!(
1079            reasm.is_empty(),
1080            "stuffing tail must be discarded, not buffered"
1081        );
1082    }
1083
1084    // ── misc ─────────────────────────────────────────────────────────────────
1085
1086    #[test]
1087    fn empty_batch_produces_no_packets() {
1088        let mut p = SectionPacketizer::new(0x0100);
1089        let packets: Vec<[u8; TS_PACKET_SIZE]> = p.packetize(&[]);
1090        assert!(packets.is_empty());
1091    }
1092
1093    #[test]
1094    fn packetize_into_clears_out_first() {
1095        let s = build_section(0x42, &[0xAA; 5]);
1096        let mut p = SectionPacketizer::new(0x0100);
1097
1098        let mut out = vec![[0u8; TS_PACKET_SIZE]; 99]; // pre-existing junk
1099        let n = p.packetize_into(&[&s], &mut out);
1100        assert_eq!(n, out.len(), "out must contain only the new packets");
1101        // Verify the output is correct (round-trip).
1102        let mut reasm = SectionReassembler::default();
1103        for pkt_raw in &out {
1104            let pkt = TsPacket::parse(pkt_raw).unwrap();
1105            reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1106        }
1107        let got = reasm.pop_section().unwrap();
1108        assert_eq!(got.as_ref(), s.as_slice());
1109    }
1110
1111    #[test]
1112    fn pid_is_correct() {
1113        let p = SectionPacketizer::new(0x1234);
1114        assert_eq!(p.pid(), 0x1234);
1115    }
1116
1117    #[test]
1118    fn with_continuity_masks_to_4_bits() {
1119        let p = SectionPacketizer::with_continuity(0x0100, 0xFE);
1120        assert_eq!(p.continuity_counter(), 0x0E);
1121    }
1122
1123    #[test]
1124    fn has_payload_always_true_no_adaptation() {
1125        let s = build_section(0x42, &[0xAA; 50]);
1126        let mut p = SectionPacketizer::new(0x0100);
1127        let packets = p.packetize(&[&s]);
1128        for pkt_raw in &packets {
1129            let pkt = TsPacket::parse(pkt_raw).unwrap();
1130            assert!(pkt.header.has_payload, "every packet must carry payload");
1131            assert!(!pkt.header.has_adaptation, "no adaptation field is emitted");
1132            assert!(!pkt.header.tei, "TEI must be false");
1133            assert_eq!(pkt.header.scrambling, 0, "scrambling must be 0");
1134        }
1135    }
1136}