1use std::collections::HashMap;
20
21const TS_PACKET_SIZE: usize = 188;
22const SYNC_BYTE: u8 = 0x47;
23const PAT_PID: u16 = 0;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum StreamType {
28 H264,
29 H265,
30 Aac,
31 Scte35,
35 Unknown(u8),
36}
37
38impl StreamType {
39 fn from_byte(b: u8) -> Self {
40 match b {
41 0x1B => Self::H264,
42 0x24 => Self::H265,
43 0x0F | 0x11 => Self::Aac,
44 0x86 => Self::Scte35,
45 other => Self::Unknown(other),
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
56pub struct Scte35Section {
57 pub pid: u16,
61 pub raw: Vec<u8>,
63}
64
65#[derive(Debug, Clone)]
67pub struct PesPacket {
68 pub pid: u16,
69 pub stream_type: StreamType,
70 pub pts: Option<u64>,
73 pub dts: Option<u64>,
76 pub payload: Vec<u8>,
79}
80
81#[derive(Debug)]
83struct PesBuffer {
84 stream_type: StreamType,
85 buf: Vec<u8>,
86 started: bool,
87}
88
89#[derive(Debug, Default)]
98struct SectionBuffer {
99 buf: Vec<u8>,
100 expected_len: Option<usize>,
101}
102
103#[derive(Debug)]
105pub struct TsDemuxer {
106 remainder: Vec<u8>,
109 pmt_pid: Option<u16>,
111 streams: HashMap<u16, StreamType>,
113 pes_bufs: HashMap<u16, PesBuffer>,
115 section_bufs: HashMap<u16, SectionBuffer>,
120 pending_scte35: Vec<Scte35Section>,
123}
124
125impl Default for TsDemuxer {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131impl TsDemuxer {
132 pub fn new() -> Self {
133 Self {
134 remainder: Vec::new(),
135 pmt_pid: None,
136 streams: HashMap::new(),
137 pes_bufs: HashMap::new(),
138 section_bufs: HashMap::new(),
139 pending_scte35: Vec::new(),
140 }
141 }
142
143 pub fn take_scte35_sections(&mut self) -> Vec<Scte35Section> {
155 std::mem::take(&mut self.pending_scte35)
156 }
157
158 pub fn feed(&mut self, data: &[u8]) -> Vec<PesPacket> {
163 let mut out = Vec::new();
164
165 let input = if self.remainder.is_empty() {
171 data
172 } else {
173 self.remainder.extend_from_slice(data);
174 self.process_buf(&mut out);
177 &[]
178 };
179
180 let mut pos = 0;
182 while pos < input.len() {
183 let sync_off = match input[pos..].iter().position(|&b| b == SYNC_BYTE) {
184 Some(p) => p,
185 None => break,
186 };
187 pos += sync_off;
188 if pos + TS_PACKET_SIZE > input.len() {
189 break;
190 }
191 let pkt: &[u8; TS_PACKET_SIZE] = input[pos..pos + TS_PACKET_SIZE].try_into().unwrap();
192 self.process_packet(pkt, &mut out);
193 pos += TS_PACKET_SIZE;
194 }
195
196 if pos < input.len() {
198 self.remainder.extend_from_slice(&input[pos..]);
199 }
200
201 out
202 }
203
204 fn process_buf(&mut self, out: &mut Vec<PesPacket>) {
206 let mut pos = 0;
207 while pos < self.remainder.len() {
208 let sync_off = match self.remainder[pos..].iter().position(|&b| b == SYNC_BYTE) {
209 Some(p) => p,
210 None => {
211 self.remainder.clear();
212 return;
213 }
214 };
215 pos += sync_off;
216 if pos + TS_PACKET_SIZE > self.remainder.len() {
217 break;
218 }
219 let pkt: [u8; TS_PACKET_SIZE] = self.remainder[pos..pos + TS_PACKET_SIZE].try_into().unwrap();
220 self.process_packet(&pkt, out);
221 pos += TS_PACKET_SIZE;
222 }
223 if pos > 0 {
225 self.remainder.drain(..pos);
226 }
227 }
228
229 fn process_packet(&mut self, pkt: &[u8; TS_PACKET_SIZE], out: &mut Vec<PesPacket>) {
230 let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
231 let pusi = pkt[1] & 0x40 != 0;
232 let afc = (pkt[3] >> 4) & 0x03;
233
234 let payload_offset = match afc {
235 0b01 => 4,
236 0b11 => {
237 let af_len = pkt[4] as usize;
238 5 + af_len
239 }
240 _ => return,
241 };
242 if payload_offset >= TS_PACKET_SIZE {
243 return;
244 }
245 let payload = &pkt[payload_offset..];
246
247 if pid == PAT_PID {
248 self.parse_pat(payload, pusi);
249 } else if Some(pid) == self.pmt_pid {
250 self.parse_pmt(payload, pusi);
251 } else if let Some(&st) = self.streams.get(&pid) {
252 if st == StreamType::Scte35 {
253 self.push_section(pid, payload, pusi);
254 } else {
255 self.push_pes(pid, payload, pusi, out);
256 }
257 }
258 }
259
260 fn push_section(&mut self, pid: u16, payload: &[u8], pusi: bool) {
271 let buf = self.section_bufs.entry(pid).or_default();
272 if pusi {
273 buf.buf.clear();
277 buf.expected_len = None;
278 if payload.is_empty() {
279 return;
280 }
281 let pointer = payload[0] as usize;
282 let start = 1 + pointer;
283 if start >= payload.len() {
284 return;
285 }
286 buf.buf.extend_from_slice(&payload[start..]);
287 } else {
288 if buf.buf.is_empty() && buf.expected_len.is_none() {
289 return;
292 }
293 buf.buf.extend_from_slice(payload);
294 }
295
296 if buf.expected_len.is_none() && buf.buf.len() >= 3 {
298 let section_length = (((buf.buf[1] & 0x0F) as usize) << 8) | buf.buf[2] as usize;
299 buf.expected_len = Some(3 + section_length);
300 }
301
302 while let Some(expected) = buf.expected_len {
307 if buf.buf.len() < expected {
308 break;
309 }
310 let section_bytes = buf.buf.drain(..expected).collect::<Vec<_>>();
311 self.pending_scte35.push(Scte35Section {
312 pid,
313 raw: section_bytes,
314 });
315 buf.expected_len = None;
316 if buf.buf.len() >= 3 {
317 let section_length = (((buf.buf[1] & 0x0F) as usize) << 8) | buf.buf[2] as usize;
318 buf.expected_len = Some(3 + section_length);
319 } else if buf.buf.iter().all(|&b| b == 0xFF) {
320 buf.buf.clear();
322 break;
323 }
324 }
325 }
326
327 fn parse_pat(&mut self, payload: &[u8], pusi: bool) {
328 let data = if pusi && !payload.is_empty() {
329 let pointer = payload[0] as usize;
330 if 1 + pointer >= payload.len() {
331 return;
332 }
333 &payload[1 + pointer..]
334 } else {
335 payload
336 };
337 if data.len() < 12 {
340 return;
341 }
342 let section_length = (((data[1] & 0x0F) as usize) << 8) | data[2] as usize;
343 let table_end = 3 + section_length;
344 if table_end > data.len() || section_length < 9 {
345 return;
346 }
347 let loop_end = table_end.saturating_sub(4);
349 let mut i = 8;
350 while i + 4 <= loop_end {
351 let prog_num = ((data[i] as u16) << 8) | data[i + 1] as u16;
352 let map_pid = (((data[i + 2] & 0x1F) as u16) << 8) | data[i + 3] as u16;
353 if prog_num != 0 {
354 self.pmt_pid = Some(map_pid);
355 break;
356 }
357 i += 4;
358 }
359 }
360
361 fn parse_pmt(&mut self, payload: &[u8], pusi: bool) {
362 let data = if pusi && !payload.is_empty() {
363 let pointer = payload[0] as usize;
364 if 1 + pointer >= payload.len() {
365 return;
366 }
367 &payload[1 + pointer..]
368 } else {
369 payload
370 };
371 if data.len() < 16 {
372 return;
373 }
374 let section_length = (((data[1] & 0x0F) as usize) << 8) | data[2] as usize;
375 let table_end = 3 + section_length;
376 if table_end > data.len() || section_length < 13 {
377 return;
378 }
379 let prog_info_len = (((data[10] & 0x0F) as usize) << 8) | data[11] as usize;
380 let mut i = 12 + prog_info_len;
381 let loop_end = table_end.saturating_sub(4);
382 self.streams.clear();
383 while i + 5 <= loop_end {
384 let st = data[i];
385 let es_pid = (((data[i + 1] & 0x1F) as u16) << 8) | data[i + 2] as u16;
386 let es_info_len = (((data[i + 3] & 0x0F) as usize) << 8) | data[i + 4] as usize;
387 self.streams.insert(es_pid, StreamType::from_byte(st));
388 i += 5 + es_info_len;
389 }
390 }
391
392 fn push_pes(&mut self, pid: u16, payload: &[u8], pusi: bool, out: &mut Vec<PesPacket>) {
393 let stream_type = *self.streams.get(&pid).unwrap_or(&StreamType::Unknown(0));
394
395 if pusi {
396 if let Some(buf) = self.pes_bufs.get_mut(&pid) {
397 if buf.started && !buf.buf.is_empty() {
398 if let Some(pkt) = Self::finish_pes(pid, buf) {
399 out.push(pkt);
400 }
401 }
402 }
403 let entry = self.pes_bufs.entry(pid).or_insert_with(|| PesBuffer {
404 stream_type,
405 buf: Vec::with_capacity(64 * 1024),
406 started: false,
407 });
408 entry.buf.clear();
409 entry.buf.extend_from_slice(payload);
410 entry.started = true;
411 entry.stream_type = stream_type;
412 } else if let Some(buf) = self.pes_bufs.get_mut(&pid) {
413 if buf.started {
414 buf.extend(payload);
415 }
416 }
417 }
418
419 fn finish_pes(pid: u16, buf: &mut PesBuffer) -> Option<PesPacket> {
420 let data = &buf.buf;
421 if data.len() < 9 || data[0] != 0 || data[1] != 0 || data[2] != 1 {
422 return None;
423 }
424 let pes_packet_length = ((data[4] as usize) << 8) | data[5] as usize;
425 let header_data_len = data[8] as usize;
426 let es_start = 9 + header_data_len;
427 if es_start > data.len() {
428 return None;
429 }
430 let flags = data[7];
431 let pts_flag = flags & 0x80 != 0;
432 let dts_flag = flags & 0x40 != 0;
433
434 let pts = if pts_flag && header_data_len >= 5 {
435 Some(parse_ts_timestamp(&data[9..14]))
436 } else {
437 None
438 };
439 let dts = if dts_flag && header_data_len >= 10 {
440 Some(parse_ts_timestamp(&data[14..19]))
441 } else {
442 None
443 };
444
445 let es_end = if pes_packet_length > 0 {
450 (6 + pes_packet_length).min(data.len())
451 } else {
452 data.len()
453 };
454 let payload = data[es_start..es_end].to_vec();
455 if payload.is_empty() {
456 return None;
457 }
458
459 Some(PesPacket {
460 pid,
461 stream_type: buf.stream_type,
462 pts,
463 dts,
464 payload,
465 })
466 }
467}
468
469impl PesBuffer {
470 fn extend(&mut self, data: &[u8]) {
471 self.buf.extend_from_slice(data);
472 }
473}
474
475fn parse_ts_timestamp(b: &[u8]) -> u64 {
480 let a = ((b[0] as u64 >> 1) & 0x07) << 30;
481 let bc = ((b[1] as u64) << 7 | (b[2] as u64 >> 1)) << 15;
482 let de = (b[3] as u64) << 7 | (b[4] as u64 >> 1);
483 a | bc | de
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489
490 fn make_ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> [u8; 188] {
491 let mut pkt = [0xFFu8; 188];
492 pkt[0] = SYNC_BYTE;
493 pkt[1] = if pusi { 0x40 } else { 0x00 } | ((pid >> 8) as u8 & 0x1F);
494 pkt[2] = pid as u8;
495 pkt[3] = 0x10; let copy_len = payload.len().min(184);
497 pkt[4..4 + copy_len].copy_from_slice(&payload[..copy_len]);
498 pkt
500 }
501
502 fn minimal_pat(pmt_pid: u16) -> Vec<u8> {
503 let mut data = vec![
507 0x00, 0x00, 0xB0, 0x0D, 0x00, 0x01, 0xC1, 0x00, 0x00, 0x00, 0x01, ];
515 data.push(0xE0 | ((pmt_pid >> 8) as u8 & 0x1F));
516 data.push(pmt_pid as u8);
517 data.extend_from_slice(&[0x00; 4]); data
519 }
520
521 fn minimal_pmt(video_pid: u16, audio_pid: u16) -> Vec<u8> {
522 let mut data = vec![
526 0x00, 0x02, 0xB0, 0x17, 0x00, 0x01, 0xC1, 0x00, 0x00, 0xE1, 0x00, 0xF0, 0x00, ];
535 data.push(0x1B); data.push(0xE0 | ((video_pid >> 8) as u8 & 0x1F));
538 data.push(video_pid as u8);
539 data.push(0xF0);
540 data.push(0x00); data.push(0x0F); data.push(0xE0 | ((audio_pid >> 8) as u8 & 0x1F));
544 data.push(audio_pid as u8);
545 data.push(0xF0);
546 data.push(0x00); data.extend_from_slice(&[0x00; 4]); data
549 }
550
551 fn minimal_pes(pts_90k: u64, es_payload: &[u8]) -> Vec<u8> {
552 let pes_len = (3 + 5 + es_payload.len()) as u16;
555 let mut data = vec![
556 0x00,
557 0x00,
558 0x01, 0xE0, (pes_len >> 8) as u8,
561 pes_len as u8,
562 0x80, 0x80, 0x05, ];
566 let pts = pts_90k & 0x1_FFFF_FFFF;
568 data.push(0x21 | ((pts >> 29) as u8 & 0x0E));
569 data.push((pts >> 22) as u8);
570 data.push(0x01 | ((pts >> 14) as u8 & 0xFE));
571 data.push((pts >> 7) as u8);
572 data.push(0x01 | ((pts << 1) as u8 & 0xFE));
573 data.extend_from_slice(es_payload);
574 data
575 }
576
577 #[test]
578 fn demux_discovers_streams_and_yields_pes() {
579 let mut demux = TsDemuxer::new();
580 let video_pid = 0x100;
581 let audio_pid = 0x101;
582 let pmt_pid = 0x1000;
583
584 let pat = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
586 assert!(demux.feed(&pat).is_empty());
587 assert_eq!(demux.pmt_pid, Some(pmt_pid));
588
589 let pmt = make_ts_packet(pmt_pid, true, &minimal_pmt(video_pid, audio_pid));
591 assert!(demux.feed(&pmt).is_empty());
592 assert_eq!(demux.streams.len(), 2);
593 assert_eq!(demux.streams[&video_pid], StreamType::H264);
594 assert_eq!(demux.streams[&audio_pid], StreamType::Aac);
595
596 let pes = minimal_pes(90_000, b"nalunalunalu");
598 let pkt = make_ts_packet(video_pid, true, &pes);
599 assert!(demux.feed(&pkt).is_empty());
601
602 let pes2 = minimal_pes(180_000, b"nalu2");
604 let pkt2 = make_ts_packet(video_pid, true, &pes2);
605 let packets = demux.feed(&pkt2);
606 assert_eq!(packets.len(), 1);
607 assert_eq!(packets[0].pid, video_pid);
608 assert_eq!(packets[0].stream_type, StreamType::H264);
609 assert_eq!(packets[0].pts, Some(90_000));
610 assert_eq!(packets[0].payload, b"nalunalunalu");
611 }
612
613 #[test]
614 fn sync_recovery_skips_garbage() {
615 let mut demux = TsDemuxer::new();
616 let pmt_pid = 0x1000;
617
618 let mut data = vec![0xDE, 0xAD, 0xBE, 0xEF];
620 data.extend_from_slice(&make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid)));
621 demux.feed(&data);
622 assert_eq!(demux.pmt_pid, Some(pmt_pid));
623 }
624
625 #[test]
626 fn cross_call_buffering_handles_partial_packets() {
627 let mut demux = TsDemuxer::new();
628 let pmt_pid = 0x1000;
629 let full = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
630
631 demux.feed(&full[..100]);
633 assert_eq!(demux.pmt_pid, None);
634
635 demux.feed(&full[100..]);
637 assert_eq!(demux.pmt_pid, Some(pmt_pid));
638 }
639
640 #[test]
641 fn pmt_with_scte35_pid_routes_to_section_drain() {
642 let mut demux = TsDemuxer::new();
643 let pmt_pid = 0x1000;
644 let scte35_pid = 0x1FFB;
645
646 let pat = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
648 demux.feed(&pat);
649
650 let mut pmt_payload = vec![
652 0x00, 0x02, 0xB0, 0x12, 0x00, 0x01, 0xC1, 0x00, 0x00, 0xE1, 0x00, 0xF0, 0x00,
656 ];
657 pmt_payload.push(0x86); pmt_payload.push(0xE0 | ((scte35_pid >> 8) as u8 & 0x1F));
659 pmt_payload.push(scte35_pid as u8);
660 pmt_payload.push(0xF0);
661 pmt_payload.push(0x00);
662 pmt_payload.extend_from_slice(&[0x00; 4]); let pmt = make_ts_packet(pmt_pid, true, &pmt_payload);
664 demux.feed(&pmt);
665
666 assert_eq!(demux.streams.get(&scte35_pid), Some(&StreamType::Scte35));
667
668 let section_body_len: usize = 17; let mut section = vec![
673 0xFCu8,
674 0x30 | ((section_body_len >> 8) as u8 & 0x0F),
675 section_body_len as u8,
676 ];
677 section.extend_from_slice(&vec![0x00u8; section_body_len]);
678
679 let mut payload = vec![0u8]; payload.extend_from_slice(§ion);
683 let pkt = make_ts_packet(scte35_pid, true, &payload);
684 let pes = demux.feed(&pkt);
685 assert!(pes.is_empty(), "SCTE-35 PIDs do not yield PES packets");
686
687 let drained = demux.take_scte35_sections();
688 assert_eq!(drained.len(), 1, "one section drained");
689 assert_eq!(drained[0].pid, scte35_pid);
690 assert_eq!(&drained[0].raw[..], §ion[..]);
691
692 assert!(demux.take_scte35_sections().is_empty());
694 }
695
696 #[test]
697 fn parse_ts_timestamp_round_trips() {
698 let pts: u64 = 123_456_789;
699 let mut buf = [0u8; 5];
700 buf[0] = 0x21 | ((pts >> 29) as u8 & 0x0E);
701 buf[1] = (pts >> 22) as u8;
702 buf[2] = 0x01 | ((pts >> 14) as u8 & 0xFE);
703 buf[3] = (pts >> 7) as u8;
704 buf[4] = 0x01 | ((pts << 1) as u8 & 0xFE);
705 assert_eq!(parse_ts_timestamp(&buf), pts);
706 }
707}