Skip to main content

mpeg_pes/
assembler.rs

1//! Per-PID PES reassembly from TS payloads.
2//!
3//! In PES-over-TS there is no `pointer_field`: a TS packet with
4//! `payload_unit_start_indicator = 1` *begins* a PES packet, and continuation
5//! packets (`PUSI = 0`) append to it. A PES therefore runs from one PUSI to the
6//! next (the unbounded-video case, `PES_packet_length = 0`, is handled the same
7//! way — flushed when the next unit starts or at end of stream).
8
9use alloc::vec::Vec;
10
11/// Reassembles PES packets for a single PID from successive TS payloads.
12///
13/// Feed each TS packet's payload with its `payload_unit_start_indicator`;
14/// [`feed`](Self::feed) returns the **previous** completed PES's bytes when a new
15/// unit starts. Call [`flush`](Self::flush) at end of stream for the last one.
16/// The returned `Vec<u8>` is ready for [`crate::PesPacket::parse`].
17#[derive(Debug, Default)]
18pub struct PesAssembler {
19    buf: Vec<u8>,
20    started: bool,
21}
22
23impl PesAssembler {
24    /// New, empty assembler.
25    #[must_use]
26    pub fn new() -> Self {
27        Self::default()
28    }
29
30    /// Feed one TS packet's payload for this PID.
31    ///
32    /// `payload_unit_start` is the packet's `payload_unit_start_indicator`.
33    /// Returns the bytes of the now-complete previous PES packet, if any.
34    #[must_use]
35    pub fn feed(&mut self, payload_unit_start: bool, payload: &[u8]) -> Option<Vec<u8>> {
36        if payload_unit_start {
37            let completed = if self.started && !self.buf.is_empty() {
38                Some(core::mem::take(&mut self.buf))
39            } else {
40                None
41            };
42            self.started = true;
43            self.buf.extend_from_slice(payload);
44            completed
45        } else {
46            // Continuation: only meaningful once a unit has started.
47            if self.started {
48                self.buf.extend_from_slice(payload);
49            }
50            None
51        }
52    }
53
54    /// Take the final buffered PES at end of stream, if any.
55    #[must_use]
56    pub fn flush(&mut self) -> Option<Vec<u8>> {
57        self.started = false;
58        if self.buf.is_empty() {
59            None
60        } else {
61            Some(core::mem::take(&mut self.buf))
62        }
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69    use crate::PesPacket;
70
71    #[test]
72    fn reassembles_across_packets_and_flushes() {
73        let mut a = PesAssembler::new();
74        // PES #1 split over 2 TS payloads.
75        assert_eq!(
76            a.feed(true, &[0x00, 0x00, 0x01, 0xE0, 0x00, 0x00, 0x80]),
77            None
78        );
79        assert_eq!(
80            a.feed(false, &[0x80, 0x05, 0x21, 0x00, 0x01, 0x00, 0x01]),
81            None
82        );
83        assert_eq!(a.feed(false, &[0xAA, 0xBB]), None);
84        // PES #2 starts → #1 emitted.
85        let first = a
86            .feed(
87                true,
88                &[0x00, 0x00, 0x01, 0xC0, 0x00, 0x00, 0x80, 0x00, 0x00, 0x11],
89            )
90            .expect("first PES emitted on next unit start");
91        let p1 = PesPacket::parse(&first).unwrap();
92        assert!(p1.stream_id.is_video());
93        assert_eq!(p1.payload, &[0xAA, 0xBB]);
94        // flush → #2.
95        let second = a.flush().expect("second PES flushed");
96        let p2 = PesPacket::parse(&second).unwrap();
97        assert!(p2.stream_id.is_audio());
98        assert!(a.flush().is_none());
99    }
100
101    #[test]
102    fn ignores_continuation_before_first_start() {
103        let mut a = PesAssembler::new();
104        assert_eq!(a.feed(false, &[0xDE, 0xAD]), None); // mid-stream join
105        assert!(a.flush().is_none());
106    }
107}