Skip to main content

mpeg_ts/
pusi.rs

1//! PUSI-delimited payload reassembler — generic non-PSI PID payload accumulation.
2//!
3//! In MPEG-2 TS, a PID carrying packetised data that is **not** PSI section
4//! data uses `payload_unit_start_indicator` (PUSI) to delimit units: the
5//! packet whose PUSI == `1` starts a new unit. This is the mechanism used by
6//! ISO/IEC 23009-1 §5.10.3.3.5 to carry a DASH `emsg` box on reserved PID
7//! `0x0004` — the `emsg` box begins in the PUSI packet's payload, continues
8//! across zero or more non-PUSI packets on the same PID, and its last packet
9//! is padded with **adaptation-field stuffing** (not payload stuffing), so
10//! the concatenation of the PID's payload bytes across a PUSI-delimited run
11//! equals the box bytes exactly.
12//!
13//! [`PusiReassembler`] implements this generic reassembly: feed TS payload
14//! bytes together with the PID and PUSI flag; it returns
15//! `Option<Vec<u8>>` when a unit completes. This type is not `emsg`-specific
16//! — it works for any PUSI-delimited non-PSI PID payload.
17
18use alloc::vec::Vec;
19
20/// Generic PUSI-delimited payload reassembler.
21///
22/// Accumulates payload bytes across consecutive TS packets sharing the same
23/// PID, using `payload_unit_start_indicator` to delimit unit boundaries.
24///
25/// # Example
26///
27/// ```ignore
28/// let mut reassembler = PusiReassembler::new(0x0004);
29///
30/// // Feed pusi=true first packet
31/// if let Some(unit) = reassembler.push(0x0004, true, &first_payload) {
32///     // unit is complete (would only happen if a second PUSI follows)
33/// }
34///
35/// // Feed continuation
36/// reassembler.push(0x0004, false, &cont_payload);
37///
38/// // Drain the final unit
39/// if let Some(unit) = reassembler.flush() {
40///     // process complete unit
41/// }
42/// ```
43#[derive(Debug, Clone)]
44pub struct PusiReassembler {
45    /// The PID we are listening to. Packets with a different PID are ignored.
46    pid: u16,
47    /// Accumulated payload bytes for the current in-progress unit.
48    buf: Vec<u8>,
49    /// `true` once at least one byte has been appended.
50    started: bool,
51}
52
53impl PusiReassembler {
54    /// Create a new reassembler for the given `pid`.
55    #[inline]
56    pub fn new(pid: u16) -> Self {
57        Self {
58            pid,
59            buf: Vec::new(),
60            started: false,
61        }
62    }
63
64    /// Feed one TS packet's (pid, payload_unit_start_indicator, payload bytes).
65    ///
66    /// Packets whose `pid != self.pid` are silently ignored (returns `None`).
67    ///
68    /// **PUSI semantics**: When `pusi == true`, the packet marks the start of a
69    /// *new* unit. If a unit was already in progress (i.e. a prior PUSI-start
70    /// was never closed by a following PUSI), the *in-progress* unit is
71    /// **complete** — it is returned as `Some(unit_bytes)`, and a fresh unit
72    /// begins with this packet's payload.
73    ///
74    /// When `pusi == false`, the payload is appended to the in-progress unit.
75    ///
76    /// Returns `Some(Vec<u8>)` when a completed unit is emitted; `None`
77    /// otherwise.
78    pub fn push(&mut self, pid: u16, pusi: bool, payload: &[u8]) -> Option<Vec<u8>> {
79        if pid != self.pid {
80            return None;
81        }
82
83        if pusi {
84            // A new unit begins. If we already had accumulated data, that old
85            // unit is complete — return it.
86            if self.started {
87                let completed = core::mem::take(&mut self.buf);
88                // Start fresh with this packet's payload.
89                self.buf.extend_from_slice(payload);
90                return Some(completed);
91            }
92
93            // First PUSI — just start accumulating.
94            self.started = true;
95            self.buf.extend_from_slice(payload);
96            return None;
97        }
98
99        // Non-PUSI: append to the in-progress unit.
100        self.buf.extend_from_slice(payload);
101        None
102    }
103
104    /// Return any in-progress (final) unit; the caller drains this at end of
105    /// the PID's data (the last real box has no following PUSI to close it).
106    ///
107    /// Returns `None` when no data has been accumulated.
108    pub fn flush(&mut self) -> Option<Vec<u8>> {
109        if self.started && !self.buf.is_empty() {
110            self.started = false;
111            Some(core::mem::take(&mut self.buf))
112        } else {
113            None
114        }
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    /// A synthetic "box" larger than a single TS payload (184 bytes).
123    /// We use a valid big-endian size prefix so the test is realistic.
124    const BIG_BOX_SIZE: u32 = 400;
125    /// Build a synthetic box with a valid 4-byte big-endian size and 'test' type.
126    fn synthetic_box_bytes(size: u32) -> Vec<u8> {
127        let n = size as usize;
128        let mut bytes = Vec::with_capacity(n);
129        bytes.extend_from_slice(&size.to_be_bytes()); // size
130        bytes.extend_from_slice(b"test"); // type
131                                          // Fill the rest (starting at offset 8) with a counter pattern.
132        for i in 8..n {
133            bytes.push((i & 0xFF) as u8);
134        }
135        debug_assert_eq!(bytes.len(), n);
136        bytes
137    }
138
139    #[test]
140    fn spanning_across_two_packets() {
141        let box_bytes = synthetic_box_bytes(BIG_BOX_SIZE);
142        let pid = 0x0004u16;
143
144        // Split at a boundary that fits in a single TS payload (≤ 184 bytes).
145        let chunk1 = &box_bytes[..184];
146        let chunk2 = &box_bytes[184..];
147
148        let mut reasm = PusiReassembler::new(pid);
149
150        // Push first chunk with PUSI = true.
151        assert!(reasm.push(pid, true, chunk1).is_none());
152
153        // Push second chunk (continuation).
154        assert!(reasm.push(pid, false, chunk2).is_none());
155
156        // Flush and verify.
157        let reassembled = reasm.flush().expect("flush should return the unit");
158        assert_eq!(reassembled, box_bytes);
159    }
160
161    #[test]
162    fn boundary_two_units() {
163        let unit_a = b"AAAA-first-unit-data";
164        let unit_b = b"BBBB-second-unit-data";
165        let pid = 0x0004u16;
166
167        let mut reasm = PusiReassembler::new(pid);
168
169        // Push unit A (PUSI start).
170        assert!(reasm.push(pid, true, unit_a).is_none());
171
172        // Push unit B (PUSI start) — this should close unit A and return it.
173        let closed = reasm.push(pid, true, unit_b);
174        assert_eq!(closed.as_deref(), Some(unit_a.as_slice()));
175
176        // Flush should return unit B.
177        let flushed = reasm.flush();
178        assert_eq!(flushed.as_deref(), Some(unit_b.as_slice()));
179    }
180
181    #[test]
182    fn different_pid_ignored() {
183        let pid = 0x0004u16;
184        let mut reasm = PusiReassembler::new(pid);
185
186        // Packet with different PID should be ignored.
187        assert!(reasm.push(0x0100, true, b"ignored").is_none());
188        assert!(reasm.push(0x0100, false, b"more-ignored").is_none());
189        assert!(reasm.flush().is_none());
190
191        // Now feed a packet on our PID.
192        assert!(reasm.push(pid, true, b"real-data").is_none());
193        let flushed = reasm.flush();
194        assert_eq!(flushed.as_deref(), Some(b"real-data".as_slice()));
195    }
196
197    #[test]
198    fn flush_empty_returns_none() {
199        let mut reasm = PusiReassembler::new(0x0004);
200        assert!(reasm.flush().is_none());
201    }
202
203    #[test]
204    fn single_packet_unit() {
205        let pid = 0x0004u16;
206        let mut reasm = PusiReassembler::new(pid);
207
208        assert!(reasm.push(pid, true, b"single-packet-emsg").is_none());
209        let flushed = reasm.flush();
210        assert_eq!(flushed.as_deref(), Some(b"single-packet-emsg".as_slice()));
211    }
212}