1use core::time::Duration;
10
11use crate::pid::well_known;
12use crate::ts::{TsHeader, CC_MASK, SECTION_LENGTH_HI_MASK, TS_PACKET_SIZE};
13
14const PUSI_PAYLOAD_CAP: usize = 183;
16const PAYLOAD_CAP: usize = 184;
18const STUFFING_BYTE: u8 = 0xFF;
20
21pub struct SectionPacketizer {
30 pid: u16,
31 continuity_counter: u8,
32}
33
34impl SectionPacketizer {
35 pub fn new(pid: u16) -> Self {
37 Self {
38 pid,
39 continuity_counter: 0,
40 }
41 }
42
43 pub fn with_continuity(pid: u16, cc: u8) -> Self {
45 Self {
46 pid,
47 continuity_counter: cc & CC_MASK,
48 }
49 }
50
51 pub fn pid(&self) -> u16 {
53 self.pid
54 }
55
56 pub fn continuity_counter(&self) -> u8 {
58 self.continuity_counter
59 }
60
61 pub fn packetize_into(
66 &mut self,
67 sections: &[&[u8]],
68 out: &mut Vec<[u8; TS_PACKET_SIZE]>,
69 ) -> usize {
70 out.clear();
71
72 if sections.is_empty() {
73 return 0;
74 }
75
76 let total_len: usize = sections.iter().map(|s| s.len()).sum();
78 if total_len == 0 {
79 return 0;
80 }
81 let mut data = Vec::with_capacity(total_len);
82 let mut starts = Vec::with_capacity(sections.len());
83 for s in sections {
84 starts.push(data.len());
85 data.extend_from_slice(s);
86 }
87
88 let count_before = out.len();
89 let mut pos = 0usize;
90
91 while pos < data.len() {
92 let next_start = starts.iter().copied().find(|&s| s >= pos);
94
95 let pusi: bool;
96 let pointer_field: u8;
97 let cap: usize;
98
99 if let Some(ns) = next_start {
100 let diff = ns.saturating_sub(pos);
101 if diff <= PUSI_PAYLOAD_CAP {
102 pusi = true;
103 pointer_field = diff as u8;
104 cap = PUSI_PAYLOAD_CAP;
105 } else {
106 pusi = false;
107 pointer_field = 0;
108 cap = PAYLOAD_CAP;
109 }
110 } else {
111 pusi = false;
112 pointer_field = 0;
113 cap = PAYLOAD_CAP;
114 }
115
116 let mut pkt = [0u8; TS_PACKET_SIZE];
117
118 let header = TsHeader {
119 tei: false,
120 pusi,
121 pid: self.pid,
122 scrambling: 0,
123 has_adaptation: false,
124 has_payload: true,
125 continuity_counter: self.continuity_counter,
126 };
127 header
128 .serialize_into(&mut pkt[..4])
129 .expect("4-byte header buffer");
130
131 self.continuity_counter = (self.continuity_counter + 1) & CC_MASK;
132
133 let mut write_pos = 4usize;
134
135 if pusi {
136 pkt[write_pos] = pointer_field;
137 write_pos += 1;
138 }
139
140 let remaining = data.len() - pos;
141 let to_copy = remaining.min(cap);
142 pkt[write_pos..write_pos + to_copy].copy_from_slice(&data[pos..pos + to_copy]);
143 pos += to_copy;
144 write_pos += to_copy;
145
146 for b in &mut pkt[write_pos..] {
148 *b = STUFFING_BYTE;
149 }
150
151 out.push(pkt);
152 }
153
154 out.len() - count_before
155 }
156
157 pub fn packetize(&mut self, sections: &[&[u8]]) -> Vec<[u8; TS_PACKET_SIZE]> {
159 let mut out = Vec::new();
160 self.packetize_into(sections, &mut out);
161 out
162 }
163}
164
165pub const NIT_MAX_INTERVAL: Duration = Duration::from_secs(10);
169pub const BAT_MAX_INTERVAL: Duration = Duration::from_secs(10);
171pub const SDT_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
173pub const SDT_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
175pub const EIT_PF_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
177pub const EIT_PF_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
180pub const EIT_SCHED_MAX_INTERVAL: Duration = Duration::from_secs(10);
182pub const EIT_SCHED_EXT_MAX_INTERVAL: Duration = Duration::from_secs(30);
184pub const TDT_MAX_INTERVAL: Duration = Duration::from_secs(30);
186pub const TOT_MAX_INTERVAL: Duration = Duration::from_secs(30);
188
189pub const PAT_MAX_INTERVAL: Duration = Duration::from_millis(100);
191pub const PMT_MAX_INTERVAL: Duration = Duration::from_millis(100);
193
194pub const MIN_SECTION_INTERVAL: Duration = Duration::from_millis(25);
197
198pub struct SiMux {
218 entries: Vec<Entry>,
219}
220
221struct Entry {
222 pid: u16,
223 sections: Vec<u8>,
224 interval: Duration,
225 last_emit: Option<Duration>,
226 packetizer: SectionPacketizer,
227}
228
229impl SiMux {
230 pub fn new() -> Self {
232 Self {
233 entries: Vec::new(),
234 }
235 }
236
237 pub fn upsert(&mut self, pid: u16, sections: Vec<u8>, interval: Duration) {
248 debug_assert!(
249 interval >= MIN_SECTION_INTERVAL,
250 "interval {interval:?} is below the 25 ms minimum (EN 300 468 §5.1.4.1)"
251 );
252
253 if let Some(entry) = self.entries.iter_mut().find(|e| e.pid == pid) {
254 entry.sections = sections;
255 entry.interval = interval;
256 } else {
257 self.entries.push(Entry {
258 pid,
259 sections,
260 interval,
261 last_emit: None,
262 packetizer: SectionPacketizer::new(pid),
263 });
264 }
265 }
266
267 pub fn upsert_pat(&mut self, sections: Vec<u8>) {
271 self.upsert(well_known::PAT.value(), sections, PAT_MAX_INTERVAL);
272 }
273
274 pub fn upsert_pmt(&mut self, pid: u16, sections: Vec<u8>) {
278 self.upsert(pid, sections, PMT_MAX_INTERVAL);
279 }
280
281 pub fn upsert_sdt_actual(&mut self, sections: Vec<u8>) {
285 self.upsert(
286 well_known::SDT_BAT.value(),
287 sections,
288 SDT_ACTUAL_MAX_INTERVAL,
289 );
290 }
291
292 pub fn upsert_nit(&mut self, sections: Vec<u8>) {
296 self.upsert(well_known::NIT.value(), sections, NIT_MAX_INTERVAL);
297 }
298
299 pub fn upsert_tdt(&mut self, sections: Vec<u8>) {
303 self.upsert(well_known::TDT_TOT.value(), sections, TDT_MAX_INTERVAL);
304 }
305
306 pub fn upsert_tot(&mut self, sections: Vec<u8>) {
310 self.upsert(well_known::TDT_TOT.value(), sections, TOT_MAX_INTERVAL);
311 }
312
313 pub fn poll_into(&mut self, now: Duration, out: &mut Vec<[u8; TS_PACKET_SIZE]>) -> usize {
320 out.clear();
321 let before = out.len();
322
323 let mut tmp = Vec::new();
324 for entry in &mut self.entries {
325 let due = match entry.last_emit {
326 None => true,
327 Some(last) => now.saturating_sub(last) >= entry.interval,
328 };
329 if due {
330 let refs = split_sections(&entry.sections);
331 if !refs.is_empty() {
332 entry.packetizer.packetize_into(&refs, &mut tmp);
333 out.append(&mut tmp);
334 }
335 entry.last_emit = Some(now);
336 }
337 }
338
339 out.len() - before
340 }
341
342 pub fn poll(&mut self, now: Duration) -> Vec<[u8; TS_PACKET_SIZE]> {
344 let mut out = Vec::new();
345 self.poll_into(now, &mut out);
346 out
347 }
348}
349
350impl Default for SiMux {
351 fn default() -> Self {
352 Self::new()
353 }
354}
355
356fn split_sections(data: &[u8]) -> Vec<&[u8]> {
362 let mut result = Vec::new();
363 let mut pos = 0;
364 while pos + 3 <= data.len() {
365 let section_length =
366 (((data[pos + 1] & SECTION_LENGTH_HI_MASK) as usize) << 8) | (data[pos + 2] as usize);
367 let end = pos + 3 + section_length;
368 if end > data.len() {
369 break;
370 }
371 result.push(&data[pos..end]);
372 pos = end;
373 }
374 result
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use crate::ts::{SectionReassembler, TsPacket};
381
382 fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
388 let section_length = body_after_length.len() as u16;
389 let mut v = Vec::with_capacity(3 + section_length as usize);
390 v.push(table_id);
391 v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
393 v.push((section_length & 0xFF) as u8);
394 v.extend_from_slice(body_after_length);
395 v
396 }
397
398 fn concat_sections(sections: &[Vec<u8>]) -> Vec<u8> {
399 let total: usize = sections.iter().map(|s| s.len()).sum();
400 let mut out = Vec::with_capacity(total);
401 for s in sections {
402 out.extend_from_slice(s);
403 }
404 out
405 }
406
407 fn assert_round_trip(sections: &[Vec<u8>]) {
410 let mut packetizer = SectionPacketizer::new(0x0100);
411 let refs: Vec<&[u8]> = sections.iter().map(|s| s.as_slice()).collect();
412 let packets = packetizer.packetize(&refs);
413
414 let mut reasm = SectionReassembler::default();
415 for pkt_raw in &packets {
416 let pkt = TsPacket::parse(pkt_raw).expect("parse generated packet");
417 let payload = pkt.payload.expect("payload present");
418 let pusi = pkt.header.pusi;
419 reasm.feed(payload, pusi);
420 }
421
422 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
423 assert_eq!(
424 got.len(),
425 sections.len(),
426 "section count mismatch: expected {}, got {}",
427 sections.len(),
428 got.len()
429 );
430 for (i, (orig, round)) in sections.iter().zip(got.iter()).enumerate() {
431 assert_eq!(
432 round.as_ref(),
433 orig.as_slice(),
434 "section {i} round-trip mismatch"
435 );
436 }
437 assert!(reasm.is_empty(), "reassembler should be empty after drain");
438 }
439
440 #[test]
443 fn split_single_section() {
444 let s = build_section(0x42, &[0xAA; 5]);
445 let refs = split_sections(&s);
446 assert_eq!(refs.len(), 1);
447 assert_eq!(refs[0], s.as_slice());
448 }
449
450 #[test]
451 fn split_two_sections() {
452 let s1 = build_section(0x42, &[0x01, 0x02]);
453 let s2 = build_section(0x46, &[0x03, 0x04, 0x05]);
454 let both = concat_sections(&[s1.clone(), s2.clone()]);
455 let refs = split_sections(&both);
456 assert_eq!(refs.len(), 2);
457 assert_eq!(refs[0], s1.as_slice());
458 assert_eq!(refs[1], s2.as_slice());
459 }
460
461 #[test]
462 fn split_empty_input() {
463 let refs = split_sections(&[]);
464 assert!(refs.is_empty());
465 }
466
467 #[test]
468 fn split_trailing_garbage_ignored() {
469 let s = build_section(0x42, &[0xAA; 3]);
470 let mut data = s.clone();
471 data.push(0xFF); let refs = split_sections(&data);
473 assert_eq!(refs.len(), 1);
474 assert_eq!(refs[0], s.as_slice());
475 }
476
477 #[test]
480 fn interval_constants_match_spec() {
481 assert_eq!(NIT_MAX_INTERVAL, Duration::from_secs(10));
483 assert_eq!(BAT_MAX_INTERVAL, Duration::from_secs(10));
484 assert_eq!(SDT_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
485 assert_eq!(SDT_OTHER_MAX_INTERVAL, Duration::from_secs(10));
486 assert_eq!(EIT_PF_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
487 assert_eq!(EIT_PF_OTHER_MAX_INTERVAL, Duration::from_secs(10));
488 assert_eq!(EIT_SCHED_MAX_INTERVAL, Duration::from_secs(10));
489 assert_eq!(EIT_SCHED_EXT_MAX_INTERVAL, Duration::from_secs(30));
490 assert_eq!(TDT_MAX_INTERVAL, Duration::from_secs(30));
491 assert_eq!(TOT_MAX_INTERVAL, Duration::from_secs(30));
492
493 assert_eq!(PAT_MAX_INTERVAL, Duration::from_millis(100));
495 assert_eq!(PMT_MAX_INTERVAL, Duration::from_millis(100));
496
497 assert_eq!(MIN_SECTION_INTERVAL, Duration::from_millis(25));
499 }
500
501 #[test]
504 fn new_simux_is_empty() {
505 let mut mux = SiMux::new();
506 let pkts = mux.poll(Duration::ZERO);
507 assert!(pkts.is_empty());
508 }
509
510 #[test]
511 fn simux_default_is_empty() {
512 let mut mux = SiMux::default();
513 let pkts = mux.poll(Duration::ZERO);
514 assert!(pkts.is_empty());
515 }
516
517 #[test]
518 fn first_poll_always_emits() {
519 let s = build_section(0x42, &[0xAA; 10]);
520 let mut mux = SiMux::new();
521 mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
522 let pkts = mux.poll(Duration::ZERO);
523 assert!(!pkts.is_empty(), "first poll must emit");
524 }
525
526 #[test]
527 fn first_poll_emits_all_entries() {
528 let s1 = build_section(0x42, &[0x01; 5]);
529 let s2 = build_section(0x46, &[0x02; 5]);
530 let mut mux = SiMux::new();
531 mux.upsert(0x0100, s1.clone(), Duration::from_millis(100));
532 mux.upsert(0x0200, s2.clone(), Duration::from_millis(200));
533 let pkts = mux.poll(Duration::ZERO);
534 let pids: Vec<u16> = pkts
536 .iter()
537 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
538 .collect();
539 assert!(pids.contains(&0x0100));
540 assert!(pids.contains(&0x0200));
541 }
542
543 #[test]
546 fn entry_emits_only_when_due() {
547 let s = build_section(0x42, &[0xAA; 10]);
548 let mut mux = SiMux::new();
549 mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
550
551 let pkts0 = mux.poll(Duration::ZERO);
553 assert!(!pkts0.is_empty());
554
555 let pkts50 = mux.poll(Duration::from_millis(50));
557 assert!(pkts50.is_empty(), "should not emit at t=50");
558
559 let pkts99 = mux.poll(Duration::from_millis(99));
561 assert!(pkts99.is_empty(), "should not emit at t=99");
562
563 let pkts100 = mux.poll(Duration::from_millis(100));
565 assert!(!pkts100.is_empty(), "should emit at t=100");
566 }
567
568 #[test]
569 fn two_entries_different_cadence() {
570 let s1 = build_section(0x42, &[0x01; 5]);
571 let s2 = build_section(0x46, &[0x02; 5]);
572 let mut mux = SiMux::new();
573 mux.upsert(0x0100, concat_sections(&[s1]), Duration::from_millis(100));
574 mux.upsert(0x0200, concat_sections(&[s2]), Duration::from_millis(200));
575
576 let pkts = mux.poll(Duration::ZERO);
578 assert!(!pkts.is_empty());
579
580 let pkts100 = mux.poll(Duration::from_millis(100));
582 assert!(!pkts100.is_empty());
583 let pids100: Vec<u16> = pkts100
584 .iter()
585 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
586 .collect();
587 assert!(pids100.contains(&0x0100), "PID 0x0100 should emit at t=100");
588 assert!(
589 !pids100.contains(&0x0200),
590 "PID 0x0200 should NOT emit at t=100"
591 );
592
593 let pkts150 = mux.poll(Duration::from_millis(150));
595 assert!(pkts150.is_empty(), "neither should emit at t=150");
596
597 let pkts200 = mux.poll(Duration::from_millis(200));
599 let pids200: Vec<u16> = pkts200
600 .iter()
601 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
602 .collect();
603 assert!(pids200.contains(&0x0100), "PID 0x0100 should emit at t=200");
604 assert!(pids200.contains(&0x0200), "PID 0x0200 should emit at t=200");
605 }
606
607 #[test]
608 fn entry_emits_again_after_full_interval() {
609 let s = build_section(0x42, &[0xAA; 20]);
610 let mut mux = SiMux::new();
611 mux.upsert(0x0100, s, Duration::from_secs(1));
612
613 let n0 = mux.poll_into(Duration::ZERO, &mut Vec::new());
615 assert!(n0 > 0);
616
617 let mut buf = Vec::new();
619 let n999 = mux.poll_into(Duration::from_millis(999), &mut buf);
620 assert_eq!(n999, 0);
621
622 let mut buf2 = Vec::new();
624 let n1000 = mux.poll_into(Duration::from_millis(1000), &mut buf2);
625 assert!(n1000 > 0);
626 }
627
628 #[test]
631 fn upsert_updates_existing_entry() {
632 let s1 = build_section(0x42, &[0xAA; 5]);
633 let s2 = build_section(0x46, &[0xBB; 10]);
634 let mut mux = SiMux::new();
635
636 mux.upsert(0x0100, s1, Duration::from_millis(100));
637 mux.upsert(0x0100, s2.clone(), Duration::from_millis(200));
639
640 let pkts = mux.poll(Duration::ZERO);
641
642 let mut reasm = SectionReassembler::default();
644 for raw in &pkts {
645 let pkt = TsPacket::parse(raw).unwrap();
646 if pkt.header.pid == 0x0100 {
647 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
648 }
649 }
650 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
651 assert_eq!(got.len(), 1);
652 assert_eq!(got[0].as_ref(), s2.as_slice());
653 }
654
655 #[test]
658 fn upsert_pat_uses_correct_pid() {
659 let s = build_section(0x00, &[0x01, 0x02]);
660 let mut mux = SiMux::new();
661 mux.upsert_pat(s);
662 let pkts = mux.poll(Duration::ZERO);
663 let pids: Vec<u16> = pkts
664 .iter()
665 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
666 .collect();
667 assert!(pids.iter().all(|&p| p == well_known::PAT.value()));
668 }
669
670 #[test]
671 fn upsert_sdt_actual_uses_correct_pid() {
672 let s = build_section(0x42, &[0x01]);
673 let mut mux = SiMux::new();
674 mux.upsert_sdt_actual(s);
675 let pkts = mux.poll(Duration::ZERO);
676 let pids: Vec<u16> = pkts
677 .iter()
678 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
679 .collect();
680 assert!(pids.iter().all(|&p| p == well_known::SDT_BAT.value()));
681 }
682
683 #[test]
684 fn upsert_nit_uses_correct_pid() {
685 let s = build_section(0x40, &[0x01]);
686 let mut mux = SiMux::new();
687 mux.upsert_nit(s);
688 let pkts = mux.poll(Duration::ZERO);
689 let pids: Vec<u16> = pkts
690 .iter()
691 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
692 .collect();
693 assert!(pids.iter().all(|&p| p == well_known::NIT.value()));
694 }
695
696 #[test]
697 fn upsert_tdt_uses_correct_pid() {
698 let s = build_section(0x70, &[0x01]);
699 let mut mux = SiMux::new();
700 mux.upsert_tdt(s);
701 let pkts = mux.poll(Duration::ZERO);
702 let pids: Vec<u16> = pkts
703 .iter()
704 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
705 .collect();
706 assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
707 }
708
709 #[test]
710 fn upsert_tot_uses_correct_pid() {
711 let s = build_section(0x73, &[0x01]);
712 let mut mux = SiMux::new();
713 mux.upsert_tot(s);
714 let pkts = mux.poll(Duration::ZERO);
715 let pids: Vec<u16> = pkts
716 .iter()
717 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
718 .collect();
719 assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
720 }
721
722 #[test]
725 fn simux_round_trip_single_entry() {
726 let s1 = build_section(0x42, &[0xAA; 20]);
727 let s2 = build_section(0x46, &[0xBB; 15]);
728 let mut mux = SiMux::new();
729 mux.upsert(
730 0x0100,
731 concat_sections(&[s1.clone(), s2.clone()]),
732 Duration::from_millis(100),
733 );
734
735 let pkts = mux.poll(Duration::ZERO);
736
737 let mut reasm = SectionReassembler::default();
738 for raw in &pkts {
739 let pkt = TsPacket::parse(raw).unwrap();
740 if pkt.header.pid == 0x0100 {
741 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
742 }
743 }
744 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
745 assert_eq!(got.len(), 2, "round-trip must recover both sections");
746 assert_eq!(got[0].as_ref(), s1.as_slice());
747 assert_eq!(got[1].as_ref(), s2.as_slice());
748 }
749
750 #[test]
751 fn simux_round_trip_multi_pid() {
752 let s_a = build_section(0x42, &[0xA0; 10]);
753 let s_b = build_section(0x46, &[0xB0; 10]);
754 let mut mux = SiMux::new();
755 mux.upsert(0x0100, s_a.clone(), Duration::from_millis(100));
756 mux.upsert(0x0200, s_b.clone(), Duration::from_millis(100));
757
758 let pkts = mux.poll(Duration::ZERO);
759
760 let mut reasm_a = SectionReassembler::default();
761 let mut reasm_b = SectionReassembler::default();
762 for raw in &pkts {
763 let pkt = TsPacket::parse(raw).unwrap();
764 match pkt.header.pid {
765 0x0100 => reasm_a.feed(pkt.payload.unwrap(), pkt.header.pusi),
766 0x0200 => reasm_b.feed(pkt.payload.unwrap(), pkt.header.pusi),
767 _ => {}
768 }
769 }
770 let got_a: Vec<_> = std::iter::from_fn(|| reasm_a.pop_section()).collect();
771 let got_b: Vec<_> = std::iter::from_fn(|| reasm_b.pop_section()).collect();
772 assert_eq!(got_a.len(), 1);
773 assert_eq!(got_b.len(), 1);
774 assert_eq!(got_a[0].as_ref(), s_a.as_slice());
775 assert_eq!(got_b[0].as_ref(), s_b.as_slice());
776 }
777
778 #[test]
781 fn continuity_counter_continuous_across_polls() {
782 let s = build_section(0x42, &[0xAA; 250]);
784 let mut mux = SiMux::new();
785 mux.upsert(0x0100, s, Duration::from_millis(100));
786
787 let pkts1 = mux.poll(Duration::ZERO);
788 assert!(pkts1.len() >= 2, "need ≥2 packets to test CC continuity");
789
790 let last_cc_pkts1 = TsPacket::parse(&pkts1[pkts1.len() - 1])
791 .unwrap()
792 .header
793 .continuity_counter;
794
795 let pkts2 = mux.poll(Duration::from_millis(100));
796 assert!(!pkts2.is_empty(), "second poll must emit");
797
798 let first_cc_pkts2 = TsPacket::parse(&pkts2[0])
799 .unwrap()
800 .header
801 .continuity_counter;
802
803 assert_eq!(
804 first_cc_pkts2,
805 (last_cc_pkts1 + 1) & 0x0F,
806 "CC must continue across poll cycles"
807 );
808 }
809
810 #[test]
813 fn poll_into_clears_out_before_appending() {
814 let s = build_section(0x42, &[0xAA; 5]);
815 let mut mux = SiMux::new();
816 mux.upsert(0x0100, s, Duration::from_millis(100));
817
818 let mut out = vec![[0u8; TS_PACKET_SIZE]; 42];
819 let n = mux.poll_into(Duration::ZERO, &mut out);
820 assert_eq!(n, out.len(), "out must contain only new packets");
821 }
822
823 #[test]
826 #[cfg(debug_assertions)]
827 #[should_panic(expected = "25 ms")]
828 fn upsert_rejects_sub_25ms_interval_in_debug() {
829 let s = build_section(0x42, &[0xAA; 5]);
830 let mut mux = SiMux::new();
831 mux.upsert(0x0100, s, Duration::from_millis(10));
832 }
833
834 #[test]
835 fn upsert_accepts_25ms_interval() {
836 let s = build_section(0x42, &[0xAA; 5]);
837 let mut mux = SiMux::new();
838 mux.upsert(0x0100, s, MIN_SECTION_INTERVAL);
839 let pkts = mux.poll(Duration::ZERO);
840 assert!(!pkts.is_empty());
841 }
842
843 #[test]
846 fn round_trip_single_short_section() {
847 let s = build_section(0x42, &[0xAA; 10]);
848 assert_round_trip(&[s]);
849 }
850
851 #[test]
852 fn round_trip_one_byte_body() {
853 let s = build_section(0x46, &[0xBB]); assert_round_trip(&[s]);
855 }
856
857 #[test]
858 fn round_trip_section_exactly_pusi_cap_boundary() {
859 let body = vec![0xCC; PUSI_PAYLOAD_CAP - 3];
861 let s = build_section(0x50, &body);
862 assert_eq!(s.len(), PUSI_PAYLOAD_CAP);
863 assert_round_trip(&[s]);
864 }
865
866 #[test]
867 fn round_trip_section_just_over_pusi_cap() {
868 let body = vec![0xDD; PUSI_PAYLOAD_CAP - 3 + 1];
870 let s = build_section(0x52, &body);
871 assert_eq!(s.len(), PUSI_PAYLOAD_CAP + 1);
872 assert_round_trip(&[s]);
873 }
874
875 #[test]
876 fn round_trip_section_spans_many_packets() {
877 let body = vec![0xEE; 2000 - 3];
879 let s = build_section(0x60, &body);
880 assert_round_trip(&[s]);
881 }
882
883 #[test]
884 fn round_trip_section_at_max_size() {
885 let body = vec![0x11; 4096 - 3];
890 let s = build_section(0x80, &body);
891 assert_eq!(s.len(), 4096);
892 assert_round_trip(&[s]);
893 }
894
895 #[test]
896 fn round_trip_multiple_short_sections_in_one_batch() {
897 let s1 = build_section(0x42, &[0x01, 0x02]); let s2 = build_section(0x46, &[0x03]); let s3 = build_section(0x4A, &[0x04, 0x05, 0x06]); assert_round_trip(&[s1, s2, s3]);
901 }
902
903 #[test]
904 fn round_trip_section_ends_exactly_at_boundary() {
905 let body1 = vec![0xA1; PUSI_PAYLOAD_CAP - 3];
909 let s1 = build_section(0x50, &body1);
910 assert_eq!(s1.len(), PUSI_PAYLOAD_CAP);
911
912 let s2 = build_section(0x52, &[0xB1, 0xB2]);
913 assert_round_trip(&[s1, s2]);
914 }
915
916 #[test]
917 fn round_trip_mix_small_large_sections() {
918 let s1 = build_section(0x10, &[0xAA; 5]);
921 let body2 = vec![0xBB; 200];
922 let s2 = build_section(0x20, &body2);
923 let s3 = build_section(0x30, &[0xCC; 50]);
924 let body4 = vec![0xDD; 800];
925 let s4 = build_section(0x40, &body4);
926 let s5 = build_section(0x50, &[0xEE]); assert_round_trip(&[s1, s2, s3, s4, s5]);
928 }
929
930 #[test]
933 fn continuity_counter_increments_per_packet() {
934 let body = vec![0xAA; 500];
936 let section = build_section(0x42, &body);
937 let mut p = SectionPacketizer::new(0x0100);
938
939 let packets = p.packetize(&[§ion]);
940 assert!(packets.len() >= 3, "need multiple packets to test CC");
941
942 let mut last_cc: Option<u8> = None;
943 for pkt_raw in &packets {
944 let pkt = TsPacket::parse(pkt_raw).unwrap();
945 let cc = pkt.header.continuity_counter;
946 if let Some(last) = last_cc {
947 assert_eq!(cc, (last + 1) & 0x0F, "CC must increment per packet");
948 }
949 last_cc = Some(cc);
950 }
951 }
952
953 #[test]
954 fn continuity_counter_wraps_and_continues_across_calls() {
955 let mut p = SectionPacketizer::with_continuity(0x0100, 14);
956 let body = vec![0xBB; 500];
958 let s = build_section(0x42, &body);
959
960 let pkts1 = p.packetize(&[&s]);
962 assert!(pkts1.len() >= 3, "section must span ≥3 packets");
963 let ccs1: Vec<u8> = pkts1
964 .iter()
965 .map(|b| TsPacket::parse(b).unwrap().header.continuity_counter)
966 .collect();
967 assert_eq!(ccs1[0], 14);
968 assert_eq!(ccs1[1], 15);
969 assert_eq!(ccs1[2], 0);
970
971 let pkts2 = p.packetize(&[&s]);
973 let cc_first_pkt2 = TsPacket::parse(&pkts2[0])
974 .unwrap()
975 .header
976 .continuity_counter;
977 assert_eq!(cc_first_pkt2, ccs1.last().map(|c| (c + 1) & 0x0F).unwrap());
978 }
979
980 #[test]
983 fn pusi_set_when_section_starts() {
984 let s = build_section(0x42, &[0xAA; 10]);
985 let mut p = SectionPacketizer::new(0x0100);
986 let packets = p.packetize(&[&s]);
987 assert!(!packets.is_empty());
988 let pkt = TsPacket::parse(&packets[0]).unwrap();
989 assert!(pkt.header.pusi, "first packet must have PUSI=1");
990 }
991
992 #[test]
993 fn pusi_not_set_on_mid_section_continuation() {
994 let body = vec![0xAA; 500];
995 let s = build_section(0x42, &body);
996 let mut p = SectionPacketizer::new(0x0100);
997 let packets = p.packetize(&[&s]);
998 assert!(packets.len() >= 2);
999 let pkt1 = TsPacket::parse(&packets[0]).unwrap();
1000 let pkt2 = TsPacket::parse(&packets[1]).unwrap();
1001 assert!(pkt1.header.pusi, "first packet must have PUSI=1");
1002 assert!(
1003 !pkt2.header.pusi,
1004 "second packet is continuation, must have PUSI=0"
1005 );
1006 }
1007
1008 #[test]
1009 fn pointer_field_equals_tail_length_before_new_section() {
1010 let body1 = vec![0xA1; 197]; let s1 = build_section(0x52, &body1);
1015 assert_eq!(s1.len(), 200);
1016 let s2 = build_section(0x54, &[0xB1; 47]); assert_eq!(s2.len(), 50);
1018
1019 let mut p = SectionPacketizer::new(0x0100);
1020 let packets = p.packetize(&[&s1, &s2]);
1021
1022 let pkt_with_pointer = packets
1024 .iter()
1025 .map(|raw| TsPacket::parse(raw).unwrap())
1026 .find(|pkt| pkt.header.pusi && pkt.payload.is_some_and(|pl| pl.first() != Some(&0)))
1027 .expect("must have a PUSI packet with non-zero pointer");
1028
1029 let payload = pkt_with_pointer.payload.unwrap();
1030 let pointer = payload[0] as usize;
1031 assert!(pointer > 0, "pointer must be non-zero");
1032 let tail_start = s1.len() - pointer;
1034 assert_eq!(&payload[1..1 + pointer], &s1[tail_start..]);
1035 }
1036
1037 #[test]
1040 fn final_packet_unused_tail_is_stuffing() {
1041 let s = build_section(0x42, &[0xAA; 5]); let mut p = SectionPacketizer::new(0x0100);
1043 let packets = p.packetize(&[&s]);
1044
1045 let pkt = TsPacket::parse(&packets[0]).unwrap();
1046 let payload = pkt.payload.unwrap();
1047 assert_eq!(payload[0], 0, "pointer_field should be 0");
1048
1049 let section_end = 1 + s.len(); assert!(
1051 section_end < payload.len(),
1052 "must have stuffing after section"
1053 );
1054 for &b in &payload[section_end..] {
1055 assert_eq!(b, STUFFING_BYTE, "all trailing bytes must be 0xFF");
1056 }
1057 }
1058
1059 #[test]
1060 fn reassembler_discards_stuffing() {
1061 let s1 = build_section(0x42, &[0xAA; 10]);
1062 let s2 = build_section(0x46, &[0xBB; 5]);
1063
1064 let mut p = SectionPacketizer::new(0x0100);
1065 let packets = p.packetize(&[&s1, &s2]);
1066
1067 let mut reasm = SectionReassembler::default();
1068 for pkt_raw in &packets {
1069 let pkt = TsPacket::parse(pkt_raw).unwrap();
1070 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1071 }
1072
1073 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
1074 assert_eq!(got.len(), 2);
1075 assert!(
1076 reasm.is_empty(),
1077 "stuffing tail must be discarded, not buffered"
1078 );
1079 }
1080
1081 #[test]
1084 fn empty_batch_produces_no_packets() {
1085 let mut p = SectionPacketizer::new(0x0100);
1086 let packets: Vec<[u8; TS_PACKET_SIZE]> = p.packetize(&[]);
1087 assert!(packets.is_empty());
1088 }
1089
1090 #[test]
1091 fn packetize_into_clears_out_first() {
1092 let s = build_section(0x42, &[0xAA; 5]);
1093 let mut p = SectionPacketizer::new(0x0100);
1094
1095 let mut out = vec![[0u8; TS_PACKET_SIZE]; 99]; let n = p.packetize_into(&[&s], &mut out);
1097 assert_eq!(n, out.len(), "out must contain only the new packets");
1098 let mut reasm = SectionReassembler::default();
1100 for pkt_raw in &out {
1101 let pkt = TsPacket::parse(pkt_raw).unwrap();
1102 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1103 }
1104 let got = reasm.pop_section().unwrap();
1105 assert_eq!(got.as_ref(), s.as_slice());
1106 }
1107
1108 #[test]
1109 fn pid_is_correct() {
1110 let p = SectionPacketizer::new(0x1234);
1111 assert_eq!(p.pid(), 0x1234);
1112 }
1113
1114 #[test]
1115 fn with_continuity_masks_to_4_bits() {
1116 let p = SectionPacketizer::with_continuity(0x0100, 0xFE);
1117 assert_eq!(p.continuity_counter(), 0x0E);
1118 }
1119
1120 #[test]
1121 fn has_payload_always_true_no_adaptation() {
1122 let s = build_section(0x42, &[0xAA; 50]);
1123 let mut p = SectionPacketizer::new(0x0100);
1124 let packets = p.packetize(&[&s]);
1125 for pkt_raw in &packets {
1126 let pkt = TsPacket::parse(pkt_raw).unwrap();
1127 assert!(pkt.header.has_payload, "every packet must carry payload");
1128 assert!(!pkt.header.has_adaptation, "no adaptation field is emitted");
1129 assert!(!pkt.header.tei, "TEI must be false");
1130 assert_eq!(pkt.header.scrambling, 0, "scrambling must be 0");
1131 }
1132 }
1133}