1use std::collections::HashMap;
20
21const TS_PACKET_SIZE: usize = 188;
22const SYNC_BYTE: u8 = 0x47;
23const PAT_PID: u16 = 0;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum StreamType {
28 H264,
29 H265,
30 Aac,
31 Scte35,
35 Unknown(u8),
36}
37
38impl StreamType {
39 fn from_byte(b: u8) -> Self {
40 match b {
41 0x1B => Self::H264,
42 0x24 => Self::H265,
43 0x0F | 0x11 => Self::Aac,
44 0x86 => Self::Scte35,
45 other => Self::Unknown(other),
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
56pub struct Scte35Section {
57 pub pid: u16,
61 pub raw: Vec<u8>,
63}
64
65#[derive(Debug, Clone)]
67pub struct PesPacket {
68 pub pid: u16,
69 pub stream_type: StreamType,
70 pub pts: Option<u64>,
73 pub dts: Option<u64>,
76 pub payload: Vec<u8>,
79}
80
81#[derive(Debug)]
83struct PesBuffer {
84 stream_type: StreamType,
85 buf: Vec<u8>,
86 started: bool,
87}
88
89#[derive(Debug, Default)]
98struct SectionBuffer {
99 buf: Vec<u8>,
100 expected_len: Option<usize>,
101}
102
103#[derive(Debug)]
105pub struct TsDemuxer {
106 remainder: Vec<u8>,
109 pmt_pid: Option<u16>,
111 streams: HashMap<u16, StreamType>,
113 pes_bufs: HashMap<u16, PesBuffer>,
115 section_bufs: HashMap<u16, SectionBuffer>,
120 pending_scte35: Vec<Scte35Section>,
123}
124
125impl Default for TsDemuxer {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131impl TsDemuxer {
132 pub fn new() -> Self {
133 Self {
134 remainder: Vec::new(),
135 pmt_pid: None,
136 streams: HashMap::new(),
137 pes_bufs: HashMap::new(),
138 section_bufs: HashMap::new(),
139 pending_scte35: Vec::new(),
140 }
141 }
142
143 pub fn take_scte35_sections(&mut self) -> Vec<Scte35Section> {
155 std::mem::take(&mut self.pending_scte35)
156 }
157
158 pub fn feed(&mut self, data: &[u8]) -> Vec<PesPacket> {
163 let mut out = Vec::new();
164
165 let input = if self.remainder.is_empty() {
171 data
172 } else {
173 self.remainder.extend_from_slice(data);
174 self.process_buf(&mut out);
177 &[]
178 };
179
180 let mut pos = 0;
182 while pos < input.len() {
183 let sync_off = match input[pos..].iter().position(|&b| b == SYNC_BYTE) {
184 Some(p) => p,
185 None => break,
186 };
187 pos += sync_off;
188 if pos + TS_PACKET_SIZE > input.len() {
189 break;
190 }
191 let pkt: &[u8; TS_PACKET_SIZE] = input[pos..pos + TS_PACKET_SIZE].try_into().unwrap();
192 self.process_packet(pkt, &mut out);
193 pos += TS_PACKET_SIZE;
194 }
195
196 if pos < input.len() {
198 self.remainder.extend_from_slice(&input[pos..]);
199 }
200
201 out
202 }
203
204 fn process_buf(&mut self, out: &mut Vec<PesPacket>) {
206 let mut pos = 0;
207 while pos < self.remainder.len() {
208 let sync_off = match self.remainder[pos..].iter().position(|&b| b == SYNC_BYTE) {
209 Some(p) => p,
210 None => {
211 self.remainder.clear();
212 return;
213 }
214 };
215 pos += sync_off;
216 if pos + TS_PACKET_SIZE > self.remainder.len() {
217 break;
218 }
219 let pkt: [u8; TS_PACKET_SIZE] = self.remainder[pos..pos + TS_PACKET_SIZE].try_into().unwrap();
220 self.process_packet(&pkt, out);
221 pos += TS_PACKET_SIZE;
222 }
223 if pos > 0 {
225 self.remainder.drain(..pos);
226 }
227 }
228
229 fn process_packet(&mut self, pkt: &[u8; TS_PACKET_SIZE], out: &mut Vec<PesPacket>) {
230 let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
231 let pusi = pkt[1] & 0x40 != 0;
232 let afc = (pkt[3] >> 4) & 0x03;
233
234 let payload_offset = match afc {
235 0b01 => 4,
236 0b11 => {
237 let af_len = pkt[4] as usize;
238 5 + af_len
239 }
240 _ => return,
241 };
242 if payload_offset >= TS_PACKET_SIZE {
243 return;
244 }
245 let payload = &pkt[payload_offset..];
246
247 if pid == PAT_PID {
248 self.parse_pat(payload, pusi);
249 } else if Some(pid) == self.pmt_pid {
250 self.parse_pmt(payload, pusi);
251 } else if let Some(&st) = self.streams.get(&pid) {
252 if st == StreamType::Scte35 {
253 self.push_section(pid, payload, pusi);
254 } else {
255 self.push_pes(pid, payload, pusi, out);
256 }
257 }
258 }
259
260 fn push_section(&mut self, pid: u16, payload: &[u8], pusi: bool) {
271 let buf = self.section_bufs.entry(pid).or_default();
272 if pusi {
273 buf.buf.clear();
277 buf.expected_len = None;
278 if payload.is_empty() {
279 return;
280 }
281 let pointer = payload[0] as usize;
282 let start = 1 + pointer;
283 if start >= payload.len() {
284 return;
285 }
286 buf.buf.extend_from_slice(&payload[start..]);
287 } else {
288 if buf.buf.is_empty() && buf.expected_len.is_none() {
289 return;
292 }
293 buf.buf.extend_from_slice(payload);
294 }
295
296 if buf.expected_len.is_none() && buf.buf.len() >= 3 {
298 let section_length = (((buf.buf[1] & 0x0F) as usize) << 8) | buf.buf[2] as usize;
299 buf.expected_len = Some(3 + section_length);
300 }
301
302 while let Some(expected) = buf.expected_len {
307 if buf.buf.len() < expected {
308 break;
309 }
310 let section_bytes = buf.buf.drain(..expected).collect::<Vec<_>>();
311 self.pending_scte35.push(Scte35Section {
312 pid,
313 raw: section_bytes,
314 });
315 buf.expected_len = None;
316 if buf.buf.len() >= 3 {
317 let section_length = (((buf.buf[1] & 0x0F) as usize) << 8) | buf.buf[2] as usize;
318 buf.expected_len = Some(3 + section_length);
319 } else if buf.buf.iter().all(|&b| b == 0xFF) {
320 buf.buf.clear();
322 break;
323 }
324 }
325 }
326
327 fn parse_pat(&mut self, payload: &[u8], pusi: bool) {
328 let data = if pusi && !payload.is_empty() {
329 let pointer = payload[0] as usize;
330 if 1 + pointer >= payload.len() {
331 return;
332 }
333 &payload[1 + pointer..]
334 } else {
335 payload
336 };
337 if data.len() < 12 {
340 return;
341 }
342 let section_length = (((data[1] & 0x0F) as usize) << 8) | data[2] as usize;
343 let table_end = 3 + section_length;
344 if table_end > data.len() || section_length < 9 {
345 return;
346 }
347 let loop_end = table_end.saturating_sub(4);
349 let mut i = 8;
350 while i + 4 <= loop_end {
351 let prog_num = ((data[i] as u16) << 8) | data[i + 1] as u16;
352 let map_pid = (((data[i + 2] & 0x1F) as u16) << 8) | data[i + 3] as u16;
353 if prog_num != 0 {
354 self.pmt_pid = Some(map_pid);
355 break;
356 }
357 i += 4;
358 }
359 }
360
361 fn parse_pmt(&mut self, payload: &[u8], pusi: bool) {
362 let data = if pusi && !payload.is_empty() {
363 let pointer = payload[0] as usize;
364 if 1 + pointer >= payload.len() {
365 return;
366 }
367 &payload[1 + pointer..]
368 } else {
369 payload
370 };
371 if data.len() < 16 {
372 return;
373 }
374 let section_length = (((data[1] & 0x0F) as usize) << 8) | data[2] as usize;
375 let table_end = 3 + section_length;
376 if table_end > data.len() || section_length < 13 {
377 return;
378 }
379 let prog_info_len = (((data[10] & 0x0F) as usize) << 8) | data[11] as usize;
380 let mut i = 12 + prog_info_len;
381 let loop_end = table_end.saturating_sub(4);
382 self.streams.clear();
383 while i + 5 <= loop_end {
384 let st = data[i];
385 let es_pid = (((data[i + 1] & 0x1F) as u16) << 8) | data[i + 2] as u16;
386 let es_info_len = (((data[i + 3] & 0x0F) as usize) << 8) | data[i + 4] as usize;
387 self.streams.insert(es_pid, StreamType::from_byte(st));
388 i += 5 + es_info_len;
389 }
390 }
391
392 fn push_pes(&mut self, pid: u16, payload: &[u8], pusi: bool, out: &mut Vec<PesPacket>) {
393 let stream_type = *self.streams.get(&pid).unwrap_or(&StreamType::Unknown(0));
394
395 if pusi {
396 if let Some(buf) = self.pes_bufs.get_mut(&pid) {
397 if buf.started && !buf.buf.is_empty() {
398 if let Some(pkt) = Self::finish_pes(pid, buf) {
399 out.push(pkt);
400 }
401 }
402 }
403 let entry = self.pes_bufs.entry(pid).or_insert_with(|| PesBuffer {
404 stream_type,
405 buf: Vec::with_capacity(64 * 1024),
406 started: false,
407 });
408 entry.buf.clear();
409 entry.buf.extend_from_slice(payload);
410 entry.started = true;
411 entry.stream_type = stream_type;
412 } else if let Some(buf) = self.pes_bufs.get_mut(&pid) {
413 if buf.started {
414 buf.extend(payload);
415 }
416 }
417 }
418
419 fn finish_pes(pid: u16, buf: &mut PesBuffer) -> Option<PesPacket> {
420 let data = &buf.buf;
421 if data.len() < 9 || data[0] != 0 || data[1] != 0 || data[2] != 1 {
422 return None;
423 }
424 let pes_packet_length = ((data[4] as usize) << 8) | data[5] as usize;
425 let header_data_len = data[8] as usize;
426 let es_start = 9 + header_data_len;
427 if es_start > data.len() {
428 return None;
429 }
430 let flags = data[7];
431 let pts_flag = flags & 0x80 != 0;
432 let dts_flag = flags & 0x40 != 0;
433
434 let pts = if pts_flag && header_data_len >= 5 {
435 Some(parse_ts_timestamp(&data[9..14]))
436 } else {
437 None
438 };
439 let dts = if dts_flag && header_data_len >= 10 {
440 Some(parse_ts_timestamp(&data[14..19]))
441 } else {
442 None
443 };
444
445 let es_end = if pes_packet_length > 0 {
450 (6 + pes_packet_length).min(data.len())
451 } else {
452 data.len()
453 };
454 if es_start > es_end {
459 return None;
460 }
461 let payload = data[es_start..es_end].to_vec();
462 if payload.is_empty() {
463 return None;
464 }
465
466 Some(PesPacket {
467 pid,
468 stream_type: buf.stream_type,
469 pts,
470 dts,
471 payload,
472 })
473 }
474}
475
476impl PesBuffer {
477 fn extend(&mut self, data: &[u8]) {
478 self.buf.extend_from_slice(data);
479 }
480}
481
482fn parse_ts_timestamp(b: &[u8]) -> u64 {
487 let a = ((b[0] as u64 >> 1) & 0x07) << 30;
488 let bc = ((b[1] as u64) << 7 | (b[2] as u64 >> 1)) << 15;
489 let de = (b[3] as u64) << 7 | (b[4] as u64 >> 1);
490 a | bc | de
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496
497 fn make_ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> [u8; 188] {
498 let mut pkt = [0xFFu8; 188];
499 pkt[0] = SYNC_BYTE;
500 pkt[1] = if pusi { 0x40 } else { 0x00 } | ((pid >> 8) as u8 & 0x1F);
501 pkt[2] = pid as u8;
502 pkt[3] = 0x10; let copy_len = payload.len().min(184);
504 pkt[4..4 + copy_len].copy_from_slice(&payload[..copy_len]);
505 pkt
507 }
508
509 fn minimal_pat(pmt_pid: u16) -> Vec<u8> {
510 let mut data = vec![
514 0x00, 0x00, 0xB0, 0x0D, 0x00, 0x01, 0xC1, 0x00, 0x00, 0x00, 0x01, ];
522 data.push(0xE0 | ((pmt_pid >> 8) as u8 & 0x1F));
523 data.push(pmt_pid as u8);
524 data.extend_from_slice(&[0x00; 4]); data
526 }
527
528 fn minimal_pmt(video_pid: u16, audio_pid: u16) -> Vec<u8> {
529 let mut data = vec![
533 0x00, 0x02, 0xB0, 0x17, 0x00, 0x01, 0xC1, 0x00, 0x00, 0xE1, 0x00, 0xF0, 0x00, ];
542 data.push(0x1B); data.push(0xE0 | ((video_pid >> 8) as u8 & 0x1F));
545 data.push(video_pid as u8);
546 data.push(0xF0);
547 data.push(0x00); data.push(0x0F); data.push(0xE0 | ((audio_pid >> 8) as u8 & 0x1F));
551 data.push(audio_pid as u8);
552 data.push(0xF0);
553 data.push(0x00); data.extend_from_slice(&[0x00; 4]); data
556 }
557
558 fn minimal_pes(pts_90k: u64, es_payload: &[u8]) -> Vec<u8> {
559 let pes_len = (3 + 5 + es_payload.len()) as u16;
562 let mut data = vec![
563 0x00,
564 0x00,
565 0x01, 0xE0, (pes_len >> 8) as u8,
568 pes_len as u8,
569 0x80, 0x80, 0x05, ];
573 let pts = pts_90k & 0x1_FFFF_FFFF;
575 data.push(0x21 | ((pts >> 29) as u8 & 0x0E));
576 data.push((pts >> 22) as u8);
577 data.push(0x01 | ((pts >> 14) as u8 & 0xFE));
578 data.push((pts >> 7) as u8);
579 data.push(0x01 | ((pts << 1) as u8 & 0xFE));
580 data.extend_from_slice(es_payload);
581 data
582 }
583
584 #[test]
585 fn demux_discovers_streams_and_yields_pes() {
586 let mut demux = TsDemuxer::new();
587 let video_pid = 0x100;
588 let audio_pid = 0x101;
589 let pmt_pid = 0x1000;
590
591 let pat = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
593 assert!(demux.feed(&pat).is_empty());
594 assert_eq!(demux.pmt_pid, Some(pmt_pid));
595
596 let pmt = make_ts_packet(pmt_pid, true, &minimal_pmt(video_pid, audio_pid));
598 assert!(demux.feed(&pmt).is_empty());
599 assert_eq!(demux.streams.len(), 2);
600 assert_eq!(demux.streams[&video_pid], StreamType::H264);
601 assert_eq!(demux.streams[&audio_pid], StreamType::Aac);
602
603 let pes = minimal_pes(90_000, b"nalunalunalu");
605 let pkt = make_ts_packet(video_pid, true, &pes);
606 assert!(demux.feed(&pkt).is_empty());
608
609 let pes2 = minimal_pes(180_000, b"nalu2");
611 let pkt2 = make_ts_packet(video_pid, true, &pes2);
612 let packets = demux.feed(&pkt2);
613 assert_eq!(packets.len(), 1);
614 assert_eq!(packets[0].pid, video_pid);
615 assert_eq!(packets[0].stream_type, StreamType::H264);
616 assert_eq!(packets[0].pts, Some(90_000));
617 assert_eq!(packets[0].payload, b"nalunalunalu");
618 }
619
620 #[test]
621 fn malformed_pes_packet_length_does_not_panic() {
622 let mut demux = TsDemuxer::new();
627 let video_pid = 0x100;
628 let pmt_pid = 0x1000;
629 demux.feed(&make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid)));
630 demux.feed(&make_ts_packet(pmt_pid, true, &minimal_pmt(video_pid, 0x101)));
631
632 let mut pes = vec![
635 0x00, 0x00, 0x01, 0xE0, 0x00, 0x23, 0x80, 0x00, 0x23, ];
642 pes.resize(44, 0xFF); assert!(demux.feed(&make_ts_packet(video_pid, true, &pes)).is_empty());
644
645 let flushed = demux.feed(&make_ts_packet(video_pid, true, &minimal_pes(90_000, b"ok")));
648 assert!(flushed.is_empty(), "malformed PES must be dropped, not yielded");
649 }
650
651 #[test]
652 fn sync_recovery_skips_garbage() {
653 let mut demux = TsDemuxer::new();
654 let pmt_pid = 0x1000;
655
656 let mut data = vec![0xDE, 0xAD, 0xBE, 0xEF];
658 data.extend_from_slice(&make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid)));
659 demux.feed(&data);
660 assert_eq!(demux.pmt_pid, Some(pmt_pid));
661 }
662
663 #[test]
664 fn cross_call_buffering_handles_partial_packets() {
665 let mut demux = TsDemuxer::new();
666 let pmt_pid = 0x1000;
667 let full = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
668
669 demux.feed(&full[..100]);
671 assert_eq!(demux.pmt_pid, None);
672
673 demux.feed(&full[100..]);
675 assert_eq!(demux.pmt_pid, Some(pmt_pid));
676 }
677
678 #[test]
679 fn pmt_with_scte35_pid_routes_to_section_drain() {
680 let mut demux = TsDemuxer::new();
681 let pmt_pid = 0x1000;
682 let scte35_pid = 0x1FFB;
683
684 let pat = make_ts_packet(PAT_PID, true, &minimal_pat(pmt_pid));
686 demux.feed(&pat);
687
688 let mut pmt_payload = vec![
690 0x00, 0x02, 0xB0, 0x12, 0x00, 0x01, 0xC1, 0x00, 0x00, 0xE1, 0x00, 0xF0, 0x00,
694 ];
695 pmt_payload.push(0x86); pmt_payload.push(0xE0 | ((scte35_pid >> 8) as u8 & 0x1F));
697 pmt_payload.push(scte35_pid as u8);
698 pmt_payload.push(0xF0);
699 pmt_payload.push(0x00);
700 pmt_payload.extend_from_slice(&[0x00; 4]); let pmt = make_ts_packet(pmt_pid, true, &pmt_payload);
702 demux.feed(&pmt);
703
704 assert_eq!(demux.streams.get(&scte35_pid), Some(&StreamType::Scte35));
705
706 let section_body_len: usize = 17; let mut section = vec![
711 0xFCu8,
712 0x30 | ((section_body_len >> 8) as u8 & 0x0F),
713 section_body_len as u8,
714 ];
715 section.extend_from_slice(&vec![0x00u8; section_body_len]);
716
717 let mut payload = vec![0u8]; payload.extend_from_slice(§ion);
721 let pkt = make_ts_packet(scte35_pid, true, &payload);
722 let pes = demux.feed(&pkt);
723 assert!(pes.is_empty(), "SCTE-35 PIDs do not yield PES packets");
724
725 let drained = demux.take_scte35_sections();
726 assert_eq!(drained.len(), 1, "one section drained");
727 assert_eq!(drained[0].pid, scte35_pid);
728 assert_eq!(&drained[0].raw[..], §ion[..]);
729
730 assert!(demux.take_scte35_sections().is_empty());
732 }
733
734 #[test]
735 fn parse_ts_timestamp_round_trips() {
736 let pts: u64 = 123_456_789;
737 let mut buf = [0u8; 5];
738 buf[0] = 0x21 | ((pts >> 29) as u8 & 0x0E);
739 buf[1] = (pts >> 22) as u8;
740 buf[2] = 0x01 | ((pts >> 14) as u8 & 0xFE);
741 buf[3] = (pts >> 7) as u8;
742 buf[4] = 0x01 | ((pts << 1) as u8 & 0xFE);
743 assert_eq!(parse_ts_timestamp(&buf), pts);
744 }
745}