1use crate::CodecId;
15use bytes::Bytes;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum TsTrackKind {
20 Video,
22 Audio,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct TsPayload {
29 pub data: Bytes,
32 pub codec: CodecId,
34 pub kind: TsTrackKind,
36 pub pts_ms: i64,
38 pub keyframe: bool,
40}
41
42#[derive(Debug)]
44struct Track {
45 pid: u16,
47 codec: CodecId,
49 kind: TsTrackKind,
51 pes: Vec<u8>,
53 pts: i64,
55 open: bool,
57}
58
59impl Track {
60 fn new(pid: u16, codec: CodecId, kind: TsTrackKind) -> Self {
61 Self {
62 pid,
63 codec,
64 kind,
65 pes: Vec::new(),
66 pts: 0,
67 open: false,
68 }
69 }
70
71 fn feed(&mut self, payload: &[u8], pusi: bool, out: &mut Vec<TsPayload>) {
73 if pusi {
74 self.flush(out);
75 if let Some((pts, es_offset)) = parse_pes_header(payload) {
76 self.pts = pts;
77 self.open = true;
78 self.pes.extend_from_slice(&payload[es_offset..]);
79 }
80 } else if self.open {
81 self.pes.extend_from_slice(payload);
82 }
83 }
84
85 fn flush(&mut self, out: &mut Vec<TsPayload>) {
87 if !self.open || self.pes.is_empty() {
88 self.pes.clear();
89 self.open = false;
90 return;
91 }
92 let es = std::mem::take(&mut self.pes);
93 let keyframe = matches!(self.kind, TsTrackKind::Video) && is_keyframe(&es, self.codec);
94 out.push(TsPayload {
95 data: Bytes::from(es),
96 codec: self.codec,
97 kind: self.kind,
98 pts_ms: self.pts / 90,
99 keyframe,
100 });
101 self.open = false;
102 }
103}
104
105const TS_PACKET_LEN: usize = 188;
106const TS_SYNC: u8 = 0x47;
107
108#[derive(Debug)]
110pub struct TsDemuxer {
111 pmt_pid: Option<u16>,
113 video: Option<Track>,
115 audio: Option<Track>,
117 carry: Vec<u8>,
119}
120
121impl Default for TsDemuxer {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127impl TsDemuxer {
128 pub fn new() -> Self {
130 Self {
131 pmt_pid: None,
132 video: None,
133 audio: None,
134 carry: Vec::new(),
135 }
136 }
137
138 pub fn push(&mut self, bytes: &[u8]) -> Vec<TsPayload> {
140 let mut out = Vec::new();
141 let mut data = std::mem::take(&mut self.carry);
143 data.extend_from_slice(bytes);
144
145 let mut i = 0;
146 while i + TS_PACKET_LEN <= data.len() {
147 let pkt = &data[i..i + TS_PACKET_LEN];
148 if pkt[0] == TS_SYNC {
149 self.handle_packet(pkt, &mut out);
150 i += TS_PACKET_LEN;
151 } else {
152 i += 1;
154 }
155 }
156 self.carry = data[i..].to_vec();
158 out
159 }
160
161 fn handle_packet(&mut self, pkt: &[u8], out: &mut Vec<TsPayload>) {
163 let pusi = pkt[1] & 0x40 != 0;
164 let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
165 let adaptation = (pkt[3] >> 4) & 0x03;
166 let has_payload = adaptation == 1 || adaptation == 3;
167 if !has_payload {
168 return;
169 }
170 let mut payload_start = 4;
172 if adaptation == 3 {
173 let af_len = pkt[4] as usize;
174 payload_start = 5 + af_len;
175 }
176 if payload_start >= TS_PACKET_LEN {
177 return;
178 }
179 let payload = &pkt[payload_start..];
180
181 if pid == 0 {
182 self.parse_pat(payload, pusi);
183 } else if Some(pid) == self.pmt_pid {
184 self.parse_pmt(payload, pusi);
185 } else if let Some(track) = self
186 .video
187 .as_mut()
188 .filter(|t| t.pid == pid)
189 .or_else(|| self.audio.as_mut().filter(|t| t.pid == pid))
190 {
191 track.feed(payload, pusi, out);
192 }
193 }
194
195 fn parse_pat(&mut self, payload: &[u8], pusi: bool) {
197 let section = section_body(payload, pusi);
198 let Some(section) = section else { return };
199 let mut i = 8;
201 while i + 4 <= section.len().saturating_sub(4) {
202 let program = u16::from_be_bytes([section[i], section[i + 1]]);
203 let pid = (((section[i + 2] & 0x1F) as u16) << 8) | section[i + 3] as u16;
204 if program != 0 {
205 self.pmt_pid = Some(pid);
206 return;
207 }
208 i += 4;
209 }
210 }
211
212 fn parse_pmt(&mut self, payload: &[u8], pusi: bool) {
214 let Some(section) = section_body(payload, pusi) else {
215 return;
216 };
217 if section.len() < 12 {
218 return;
219 }
220 let program_info_len = (((section[10] & 0x0F) as usize) << 8) | section[11] as usize;
221 let mut i = 12 + program_info_len;
222 while i + 5 <= section.len().saturating_sub(4) {
223 let stream_type = section[i];
224 let pid = (((section[i + 1] & 0x1F) as u16) << 8) | section[i + 2] as u16;
225 let es_info_len = (((section[i + 3] & 0x0F) as usize) << 8) | section[i + 4] as usize;
226 match stream_type_to_track(stream_type) {
227 Some((codec, TsTrackKind::Video)) if self.video.is_none() => {
228 self.video = Some(Track::new(pid, codec, TsTrackKind::Video));
229 }
230 Some((codec, TsTrackKind::Audio)) if self.audio.is_none() => {
231 self.audio = Some(Track::new(pid, codec, TsTrackKind::Audio));
232 }
233 _ => {}
234 }
235 i += 5 + es_info_len;
236 }
237 }
238}
239
240fn stream_type_to_track(stream_type: u8) -> Option<(CodecId, TsTrackKind)> {
242 match stream_type {
243 0x1B => Some((CodecId::H264, TsTrackKind::Video)),
244 0x24 => Some((CodecId::H265, TsTrackKind::Video)),
245 0x0F | 0x11 => Some((CodecId::AAC, TsTrackKind::Audio)), 0x03 | 0x04 => Some((CodecId::MP3, TsTrackKind::Audio)),
247 _ => None,
248 }
249}
250
251fn section_body(payload: &[u8], pusi: bool) -> Option<&[u8]> {
254 if pusi {
255 let pointer = *payload.first()? as usize;
256 payload.get(1 + pointer..)
257 } else {
258 Some(payload)
259 }
260}
261
262fn parse_pes_header(p: &[u8]) -> Option<(i64, usize)> {
264 if p.len() < 9 || p[0] != 0 || p[1] != 0 || p[2] != 1 {
266 return None;
267 }
268 let header_data_len = p[8] as usize;
269 let es_offset = (9 + header_data_len).min(p.len());
273 let pts_dts_flags = p[7] >> 6;
274 let pts = if pts_dts_flags & 0x02 != 0 && p.len() >= 14 {
275 let b = &p[9..14];
277 (((b[0] as i64 >> 1) & 0x07) << 30)
278 | ((b[1] as i64) << 22)
279 | (((b[2] as i64 >> 1) & 0x7F) << 15)
280 | ((b[3] as i64) << 7)
281 | ((b[4] as i64 >> 1) & 0x7F)
282 } else {
283 0
284 };
285 Some((pts, es_offset))
286}
287
288fn is_keyframe(es: &[u8], codec: CodecId) -> bool {
290 let mut i = 0;
291 while i + 4 < es.len() {
292 let sc3 = es[i] == 0 && es[i + 1] == 0 && es[i + 2] == 1;
294 let sc4 = es[i] == 0 && es[i + 1] == 0 && es[i + 2] == 0 && es[i + 3] == 1;
295 if sc3 || sc4 {
296 let nal_off = if sc4 { i + 4 } else { i + 3 };
297 if let Some(&hdr) = es.get(nal_off) {
298 match codec {
299 CodecId::H264 if hdr & 0x1F == 5 => return true,
300 CodecId::H265 if (16..=21).contains(&((hdr >> 1) & 0x3F)) => return true,
301 _ => {}
302 }
303 }
304 i = nal_off;
305 } else {
306 i += 1;
307 }
308 }
309 false
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 fn ts_packet(pid: u16, pusi: bool, payload: &[u8]) -> Vec<u8> {
318 let mut pkt = vec![0u8; TS_PACKET_LEN];
319 pkt[0] = TS_SYNC;
320 pkt[1] = if pusi { 0x40 } else { 0 } | ((pid >> 8) as u8 & 0x1F);
321 pkt[2] = (pid & 0xFF) as u8;
322 pkt[3] = 0x10; let n = payload.len().min(TS_PACKET_LEN - 4);
324 pkt[4..4 + n].copy_from_slice(&payload[..n]);
325 pkt
326 }
327
328 fn pat() -> Vec<u8> {
330 let mut sec = vec![0u8]; sec.extend_from_slice(&[0x00, 0xB0, 0x0D, 0, 0, 0xC1, 0, 0]);
333 sec.extend_from_slice(&[0x00, 0x01]); sec.extend_from_slice(&[0xE0 | 0x10, 0x00]); sec.extend_from_slice(&[0, 0, 0, 0]); ts_packet(0, true, &sec)
337 }
338
339 fn pmt_with(audio: bool) -> Vec<u8> {
342 let mut sec = vec![0u8]; sec.extend_from_slice(&[0x02, 0xB0, 0x12, 0, 0x01, 0xC1, 0, 0]);
345 sec.extend_from_slice(&[0xE1, 0x00]); sec.extend_from_slice(&[0xF0, 0x00]); sec.extend_from_slice(&[0x1B, 0xE1, 0x00, 0xF0, 0x00]); if audio {
349 sec.extend_from_slice(&[0x0F, 0xE1, 0x01, 0xF0, 0x00]); }
351 sec.extend_from_slice(&[0, 0, 0, 0]); ts_packet(0x1000, true, &sec)
353 }
354
355 fn pmt() -> Vec<u8> {
356 pmt_with(false)
357 }
358
359 fn pes_on(pid: u16, stream_id: u8, es: &[u8], pts: i64) -> Vec<u8> {
361 let mut p = vec![0x00, 0x00, 0x01, stream_id, 0x00, 0x00, 0x80, 0x80, 0x05];
362 let pts = pts as u64;
364 p.push((0x21 | (((pts >> 30) & 0x07) << 1)) as u8);
365 p.push(((pts >> 22) & 0xFF) as u8);
366 p.push((0x01 | (((pts >> 15) & 0x7F) << 1)) as u8);
367 p.push(((pts >> 7) & 0xFF) as u8);
368 p.push((0x01 | ((pts & 0x7F) << 1)) as u8);
369 p.extend_from_slice(es);
370 ts_packet(pid, true, &p)
371 }
372
373 fn video_pes(es: &[u8], pts: i64) -> Vec<u8> {
375 pes_on(0x0100, 0xE0, es, pts)
376 }
377
378 fn audio_pes(es: &[u8], pts: i64) -> Vec<u8> {
380 pes_on(0x0101, 0xC0, es, pts)
381 }
382
383 #[test]
384 fn pes_header_decodes_pts() {
385 let pes = video_pes(&[], 90_000);
387 let (pts, _off) = parse_pes_header(&pes[4..]).unwrap();
388 assert_eq!(pts, 90_000);
389 }
390
391 #[test]
392 fn keyframe_detection_h264_idr() {
393 let idr = [0, 0, 0, 1, 0x65, 0xAA];
394 assert!(is_keyframe(&idr, CodecId::H264));
395 let non_idr = [0, 0, 0, 1, 0x41, 0xAA];
396 assert!(!is_keyframe(&non_idr, CodecId::H264));
397 }
398
399 #[test]
400 fn full_chain_pat_pmt_pes_emits_access_unit() {
401 let mut d = TsDemuxer::new();
402 assert!(d.push(&pat()).is_empty());
403 assert!(d.push(&pmt()).is_empty());
404 assert_eq!(d.video.as_ref().unwrap().pid, 0x0100);
405 assert_eq!(d.video.as_ref().unwrap().codec, CodecId::H264);
406
407 let idr = [0, 0, 0, 1, 0x65, 0x11, 0x22];
409 assert!(d.push(&video_pes(&idr, 9000)).is_empty());
410 let delta = [0, 0, 0, 1, 0x41, 0x33];
411 let out = d.push(&video_pes(&delta, 12000));
412 assert_eq!(out.len(), 1);
413 assert_eq!(out[0].codec, CodecId::H264);
414 assert_eq!(out[0].kind, TsTrackKind::Video);
415 assert_eq!(out[0].pts_ms, 100); assert!(out[0].keyframe);
417 assert!(out[0].data.starts_with(&idr));
420 }
421
422 #[test]
423 fn carries_partial_packet_across_pushes() {
424 let mut d = TsDemuxer::new();
425 let p = pat();
426 assert!(d.push(&p[..100]).is_empty());
428 assert!(d.push(&p[100..]).is_empty());
429 d.push(&pmt());
431 assert_eq!(d.video.as_ref().unwrap().pid, 0x0100);
432 }
433
434 #[test]
435 fn demuxes_audio_track_alongside_video() {
436 let mut d = TsDemuxer::new();
437 d.push(&pat());
438 d.push(&pmt_with(true));
439 assert_eq!(d.audio.as_ref().unwrap().pid, 0x0101);
441 assert_eq!(d.audio.as_ref().unwrap().codec, CodecId::AAC);
442
443 let adts = [0xFF, 0xF1, 0x4C, 0x80, 0x01, 0x23];
445 assert!(d.push(&audio_pes(&adts, 18000)).is_empty());
446 let out = d.push(&audio_pes(&[0xFF, 0xF1, 0x00], 19000));
447 assert_eq!(out.len(), 1);
448 let au = &out[0];
449 assert_eq!(au.kind, TsTrackKind::Audio);
450 assert_eq!(au.codec, CodecId::AAC);
451 assert!(!au.keyframe, "audio access units are never keyframes");
452 assert_eq!(au.pts_ms, 200); assert!(au.data.starts_with(&adts));
454 }
455
456 #[test]
457 fn pes_header_with_oversized_declared_length_is_clamped() {
458 let p = [0x00, 0x00, 0x01, 0xE0, 0x00, 0x00, 0x80, 0x00, 0xFF, 0xAA];
461 let (_pts, es_offset) = parse_pes_header(&p).unwrap();
462 assert_eq!(es_offset, p.len(), "offset clamped to payload length");
463 let _ = &p[es_offset..];
465 }
466
467 #[test]
468 fn demuxer_survives_oversized_pes_header() {
469 let mut d = TsDemuxer::new();
472 d.push(&pat());
473 d.push(&pmt());
474 let mut pes = vec![0x00, 0x00, 0x01, 0xE0, 0x00, 0x00, 0x80, 0x00, 0xFF];
475 pes.extend_from_slice(&[0x11, 0x22]); let _ = d.push(&ts_packet(0x0100, true, &pes));
478 }
479
480 #[test]
481 fn audio_only_stream_type_maps_to_aac() {
482 assert_eq!(
483 stream_type_to_track(0x0F),
484 Some((CodecId::AAC, TsTrackKind::Audio))
485 );
486 assert_eq!(
487 stream_type_to_track(0x03),
488 Some((CodecId::MP3, TsTrackKind::Audio))
489 );
490 assert!(stream_type_to_track(0x99).is_none());
491 }
492}