1use anyhow::{Context, Result, bail};
8use codec::frame::{ColorSpace, PixelFormat, StreamInfo};
9
10use crate::demux::AudioTrack;
11use crate::streaming::{DemuxHeader, Sample, StreamingDemuxer};
12
13use super::{
14 ProgramInfo, PatProgram,
15 STREAM_TYPE_H264, STREAM_TYPE_HEVC, STREAM_TYPE_MPEG2_VIDEO,
16 TS_PACKET, TS_SYNC,
17};
18use super::audio::extract_ts_audio;
19use super::framerate::estimate_frame_rate_from_ptses;
20use super::pat_pmt::{parse_pat_all_programs, parse_pmt_streams};
21use super::pes::{parse_pes_header, scan_first_video_au};
22
23pub struct TsStreamingDemuxer {
39 data: Vec<u8>,
40 header: DemuxHeader,
41 audio: Option<AudioTrack>,
42 packets: usize,
43 packet_stride: usize,
44 prefix_len: usize,
45 programs: Vec<ProgramInfo>,
50 active_program_idx: usize,
53 video_pid: u16,
55 next_pkt: usize,
57 pending: Vec<u8>,
59 pending_pts: Option<u64>,
62 have_first_start: bool,
65 eof: bool,
68 pixel_format_detected: bool,
72 encrypted_drop: bool,
77}
78
79pub(crate) fn demux_ts_streaming_init(data: &[u8]) -> Result<TsStreamingDemuxer> {
80 let owned = data.to_vec();
81 let (packets, packet_stride, prefix_len) = super::detect_packet_layout(&owned)?;
82 if packets == 0 {
83 bail!("TS: file contains no TS packets");
84 }
85
86 let mut pat_programs: Vec<PatProgram> = Vec::new();
88 for i in 0..packets {
89 let start = i * packet_stride + prefix_len;
90 let pkt = &owned[start..start + TS_PACKET];
91 if pkt[0] != TS_SYNC {
92 continue;
93 }
94 let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
95 if pid == 0
96 && let Some(payload) = super::ts_psi_payload(pkt)
97 {
98 let progs = parse_pat_all_programs(payload);
99 if !progs.is_empty() {
100 pat_programs = progs;
101 break;
102 }
103 }
104 }
105 if pat_programs.is_empty() {
106 bail!("TS: no PAT entries found");
107 }
108
109 let mut programs: Vec<ProgramInfo> = pat_programs
115 .iter()
116 .map(|p| ProgramInfo {
117 program_number: p.program_number,
118 pmt_pid: p.pmt_pid,
119 video_streams: Vec::new(),
120 audio_streams: Vec::new(),
121 })
122 .collect();
123 let mut need: std::collections::HashSet<u16> =
125 pat_programs.iter().map(|p| p.pmt_pid).collect();
126 for i in 0..packets {
127 if need.is_empty() {
128 break;
129 }
130 let start = i * packet_stride + prefix_len;
131 let pkt = &owned[start..start + TS_PACKET];
132 if pkt[0] != TS_SYNC {
133 continue;
134 }
135 let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
136 if !need.contains(&pid) {
137 continue;
138 }
139 if let Some(payload) = super::ts_psi_payload(pkt)
140 && let Some((video_streams, audio_streams)) = parse_pmt_streams(payload)
141 {
142 if let Some(prog) = programs.iter_mut().find(|p| p.pmt_pid == pid) {
143 prog.video_streams = video_streams;
144 prog.audio_streams = audio_streams;
145 }
146 need.remove(&pid);
147 }
148 }
149
150 let active_program_idx = programs
154 .iter()
155 .position(|p| !p.video_streams.is_empty())
156 .context("TS: no program advertises a recognised video elementary stream")?;
157 let active = &programs[active_program_idx];
158 let video = active.video_streams[0];
159 let audio = active.audio_streams.first().copied();
160 let codec = match video.stream_type {
161 STREAM_TYPE_MPEG2_VIDEO => "mpeg2",
162 STREAM_TYPE_H264 => "h264",
163 STREAM_TYPE_HEVC => "h265",
164 other => bail!("TS: unsupported stream_type 0x{:02X}", other),
165 }
166 .to_string();
167
168 let scan = scan_first_video_au(&owned, packets, packet_stride, prefix_len, video.pid, 64);
187 let (width, height) = match &scan.first_au {
188 Some(au) => {
189 codec::pixel_format::detect_dims(&codec, std::slice::from_ref(au)).unwrap_or_else(
190 || {
191 tracing::warn!(
192 codec = codec.as_str(),
193 video_pid = video.pid,
194 "TS streaming demux: first AU SPS parse failed; width/height=0×0"
195 );
196 (0, 0)
197 },
198 )
199 }
200 None => {
201 tracing::warn!(
202 codec = codec.as_str(),
203 video_pid = video.pid,
204 "TS streaming demux: could not locate first video AU during init; width/height=0×0"
205 );
206 (0, 0)
207 }
208 };
209 let frame_rate = estimate_frame_rate_from_ptses(&scan.ptses).unwrap_or_else(|| {
210 tracing::warn!(
211 codec = codec.as_str(),
212 video_pid = video.pid,
213 pts_samples = scan.ptses.len(),
214 "TS streaming demux: could not derive frame_rate from PTS window; defaulting to 30.0"
215 );
216 30.0
217 });
218
219 let info = StreamInfo {
220 codec: codec.clone(),
221 width,
222 height,
223 frame_rate,
224 duration: 0.0,
225 pixel_format: PixelFormat::Yuv420p,
226 color_space: ColorSpace::Bt709,
227 total_frames: 0,
228 bitrate: 0,
229 color_metadata: Default::default(),
230 };
231
232 let audio_track = audio.and_then(|info| {
235 match extract_ts_audio(&owned, packets, packet_stride, prefix_len, info) {
236 Ok(track) => track,
237 Err(e) => {
238 tracing::warn!(
239 audio_pid = info.pid,
240 audio_kind = ?info.kind,
241 error = %e,
242 "TS audio extraction failed; emitting video-only"
243 );
244 None
245 }
246 }
247 });
248
249 Ok(TsStreamingDemuxer {
250 data: owned,
251 header: DemuxHeader { codec, info },
252 audio: audio_track,
253 packets,
254 packet_stride,
255 prefix_len,
256 programs,
257 active_program_idx,
258 video_pid: video.pid,
259 next_pkt: 0,
260 pending: Vec::new(),
261 pending_pts: None,
262 have_first_start: false,
263 eof: false,
264 pixel_format_detected: false,
265 encrypted_drop: false,
266 })
267}
268
269impl TsStreamingDemuxer {
270 pub fn programs(&self) -> &[ProgramInfo] {
275 &self.programs
276 }
277
278 pub fn active_program_index(&self) -> usize {
280 self.active_program_idx
281 }
282
283 pub fn select_program(&mut self, program_number: u16) -> Result<()> {
295 let new_idx = self
296 .programs
297 .iter()
298 .position(|p| p.program_number == program_number)
299 .with_context(|| format!("TS: program_number {} not found in PAT", program_number))?;
300 if self.programs[new_idx].video_streams.is_empty() {
301 bail!(
302 "TS: program {} has no recognised video stream",
303 program_number
304 );
305 }
306 let video = self.programs[new_idx].video_streams[0];
307 let audio = self.programs[new_idx].audio_streams.first().copied();
308 let codec = match video.stream_type {
309 STREAM_TYPE_MPEG2_VIDEO => "mpeg2",
310 STREAM_TYPE_H264 => "h264",
311 STREAM_TYPE_HEVC => "h265",
312 other => bail!(
313 "TS: program {} video stream_type 0x{:02X} unsupported",
314 program_number,
315 other
316 ),
317 }
318 .to_string();
319 self.active_program_idx = new_idx;
320 self.video_pid = video.pid;
321 self.header.codec = codec.clone();
324 self.header.info.codec = codec.clone();
325 self.header.info.pixel_format = PixelFormat::Yuv420p;
326 self.pixel_format_detected = false;
327 let scan = scan_first_video_au(
332 &self.data,
333 self.packets,
334 self.packet_stride,
335 self.prefix_len,
336 video.pid,
337 64,
338 );
339 let (w, h) = match &scan.first_au {
340 Some(au) => {
341 codec::pixel_format::detect_dims(&codec, std::slice::from_ref(au))
342 .unwrap_or((0, 0))
343 }
344 None => (0, 0),
345 };
346 self.header.info.width = w;
347 self.header.info.height = h;
348 self.header.info.frame_rate =
349 estimate_frame_rate_from_ptses(&scan.ptses).unwrap_or(30.0);
350 self.next_pkt = 0;
352 self.pending.clear();
353 self.pending_pts = None;
354 self.have_first_start = false;
355 self.eof = false;
356 self.encrypted_drop = false;
357 self.audio = audio.and_then(|info| {
359 match extract_ts_audio(
360 &self.data,
361 self.packets,
362 self.packet_stride,
363 self.prefix_len,
364 info,
365 ) {
366 Ok(track) => track,
367 Err(e) => {
368 tracing::warn!(
369 audio_pid = info.pid,
370 audio_kind = ?info.kind,
371 error = %e,
372 "TS audio extraction failed on program switch; emitting video-only"
373 );
374 None
375 }
376 }
377 });
378 Ok(())
379 }
380
381 fn yield_sample(&mut self, data: Vec<u8>, pts: Option<u64>) -> Sample {
385 if !self.pixel_format_detected {
386 let detected =
387 codec::pixel_format::detect(&self.header.codec, std::slice::from_ref(&data));
388 self.header.info.pixel_format = detected;
389 self.pixel_format_detected = true;
390 }
391 Sample {
392 data,
393 pts_ticks: pts.map(|p| p as i64).unwrap_or(0),
394 duration_ticks: 0,
395 }
396 }
397}
398
399impl StreamingDemuxer for TsStreamingDemuxer {
400 fn header(&self) -> &DemuxHeader {
401 &self.header
402 }
403
404 fn next_video_sample(&mut self) -> Result<Option<Sample>> {
405 if self.eof || self.encrypted_drop {
406 return Ok(None);
407 }
408 loop {
409 if self.next_pkt >= self.packets {
410 self.eof = true;
412 if !self.pending.is_empty() {
413 let data = std::mem::take(&mut self.pending);
414 let pts = self.pending_pts.take();
415 return Ok(Some(self.yield_sample(data, pts)));
416 }
417 return Ok(None);
418 }
419
420 let i = self.next_pkt;
421 self.next_pkt += 1;
422 let start = i * self.packet_stride + self.prefix_len;
423 let pkt = &self.data[start..start + TS_PACKET];
424 if pkt[0] != TS_SYNC {
425 continue;
426 }
427 let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
428 if pid != self.video_pid {
429 continue;
430 }
431 let pusi = pkt[1] & 0x40 != 0;
432 let scramble = (pkt[3] >> 6) & 0x03;
433 if scramble != 0 {
434 tracing::warn!(
441 video_pid = self.video_pid,
442 transport_scrambling_control = scramble,
443 error_kind = "encrypted_ts",
444 "encrypted TS stream; we don't carry CA tables — drop video output"
445 );
446 self.encrypted_drop = true;
447 self.pending.clear();
448 self.pending_pts = None;
449 self.have_first_start = false;
450 self.eof = true;
451 return Ok(None);
452 }
453 let adaptation = (pkt[3] >> 4) & 0x03;
454 let has_payload = adaptation & 0x01 != 0;
455 let has_adaptation = adaptation & 0x02 != 0;
456 if !has_payload {
457 continue;
458 }
459
460 let mut offset = 4usize;
461 if has_adaptation {
462 if offset >= TS_PACKET {
463 continue;
464 }
465 let adap_len = pkt[offset] as usize;
466 offset += 1 + adap_len;
467 if offset > TS_PACKET {
468 continue;
469 }
470 }
471 if offset >= TS_PACKET {
472 continue;
473 }
474 let payload = &pkt[offset..];
475
476 if pusi {
477 let had_pending = self.have_first_start;
481 let prev_data = if had_pending {
482 std::mem::take(&mut self.pending)
483 } else {
484 Vec::new()
485 };
486 let prev_pts = self.pending_pts.take();
487 self.have_first_start = true;
488
489 let Some((es_start, pts)) = parse_pes_header(payload) else {
490 self.have_first_start = false;
492 self.pending.clear();
493 if !prev_data.is_empty() {
494 return Ok(Some(self.yield_sample(prev_data, prev_pts)));
495 }
496 continue;
497 };
498 self.pending_pts = pts;
499 if es_start < payload.len() {
500 self.pending.extend_from_slice(&payload[es_start..]);
501 }
502
503 if !prev_data.is_empty() {
504 return Ok(Some(self.yield_sample(prev_data, prev_pts)));
505 }
506 } else if self.have_first_start {
509 self.pending.extend_from_slice(payload);
510 }
511 }
512 }
513
514 fn audio(&self) -> Option<&AudioTrack> {
515 self.audio.as_ref()
516 }
517}