Skip to main content

container/ts/
streaming.rs

1//! Streaming MPEG-TS demuxer (`TsStreamingDemuxer`).
2//!
3//! Holds the PES reassembly buffer for exactly one in-flight access unit;
4//! yields a sample whenever a PUSI=1 packet closes the current one (or at
5//! EOF for the final pending sample).
6
7use anyhow::{Context, Result, bail};
8use codec::frame::{ColorSpace, PixelFormat, StreamInfo};
9
10use crate::demux::AudioTrack;
11use crate::streaming::{DemuxHeader, Sample, StreamingDemuxer};
12
13use super::{
14    ProgramInfo, PatProgram,
15    STREAM_TYPE_H264, STREAM_TYPE_HEVC, STREAM_TYPE_MPEG2_VIDEO,
16    TS_PACKET, TS_SYNC,
17};
18use super::audio::extract_ts_audio;
19use super::framerate::estimate_frame_rate_from_ptses;
20use super::pat_pmt::{parse_pat_all_programs, parse_pmt_streams};
21use super::pes::{parse_pes_header, scan_first_video_au};
22
23/// Streaming MPEG-TS demuxer. Holds the PES reassembly buffer for one
24/// in-flight access unit only — yields whenever a PUSI=1 packet
25/// closes the current sample (or at EOF for the final pending sample).
26///
27/// Squad-37 added:
28/// - **Multi-program awareness**: `programs()` returns every program
29///   the PAT advertised plus their PMT contents; `select_program()`
30///   switches the active video PID + audio extraction to a different
31///   program. Default behaviour is unchanged (first program with a
32///   recognised video stream wins).
33/// - **Encrypted-stream guard**: if any packet on the active video PID
34///   carries `transport_scrambling_control != 0`, we log a one-time
35///   typed warn ("encrypted TS stream; we don't carry CA tables —
36///   drop video output") and switch to a "drop everything" mode where
37///   `next_video_sample` returns `Ok(None)` without further parsing.
38pub struct TsStreamingDemuxer {
39    data: Vec<u8>,
40    header: DemuxHeader,
41    audio: Option<AudioTrack>,
42    packets: usize,
43    packet_stride: usize,
44    prefix_len: usize,
45    /// Every program the PAT advertised, in PAT order. Each entry's
46    /// PMT was walked at init to populate its video/audio stream lists.
47    /// Programs whose PMT we couldn't parse are still listed (with
48    /// empty video_streams/audio_streams) so callers see them.
49    programs: Vec<ProgramInfo>,
50    /// Index into `programs` of the currently active program. Default:
51    /// the first program with a recognised video stream.
52    active_program_idx: usize,
53    /// Active video PID (mirrors `programs[active_program_idx].video_streams[0].pid`).
54    video_pid: u16,
55    /// Index of the next packet to scan.
56    next_pkt: usize,
57    /// In-flight PES payload — emptied & yielded on next PUSI.
58    pending: Vec<u8>,
59    /// PTS attached to `pending` (PTS lives in the PES header that
60    /// opened the AU).
61    pending_pts: Option<u64>,
62    /// True once we've seen the first PUSI for our PID. Bytes before
63    /// the first PUSI are dropped (mid-stream join semantics).
64    have_first_start: bool,
65    /// True after we've returned `Ok(None)` once — guards against
66    /// repeated drains.
67    eof: bool,
68    /// Lazily set on first emitted sample — `pixel_format::detect` is
69    /// one-shot against `samples[0]` so we patch `header.info.pixel_format`
70    /// in place once and skip the probe thereafter.
71    pixel_format_detected: bool,
72    /// Encrypted-stream guard (Squad-37). Latches `true` the first time
73    /// we see `transport_scrambling_control != 0` on the active video
74    /// PID; warning is logged exactly once and `next_video_sample`
75    /// returns `Ok(None)` from that point on.
76    encrypted_drop: bool,
77}
78
79pub(crate) fn demux_ts_streaming_init(data: &[u8]) -> Result<TsStreamingDemuxer> {
80    let owned = data.to_vec();
81    let (packets, packet_stride, prefix_len) = super::detect_packet_layout(&owned)?;
82    if packets == 0 {
83        bail!("TS: file contains no TS packets");
84    }
85
86    // Phase 1: walk the PAT and collect every program + its PMT PID.
87    let mut pat_programs: Vec<PatProgram> = Vec::new();
88    for i in 0..packets {
89        let start = i * packet_stride + prefix_len;
90        let pkt = &owned[start..start + TS_PACKET];
91        if pkt[0] != TS_SYNC {
92            continue;
93        }
94        let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
95        if pid == 0
96            && let Some(payload) = super::ts_psi_payload(pkt)
97        {
98            let progs = parse_pat_all_programs(payload);
99            if !progs.is_empty() {
100                pat_programs = progs;
101                break;
102            }
103        }
104    }
105    if pat_programs.is_empty() {
106        bail!("TS: no PAT entries found");
107    }
108
109    // Phase 2: walk every PMT and resolve its video+audio streams. We
110    // remember the FIRST PMT section we see per PID — later versions
111    // (table_id 0x02 with a higher `version_number`) would update an
112    // active session in a real-world receiver but our demuxer is
113    // start-of-file-only, so first-section semantics are correct.
114    let mut programs: Vec<ProgramInfo> = pat_programs
115        .iter()
116        .map(|p| ProgramInfo {
117            program_number: p.program_number,
118            pmt_pid: p.pmt_pid,
119            video_streams: Vec::new(),
120            audio_streams: Vec::new(),
121        })
122        .collect();
123    // Track which programs still need their PMT parsed.
124    let mut need: std::collections::HashSet<u16> =
125        pat_programs.iter().map(|p| p.pmt_pid).collect();
126    for i in 0..packets {
127        if need.is_empty() {
128            break;
129        }
130        let start = i * packet_stride + prefix_len;
131        let pkt = &owned[start..start + TS_PACKET];
132        if pkt[0] != TS_SYNC {
133            continue;
134        }
135        let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
136        if !need.contains(&pid) {
137            continue;
138        }
139        if let Some(payload) = super::ts_psi_payload(pkt)
140            && let Some((video_streams, audio_streams)) = parse_pmt_streams(payload)
141        {
142            if let Some(prog) = programs.iter_mut().find(|p| p.pmt_pid == pid) {
143                prog.video_streams = video_streams;
144                prog.audio_streams = audio_streams;
145            }
146            need.remove(&pid);
147        }
148    }
149
150    // Phase 3: pick the default active program — first one with a
151    // recognised video stream. Matches legacy "first program wins"
152    // semantics for single-program files.
153    let active_program_idx = programs
154        .iter()
155        .position(|p| !p.video_streams.is_empty())
156        .context("TS: no program advertises a recognised video elementary stream")?;
157    let active = &programs[active_program_idx];
158    let video = active.video_streams[0];
159    let audio = active.audio_streams.first().copied();
160    let codec = match video.stream_type {
161        STREAM_TYPE_MPEG2_VIDEO => "mpeg2",
162        STREAM_TYPE_H264 => "h264",
163        STREAM_TYPE_HEVC => "h265",
164        other => bail!("TS: unsupported stream_type 0x{:02X}", other),
165    }
166    .to_string();
167
168    // total_frames + duration are unknown until drained.
169    //
170    // width/height recovery: TS carries nothing at the container layer,
171    // so we walk just enough packets to capture the first video AU and
172    // parse its SPS (H.264 / HEVC) or sequence header (MPEG-2). This
173    // has to happen during init — `header()` is read by the pipeline
174    // before any `next_video_sample` call, and the rav1e encoder
175    // rejects 0×0 configs outright. Parse failure is non-fatal: we
176    // warn and leave dims at 0 so the failure surfaces in the encoder
177    // config error rather than silently corrupting the output.
178    //
179    // frame_rate: same scan collects a window of video-PID PTSes (up
180    // to 64 PUSIs). Inter-PTS span over (count-1) intervals at the
181    // 90 kHz TS clock gives the source fps. A wrong frame_rate here
182    // causes exactly the kind of "video sped up, audio drags" sync
183    // symptom that the BBB 24 fps sample hit against the previous
184    // hardcoded `30.0` fallback. Falls back to `30.0` only when the
185    // scan can't derive a finite fps in [1.0, 240.0].
186    let scan = scan_first_video_au(&owned, packets, packet_stride, prefix_len, video.pid, 64);
187    let (width, height) = match &scan.first_au {
188        Some(au) => {
189            codec::pixel_format::detect_dims(&codec, std::slice::from_ref(au)).unwrap_or_else(
190                || {
191                    tracing::warn!(
192                        codec = codec.as_str(),
193                        video_pid = video.pid,
194                        "TS streaming demux: first AU SPS parse failed; width/height=0×0"
195                    );
196                    (0, 0)
197                },
198            )
199        }
200        None => {
201            tracing::warn!(
202                codec = codec.as_str(),
203                video_pid = video.pid,
204                "TS streaming demux: could not locate first video AU during init; width/height=0×0"
205            );
206            (0, 0)
207        }
208    };
209    let frame_rate = estimate_frame_rate_from_ptses(&scan.ptses).unwrap_or_else(|| {
210        tracing::warn!(
211            codec = codec.as_str(),
212            video_pid = video.pid,
213            pts_samples = scan.ptses.len(),
214            "TS streaming demux: could not derive frame_rate from PTS window; defaulting to 30.0"
215        );
216        30.0
217    });
218
219    let info = StreamInfo {
220        codec: codec.clone(),
221        width,
222        height,
223        frame_rate,
224        duration: 0.0,
225        pixel_format: PixelFormat::Yuv420p,
226        color_space: ColorSpace::Bt709,
227        total_frames: 0,
228        bitrate: 0,
229        color_metadata: Default::default(),
230    };
231
232    // Audio passthrough still happens up-front (Squad-18 contract).
233    // Squad-37 routes by codec kind (AAC / AC-3 / E-AC-3).
234    let audio_track = audio.and_then(|info| {
235        match extract_ts_audio(&owned, packets, packet_stride, prefix_len, info) {
236            Ok(track) => track,
237            Err(e) => {
238                tracing::warn!(
239                    audio_pid = info.pid,
240                    audio_kind = ?info.kind,
241                    error = %e,
242                    "TS audio extraction failed; emitting video-only"
243                );
244                None
245            }
246        }
247    });
248
249    Ok(TsStreamingDemuxer {
250        data: owned,
251        header: DemuxHeader { codec, info },
252        audio: audio_track,
253        packets,
254        packet_stride,
255        prefix_len,
256        programs,
257        active_program_idx,
258        video_pid: video.pid,
259        next_pkt: 0,
260        pending: Vec::new(),
261        pending_pts: None,
262        have_first_start: false,
263        eof: false,
264        pixel_format_detected: false,
265        encrypted_drop: false,
266    })
267}
268
269impl TsStreamingDemuxer {
270    /// Every program the PAT advertised, in PAT order. Squad-37 multi-
271    /// program API — useful for callers that want to enumerate channels
272    /// in a multi-program transport (DVB / ATSC broadcast capture). For
273    /// single-program files the slice has length 1.
274    pub fn programs(&self) -> &[ProgramInfo] {
275        &self.programs
276    }
277
278    /// Index of the currently active program (within `programs()`).
279    pub fn active_program_index(&self) -> usize {
280        self.active_program_idx
281    }
282
283    /// Switch the active program by PMT-side `program_number`. Resets the
284    /// per-AU walk state (pending PES bytes, PTS, encrypted-drop guard,
285    /// pixel-format detection) so the next `next_video_sample` call
286    /// starts cleanly on the new video PID. Returns `Ok(())` on success
287    /// or an error if `program_number` is not in `programs()` or the
288    /// chosen program has no recognised video stream.
289    ///
290    /// Audio is re-extracted from the new program's first audio stream
291    /// (if any). For single-program files (the common case) callers
292    /// don't need to touch this; the constructor already picked program
293    /// 0 by default.
294    pub fn select_program(&mut self, program_number: u16) -> Result<()> {
295        let new_idx = self
296            .programs
297            .iter()
298            .position(|p| p.program_number == program_number)
299            .with_context(|| format!("TS: program_number {} not found in PAT", program_number))?;
300        if self.programs[new_idx].video_streams.is_empty() {
301            bail!(
302                "TS: program {} has no recognised video stream",
303                program_number
304            );
305        }
306        let video = self.programs[new_idx].video_streams[0];
307        let audio = self.programs[new_idx].audio_streams.first().copied();
308        let codec = match video.stream_type {
309            STREAM_TYPE_MPEG2_VIDEO => "mpeg2",
310            STREAM_TYPE_H264 => "h264",
311            STREAM_TYPE_HEVC => "h265",
312            other => bail!(
313                "TS: program {} video stream_type 0x{:02X} unsupported",
314                program_number,
315                other
316            ),
317        }
318        .to_string();
319        self.active_program_idx = new_idx;
320        self.video_pid = video.pid;
321        // Refresh the codec / pixel-format fields on the cached header
322        // — `info.codec` flows out of `header()` to the pipeline.
323        self.header.codec = codec.clone();
324        self.header.info.codec = codec.clone();
325        self.header.info.pixel_format = PixelFormat::Yuv420p;
326        self.pixel_format_detected = false;
327        // Re-probe width/height + frame_rate from the new program's
328        // video PID. Zero dims / 30 fps fallback on parse failure so
329        // the encoder reports the miss rather than silently using the
330        // previous program's values.
331        let scan = scan_first_video_au(
332            &self.data,
333            self.packets,
334            self.packet_stride,
335            self.prefix_len,
336            video.pid,
337            64,
338        );
339        let (w, h) = match &scan.first_au {
340            Some(au) => {
341                codec::pixel_format::detect_dims(&codec, std::slice::from_ref(au))
342                    .unwrap_or((0, 0))
343            }
344            None => (0, 0),
345        };
346        self.header.info.width = w;
347        self.header.info.height = h;
348        self.header.info.frame_rate =
349            estimate_frame_rate_from_ptses(&scan.ptses).unwrap_or(30.0);
350        // Reset PES walk state.
351        self.next_pkt = 0;
352        self.pending.clear();
353        self.pending_pts = None;
354        self.have_first_start = false;
355        self.eof = false;
356        self.encrypted_drop = false;
357        // Re-extract audio from the new program's first audio stream.
358        self.audio = audio.and_then(|info| {
359            match extract_ts_audio(
360                &self.data,
361                self.packets,
362                self.packet_stride,
363                self.prefix_len,
364                info,
365            ) {
366                Ok(track) => track,
367                Err(e) => {
368                    tracing::warn!(
369                        audio_pid = info.pid,
370                        audio_kind = ?info.kind,
371                        error = %e,
372                        "TS audio extraction failed on program switch; emitting video-only"
373                    );
374                    None
375                }
376            }
377        });
378        Ok(())
379    }
380
381    /// Build a Sample from raw AU bytes, applying the one-shot
382    /// pixel_format detection on the first emission. Centralises the
383    /// three yield sites in `next_video_sample`.
384    fn yield_sample(&mut self, data: Vec<u8>, pts: Option<u64>) -> Sample {
385        if !self.pixel_format_detected {
386            let detected =
387                codec::pixel_format::detect(&self.header.codec, std::slice::from_ref(&data));
388            self.header.info.pixel_format = detected;
389            self.pixel_format_detected = true;
390        }
391        Sample {
392            data,
393            pts_ticks: pts.map(|p| p as i64).unwrap_or(0),
394            duration_ticks: 0,
395        }
396    }
397}
398
399impl StreamingDemuxer for TsStreamingDemuxer {
400    fn header(&self) -> &DemuxHeader {
401        &self.header
402    }
403
404    fn next_video_sample(&mut self) -> Result<Option<Sample>> {
405        if self.eof || self.encrypted_drop {
406            return Ok(None);
407        }
408        loop {
409            if self.next_pkt >= self.packets {
410                // Drain the final pending sample at EOF.
411                self.eof = true;
412                if !self.pending.is_empty() {
413                    let data = std::mem::take(&mut self.pending);
414                    let pts = self.pending_pts.take();
415                    return Ok(Some(self.yield_sample(data, pts)));
416                }
417                return Ok(None);
418            }
419
420            let i = self.next_pkt;
421            self.next_pkt += 1;
422            let start = i * self.packet_stride + self.prefix_len;
423            let pkt = &self.data[start..start + TS_PACKET];
424            if pkt[0] != TS_SYNC {
425                continue;
426            }
427            let pid = (((pkt[1] & 0x1F) as u16) << 8) | pkt[2] as u16;
428            if pid != self.video_pid {
429                continue;
430            }
431            let pusi = pkt[1] & 0x40 != 0;
432            let scramble = (pkt[3] >> 6) & 0x03;
433            if scramble != 0 {
434                // Encrypted-stream guard (Squad-37). The first scrambled
435                // packet on the active video PID triggers a one-time
436                // typed warn and flips us into drop-everything mode —
437                // any further `next_video_sample` calls return
438                // `Ok(None)` immediately. We don't carry CA tables, so
439                // any byte we feed downstream from here is garbage.
440                tracing::warn!(
441                    video_pid = self.video_pid,
442                    transport_scrambling_control = scramble,
443                    error_kind = "encrypted_ts",
444                    "encrypted TS stream; we don't carry CA tables — drop video output"
445                );
446                self.encrypted_drop = true;
447                self.pending.clear();
448                self.pending_pts = None;
449                self.have_first_start = false;
450                self.eof = true;
451                return Ok(None);
452            }
453            let adaptation = (pkt[3] >> 4) & 0x03;
454            let has_payload = adaptation & 0x01 != 0;
455            let has_adaptation = adaptation & 0x02 != 0;
456            if !has_payload {
457                continue;
458            }
459
460            let mut offset = 4usize;
461            if has_adaptation {
462                if offset >= TS_PACKET {
463                    continue;
464                }
465                let adap_len = pkt[offset] as usize;
466                offset += 1 + adap_len;
467                if offset > TS_PACKET {
468                    continue;
469                }
470            }
471            if offset >= TS_PACKET {
472                continue;
473            }
474            let payload = &pkt[offset..];
475
476            if pusi {
477                // PUSI flushes the previous AU. If we already had one
478                // in-flight, return it now and stage the new one for
479                // the next call.
480                let had_pending = self.have_first_start;
481                let prev_data = if had_pending {
482                    std::mem::take(&mut self.pending)
483                } else {
484                    Vec::new()
485                };
486                let prev_pts = self.pending_pts.take();
487                self.have_first_start = true;
488
489                let Some((es_start, pts)) = parse_pes_header(payload) else {
490                    // Malformed PES — drop state, keep walking.
491                    self.have_first_start = false;
492                    self.pending.clear();
493                    if !prev_data.is_empty() {
494                        return Ok(Some(self.yield_sample(prev_data, prev_pts)));
495                    }
496                    continue;
497                };
498                self.pending_pts = pts;
499                if es_start < payload.len() {
500                    self.pending.extend_from_slice(&payload[es_start..]);
501                }
502
503                if !prev_data.is_empty() {
504                    return Ok(Some(self.yield_sample(prev_data, prev_pts)));
505                }
506                // No previous AU to yield — keep walking until the next
507                // PUSI (or EOF).
508            } else if self.have_first_start {
509                self.pending.extend_from_slice(payload);
510            }
511        }
512    }
513
514    fn audio(&self) -> Option<&AudioTrack> {
515        self.audio.as_ref()
516    }
517}