1use crate::error::{Error, Result};
4
5pub const TS_PACKET_SIZE: usize = 188;
7pub const TS_SYNC_BYTE: u8 = 0x47;
9const MAX_SECTION_SIZE: usize = 4098;
14
15const TEI_MASK: u8 = 0x80;
17const PUSI_MASK: u8 = 0x40;
19pub const PID_MASK_HI: u8 = 0x1F;
21pub const SCRAMBLING_MASK: u8 = 0xC0;
23pub const ADAPTATION_FLAG: u8 = 0x20;
25pub const PAYLOAD_FLAG: u8 = 0x10;
27pub const CC_MASK: u8 = 0x0F;
29
30#[derive(Clone, Debug, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize))]
33pub struct TsHeader {
34 pub tei: bool,
37 pub pusi: bool,
40 pub pid: u16,
42 pub scrambling: u8,
44 pub has_adaptation: bool,
46 pub has_payload: bool,
48 pub continuity_counter: u8,
50}
51
52#[derive(Clone, Debug)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize))]
59pub struct TsPacket<'a> {
60 pub header: TsHeader,
62 pub payload: Option<&'a [u8]>,
65 #[cfg_attr(feature = "serde", serde(skip))]
67 pub raw: &'a [u8; TS_PACKET_SIZE],
68}
69
70impl TsHeader {
71 pub fn parse(raw4: &[u8]) -> Option<Self> {
75 if raw4.len() < 4 {
76 return None;
77 }
78 let b1 = raw4[1];
79 let b2 = raw4[2];
80 let b3 = raw4[3];
81
82 let tei = (b1 & TEI_MASK) != 0;
83 let pusi = (b1 & PUSI_MASK) != 0;
84 let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (b2 as u16);
85 let scrambling = (b3 & SCRAMBLING_MASK) >> 6;
86 let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
87 let has_payload = (b3 & PAYLOAD_FLAG) != 0;
88 let continuity_counter = b3 & CC_MASK;
89
90 Some(Self {
91 tei,
92 pusi,
93 pid,
94 scrambling,
95 has_adaptation,
96 has_payload,
97 continuity_counter,
98 })
99 }
100
101 pub fn serialize_into(&self, buf: &mut [u8]) {
105 assert!(
106 buf.len() >= 4,
107 "buffer must have at least 4 bytes for TS header"
108 );
109 buf[0] = TS_SYNC_BYTE;
110 buf[1] = 0;
111 if self.tei {
112 buf[1] |= TEI_MASK;
113 }
114 if self.pusi {
115 buf[1] |= PUSI_MASK;
116 }
117 buf[1] |= ((self.pid >> 8) as u8) & PID_MASK_HI;
118 buf[2] = (self.pid & 0xFF) as u8;
119 buf[3] = (self.scrambling << 6) & SCRAMBLING_MASK;
120 if self.has_adaptation {
121 buf[3] |= ADAPTATION_FLAG;
122 }
123 if self.has_payload {
124 buf[3] |= PAYLOAD_FLAG;
125 }
126 buf[3] |= self.continuity_counter & CC_MASK;
127 }
128}
129
130impl<'a> TsPacket<'a> {
131 pub fn parse(buf: &'a [u8]) -> Result<Self> {
137 if buf.len() < TS_PACKET_SIZE {
138 return Err(Error::BufferTooShort {
139 need: TS_PACKET_SIZE,
140 have: buf.len(),
141 what: "TsPacket::parse",
142 });
143 }
144 if buf[0] != TS_SYNC_BYTE {
145 return Err(Error::InvalidSyncByte { found: buf[0] });
146 }
147
148 let raw: &[u8; TS_PACKET_SIZE] =
149 buf[..TS_PACKET_SIZE]
150 .try_into()
151 .map_err(|_| Error::BufferTooShort {
152 need: TS_PACKET_SIZE,
153 have: buf.len(),
154 what: "TsPacket::parse (array conversion)",
155 })?;
156
157 let header = TsHeader::parse(&raw[..4])
158 .expect("raw is 188 bytes so first 4 bytes are always present");
159
160 let mut cursor = 4usize;
161 let mut payload = None;
162
163 if header.has_adaptation && cursor < TS_PACKET_SIZE {
165 let af_len = raw[cursor] as usize;
166 cursor += 1 + af_len;
167 }
168
169 if header.has_payload && cursor < TS_PACKET_SIZE {
170 payload = Some(&raw[cursor..]);
171 }
172
173 Ok(TsPacket {
174 header,
175 payload,
176 raw,
177 })
178 }
179}
180
181#[derive(Default)]
186pub struct SectionReassembler {
187 buf: bytes::BytesMut,
188 expected: usize,
189 ready: std::collections::VecDeque<bytes::Bytes>,
190}
191
192impl SectionReassembler {
193 pub fn feed(&mut self, payload: &[u8], pusi: bool) {
201 if pusi {
202 if payload.is_empty() {
205 self.buf.clear();
206 self.expected = 0;
207 return;
208 }
209 let pointer = payload[0] as usize;
210
211 if !self.buf.is_empty() && pointer > 0 {
219 let avail = payload.len() - 1;
220 let tail_len = pointer.min(avail);
221 if self.buf.len() + tail_len > MAX_SECTION_SIZE {
222 self.buf.clear();
223 self.expected = 0;
224 } else {
225 self.buf.extend_from_slice(&payload[1..1 + tail_len]);
226 self.drain_complete_sections();
227 }
228 }
229
230 self.buf.clear();
233 self.expected = 0;
234
235 let start = 1 + pointer;
236 if start >= payload.len() {
237 return;
239 }
240 let new_data = &payload[start..];
241 if new_data.len() > MAX_SECTION_SIZE {
242 return;
243 }
244 self.buf.extend_from_slice(new_data);
245 } else {
246 if self.buf.is_empty() {
247 return;
248 }
249 if self.buf.len() + payload.len() > MAX_SECTION_SIZE {
250 self.buf.clear();
251 self.expected = 0;
252 return;
253 }
254 self.buf.extend_from_slice(payload);
255 }
256
257 self.drain_complete_sections();
258 }
259
260 fn drain_complete_sections(&mut self) {
270 loop {
271 if self.buf.len() < 3 {
272 self.expected = 0;
275 break;
276 }
277 if self.buf[0] == 0xFF {
278 self.buf.clear();
280 self.expected = 0;
281 break;
282 }
283 let exp = 3 + (((self.buf[1] & 0x0F) as usize) << 8 | self.buf[2] as usize);
284 if self.buf.len() >= exp {
285 let section = self.buf.split_to(exp).freeze();
288 self.ready.push_back(section);
289 self.expected = 0;
290 } else {
291 self.expected = exp;
293 break;
294 }
295 }
296 }
297
298 pub fn pop_section(&mut self) -> Option<bytes::Bytes> {
300 self.ready.pop_front()
301 }
302
303 pub fn len(&self) -> usize {
305 self.buf.len()
306 }
307
308 pub fn is_empty(&self) -> bool {
310 self.buf.is_empty()
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 fn make_packet(b1: u8, b2: u8, b3: u8, payload_data: &[u8]) -> [u8; TS_PACKET_SIZE] {
320 let mut pkt = [0u8; TS_PACKET_SIZE];
321 pkt[0] = TS_SYNC_BYTE;
322 pkt[1] = b1;
323 pkt[2] = b2;
324 pkt[3] = b3;
325 let payload_start = 4;
326 let end = (payload_start + payload_data.len()).min(TS_PACKET_SIZE);
327 let len = (end - payload_start).min(payload_data.len());
328 pkt[payload_start..payload_start + len].copy_from_slice(&payload_data[..len]);
329 pkt
330 }
331
332 #[test]
333 fn parse_rejects_non_0x47_sync_byte() {
334 let mut pkt = [0u8; TS_PACKET_SIZE];
335 pkt[0] = 0x46; let err = TsPacket::parse(&pkt).unwrap_err();
337 match err {
338 Error::InvalidSyncByte { found } => assert_eq!(found, 0x46),
339 other => panic!("expected InvalidSyncByte, got {other:?}"),
340 }
341 }
342
343 #[test]
344 fn parse_extracts_pid_and_continuity_counter() {
345 let pkt = make_packet(0x12, 0x34, 0x05, &[]);
352 let pkt = TsPacket::parse(&pkt).unwrap();
353 assert_eq!(pkt.header.pid, 0x1234);
354 assert_eq!(pkt.header.continuity_counter, 5);
355 }
356
357 #[test]
358 fn payload_unit_start_indicator_flag_extracted() {
359 let pkt1 = make_packet(0x40, 0x00, 0x00, &[]);
361 let pkt1 = TsPacket::parse(&pkt1).unwrap();
362 assert!(pkt1.header.pusi);
363
364 let pkt2 = make_packet(0x00, 0x00, 0x00, &[]);
366 let pkt2 = TsPacket::parse(&pkt2).unwrap();
367 assert!(!pkt2.header.pusi);
368 }
369
370 fn build_pusi_payload(pointer_field: u8, previous_tail: &[u8], section: &[u8]) -> Vec<u8> {
375 assert_eq!(pointer_field as usize, previous_tail.len());
376 let mut v = Vec::with_capacity(1 + previous_tail.len() + section.len());
377 v.push(pointer_field);
378 v.extend_from_slice(previous_tail);
379 v.extend_from_slice(section);
380 v
381 }
382
383 fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
387 let section_length = body_after_length.len() as u16;
388 let mut v = Vec::with_capacity(3 + section_length as usize);
389 v.push(table_id);
390 v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
392 v.push((section_length & 0xFF) as u8);
393 v.extend_from_slice(body_after_length);
394 v
395 }
396
397 #[test]
403 fn reassembler_accumulates_multi_packet_section() {
404 let body = vec![0xAAu8; 197];
406 let section = build_section(0x02, &body);
407 assert_eq!(section.len(), 200);
408
409 let first_chunk = 100;
410 let payload1 = build_pusi_payload(0, &[], §ion[..first_chunk]);
411 let payload2 = section[first_chunk..].to_vec();
412
413 let mut reasm = SectionReassembler::default();
414 reasm.feed(&payload1, true);
415 reasm.feed(&payload2, false);
416
417 let out = reasm.pop_section().expect("section should be ready");
418 assert_eq!(out.len(), 200);
419 assert_eq!(out.as_ref(), §ion[..]);
420 }
421
422 #[test]
423 fn reassembler_yields_complete_section_once_length_satisfied() {
424 let section = build_section(0x42, &[0xAA]);
426 assert_eq!(section.len(), 4);
427 let payload = build_pusi_payload(0, &[], §ion);
428
429 let mut reasm = SectionReassembler::default();
430 reasm.feed(&payload, true);
431
432 let out = reasm
433 .pop_section()
434 .expect("single-packet section should pop");
435 assert_eq!(out.as_ref(), §ion[..]);
436 }
437
438 #[test]
439 fn reassembler_extracts_all_concatenated_sections_in_one_payload() {
440 let s1 = build_section(0x42, &[0x11, 0x22]); let s2 = build_section(0x46, &[0x33]); let s3 = build_section(0x4A, &[0x44, 0x55, 0x66]); let mut concat = Vec::new();
449 concat.extend_from_slice(&s1);
450 concat.extend_from_slice(&s2);
451 concat.extend_from_slice(&s3);
452 let payload = build_pusi_payload(0, &[], &concat);
453
454 let mut reasm = SectionReassembler::default();
455 reasm.feed(&payload, true);
456
457 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
459 assert_eq!(got.len(), 3, "all three concatenated sections must pop");
460 assert_eq!(got[0].as_ref(), &s1[..]);
461 assert_eq!(got[1].as_ref(), &s2[..]);
462 assert_eq!(got[2].as_ref(), &s3[..]);
463 }
464
465 #[test]
466 fn reassembler_stops_at_stuffing_after_concatenated_sections() {
467 let s1 = build_section(0x42, &[0xAA]); let s2 = build_section(0x46, &[0xBB, 0xCC]); let mut concat = Vec::new();
473 concat.extend_from_slice(&s1);
474 concat.extend_from_slice(&s2);
475 concat.extend_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); let payload = build_pusi_payload(0, &[], &concat);
477
478 let mut reasm = SectionReassembler::default();
479 reasm.feed(&payload, true);
480
481 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
482 assert_eq!(got.len(), 2);
483 assert_eq!(got[0].as_ref(), &s1[..]);
484 assert_eq!(got[1].as_ref(), &s2[..]);
485 assert!(
486 reasm.is_empty(),
487 "stuffing tail must be discarded, not buffered"
488 );
489 }
490
491 #[test]
492 fn reassembler_concatenated_then_spanning_tail() {
493 let s1 = build_section(0x42, &[0x01, 0x02]); let s2 = build_section(0x46, &[0x09u8; 60]); let split = 30;
499
500 let mut head = Vec::new();
501 head.extend_from_slice(&s1);
502 head.extend_from_slice(&s2[..split]);
503 let payload1 = build_pusi_payload(0, &[], &head);
504 let payload2 = s2[split..].to_vec();
505
506 let mut reasm = SectionReassembler::default();
507 reasm.feed(&payload1, true);
508 let first = reasm.pop_section().expect("first section pops at once");
509 assert_eq!(first.as_ref(), &s1[..]);
510 assert!(reasm.pop_section().is_none(), "second is still partial");
511
512 reasm.feed(&payload2, false);
513 let second = reasm.pop_section().expect("second pops after continuation");
514 assert_eq!(second.as_ref(), &s2[..]);
515 }
516
517 #[test]
518 fn reassembler_completes_section_spanning_into_pusi_packet() {
519 let spanning = build_section(0x42, &[0x5Au8; 62]); let head = 41;
527 let tail = &spanning[head..]; assert_eq!(tail.len(), 24);
529
530 let next = build_section(0x46, &[0x77, 0x88]); let payload_a = build_pusi_payload(0, &[], &spanning[..head]);
535 let payload_b = build_pusi_payload(24, tail, &next);
537
538 let mut reasm = SectionReassembler::default();
539 reasm.feed(&payload_a, true);
540 assert!(reasm.pop_section().is_none(), "head alone is incomplete");
541
542 reasm.feed(&payload_b, true);
543 let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
544 assert_eq!(got.len(), 2, "spanning section + new section must both pop");
545 assert_eq!(
546 got[0].as_ref(),
547 &spanning[..],
548 "spanning section completed from B's pointer tail"
549 );
550 assert_eq!(got[1].as_ref(), &next[..]);
551 }
552
553 #[test]
554 fn reassembler_pusi_pointer_spans_whole_payload() {
555 let spanning = build_section(0x42, &[0x33u8; 40]); let head = 20;
560 let payload_a = build_pusi_payload(0, &[], &spanning[..head]);
561 let tail = &spanning[head..]; let mut reasm = SectionReassembler::default();
564 reasm.feed(&payload_a, true);
565 reasm.feed(&payload_b_pointer_only(tail), true);
567
568 let out = reasm.pop_section().expect("spanning section completes");
569 assert_eq!(out.as_ref(), &spanning[..]);
570 assert!(reasm.pop_section().is_none());
571 }
572
573 fn payload_b_pointer_only(tail: &[u8]) -> Vec<u8> {
576 let mut v = Vec::with_capacity(1 + tail.len());
577 v.push(tail.len() as u8);
578 v.extend_from_slice(tail);
579 v
580 }
581
582 #[test]
583 fn reassembler_discards_on_buffer_overflow() {
584 let mut section = Vec::with_capacity(3 + 4095);
588 section.push(0x00); section.push(0xB0 | ((4095u16 >> 8) as u8 & 0x0F));
590 section.push(0xFF);
591 section.extend_from_slice(&[0u8; 160]);
592 let payload1 = build_pusi_payload(0, &[], §ion);
593
594 let mut reasm = SectionReassembler::default();
595 reasm.feed(&payload1, true);
596 assert!(reasm.pop_section().is_none());
597
598 let filler = vec![0u8; 180];
600 for _ in 0..(MAX_SECTION_SIZE / 180 + 1) {
601 reasm.feed(&filler, false);
602 }
603 assert!(
604 reasm.pop_section().is_none(),
605 "no section should pop after overflow reset"
606 );
607
608 let valid_section = build_section(0x00, &[0xAA]);
610 let payload2 = build_pusi_payload(0, &[], &valid_section);
611 reasm.feed(&payload2, true);
612 let out = reasm
613 .pop_section()
614 .expect("fresh section should pop after reset");
615 assert_eq!(out.as_ref(), &valid_section[..]);
616 }
617
618 #[test]
619 fn reassembler_handles_pusi_with_nonzero_pointer_field() {
620 let prior_tail = vec![0x11, 0x22, 0x33];
622 let new_section = build_section(0x02, &[0xBB]);
623 assert_eq!(new_section.len(), 4);
624 let payload = build_pusi_payload(3, &prior_tail, &new_section);
625
626 let mut reasm = SectionReassembler::default();
627 reasm.feed(&payload, true);
628
629 let out = reasm
630 .pop_section()
631 .expect("section after pointer_field skip should pop");
632 assert_eq!(out.as_ref(), &new_section[..]);
633 }
634
635 #[test]
636 fn reassembler_ignores_continuation_before_pusi() {
637 let pkt = make_packet(0x00, 0x00, PAYLOAD_FLAG, &[0xAA, 0xBB, 0xCC]);
640
641 let mut reasm = SectionReassembler::default();
642 reasm.feed(&pkt[4..], false); assert!(
645 reasm.pop_section().is_none(),
646 "no section should appear without prior PUSI"
647 );
648 assert!(
649 reasm.pop_section().is_none(),
650 "second pop should also be none"
651 );
652 }
653
654 #[test]
657 fn reassembler_empty_pusi_payload_does_not_panic() {
658 let mut reasm = SectionReassembler::default();
659 reasm.feed(&[], true);
660 assert!(reasm.pop_section().is_none());
661 let mut payload = vec![0x00u8, 0x72, 0x70, 0x01, 0x00];
663 payload.resize(5, 0);
664 reasm.feed(&payload, true);
665 assert!(reasm.pop_section().is_some());
666 }
667
668 #[test]
672 fn reassembler_accepts_maximal_private_section() {
673 let mut section = vec![0x80u8, 0x7F, 0xFF]; section.resize(3 + 0xFFF, 0xAB);
675
676 let mut reasm = SectionReassembler::default();
677 let mut first = vec![0x00];
679 first.extend_from_slice(§ion[..183]);
680 reasm.feed(&first, true);
681 for chunk in section[183..].chunks(184) {
682 reasm.feed(chunk, false);
683 }
684 let out = reasm.pop_section().expect("4098-byte section should pop");
685 assert_eq!(out.len(), 4098);
686 assert_eq!(out.as_ref(), §ion[..]);
687 }
688}