1use alloc::vec::Vec;
10use core::time::Duration;
11
12use crate::pid::well_known;
13use crate::ts::{TsHeader, CC_MASK, SECTION_LENGTH_HI_MASK, TS_PACKET_SIZE};
14
15const PUSI_PAYLOAD_CAP: usize = 183;
17const PAYLOAD_CAP: usize = 184;
19const STUFFING_BYTE: u8 = 0xFF;
21
22pub struct SectionPacketizer {
31 pid: u16,
32 continuity_counter: u8,
33}
34
35impl SectionPacketizer {
36 pub fn new(pid: u16) -> Self {
38 Self {
39 pid,
40 continuity_counter: 0,
41 }
42 }
43
44 pub fn with_continuity(pid: u16, cc: u8) -> Self {
46 Self {
47 pid,
48 continuity_counter: cc & CC_MASK,
49 }
50 }
51
52 pub fn pid(&self) -> u16 {
54 self.pid
55 }
56
57 pub fn continuity_counter(&self) -> u8 {
59 self.continuity_counter
60 }
61
62 pub fn packetize_into(
67 &mut self,
68 sections: &[&[u8]],
69 out: &mut Vec<[u8; TS_PACKET_SIZE]>,
70 ) -> usize {
71 out.clear();
72
73 if sections.is_empty() {
74 return 0;
75 }
76
77 let total_len: usize = sections.iter().map(|s| s.len()).sum();
79 if total_len == 0 {
80 return 0;
81 }
82 let mut data = Vec::with_capacity(total_len);
83 let mut starts = Vec::with_capacity(sections.len());
84 for s in sections {
85 starts.push(data.len());
86 data.extend_from_slice(s);
87 }
88
89 let count_before = out.len();
90 let mut pos = 0usize;
91
92 while pos < data.len() {
93 let next_start = starts.iter().copied().find(|&s| s >= pos);
95
96 let pusi: bool;
97 let pointer_field: u8;
98 let cap: usize;
99
100 if let Some(ns) = next_start {
101 let diff = ns.saturating_sub(pos);
102 if diff <= PUSI_PAYLOAD_CAP {
103 pusi = true;
104 pointer_field = diff as u8;
105 cap = PUSI_PAYLOAD_CAP;
106 } else {
107 pusi = false;
108 pointer_field = 0;
109 cap = PAYLOAD_CAP;
110 }
111 } else {
112 pusi = false;
113 pointer_field = 0;
114 cap = PAYLOAD_CAP;
115 }
116
117 let mut pkt = [0u8; TS_PACKET_SIZE];
118
119 let header = TsHeader {
120 tei: false,
121 pusi,
122 pid: self.pid,
123 scrambling: 0,
124 has_adaptation: false,
125 has_payload: true,
126 continuity_counter: self.continuity_counter,
127 };
128 header
129 .serialize_into(&mut pkt[..4])
130 .expect("4-byte header buffer");
131
132 self.continuity_counter = (self.continuity_counter + 1) & CC_MASK;
133
134 let mut write_pos = 4usize;
135
136 if pusi {
137 pkt[write_pos] = pointer_field;
138 write_pos += 1;
139 }
140
141 let remaining = data.len() - pos;
142 let to_copy = remaining.min(cap);
143 pkt[write_pos..write_pos + to_copy].copy_from_slice(&data[pos..pos + to_copy]);
144 pos += to_copy;
145 write_pos += to_copy;
146
147 for b in &mut pkt[write_pos..] {
149 *b = STUFFING_BYTE;
150 }
151
152 out.push(pkt);
153 }
154
155 out.len() - count_before
156 }
157
158 pub fn packetize(&mut self, sections: &[&[u8]]) -> Vec<[u8; TS_PACKET_SIZE]> {
160 let mut out = Vec::new();
161 self.packetize_into(sections, &mut out);
162 out
163 }
164}
165
166pub const NIT_MAX_INTERVAL: Duration = Duration::from_secs(10);
170pub const BAT_MAX_INTERVAL: Duration = Duration::from_secs(10);
172pub const SDT_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
174pub const SDT_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
176pub const EIT_PF_ACTUAL_MAX_INTERVAL: Duration = Duration::from_secs(2);
178pub const EIT_PF_OTHER_MAX_INTERVAL: Duration = Duration::from_secs(10);
181pub const EIT_SCHED_MAX_INTERVAL: Duration = Duration::from_secs(10);
183pub const EIT_SCHED_EXT_MAX_INTERVAL: Duration = Duration::from_secs(30);
185pub const TDT_MAX_INTERVAL: Duration = Duration::from_secs(30);
187pub const TOT_MAX_INTERVAL: Duration = Duration::from_secs(30);
189
190pub const PAT_MAX_INTERVAL: Duration = Duration::from_millis(100);
192pub const PMT_MAX_INTERVAL: Duration = Duration::from_millis(100);
194
195pub const MIN_SECTION_INTERVAL: Duration = Duration::from_millis(25);
198
199pub struct SiMux {
219 entries: Vec<Entry>,
220}
221
222struct Entry {
223 pid: u16,
224 sections: Vec<u8>,
225 interval: Duration,
226 last_emit: Option<Duration>,
227 packetizer: SectionPacketizer,
228}
229
230impl SiMux {
231 pub fn new() -> Self {
233 Self {
234 entries: Vec::new(),
235 }
236 }
237
238 pub fn upsert(&mut self, pid: u16, sections: Vec<u8>, interval: Duration) {
249 debug_assert!(
250 interval >= MIN_SECTION_INTERVAL,
251 "interval {interval:?} is below the 25 ms minimum (EN 300 468 §5.1.4.1)"
252 );
253
254 if let Some(entry) = self.entries.iter_mut().find(|e| e.pid == pid) {
255 entry.sections = sections;
256 entry.interval = interval;
257 } else {
258 self.entries.push(Entry {
259 pid,
260 sections,
261 interval,
262 last_emit: None,
263 packetizer: SectionPacketizer::new(pid),
264 });
265 }
266 }
267
268 pub fn upsert_pat(&mut self, sections: Vec<u8>) {
272 self.upsert(well_known::PAT.value(), sections, PAT_MAX_INTERVAL);
273 }
274
275 pub fn upsert_pmt(&mut self, pid: u16, sections: Vec<u8>) {
279 self.upsert(pid, sections, PMT_MAX_INTERVAL);
280 }
281
282 pub fn upsert_sdt_actual(&mut self, sections: Vec<u8>) {
286 self.upsert(
287 well_known::SDT_BAT.value(),
288 sections,
289 SDT_ACTUAL_MAX_INTERVAL,
290 );
291 }
292
293 pub fn upsert_nit(&mut self, sections: Vec<u8>) {
297 self.upsert(well_known::NIT.value(), sections, NIT_MAX_INTERVAL);
298 }
299
300 pub fn upsert_tdt(&mut self, sections: Vec<u8>) {
304 self.upsert(well_known::TDT_TOT.value(), sections, TDT_MAX_INTERVAL);
305 }
306
307 pub fn upsert_tot(&mut self, sections: Vec<u8>) {
311 self.upsert(well_known::TDT_TOT.value(), sections, TOT_MAX_INTERVAL);
312 }
313
314 pub fn poll_into(&mut self, now: Duration, out: &mut Vec<[u8; TS_PACKET_SIZE]>) -> usize {
321 out.clear();
322 let before = out.len();
323
324 let mut tmp = Vec::new();
325 for entry in &mut self.entries {
326 let due = match entry.last_emit {
327 None => true,
328 Some(last) => now.saturating_sub(last) >= entry.interval,
329 };
330 if due {
331 let refs = split_sections(&entry.sections);
332 if !refs.is_empty() {
333 entry.packetizer.packetize_into(&refs, &mut tmp);
334 out.append(&mut tmp);
335 }
336 entry.last_emit = Some(now);
337 }
338 }
339
340 out.len() - before
341 }
342
343 pub fn poll(&mut self, now: Duration) -> Vec<[u8; TS_PACKET_SIZE]> {
345 let mut out = Vec::new();
346 self.poll_into(now, &mut out);
347 out
348 }
349}
350
351impl Default for SiMux {
352 fn default() -> Self {
353 Self::new()
354 }
355}
356
357fn split_sections(data: &[u8]) -> Vec<&[u8]> {
363 let mut result = Vec::new();
364 let mut pos = 0;
365 while pos + 3 <= data.len() {
366 let section_length =
367 (((data[pos + 1] & SECTION_LENGTH_HI_MASK) as usize) << 8) | (data[pos + 2] as usize);
368 let end = pos + 3 + section_length;
369 if end > data.len() {
370 break;
371 }
372 result.push(&data[pos..end]);
373 pos = end;
374 }
375 result
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use crate::ts::{SectionReassembler, TsPacket};
382
383 fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
389 let section_length = body_after_length.len() as u16;
390 let mut v = Vec::with_capacity(3 + section_length as usize);
391 v.push(table_id);
392 v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
394 v.push((section_length & 0xFF) as u8);
395 v.extend_from_slice(body_after_length);
396 v
397 }
398
399 fn concat_sections(sections: &[Vec<u8>]) -> Vec<u8> {
400 let total: usize = sections.iter().map(|s| s.len()).sum();
401 let mut out = Vec::with_capacity(total);
402 for s in sections {
403 out.extend_from_slice(s);
404 }
405 out
406 }
407
408 fn assert_round_trip(sections: &[Vec<u8>]) {
411 let mut packetizer = SectionPacketizer::new(0x0100);
412 let refs: Vec<&[u8]> = sections.iter().map(|s| s.as_slice()).collect();
413 let packets = packetizer.packetize(&refs);
414
415 let mut reasm = SectionReassembler::default();
416 for pkt_raw in &packets {
417 let pkt = TsPacket::parse(pkt_raw).expect("parse generated packet");
418 let payload = pkt.payload.expect("payload present");
419 let pusi = pkt.header.pusi;
420 reasm.feed(payload, pusi);
421 }
422
423 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
424 assert_eq!(
425 got.len(),
426 sections.len(),
427 "section count mismatch: expected {}, got {}",
428 sections.len(),
429 got.len()
430 );
431 for (i, (orig, round)) in sections.iter().zip(got.iter()).enumerate() {
432 assert_eq!(
433 round.as_ref(),
434 orig.as_slice(),
435 "section {i} round-trip mismatch"
436 );
437 }
438 assert!(reasm.is_empty(), "reassembler should be empty after drain");
439 }
440
441 #[test]
444 fn split_single_section() {
445 let s = build_section(0x42, &[0xAA; 5]);
446 let refs = split_sections(&s);
447 assert_eq!(refs.len(), 1);
448 assert_eq!(refs[0], s.as_slice());
449 }
450
451 #[test]
452 fn split_two_sections() {
453 let s1 = build_section(0x42, &[0x01, 0x02]);
454 let s2 = build_section(0x46, &[0x03, 0x04, 0x05]);
455 let both = concat_sections(&[s1.clone(), s2.clone()]);
456 let refs = split_sections(&both);
457 assert_eq!(refs.len(), 2);
458 assert_eq!(refs[0], s1.as_slice());
459 assert_eq!(refs[1], s2.as_slice());
460 }
461
462 #[test]
463 fn split_empty_input() {
464 let refs = split_sections(&[]);
465 assert!(refs.is_empty());
466 }
467
468 #[test]
469 fn split_trailing_garbage_ignored() {
470 let s = build_section(0x42, &[0xAA; 3]);
471 let mut data = s.clone();
472 data.push(0xFF); let refs = split_sections(&data);
474 assert_eq!(refs.len(), 1);
475 assert_eq!(refs[0], s.as_slice());
476 }
477
478 #[test]
481 fn interval_constants_match_spec() {
482 assert_eq!(NIT_MAX_INTERVAL, Duration::from_secs(10));
484 assert_eq!(BAT_MAX_INTERVAL, Duration::from_secs(10));
485 assert_eq!(SDT_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
486 assert_eq!(SDT_OTHER_MAX_INTERVAL, Duration::from_secs(10));
487 assert_eq!(EIT_PF_ACTUAL_MAX_INTERVAL, Duration::from_secs(2));
488 assert_eq!(EIT_PF_OTHER_MAX_INTERVAL, Duration::from_secs(10));
489 assert_eq!(EIT_SCHED_MAX_INTERVAL, Duration::from_secs(10));
490 assert_eq!(EIT_SCHED_EXT_MAX_INTERVAL, Duration::from_secs(30));
491 assert_eq!(TDT_MAX_INTERVAL, Duration::from_secs(30));
492 assert_eq!(TOT_MAX_INTERVAL, Duration::from_secs(30));
493
494 assert_eq!(PAT_MAX_INTERVAL, Duration::from_millis(100));
496 assert_eq!(PMT_MAX_INTERVAL, Duration::from_millis(100));
497
498 assert_eq!(MIN_SECTION_INTERVAL, Duration::from_millis(25));
500 }
501
502 #[test]
505 fn new_simux_is_empty() {
506 let mut mux = SiMux::new();
507 let pkts = mux.poll(Duration::ZERO);
508 assert!(pkts.is_empty());
509 }
510
511 #[test]
512 fn simux_default_is_empty() {
513 let mut mux = SiMux::default();
514 let pkts = mux.poll(Duration::ZERO);
515 assert!(pkts.is_empty());
516 }
517
518 #[test]
519 fn first_poll_always_emits() {
520 let s = build_section(0x42, &[0xAA; 10]);
521 let mut mux = SiMux::new();
522 mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
523 let pkts = mux.poll(Duration::ZERO);
524 assert!(!pkts.is_empty(), "first poll must emit");
525 }
526
527 #[test]
528 fn first_poll_emits_all_entries() {
529 let s1 = build_section(0x42, &[0x01; 5]);
530 let s2 = build_section(0x46, &[0x02; 5]);
531 let mut mux = SiMux::new();
532 mux.upsert(0x0100, s1.clone(), Duration::from_millis(100));
533 mux.upsert(0x0200, s2.clone(), Duration::from_millis(200));
534 let pkts = mux.poll(Duration::ZERO);
535 let pids: Vec<u16> = pkts
537 .iter()
538 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
539 .collect();
540 assert!(pids.contains(&0x0100));
541 assert!(pids.contains(&0x0200));
542 }
543
544 #[test]
547 fn entry_emits_only_when_due() {
548 let s = build_section(0x42, &[0xAA; 10]);
549 let mut mux = SiMux::new();
550 mux.upsert(0x0100, s.clone(), Duration::from_millis(100));
551
552 let pkts0 = mux.poll(Duration::ZERO);
554 assert!(!pkts0.is_empty());
555
556 let pkts50 = mux.poll(Duration::from_millis(50));
558 assert!(pkts50.is_empty(), "should not emit at t=50");
559
560 let pkts99 = mux.poll(Duration::from_millis(99));
562 assert!(pkts99.is_empty(), "should not emit at t=99");
563
564 let pkts100 = mux.poll(Duration::from_millis(100));
566 assert!(!pkts100.is_empty(), "should emit at t=100");
567 }
568
569 #[test]
570 fn two_entries_different_cadence() {
571 let s1 = build_section(0x42, &[0x01; 5]);
572 let s2 = build_section(0x46, &[0x02; 5]);
573 let mut mux = SiMux::new();
574 mux.upsert(0x0100, concat_sections(&[s1]), Duration::from_millis(100));
575 mux.upsert(0x0200, concat_sections(&[s2]), Duration::from_millis(200));
576
577 let pkts = mux.poll(Duration::ZERO);
579 assert!(!pkts.is_empty());
580
581 let pkts100 = mux.poll(Duration::from_millis(100));
583 assert!(!pkts100.is_empty());
584 let pids100: Vec<u16> = pkts100
585 .iter()
586 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
587 .collect();
588 assert!(pids100.contains(&0x0100), "PID 0x0100 should emit at t=100");
589 assert!(
590 !pids100.contains(&0x0200),
591 "PID 0x0200 should NOT emit at t=100"
592 );
593
594 let pkts150 = mux.poll(Duration::from_millis(150));
596 assert!(pkts150.is_empty(), "neither should emit at t=150");
597
598 let pkts200 = mux.poll(Duration::from_millis(200));
600 let pids200: Vec<u16> = pkts200
601 .iter()
602 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
603 .collect();
604 assert!(pids200.contains(&0x0100), "PID 0x0100 should emit at t=200");
605 assert!(pids200.contains(&0x0200), "PID 0x0200 should emit at t=200");
606 }
607
608 #[test]
609 fn entry_emits_again_after_full_interval() {
610 let s = build_section(0x42, &[0xAA; 20]);
611 let mut mux = SiMux::new();
612 mux.upsert(0x0100, s, Duration::from_secs(1));
613
614 let n0 = mux.poll_into(Duration::ZERO, &mut Vec::new());
616 assert!(n0 > 0);
617
618 let mut buf = Vec::new();
620 let n999 = mux.poll_into(Duration::from_millis(999), &mut buf);
621 assert_eq!(n999, 0);
622
623 let mut buf2 = Vec::new();
625 let n1000 = mux.poll_into(Duration::from_millis(1000), &mut buf2);
626 assert!(n1000 > 0);
627 }
628
629 #[test]
632 fn upsert_updates_existing_entry() {
633 let s1 = build_section(0x42, &[0xAA; 5]);
634 let s2 = build_section(0x46, &[0xBB; 10]);
635 let mut mux = SiMux::new();
636
637 mux.upsert(0x0100, s1, Duration::from_millis(100));
638 mux.upsert(0x0100, s2.clone(), Duration::from_millis(200));
640
641 let pkts = mux.poll(Duration::ZERO);
642
643 let mut reasm = SectionReassembler::default();
645 for raw in &pkts {
646 let pkt = TsPacket::parse(raw).unwrap();
647 if pkt.header.pid == 0x0100 {
648 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
649 }
650 }
651 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
652 assert_eq!(got.len(), 1);
653 assert_eq!(got[0].as_ref(), s2.as_slice());
654 }
655
656 #[test]
659 fn upsert_pat_uses_correct_pid() {
660 let s = build_section(0x00, &[0x01, 0x02]);
661 let mut mux = SiMux::new();
662 mux.upsert_pat(s);
663 let pkts = mux.poll(Duration::ZERO);
664 let pids: Vec<u16> = pkts
665 .iter()
666 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
667 .collect();
668 assert!(pids.iter().all(|&p| p == well_known::PAT.value()));
669 }
670
671 #[test]
672 fn upsert_sdt_actual_uses_correct_pid() {
673 let s = build_section(0x42, &[0x01]);
674 let mut mux = SiMux::new();
675 mux.upsert_sdt_actual(s);
676 let pkts = mux.poll(Duration::ZERO);
677 let pids: Vec<u16> = pkts
678 .iter()
679 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
680 .collect();
681 assert!(pids.iter().all(|&p| p == well_known::SDT_BAT.value()));
682 }
683
684 #[test]
685 fn upsert_nit_uses_correct_pid() {
686 let s = build_section(0x40, &[0x01]);
687 let mut mux = SiMux::new();
688 mux.upsert_nit(s);
689 let pkts = mux.poll(Duration::ZERO);
690 let pids: Vec<u16> = pkts
691 .iter()
692 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
693 .collect();
694 assert!(pids.iter().all(|&p| p == well_known::NIT.value()));
695 }
696
697 #[test]
698 fn upsert_tdt_uses_correct_pid() {
699 let s = build_section(0x70, &[0x01]);
700 let mut mux = SiMux::new();
701 mux.upsert_tdt(s);
702 let pkts = mux.poll(Duration::ZERO);
703 let pids: Vec<u16> = pkts
704 .iter()
705 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
706 .collect();
707 assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
708 }
709
710 #[test]
711 fn upsert_tot_uses_correct_pid() {
712 let s = build_section(0x73, &[0x01]);
713 let mut mux = SiMux::new();
714 mux.upsert_tot(s);
715 let pkts = mux.poll(Duration::ZERO);
716 let pids: Vec<u16> = pkts
717 .iter()
718 .map(|raw| TsPacket::parse(raw).unwrap().header.pid)
719 .collect();
720 assert!(pids.iter().all(|&p| p == well_known::TDT_TOT.value()));
721 }
722
723 #[test]
726 fn simux_round_trip_single_entry() {
727 let s1 = build_section(0x42, &[0xAA; 20]);
728 let s2 = build_section(0x46, &[0xBB; 15]);
729 let mut mux = SiMux::new();
730 mux.upsert(
731 0x0100,
732 concat_sections(&[s1.clone(), s2.clone()]),
733 Duration::from_millis(100),
734 );
735
736 let pkts = mux.poll(Duration::ZERO);
737
738 let mut reasm = SectionReassembler::default();
739 for raw in &pkts {
740 let pkt = TsPacket::parse(raw).unwrap();
741 if pkt.header.pid == 0x0100 {
742 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
743 }
744 }
745 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
746 assert_eq!(got.len(), 2, "round-trip must recover both sections");
747 assert_eq!(got[0].as_ref(), s1.as_slice());
748 assert_eq!(got[1].as_ref(), s2.as_slice());
749 }
750
751 #[test]
752 fn simux_round_trip_multi_pid() {
753 let s_a = build_section(0x42, &[0xA0; 10]);
754 let s_b = build_section(0x46, &[0xB0; 10]);
755 let mut mux = SiMux::new();
756 mux.upsert(0x0100, s_a.clone(), Duration::from_millis(100));
757 mux.upsert(0x0200, s_b.clone(), Duration::from_millis(100));
758
759 let pkts = mux.poll(Duration::ZERO);
760
761 let mut reasm_a = SectionReassembler::default();
762 let mut reasm_b = SectionReassembler::default();
763 for raw in &pkts {
764 let pkt = TsPacket::parse(raw).unwrap();
765 match pkt.header.pid {
766 0x0100 => reasm_a.feed(pkt.payload.unwrap(), pkt.header.pusi),
767 0x0200 => reasm_b.feed(pkt.payload.unwrap(), pkt.header.pusi),
768 _ => {}
769 }
770 }
771 let got_a: Vec<_> = std::iter::from_fn(|| reasm_a.pop_section()).collect();
772 let got_b: Vec<_> = std::iter::from_fn(|| reasm_b.pop_section()).collect();
773 assert_eq!(got_a.len(), 1);
774 assert_eq!(got_b.len(), 1);
775 assert_eq!(got_a[0].as_ref(), s_a.as_slice());
776 assert_eq!(got_b[0].as_ref(), s_b.as_slice());
777 }
778
779 #[test]
782 fn continuity_counter_continuous_across_polls() {
783 let s = build_section(0x42, &[0xAA; 250]);
785 let mut mux = SiMux::new();
786 mux.upsert(0x0100, s, Duration::from_millis(100));
787
788 let pkts1 = mux.poll(Duration::ZERO);
789 assert!(pkts1.len() >= 2, "need ≥2 packets to test CC continuity");
790
791 let last_cc_pkts1 = TsPacket::parse(&pkts1[pkts1.len() - 1])
792 .unwrap()
793 .header
794 .continuity_counter;
795
796 let pkts2 = mux.poll(Duration::from_millis(100));
797 assert!(!pkts2.is_empty(), "second poll must emit");
798
799 let first_cc_pkts2 = TsPacket::parse(&pkts2[0])
800 .unwrap()
801 .header
802 .continuity_counter;
803
804 assert_eq!(
805 first_cc_pkts2,
806 (last_cc_pkts1 + 1) & 0x0F,
807 "CC must continue across poll cycles"
808 );
809 }
810
811 #[test]
814 fn poll_into_clears_out_before_appending() {
815 let s = build_section(0x42, &[0xAA; 5]);
816 let mut mux = SiMux::new();
817 mux.upsert(0x0100, s, Duration::from_millis(100));
818
819 let mut out = vec![[0u8; TS_PACKET_SIZE]; 42];
820 let n = mux.poll_into(Duration::ZERO, &mut out);
821 assert_eq!(n, out.len(), "out must contain only new packets");
822 }
823
824 #[test]
827 #[cfg(debug_assertions)]
828 #[should_panic(expected = "25 ms")]
829 fn upsert_rejects_sub_25ms_interval_in_debug() {
830 let s = build_section(0x42, &[0xAA; 5]);
831 let mut mux = SiMux::new();
832 mux.upsert(0x0100, s, Duration::from_millis(10));
833 }
834
835 #[test]
836 fn upsert_accepts_25ms_interval() {
837 let s = build_section(0x42, &[0xAA; 5]);
838 let mut mux = SiMux::new();
839 mux.upsert(0x0100, s, MIN_SECTION_INTERVAL);
840 let pkts = mux.poll(Duration::ZERO);
841 assert!(!pkts.is_empty());
842 }
843
844 #[test]
847 fn round_trip_single_short_section() {
848 let s = build_section(0x42, &[0xAA; 10]);
849 assert_round_trip(&[s]);
850 }
851
852 #[test]
853 fn round_trip_one_byte_body() {
854 let s = build_section(0x46, &[0xBB]); assert_round_trip(&[s]);
856 }
857
858 #[test]
859 fn round_trip_section_exactly_pusi_cap_boundary() {
860 let body = vec![0xCC; PUSI_PAYLOAD_CAP - 3];
862 let s = build_section(0x50, &body);
863 assert_eq!(s.len(), PUSI_PAYLOAD_CAP);
864 assert_round_trip(&[s]);
865 }
866
867 #[test]
868 fn round_trip_section_just_over_pusi_cap() {
869 let body = vec![0xDD; PUSI_PAYLOAD_CAP - 3 + 1];
871 let s = build_section(0x52, &body);
872 assert_eq!(s.len(), PUSI_PAYLOAD_CAP + 1);
873 assert_round_trip(&[s]);
874 }
875
876 #[test]
877 fn round_trip_section_spans_many_packets() {
878 let body = vec![0xEE; 2000 - 3];
880 let s = build_section(0x60, &body);
881 assert_round_trip(&[s]);
882 }
883
884 #[test]
885 fn round_trip_section_at_max_size() {
886 let body = vec![0x11; 4096 - 3];
891 let s = build_section(0x80, &body);
892 assert_eq!(s.len(), 4096);
893 assert_round_trip(&[s]);
894 }
895
896 #[test]
897 fn round_trip_multiple_short_sections_in_one_batch() {
898 let s1 = build_section(0x42, &[0x01, 0x02]); let s2 = build_section(0x46, &[0x03]); let s3 = build_section(0x4A, &[0x04, 0x05, 0x06]); assert_round_trip(&[s1, s2, s3]);
902 }
903
904 #[test]
905 fn round_trip_section_ends_exactly_at_boundary() {
906 let body1 = vec![0xA1; PUSI_PAYLOAD_CAP - 3];
910 let s1 = build_section(0x50, &body1);
911 assert_eq!(s1.len(), PUSI_PAYLOAD_CAP);
912
913 let s2 = build_section(0x52, &[0xB1, 0xB2]);
914 assert_round_trip(&[s1, s2]);
915 }
916
917 #[test]
918 fn round_trip_mix_small_large_sections() {
919 let s1 = build_section(0x10, &[0xAA; 5]);
922 let body2 = vec![0xBB; 200];
923 let s2 = build_section(0x20, &body2);
924 let s3 = build_section(0x30, &[0xCC; 50]);
925 let body4 = vec![0xDD; 800];
926 let s4 = build_section(0x40, &body4);
927 let s5 = build_section(0x50, &[0xEE]); assert_round_trip(&[s1, s2, s3, s4, s5]);
929 }
930
931 #[test]
934 fn continuity_counter_increments_per_packet() {
935 let body = vec![0xAA; 500];
937 let section = build_section(0x42, &body);
938 let mut p = SectionPacketizer::new(0x0100);
939
940 let packets = p.packetize(&[§ion]);
941 assert!(packets.len() >= 3, "need multiple packets to test CC");
942
943 let mut last_cc: Option<u8> = None;
944 for pkt_raw in &packets {
945 let pkt = TsPacket::parse(pkt_raw).unwrap();
946 let cc = pkt.header.continuity_counter;
947 if let Some(last) = last_cc {
948 assert_eq!(cc, (last + 1) & 0x0F, "CC must increment per packet");
949 }
950 last_cc = Some(cc);
951 }
952 }
953
954 #[test]
955 fn continuity_counter_wraps_and_continues_across_calls() {
956 let mut p = SectionPacketizer::with_continuity(0x0100, 14);
957 let body = vec![0xBB; 500];
959 let s = build_section(0x42, &body);
960
961 let pkts1 = p.packetize(&[&s]);
963 assert!(pkts1.len() >= 3, "section must span ≥3 packets");
964 let ccs1: Vec<u8> = pkts1
965 .iter()
966 .map(|b| TsPacket::parse(b).unwrap().header.continuity_counter)
967 .collect();
968 assert_eq!(ccs1[0], 14);
969 assert_eq!(ccs1[1], 15);
970 assert_eq!(ccs1[2], 0);
971
972 let pkts2 = p.packetize(&[&s]);
974 let cc_first_pkt2 = TsPacket::parse(&pkts2[0])
975 .unwrap()
976 .header
977 .continuity_counter;
978 assert_eq!(cc_first_pkt2, ccs1.last().map(|c| (c + 1) & 0x0F).unwrap());
979 }
980
981 #[test]
984 fn pusi_set_when_section_starts() {
985 let s = build_section(0x42, &[0xAA; 10]);
986 let mut p = SectionPacketizer::new(0x0100);
987 let packets = p.packetize(&[&s]);
988 assert!(!packets.is_empty());
989 let pkt = TsPacket::parse(&packets[0]).unwrap();
990 assert!(pkt.header.pusi, "first packet must have PUSI=1");
991 }
992
993 #[test]
994 fn pusi_not_set_on_mid_section_continuation() {
995 let body = vec![0xAA; 500];
996 let s = build_section(0x42, &body);
997 let mut p = SectionPacketizer::new(0x0100);
998 let packets = p.packetize(&[&s]);
999 assert!(packets.len() >= 2);
1000 let pkt1 = TsPacket::parse(&packets[0]).unwrap();
1001 let pkt2 = TsPacket::parse(&packets[1]).unwrap();
1002 assert!(pkt1.header.pusi, "first packet must have PUSI=1");
1003 assert!(
1004 !pkt2.header.pusi,
1005 "second packet is continuation, must have PUSI=0"
1006 );
1007 }
1008
1009 #[test]
1010 fn pointer_field_equals_tail_length_before_new_section() {
1011 let body1 = vec![0xA1; 197]; let s1 = build_section(0x52, &body1);
1016 assert_eq!(s1.len(), 200);
1017 let s2 = build_section(0x54, &[0xB1; 47]); assert_eq!(s2.len(), 50);
1019
1020 let mut p = SectionPacketizer::new(0x0100);
1021 let packets = p.packetize(&[&s1, &s2]);
1022
1023 let pkt_with_pointer = packets
1025 .iter()
1026 .map(|raw| TsPacket::parse(raw).unwrap())
1027 .find(|pkt| pkt.header.pusi && pkt.payload.is_some_and(|pl| pl.first() != Some(&0)))
1028 .expect("must have a PUSI packet with non-zero pointer");
1029
1030 let payload = pkt_with_pointer.payload.unwrap();
1031 let pointer = payload[0] as usize;
1032 assert!(pointer > 0, "pointer must be non-zero");
1033 let tail_start = s1.len() - pointer;
1035 assert_eq!(&payload[1..1 + pointer], &s1[tail_start..]);
1036 }
1037
1038 #[test]
1041 fn final_packet_unused_tail_is_stuffing() {
1042 let s = build_section(0x42, &[0xAA; 5]); let mut p = SectionPacketizer::new(0x0100);
1044 let packets = p.packetize(&[&s]);
1045
1046 let pkt = TsPacket::parse(&packets[0]).unwrap();
1047 let payload = pkt.payload.unwrap();
1048 assert_eq!(payload[0], 0, "pointer_field should be 0");
1049
1050 let section_end = 1 + s.len(); assert!(
1052 section_end < payload.len(),
1053 "must have stuffing after section"
1054 );
1055 for &b in &payload[section_end..] {
1056 assert_eq!(b, STUFFING_BYTE, "all trailing bytes must be 0xFF");
1057 }
1058 }
1059
1060 #[test]
1061 fn reassembler_discards_stuffing() {
1062 let s1 = build_section(0x42, &[0xAA; 10]);
1063 let s2 = build_section(0x46, &[0xBB; 5]);
1064
1065 let mut p = SectionPacketizer::new(0x0100);
1066 let packets = p.packetize(&[&s1, &s2]);
1067
1068 let mut reasm = SectionReassembler::default();
1069 for pkt_raw in &packets {
1070 let pkt = TsPacket::parse(pkt_raw).unwrap();
1071 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1072 }
1073
1074 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
1075 assert_eq!(got.len(), 2);
1076 assert!(
1077 reasm.is_empty(),
1078 "stuffing tail must be discarded, not buffered"
1079 );
1080 }
1081
1082 #[test]
1085 fn empty_batch_produces_no_packets() {
1086 let mut p = SectionPacketizer::new(0x0100);
1087 let packets: Vec<[u8; TS_PACKET_SIZE]> = p.packetize(&[]);
1088 assert!(packets.is_empty());
1089 }
1090
1091 #[test]
1092 fn packetize_into_clears_out_first() {
1093 let s = build_section(0x42, &[0xAA; 5]);
1094 let mut p = SectionPacketizer::new(0x0100);
1095
1096 let mut out = vec![[0u8; TS_PACKET_SIZE]; 99]; let n = p.packetize_into(&[&s], &mut out);
1098 assert_eq!(n, out.len(), "out must contain only the new packets");
1099 let mut reasm = SectionReassembler::default();
1101 for pkt_raw in &out {
1102 let pkt = TsPacket::parse(pkt_raw).unwrap();
1103 reasm.feed(pkt.payload.unwrap(), pkt.header.pusi);
1104 }
1105 let got = reasm.pop_section().unwrap();
1106 assert_eq!(got.as_ref(), s.as_slice());
1107 }
1108
1109 #[test]
1110 fn pid_is_correct() {
1111 let p = SectionPacketizer::new(0x1234);
1112 assert_eq!(p.pid(), 0x1234);
1113 }
1114
1115 #[test]
1116 fn with_continuity_masks_to_4_bits() {
1117 let p = SectionPacketizer::with_continuity(0x0100, 0xFE);
1118 assert_eq!(p.continuity_counter(), 0x0E);
1119 }
1120
1121 #[test]
1122 fn has_payload_always_true_no_adaptation() {
1123 let s = build_section(0x42, &[0xAA; 50]);
1124 let mut p = SectionPacketizer::new(0x0100);
1125 let packets = p.packetize(&[&s]);
1126 for pkt_raw in &packets {
1127 let pkt = TsPacket::parse(pkt_raw).unwrap();
1128 assert!(pkt.header.has_payload, "every packet must carry payload");
1129 assert!(!pkt.header.has_adaptation, "no adaptation field is emitted");
1130 assert!(!pkt.header.tei, "TEI must be false");
1131 assert_eq!(pkt.header.scrambling, 0, "scrambling must be 0");
1132 }
1133 }
1134}