1use crate::error::{Error, Result};
4
5pub const TS_PACKET_SIZE: usize = 188;
7pub const TS_SYNC_BYTE: u8 = 0x47;
9const MAX_SECTION_SIZE: usize = 4098;
14
15const TEI_MASK: u8 = 0x80;
17const PUSI_MASK: u8 = 0x40;
19pub const PID_MASK_HI: u8 = 0x1F;
21pub const SCRAMBLING_MASK: u8 = 0xC0;
23pub const ADAPTATION_FLAG: u8 = 0x20;
25pub const PAYLOAD_FLAG: u8 = 0x10;
27pub const CC_MASK: u8 = 0x0F;
29
30#[derive(Clone, Debug, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct TsHeader {
34 pub tei: bool,
37 pub pusi: bool,
40 pub pid: u16,
42 pub scrambling: u8,
44 pub has_adaptation: bool,
46 pub has_payload: bool,
48 pub continuity_counter: u8,
50}
51
52#[derive(Clone, Debug)]
59#[cfg_attr(feature = "serde", derive(serde::Serialize))]
60pub struct TsPacket<'a> {
61 pub header: TsHeader,
63 pub payload: Option<&'a [u8]>,
66 #[cfg_attr(feature = "serde", serde(skip))]
68 pub raw: &'a [u8; TS_PACKET_SIZE],
69}
70
71impl TsHeader {
72 pub fn parse(raw4: &[u8]) -> Option<Self> {
76 if raw4.len() < 4 {
77 return None;
78 }
79 let b1 = raw4[1];
80 let b2 = raw4[2];
81 let b3 = raw4[3];
82
83 let tei = (b1 & TEI_MASK) != 0;
84 let pusi = (b1 & PUSI_MASK) != 0;
85 let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (b2 as u16);
86 let scrambling = (b3 & SCRAMBLING_MASK) >> 6;
87 let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
88 let has_payload = (b3 & PAYLOAD_FLAG) != 0;
89 let continuity_counter = b3 & CC_MASK;
90
91 Some(Self {
92 tei,
93 pusi,
94 pid,
95 scrambling,
96 has_adaptation,
97 has_payload,
98 continuity_counter,
99 })
100 }
101
102 pub fn serialize_into(&self, buf: &mut [u8]) {
106 assert!(buf.len() >= 4, "buffer must have at least 4 bytes for TS header");
107 buf[0] = TS_SYNC_BYTE;
108 buf[1] = 0;
109 if self.tei {
110 buf[1] |= TEI_MASK;
111 }
112 if self.pusi {
113 buf[1] |= PUSI_MASK;
114 }
115 buf[1] |= ((self.pid >> 8) as u8) & PID_MASK_HI;
116 buf[2] = (self.pid & 0xFF) as u8;
117 buf[3] = (self.scrambling << 6) & SCRAMBLING_MASK;
118 if self.has_adaptation {
119 buf[3] |= ADAPTATION_FLAG;
120 }
121 if self.has_payload {
122 buf[3] |= PAYLOAD_FLAG;
123 }
124 buf[3] |= self.continuity_counter & CC_MASK;
125 }
126}
127
128impl<'a> TsPacket<'a> {
129 pub fn parse(buf: &'a [u8]) -> Result<Self> {
135 if buf.len() < TS_PACKET_SIZE {
136 return Err(Error::BufferTooShort {
137 need: TS_PACKET_SIZE,
138 have: buf.len(),
139 what: "TsPacket::parse",
140 });
141 }
142 if buf[0] != TS_SYNC_BYTE {
143 return Err(Error::InvalidSyncByte { found: buf[0] });
144 }
145
146 let raw: &[u8; TS_PACKET_SIZE] =
147 buf[..TS_PACKET_SIZE]
148 .try_into()
149 .map_err(|_| Error::BufferTooShort {
150 need: TS_PACKET_SIZE,
151 have: buf.len(),
152 what: "TsPacket::parse (array conversion)",
153 })?;
154
155 let header = TsHeader::parse(&raw[..4])
156 .expect("raw is 188 bytes so first 4 bytes are always present");
157
158 let mut cursor = 4usize;
159 let mut payload = None;
160
161 if header.has_adaptation && cursor < TS_PACKET_SIZE {
163 let af_len = raw[cursor] as usize;
164 cursor += 1 + af_len;
165 }
166
167 if header.has_payload && cursor < TS_PACKET_SIZE {
168 payload = Some(&raw[cursor..]);
169 }
170
171 Ok(TsPacket {
172 header,
173 payload,
174 raw,
175 })
176 }
177}
178
179#[derive(Default)]
184pub struct SectionReassembler {
185 buf: bytes::BytesMut,
186 expected: usize,
187 ready: std::collections::VecDeque<bytes::Bytes>,
188}
189
190impl SectionReassembler {
191 pub fn feed(&mut self, payload: &[u8], pusi: bool) {
197 if pusi {
198 if payload.is_empty() {
201 self.buf.clear();
202 self.expected = 0;
203 return;
204 }
205 let pointer = payload[0] as usize;
206 let start = 1 + pointer;
207 if start >= payload.len() {
208 self.buf.clear();
209 return;
210 }
211 self.buf.clear();
212 let new_data = &payload[start..];
213 if self.buf.len() + new_data.len() > MAX_SECTION_SIZE {
214 self.buf.clear();
215 self.expected = 0;
216 return;
217 }
218 self.buf.extend_from_slice(new_data);
219 if self.buf.len() >= 3 {
220 self.expected = 3 + (((self.buf[1] & 0x0F) as usize) << 8 | self.buf[2] as usize);
221 }
222 } else {
223 if self.buf.is_empty() {
224 return;
225 }
226 if self.buf.len() + payload.len() > MAX_SECTION_SIZE {
227 self.buf.clear();
228 self.expected = 0;
229 return;
230 }
231 self.buf.extend_from_slice(payload);
232 }
233
234 if self.expected > 0 && self.buf.len() >= self.expected {
235 let section = self.buf.split_to(self.expected).freeze();
238 self.ready.push_back(section);
239 self.expected = 0;
240 }
241 }
242
243 pub fn pop_section(&mut self) -> Option<bytes::Bytes> {
245 self.ready.pop_front()
246 }
247
248 pub fn len(&self) -> usize {
250 self.buf.len()
251 }
252
253 pub fn is_empty(&self) -> bool {
255 self.buf.is_empty()
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 fn make_packet(b1: u8, b2: u8, b3: u8, payload_data: &[u8]) -> [u8; TS_PACKET_SIZE] {
265 let mut pkt = [0u8; TS_PACKET_SIZE];
266 pkt[0] = TS_SYNC_BYTE;
267 pkt[1] = b1;
268 pkt[2] = b2;
269 pkt[3] = b3;
270 let payload_start = 4;
271 let end = (payload_start + payload_data.len()).min(TS_PACKET_SIZE);
272 let len = (end - payload_start).min(payload_data.len());
273 pkt[payload_start..payload_start + len].copy_from_slice(&payload_data[..len]);
274 pkt
275 }
276
277 #[test]
278 fn parse_rejects_non_0x47_sync_byte() {
279 let mut pkt = [0u8; TS_PACKET_SIZE];
280 pkt[0] = 0x46; let err = TsPacket::parse(&pkt).unwrap_err();
282 match err {
283 Error::InvalidSyncByte { found } => assert_eq!(found, 0x46),
284 other => panic!("expected InvalidSyncByte, got {other:?}"),
285 }
286 }
287
288 #[test]
289 fn parse_extracts_pid_and_continuity_counter() {
290 let pkt = make_packet(0x12, 0x34, 0x05, &[]);
297 let pkt = TsPacket::parse(&pkt).unwrap();
298 assert_eq!(pkt.header.pid, 0x1234);
299 assert_eq!(pkt.header.continuity_counter, 5);
300 }
301
302 #[test]
303 fn payload_unit_start_indicator_flag_extracted() {
304 let pkt1 = make_packet(0x40, 0x00, 0x00, &[]);
306 let pkt1 = TsPacket::parse(&pkt1).unwrap();
307 assert!(pkt1.header.pusi);
308
309 let pkt2 = make_packet(0x00, 0x00, 0x00, &[]);
311 let pkt2 = TsPacket::parse(&pkt2).unwrap();
312 assert!(!pkt2.header.pusi);
313 }
314
315 fn build_pusi_payload(pointer_field: u8, previous_tail: &[u8], section: &[u8]) -> Vec<u8> {
320 assert_eq!(pointer_field as usize, previous_tail.len());
321 let mut v = Vec::with_capacity(1 + previous_tail.len() + section.len());
322 v.push(pointer_field);
323 v.extend_from_slice(previous_tail);
324 v.extend_from_slice(section);
325 v
326 }
327
328 fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
332 let section_length = body_after_length.len() as u16;
333 let mut v = Vec::with_capacity(3 + section_length as usize);
334 v.push(table_id);
335 v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
337 v.push((section_length & 0xFF) as u8);
338 v.extend_from_slice(body_after_length);
339 v
340 }
341
342 #[test]
348 fn reassembler_accumulates_multi_packet_section() {
349 let body = vec![0xAAu8; 197];
351 let section = build_section(0x02, &body);
352 assert_eq!(section.len(), 200);
353
354 let first_chunk = 100;
355 let payload1 = build_pusi_payload(0, &[], §ion[..first_chunk]);
356 let payload2 = section[first_chunk..].to_vec();
357
358 let mut reasm = SectionReassembler::default();
359 reasm.feed(&payload1, true);
360 reasm.feed(&payload2, false);
361
362 let out = reasm.pop_section().expect("section should be ready");
363 assert_eq!(out.len(), 200);
364 assert_eq!(out.as_ref(), §ion[..]);
365 }
366
367 #[test]
368 fn reassembler_yields_complete_section_once_length_satisfied() {
369 let section = build_section(0x42, &[0xAA]);
371 assert_eq!(section.len(), 4);
372 let payload = build_pusi_payload(0, &[], §ion);
373
374 let mut reasm = SectionReassembler::default();
375 reasm.feed(&payload, true);
376
377 let out = reasm
378 .pop_section()
379 .expect("single-packet section should pop");
380 assert_eq!(out.as_ref(), §ion[..]);
381 }
382
383 #[test]
384 fn reassembler_discards_on_buffer_overflow() {
385 let mut section = Vec::with_capacity(3 + 4095);
389 section.push(0x00); section.push(0xB0 | ((4095u16 >> 8) as u8 & 0x0F));
391 section.push(0xFF);
392 section.extend_from_slice(&[0u8; 160]);
393 let payload1 = build_pusi_payload(0, &[], §ion);
394
395 let mut reasm = SectionReassembler::default();
396 reasm.feed(&payload1, true);
397 assert!(reasm.pop_section().is_none());
398
399 let filler = vec![0u8; 180];
401 for _ in 0..(MAX_SECTION_SIZE / 180 + 1) {
402 reasm.feed(&filler, false);
403 }
404 assert!(
405 reasm.pop_section().is_none(),
406 "no section should pop after overflow reset"
407 );
408
409 let valid_section = build_section(0x00, &[0xAA]);
411 let payload2 = build_pusi_payload(0, &[], &valid_section);
412 reasm.feed(&payload2, true);
413 let out = reasm
414 .pop_section()
415 .expect("fresh section should pop after reset");
416 assert_eq!(out.as_ref(), &valid_section[..]);
417 }
418
419 #[test]
420 fn reassembler_handles_pusi_with_nonzero_pointer_field() {
421 let prior_tail = vec![0x11, 0x22, 0x33];
423 let new_section = build_section(0x02, &[0xBB]);
424 assert_eq!(new_section.len(), 4);
425 let payload = build_pusi_payload(3, &prior_tail, &new_section);
426
427 let mut reasm = SectionReassembler::default();
428 reasm.feed(&payload, true);
429
430 let out = reasm
431 .pop_section()
432 .expect("section after pointer_field skip should pop");
433 assert_eq!(out.as_ref(), &new_section[..]);
434 }
435
436 #[test]
437 fn reassembler_ignores_continuation_before_pusi() {
438 let pkt = make_packet(0x00, 0x00, PAYLOAD_FLAG, &[0xAA, 0xBB, 0xCC]);
441
442 let mut reasm = SectionReassembler::default();
443 reasm.feed(&pkt[4..], false); assert!(
446 reasm.pop_section().is_none(),
447 "no section should appear without prior PUSI"
448 );
449 assert!(
450 reasm.pop_section().is_none(),
451 "second pop should also be none"
452 );
453 }
454
455 #[test]
458 fn reassembler_empty_pusi_payload_does_not_panic() {
459 let mut reasm = SectionReassembler::default();
460 reasm.feed(&[], true);
461 assert!(reasm.pop_section().is_none());
462 let mut payload = vec![0x00u8, 0x72, 0x70, 0x01, 0x00];
464 payload.resize(5, 0);
465 reasm.feed(&payload, true);
466 assert!(reasm.pop_section().is_some());
467 }
468
469 #[test]
473 fn reassembler_accepts_maximal_private_section() {
474 let mut section = vec![0x80u8, 0x7F, 0xFF]; section.resize(3 + 0xFFF, 0xAB);
476
477 let mut reasm = SectionReassembler::default();
478 let mut first = vec![0x00];
480 first.extend_from_slice(§ion[..183]);
481 reasm.feed(&first, true);
482 for chunk in section[183..].chunks(184) {
483 reasm.feed(chunk, false);
484 }
485 let out = reasm.pop_section().expect("4098-byte section should pop");
486 assert_eq!(out.len(), 4098);
487 assert_eq!(out.as_ref(), §ion[..]);
488 }
489}