1use alloc::vec::Vec;
10use core::time::Duration;
11
12use crate::pid::well_known;
13use crate::ts::{TsHeader, CC_MASK, SECTION_LENGTH_HI_MASK, TS_PACKET_SIZE};
14
15const PUSI_PAYLOAD_CAP: usize = 183;
17const PAYLOAD_CAP: usize = 184;
19const STUFFING_BYTE: u8 = 0xFF;
21
22pub struct SectionPacketizer {
31 pid: u16,
32 continuity_counter: u8,
33}
34
35impl SectionPacketizer {
36 pub fn new(pid: u16) -> Self {
38 Self {
39 pid,
40 continuity_counter: 0,
41 }
42 }
43
44 pub fn with_continuity(pid: u16, cc: u8) -> Self {
46 Self {
47 pid,
48 continuity_counter: cc & CC_MASK,
49 }
50 }
51
52 pub fn pid(&self) -> u16 {
54 self.pid
55 }
56
57 pub fn continuity_counter(&self) -> u8 {
59 self.continuity_counter
60 }
61
62 pub fn packetize_into(
67 &mut self,
68 sections: &[&[u8]],
69 out: &mut Vec<[u8; TS_PACKET_SIZE]>,
70 ) -> usize {
71 out.clear();
72
73 if sections.is_empty() {
74 return 0;
75 }
76
77 let total_len: usize = sections.iter().map(|s| s.len()).sum();
79 if total_len == 0 {
80 return 0;
81 }
82 let mut data = Vec::with_capacity(total_len);
83 let mut starts = Vec::with_capacity(sections.len());
84 for s in sections {
85 starts.push(data.len());
86 data.extend_from_slice(s);
87 }
88
89 let count_before = out.len();
90 let mut pos = 0usize;
91
92 while pos < data.len() {
93 let next_start = starts.iter().copied().find(|&s| s >= pos);
95
96 let pusi: bool;
97 let pointer_field: u8;
98 let cap: usize;
99
100 if let Some(ns) = next_start {
101 let diff = ns.saturating_sub(pos);
102 if diff <= PUSI_PAYLOAD_CAP {
103 pusi = true;
104 pointer_field = diff as u8;
105 cap = PUSI_PAYLOAD_CAP;
106 } else {
107 pusi = false;
108 pointer_field = 0;
109 cap = PAYLOAD_CAP;
110 }
111 } else {
112 pusi = false;
113 pointer_field = 0;
114 cap = PAYLOAD_CAP;
115 }
116
117 let mut pkt = [0u8; TS_PACKET_SIZE];
118
119 let header = TsHeader {
120 tei: false,
121 pusi,
122 pid: self.pid,
123 scrambling: 0,
124 has_adaptation: false,
125 has_payload: true,
126 continuity_counter: self.continuity_counter,
127 };
128 header
129 .serialize_into(&mut pkt[..4])
130 .expect("4-byte header buffer");
131
132 self.continuity_counter = (self.continuity_counter + 1) & CC_MASK;
133
134 let mut write_pos = 4usize;
135
136 if pusi {
137 pkt[write_pos] = pointer_field;
138 write_pos += 1;
139 }
140
141 let remaining = data.len() - pos;
142 let to_copy = remaining.min(cap);
143 pkt[write_pos..write_pos + to_copy].copy_from_slice(&data[pos..pos + to_copy]);
144 pos += to_copy;
145 write_pos += to_copy;
146
147 for b in &mut pkt[write_pos..] {
149 *b = STUFFING_BYTE;
150 }
151
152 out.push(pkt);
153 }
154
155 out.len() - count_before
156 }
157
158 pub fn packetize(&mut self, sections: &[&[u8]]) -> Vec<[u8; TS_PACKET_SIZE]> {
160 let mut out = Vec::new();
161 self.packetize_into(sections, &mut out);
162 out
163 }
164}
165
166pub const NIT_MAX_INTERVAL: Duration = Duration::from_secs(10);
170pub const BAT_MAX_INTERVAL: Duration = Duration::from_secs(10);
172pub const SDT_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
174pub const SDT_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
176pub const EIT_PF_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
178pub const EIT_PF_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
181pub const EIT_SCHED_MAX_INTERVAL: Duration = Duration::from_secs(10);
183pub const EIT_SCHED_EXT_MAX_INTERVAL: Duration = Duration::from_secs(30);
185pub const TDT_MAX_INTERVAL: Duration = Duration::from_secs(30);
187pub const TOT_MAX_INTERVAL: Duration = Duration::from_secs(30);
189
190pub const PAT_MAX_INTERVAL: Duration = Duration::from_millis(100);
192pub const PMT_MAX_INTERVAL: Duration = Duration::from_millis(100);
194
195pub const MIN_SECTION_INTERVAL: Duration = Duration::from_millis(25);
198
199pub struct SiMux {
219 entries: Vec<Entry>,
220}
221
222struct Entry {
223 pid: u16,
224 sections: Vec<u8>,
225 interval: Duration,
226 last_emit: Option<Duration>,
227 packetizer: SectionPacketizer,
228}
229
230impl SiMux {
231 pub fn new() -> Self {
233 Self {
234 entries: Vec::new(),
235 }
236 }
237
238 pub fn upsert(&mut self, pid: u16, sections: Vec<u8>, interval: Duration) {
249 debug_assert!(
250 interval >= MIN_SECTION_INTERVAL,
251 "interval {interval:?} is below the 25 ms minimum (EN 300 468 §5.1.4.1)"
252 );
253
254 if let Some(entry) = self.entries.iter_mut().find(|e| e.pid == pid) {
255 entry.sections = sections;
256 entry.interval = interval;
257 } else {
258 self.entries.push(Entry {
259 pid,
260 sections,
261 interval,
262 last_emit: None,
263 packetizer: SectionPacketizer::new(pid),
264 });
265 }
266 }
267
268 pub fn upsert_pat(&mut self, sections: Vec<u8>) {
272 self.upsert(well_known::PAT.value(), sections, PAT_MAX_INTERVAL);
273 }
274
275 pub fn upsert_pmt(&mut self, pid: u16, sections: Vec<u8>) {
279 self.upsert(pid, sections, PMT_MAX_INTERVAL);
280 }
281
282 pub fn upsert_sdt_actual(&mut self, sections: Vec<u8>) {
286 self.upsert(
287 well_known::SDT_BAT.value(),
288 sections,
289 SDT_ACTUAL_MAX_INTERVAL,
290 );
291 }
292
293 pub fn upsert_nit(&mut self, sections: Vec<u8>) {
297 self.upsert(well_known::NIT.value(), sections, NIT_MAX_INTERVAL);
298 }
299
300 pub fn upsert_tdt(&mut self, sections: Vec<u8>) {
304 self.upsert(well_known::TDT_TOT.value(), sections, TDT_MAX_INTERVAL);
305 }
306
307 pub fn upsert_tot(&mut self, sections: Vec<u8>) {
311 self.upsert(well_known::TDT_TOT.value(), sections, TOT_MAX_INTERVAL);
312 }
313
314 pub fn poll_into(&mut self, now: Duration, out: &mut Vec<[u8; TS_PACKET_SIZE]>) -> usize {
321 out.clear();
322 let before = out.len();
323
324 let mut tmp = Vec::new();
325 for entry in &mut self.entries {
326 let due = match entry.last_emit {
327 None => true,
328 Some(last) => now.saturating_sub(last) >= entry.interval,
329 };
330 if due {
331 let refs = split_sections(&entry.sections);
332 if !refs.is_empty() {
333 entry.packetizer.packetize_into(&refs, &mut tmp);
334 out.append(&mut tmp);
335 }
336 entry.last_emit = Some(now);
337 }
338 }
339
340 out.len() - before
341 }
342
343 pub fn poll(&mut self, now: Duration) -> Vec<[u8; TS_PACKET_SIZE]> {
345 let mut out = Vec::new();
346 self.poll_into(now, &mut out);
347 out
348 }
349}
350
351impl Default for SiMux {
352 fn default() -> Self {
353 Self::new()
354 }
355}
356
357fn split_sections(data: &[u8]) -> Vec<&[u8]> {
363 let mut result = Vec::new();
364 let mut pos = 0;
365 while pos + 3 <= data.len() {
366 let section_length =
367 (((data[pos + 1] & SECTION_LENGTH_HI_MASK) as usize) << 8) | (data[pos + 2] as usize);
368 let end = pos + 3 + section_length;
369 if end > data.len() {
370 break;
371 }
372 result.push(&data[pos..end]);
373 pos = end;
374 }
375 result
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use crate::ts::{SectionReassembler, TsPacket};
382 use alloc::vec;
383 use alloc::vec::Vec;
384
385 fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
391 let section_length = body_after_length.len() as u16;
392 let mut v = Vec::with_capacity(3 + section_length as usize);
393 v.push(table_id);
394 v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
396 v.push((section_length & 0xFF) as u8);
397 v.extend_from_slice(body_after_length);
398 v
399 }
400
401 fn concat_sections(sections: &[Vec<u8>]) -> Vec<u8> {
402 let total: usize = sections.iter().map(|s| s.len()).sum();
403 let mut out = Vec::with_capacity(total);
404 for s in sections {
405 out.extend_from_slice(s);
406 }
407 out
408 }
409
410 fn assert_round_trip(sections: &[Vec<u8>]) {
413 let mut packetizer = SectionPacketizer::new(0x0100);
414 let refs: Vec<&[u8]> = sections.iter().map(|s| s.as_slice()).collect();
415 let packets = packetizer.packetize(&refs);
416
417 let mut reasm = SectionReassembler::default();
418 for pkt_raw in &packets {
419 let pkt = TsPacket::parse(pkt_raw).expect("parse generated packet");
420 let payload = pkt.payload.expect("payload present");
421 let pusi = pkt.header.pusi;
422 reasm.feed(payload, pusi);
423 }
424
425 let got: Vec<_> = core::iter::from_fn(|| reasm.pop_section()).collect();
426 assert_eq!(
427 got.len(),
428 sections.len(),
429 "section count mismatch: expected {}, got {}",
430 sections.len(),
431 got.len()
432 );
433 for (i, (orig, round)) in sections.iter().zip(got.iter()).enumerate() {
434 assert_eq!(
435 round.as_ref(),
436 orig.as_slice(),
437 "section {i} round-trip mismatch"
438 );
439 }
440 assert!(reasm.is_empty(), "reassembler should be empty after drain");
441 }
442
443 #[test]
446 fn split_single_section() {
447 let s = build_section(0x42, &[0xAA; 5]);
448 let refs = split_sections(&s);
449 assert_eq!(refs.len(), 1);
450 assert_eq!(refs[0], s.as_slice());
451 }
452
453 #[test]
454 fn split_two_sections() {
455 let s1 = build_section(0x42, &[0x01, 0x02]);
456 let s2 = build_section(0x46, &[0x03, 0x04, 0x05]);
457 let both = concat_sections(&[s1.clone(), s2.clone()]);
458 let refs = split_sections(&both);
459 assert_eq!(refs.len(), 2);
460 assert_eq!(refs[0], s1.as_slice());
461 assert_eq!(refs[1], s2.as_slice());
462 }
463
464 #[test]
465 fn split_empty_input() {
466 let refs = split_sections(&[]);
467 assert!(refs.is_empty());
468 }
469
470 #[test]
471 fn split_trailing_garbage_ignored() {
472 let s = build_section(0x42, &[0xAA; 3]);
473 let mut data = s.clone();
474 data.push(0xFF); let refs = split_sections(&data);
476 assert_eq!(refs.len(), 1);
477 assert_eq!(refs[0], s.as_slice());
478 }
479
480 #[test]
483 fn interval_constants_match_spec() {
484 assert_eq!(NIT_MAX_INTERVAL, Duration::from_secs(10));
486 assert_eq!(BAT_MAX_INTERVAL, Duration::from_secs(10));
487 assert_eq!(SDT_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
488 assert_eq!(SDT_OTHER_MAX_INTERVAL, Duration::from_secs(10));
489 assert_eq!(EIT_PF_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
490 assert_eq!(EIT_PF_OTHER_MAX_INTERVAL, Duration::from_secs(10));
491 assert_eq!(EIT_SCHED_MAX_INTERVAL, Duration::from_secs(10));
492 assert_eq!(EIT_SCHED_EXT_MAX_INTERVAL, Duration::from_secs(30));
493 assert_eq!(TDT_MAX_INTERVAL, Duration::from_secs(30));
494 assert_eq!(TOT_MAX_INTERVAL, Duration::from_secs(30));
495
496 assert_eq!(PAT_MAX_INTERVAL, Duration::from_millis(100));
498 assert_eq!(PMT_MAX_INTERVAL, Duration::from_millis(100));
499
500 assert_eq!(MIN_SECTION_INTERVAL, Duration::from_millis(25));
502 }
503
504 #[test]
507 fn new_simux_is_empty() {
508 let mut mux = SiMux::new();
509 let pkts = mux.poll(Duration::ZERO);
510 assert!(pkts.is_empty());
511 }
512
513 #[test]
514 fn simux_default_is_empty() {
515 let mut mux = SiMux::default();
516 let pkts = mux.poll(Duration::ZERO);
517 assert!(pkts.is_empty());
518 }
519
520 #[test]
521 fn first_poll_always_emits() {
522 let s = build_section(0x42, &[0xAA; 10]);
523 let mut mux = SiMux::new();
524 mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
525 let pkts = mux.poll(Duration::ZERO);
526 assert!(!pkts.is_empty(), "first poll must emit");
527 }
528
529 #[test]
530 fn first_poll_emits_all_entries() {
531 let s1 = build_section(0x42, &[0x01; 5]);
532 let s2 = build_section(0x46, &[0x02; 5]);
533 let mut mux = SiMux::new();
534 mux.upsert(0x0100, s1.clone(), Duration::from_millis(100));
535 mux.upsert(0x0200, s2.clone(), Duration::from_millis(200));
536 let pkts = mux.poll(Duration::ZERO);
537 let pids: Vec<u16> = pkts
539 .iter()
540 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
541 .collect();
542 assert!(pids.contains(&0x0100));
543 assert!(pids.contains(&0x0200));
544 }
545
546 #[test]
549 fn entry_emits_only_when_due() {
550 let s = build_section(0x42, &[0xAA; 10]);
551 let mut mux = SiMux::new();
552 mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
553
554 let pkts0 = mux.poll(Duration::ZERO);
556 assert!(!pkts0.is_empty());
557
558 let pkts50 = mux.poll(Duration::from_millis(50));
560 assert!(pkts50.is_empty(), "should not emit at t=50");
561
562 let pkts99 = mux.poll(Duration::from_millis(99));
564 assert!(pkts99.is_empty(), "should not emit at t=99");
565
566 let pkts100 = mux.poll(Duration::from_millis(100));
568 assert!(!pkts100.is_empty(), "should emit at t=100");
569 }
570
571 #[test]
572 fn two_entries_different_cadence() {
573 let s1 = build_section(0x42, &[0x01; 5]);
574 let s2 = build_section(0x46, &[0x02; 5]);
575 let mut mux = SiMux::new();
576 mux.upsert(0x0100, concat_sections(&[s1]), Duration::from_millis(100));
577 mux.upsert(0x0200, concat_sections(&[s2]), Duration::from_millis(200));
578
579 let pkts = mux.poll(Duration::ZERO);
581 assert!(!pkts.is_empty());
582
583 let pkts100 = mux.poll(Duration::from_millis(100));
585 assert!(!pkts100.is_empty());
586 let pids100: Vec<u16> = pkts100
587 .iter()
588 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
589 .collect();
590 assert!(pids100.contains(&0x0100), "PID 0x0100 should emit at t=100");
591 assert!(
592 !pids100.contains(&0x0200),
593 "PID 0x0200 should NOT emit at t=100"
594 );
595
596 let pkts150 = mux.poll(Duration::from_millis(150));
598 assert!(pkts150.is_empty(), "neither should emit at t=150");
599
600 let pkts200 = mux.poll(Duration::from_millis(200));
602 let pids200: Vec<u16> = pkts200
603 .iter()
604 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
605 .collect();
606 assert!(pids200.contains(&0x0100), "PID 0x0100 should emit at t=200");
607 assert!(pids200.contains(&0x0200), "PID 0x0200 should emit at t=200");
608 }
609
610 #[test]
611 fn entry_emits_again_after_full_interval() {
612 let s = build_section(0x42, &[0xAA; 20]);
613 let mut mux = SiMux::new();
614 mux.upsert(0x0100, s, Duration::from_secs(1));
615
616 let n0 = mux.poll_into(Duration::ZERO, &mut Vec::new());
618 assert!(n0 > 0);
619
620 let mut buf = Vec::new();
622 let n999 = mux.poll_into(Duration::from_millis(999), &mut buf);
623 assert_eq!(n999, 0);
624
625 let mut buf2 = Vec::new();
627 let n1000 = mux.poll_into(Duration::from_millis(1000), &mut buf2);
628 assert!(n1000 > 0);
629 }
630
631 #[test]
634 fn upsert_updates_existing_entry() {
635 let s1 = build_section(0x42, &[0xAA; 5]);
636 let s2 = build_section(0x46, &[0xBB; 10]);
637 let mut mux = SiMux::new();
638
639 mux.upsert(0x0100, s1, Duration::from_millis(100));
640 mux.upsert(0x0100, s2.clone(), Duration::from_millis(200));
642
643 let pkts = mux.poll(Duration::ZERO);
644
645 let mut reasm = SectionReassembler::default();
647 for raw in &pkts {
648 let pkt = TsPacket::parse(raw).unwrap();
649 if pkt.header.pid == 0x0100 {
650 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
651 }
652 }
653 let got: Vec<_> = core::iter::from_fn(|| reasm.pop_section()).collect();
654 assert_eq!(got.len(), 1);
655 assert_eq!(got[0].as_ref(), s2.as_slice());
656 }
657
658 #[test]
661 fn upsert_pat_uses_correct_pid() {
662 let s = build_section(0x00, &[0x01, 0x02]);
663 let mut mux = SiMux::new();
664 mux.upsert_pat(s);
665 let pkts = mux.poll(Duration::ZERO);
666 let pids: Vec<u16> = pkts
667 .iter()
668 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
669 .collect();
670 assert!(pids.iter().all(|&p| p == well_known::PAT.value()));
671 }
672
673 #[test]
674 fn upsert_sdt_actual_uses_correct_pid() {
675 let s = build_section(0x42, &[0x01]);
676 let mut mux = SiMux::new();
677 mux.upsert_sdt_actual(s);
678 let pkts = mux.poll(Duration::ZERO);
679 let pids: Vec<u16> = pkts
680 .iter()
681 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
682 .collect();
683 assert!(pids.iter().all(|&p| p == well_known::SDT_BAT.value()));
684 }
685
686 #[test]
687 fn upsert_nit_uses_correct_pid() {
688 let s = build_section(0x40, &[0x01]);
689 let mut mux = SiMux::new();
690 mux.upsert_nit(s);
691 let pkts = mux.poll(Duration::ZERO);
692 let pids: Vec<u16> = pkts
693 .iter()
694 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
695 .collect();
696 assert!(pids.iter().all(|&p| p == well_known::NIT.value()));
697 }
698
699 #[test]
700 fn upsert_tdt_uses_correct_pid() {
701 let s = build_section(0x70, &[0x01]);
702 let mut mux = SiMux::new();
703 mux.upsert_tdt(s);
704 let pkts = mux.poll(Duration::ZERO);
705 let pids: Vec<u16> = pkts
706 .iter()
707 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
708 .collect();
709 assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
710 }
711
712 #[test]
713 fn upsert_tot_uses_correct_pid() {
714 let s = build_section(0x73, &[0x01]);
715 let mut mux = SiMux::new();
716 mux.upsert_tot(s);
717 let pkts = mux.poll(Duration::ZERO);
718 let pids: Vec<u16> = pkts
719 .iter()
720 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
721 .collect();
722 assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
723 }
724
725 #[test]
728 fn simux_round_trip_single_entry() {
729 let s1 = build_section(0x42, &[0xAA; 20]);
730 let s2 = build_section(0x46, &[0xBB; 15]);
731 let mut mux = SiMux::new();
732 mux.upsert(
733 0x0100,
734 concat_sections(&[s1.clone(), s2.clone()]),
735 Duration::from_millis(100),
736 );
737
738 let pkts = mux.poll(Duration::ZERO);
739
740 let mut reasm = SectionReassembler::default();
741 for raw in &pkts {
742 let pkt = TsPacket::parse(raw).unwrap();
743 if pkt.header.pid == 0x0100 {
744 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
745 }
746 }
747 let got: Vec<_> = core::iter::from_fn(|| reasm.pop_section()).collect();
748 assert_eq!(got.len(), 2, "round-trip must recover both sections");
749 assert_eq!(got[0].as_ref(), s1.as_slice());
750 assert_eq!(got[1].as_ref(), s2.as_slice());
751 }
752
753 #[test]
754 fn simux_round_trip_multi_pid() {
755 let s_a = build_section(0x42, &[0xA0; 10]);
756 let s_b = build_section(0x46, &[0xB0; 10]);
757 let mut mux = SiMux::new();
758 mux.upsert(0x0100, s_a.clone(), Duration::from_millis(100));
759 mux.upsert(0x0200, s_b.clone(), Duration::from_millis(100));
760
761 let pkts = mux.poll(Duration::ZERO);
762
763 let mut reasm_a = SectionReassembler::default();
764 let mut reasm_b = SectionReassembler::default();
765 for raw in &pkts {
766 let pkt = TsPacket::parse(raw).unwrap();
767 match pkt.header.pid {
768 0x0100 => reasm_a.feed(pkt.payload.unwrap(), pkt.header.pusi),
769 0x0200 => reasm_b.feed(pkt.payload.unwrap(), pkt.header.pusi),
770 _ => {}
771 }
772 }
773 let got_a: Vec<_> = core::iter::from_fn(|| reasm_a.pop_section()).collect();
774 let got_b: Vec<_> = core::iter::from_fn(|| reasm_b.pop_section()).collect();
775 assert_eq!(got_a.len(), 1);
776 assert_eq!(got_b.len(), 1);
777 assert_eq!(got_a[0].as_ref(), s_a.as_slice());
778 assert_eq!(got_b[0].as_ref(), s_b.as_slice());
779 }
780
781 #[test]
784 fn continuity_counter_continuous_across_polls() {
785 let s = build_section(0x42, &[0xAA; 250]);
787 let mut mux = SiMux::new();
788 mux.upsert(0x0100, s, Duration::from_millis(100));
789
790 let pkts1 = mux.poll(Duration::ZERO);
791 assert!(pkts1.len() >= 2, "need ≥2 packets to test CC continuity");
792
793 let last_cc_pkts1 = TsPacket::parse(&pkts1[pkts1.len() - 1])
794 .unwrap()
795 .header
796 .continuity_counter;
797
798 let pkts2 = mux.poll(Duration::from_millis(100));
799 assert!(!pkts2.is_empty(), "second poll must emit");
800
801 let first_cc_pkts2 = TsPacket::parse(&pkts2[0])
802 .unwrap()
803 .header
804 .continuity_counter;
805
806 assert_eq!(
807 first_cc_pkts2,
808 (last_cc_pkts1 + 1) & 0x0F,
809 "CC must continue across poll cycles"
810 );
811 }
812
813 #[test]
816 fn poll_into_clears_out_before_appending() {
817 let s = build_section(0x42, &[0xAA; 5]);
818 let mut mux = SiMux::new();
819 mux.upsert(0x0100, s, Duration::from_millis(100));
820
821 let mut out = vec![[0u8; TS_PACKET_SIZE]; 42];
822 let n = mux.poll_into(Duration::ZERO, &mut out);
823 assert_eq!(n, out.len(), "out must contain only new packets");
824 }
825
826 #[test]
829 #[cfg(debug_assertions)]
830 #[should_panic(expected = "25 ms")]
831 fn upsert_rejects_sub_25ms_interval_in_debug() {
832 let s = build_section(0x42, &[0xAA; 5]);
833 let mut mux = SiMux::new();
834 mux.upsert(0x0100, s, Duration::from_millis(10));
835 }
836
837 #[test]
838 fn upsert_accepts_25ms_interval() {
839 let s = build_section(0x42, &[0xAA; 5]);
840 let mut mux = SiMux::new();
841 mux.upsert(0x0100, s, MIN_SECTION_INTERVAL);
842 let pkts = mux.poll(Duration::ZERO);
843 assert!(!pkts.is_empty());
844 }
845
846 #[test]
849 fn round_trip_single_short_section() {
850 let s = build_section(0x42, &[0xAA; 10]);
851 assert_round_trip(&[s]);
852 }
853
854 #[test]
855 fn round_trip_one_byte_body() {
856 let s = build_section(0x46, &[0xBB]); assert_round_trip(&[s]);
858 }
859
860 #[test]
861 fn round_trip_section_exactly_pusi_cap_boundary() {
862 let body = vec![0xCC; PUSI_PAYLOAD_CAP - 3];
864 let s = build_section(0x50, &body);
865 assert_eq!(s.len(), PUSI_PAYLOAD_CAP);
866 assert_round_trip(&[s]);
867 }
868
869 #[test]
870 fn round_trip_section_just_over_pusi_cap() {
871 let body = vec![0xDD; PUSI_PAYLOAD_CAP - 3 + 1];
873 let s = build_section(0x52, &body);
874 assert_eq!(s.len(), PUSI_PAYLOAD_CAP + 1);
875 assert_round_trip(&[s]);
876 }
877
878 #[test]
879 fn round_trip_section_spans_many_packets() {
880 let body = vec![0xEE; 2000 - 3];
882 let s = build_section(0x60, &body);
883 assert_round_trip(&[s]);
884 }
885
886 #[test]
887 fn round_trip_section_at_max_size() {
888 let body = vec![0x11; 4096 - 3];
893 let s = build_section(0x80, &body);
894 assert_eq!(s.len(), 4096);
895 assert_round_trip(&[s]);
896 }
897
898 #[test]
899 fn round_trip_multiple_short_sections_in_one_batch() {
900 let s1 = build_section(0x42, &[0x01, 0x02]); let s2 = build_section(0x46, &[0x03]); let s3 = build_section(0x4A, &[0x04, 0x05, 0x06]); assert_round_trip(&[s1, s2, s3]);
904 }
905
906 #[test]
907 fn round_trip_section_ends_exactly_at_boundary() {
908 let body1 = vec![0xA1; PUSI_PAYLOAD_CAP - 3];
912 let s1 = build_section(0x50, &body1);
913 assert_eq!(s1.len(), PUSI_PAYLOAD_CAP);
914
915 let s2 = build_section(0x52, &[0xB1, 0xB2]);
916 assert_round_trip(&[s1, s2]);
917 }
918
919 #[test]
920 fn round_trip_mix_small_large_sections() {
921 let s1 = build_section(0x10, &[0xAA; 5]);
924 let body2 = vec![0xBB; 200];
925 let s2 = build_section(0x20, &body2);
926 let s3 = build_section(0x30, &[0xCC; 50]);
927 let body4 = vec![0xDD; 800];
928 let s4 = build_section(0x40, &body4);
929 let s5 = build_section(0x50, &[0xEE]); assert_round_trip(&[s1, s2, s3, s4, s5]);
931 }
932
933 #[test]
936 fn continuity_counter_increments_per_packet() {
937 let body = vec![0xAA; 500];
939 let section = build_section(0x42, &body);
940 let mut p = SectionPacketizer::new(0x0100);
941
942 let packets = p.packetize(&[§ion]);
943 assert!(packets.len() >= 3, "need multiple packets to test CC");
944
945 let mut last_cc: Option<u8> = None;
946 for pkt_raw in &packets {
947 let pkt = TsPacket::parse(pkt_raw).unwrap();
948 let cc = pkt.header.continuity_counter;
949 if let Some(last) = last_cc {
950 assert_eq!(cc, (last + 1) & 0x0F, "CC must increment per packet");
951 }
952 last_cc = Some(cc);
953 }
954 }
955
956 #[test]
957 fn continuity_counter_wraps_and_continues_across_calls() {
958 let mut p = SectionPacketizer::with_continuity(0x0100, 14);
959 let body = vec![0xBB; 500];
961 let s = build_section(0x42, &body);
962
963 let pkts1 = p.packetize(&[&s]);
965 assert!(pkts1.len() >= 3, "section must span ≥3 packets");
966 let ccs1: Vec<u8> = pkts1
967 .iter()
968 .map(|b| TsPacket::parse(b).unwrap().header.continuity_counter)
969 .collect();
970 assert_eq!(ccs1[0], 14);
971 assert_eq!(ccs1[1], 15);
972 assert_eq!(ccs1[2], 0);
973
974 let pkts2 = p.packetize(&[&s]);
976 let cc_first_pkt2 = TsPacket::parse(&pkts2[0])
977 .unwrap()
978 .header
979 .continuity_counter;
980 assert_eq!(cc_first_pkt2, ccs1.last().map(|c| (c + 1) & 0x0F).unwrap());
981 }
982
983 #[test]
986 fn pusi_set_when_section_starts() {
987 let s = build_section(0x42, &[0xAA; 10]);
988 let mut p = SectionPacketizer::new(0x0100);
989 let packets = p.packetize(&[&s]);
990 assert!(!packets.is_empty());
991 let pkt = TsPacket::parse(&packets[0]).unwrap();
992 assert!(pkt.header.pusi, "first packet must have PUSI=1");
993 }
994
995 #[test]
996 fn pusi_not_set_on_mid_section_continuation() {
997 let body = vec![0xAA; 500];
998 let s = build_section(0x42, &body);
999 let mut p = SectionPacketizer::new(0x0100);
1000 let packets = p.packetize(&[&s]);
1001 assert!(packets.len() >= 2);
1002 let pkt1 = TsPacket::parse(&packets[0]).unwrap();
1003 let pkt2 = TsPacket::parse(&packets[1]).unwrap();
1004 assert!(pkt1.header.pusi, "first packet must have PUSI=1");
1005 assert!(
1006 !pkt2.header.pusi,
1007 "second packet is continuation, must have PUSI=0"
1008 );
1009 }
1010
1011 #[test]
1012 fn pointer_field_equals_tail_length_before_new_section() {
1013 let body1 = vec![0xA1; 197]; let s1 = build_section(0x52, &body1);
1018 assert_eq!(s1.len(), 200);
1019 let s2 = build_section(0x54, &[0xB1; 47]); assert_eq!(s2.len(), 50);
1021
1022 let mut p = SectionPacketizer::new(0x0100);
1023 let packets = p.packetize(&[&s1, &s2]);
1024
1025 let pkt_with_pointer = packets
1027 .iter()
1028 .map(|raw| TsPacket::parse(raw).unwrap())
1029 .find(|pkt| pkt.header.pusi && pkt.payload.is_some_and(|pl| pl.first() != Some(&0)))
1030 .expect("must have a PUSI packet with non-zero pointer");
1031
1032 let payload = pkt_with_pointer.payload.unwrap();
1033 let pointer = payload[0] as usize;
1034 assert!(pointer > 0, "pointer must be non-zero");
1035 let tail_start = s1.len() - pointer;
1037 assert_eq!(&payload[1..1 + pointer], &s1[tail_start..]);
1038 }
1039
1040 #[test]
1043 fn final_packet_unused_tail_is_stuffing() {
1044 let s = build_section(0x42, &[0xAA; 5]); let mut p = SectionPacketizer::new(0x0100);
1046 let packets = p.packetize(&[&s]);
1047
1048 let pkt = TsPacket::parse(&packets[0]).unwrap();
1049 let payload = pkt.payload.unwrap();
1050 assert_eq!(payload[0], 0, "pointer_field should be 0");
1051
1052 let section_end = 1 + s.len(); assert!(
1054 section_end < payload.len(),
1055 "must have stuffing after section"
1056 );
1057 for &b in &payload[section_end..] {
1058 assert_eq!(b, STUFFING_BYTE, "all trailing bytes must be 0xFF");
1059 }
1060 }
1061
1062 #[test]
1063 fn reassembler_discards_stuffing() {
1064 let s1 = build_section(0x42, &[0xAA; 10]);
1065 let s2 = build_section(0x46, &[0xBB; 5]);
1066
1067 let mut p = SectionPacketizer::new(0x0100);
1068 let packets = p.packetize(&[&s1, &s2]);
1069
1070 let mut reasm = SectionReassembler::default();
1071 for pkt_raw in &packets {
1072 let pkt = TsPacket::parse(pkt_raw).unwrap();
1073 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1074 }
1075
1076 let got: Vec<_> = core::iter::from_fn(|| reasm.pop_section()).collect();
1077 assert_eq!(got.len(), 2);
1078 assert!(
1079 reasm.is_empty(),
1080 "stuffing tail must be discarded, not buffered"
1081 );
1082 }
1083
1084 #[test]
1087 fn empty_batch_produces_no_packets() {
1088 let mut p = SectionPacketizer::new(0x0100);
1089 let packets: Vec<[u8; TS_PACKET_SIZE]> = p.packetize(&[]);
1090 assert!(packets.is_empty());
1091 }
1092
1093 #[test]
1094 fn packetize_into_clears_out_first() {
1095 let s = build_section(0x42, &[0xAA; 5]);
1096 let mut p = SectionPacketizer::new(0x0100);
1097
1098 let mut out = vec![[0u8; TS_PACKET_SIZE]; 99]; let n = p.packetize_into(&[&s], &mut out);
1100 assert_eq!(n, out.len(), "out must contain only the new packets");
1101 let mut reasm = SectionReassembler::default();
1103 for pkt_raw in &out {
1104 let pkt = TsPacket::parse(pkt_raw).unwrap();
1105 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1106 }
1107 let got = reasm.pop_section().unwrap();
1108 assert_eq!(got.as_ref(), s.as_slice());
1109 }
1110
1111 #[test]
1112 fn pid_is_correct() {
1113 let p = SectionPacketizer::new(0x1234);
1114 assert_eq!(p.pid(), 0x1234);
1115 }
1116
1117 #[test]
1118 fn with_continuity_masks_to_4_bits() {
1119 let p = SectionPacketizer::with_continuity(0x0100, 0xFE);
1120 assert_eq!(p.continuity_counter(), 0x0E);
1121 }
1122
1123 #[test]
1124 fn has_payload_always_true_no_adaptation() {
1125 let s = build_section(0x42, &[0xAA; 50]);
1126 let mut p = SectionPacketizer::new(0x0100);
1127 let packets = p.packetize(&[&s]);
1128 for pkt_raw in &packets {
1129 let pkt = TsPacket::parse(pkt_raw).unwrap();
1130 assert!(pkt.header.has_payload, "every packet must carry payload");
1131 assert!(!pkt.header.has_adaptation, "no adaptation field is emitted");
1132 assert!(!pkt.header.tei, "TEI must be false");
1133 assert_eq!(pkt.header.scrambling, 0, "scrambling must be 0");
1134 }
1135 }
1136}