mpeg_ts/pusi.rs
1//! PUSI-delimited payload reassembler — generic non-PSI PID payload accumulation.
2//!
3//! In MPEG-2 TS, a PID carrying packetised data that is **not** PSI section
4//! data uses `payload_unit_start_indicator` (PUSI) to delimit units: the
5//! packet whose PUSI == `1` starts a new unit. This is the mechanism used by
6//! ISO/IEC 23009-1 §5.10.3.3.5 to carry a DASH `emsg` box on reserved PID
7//! `0x0004` — the `emsg` box begins in the PUSI packet's payload, continues
8//! across zero or more non-PUSI packets on the same PID, and its last packet
9//! is padded with **adaptation-field stuffing** (not payload stuffing), so
10//! the concatenation of the PID's payload bytes across a PUSI-delimited run
11//! equals the box bytes exactly.
12//!
13//! [`PusiReassembler`] implements this generic reassembly: feed TS payload
14//! bytes together with the PID and PUSI flag; it returns
15//! `Option<Vec<u8>>` when a unit completes. This type is not `emsg`-specific
16//! — it works for any PUSI-delimited non-PSI PID payload.
17
18use alloc::vec::Vec;
19
20/// Generic PUSI-delimited payload reassembler.
21///
22/// Accumulates payload bytes across consecutive TS packets sharing the same
23/// PID, using `payload_unit_start_indicator` to delimit unit boundaries.
24///
25/// # Example
26///
27/// ```ignore
28/// let mut reassembler = PusiReassembler::new(0x0004);
29///
30/// // Feed pusi=true first packet
31/// if let Some(unit) = reassembler.push(0x0004, true, &first_payload) {
32/// // unit is complete (would only happen if a second PUSI follows)
33/// }
34///
35/// // Feed continuation
36/// reassembler.push(0x0004, false, &cont_payload);
37///
38/// // Drain the final unit
39/// if let Some(unit) = reassembler.flush() {
40/// // process complete unit
41/// }
42/// ```
43#[derive(Debug, Clone)]
44pub struct PusiReassembler {
45 /// The PID we are listening to. Packets with a different PID are ignored.
46 pid: u16,
47 /// Accumulated payload bytes for the current in-progress unit.
48 buf: Vec<u8>,
49 /// `true` once at least one byte has been appended.
50 started: bool,
51}
52
53impl PusiReassembler {
54 /// Create a new reassembler for the given `pid`.
55 #[inline]
56 pub fn new(pid: u16) -> Self {
57 Self {
58 pid,
59 buf: Vec::new(),
60 started: false,
61 }
62 }
63
64 /// Feed one TS packet's (pid, payload_unit_start_indicator, payload bytes).
65 ///
66 /// Packets whose `pid != self.pid` are silently ignored (returns `None`).
67 ///
68 /// **PUSI semantics**: When `pusi == true`, the packet marks the start of a
69 /// *new* unit. If a unit was already in progress (i.e. a prior PUSI-start
70 /// was never closed by a following PUSI), the *in-progress* unit is
71 /// **complete** — it is returned as `Some(unit_bytes)`, and a fresh unit
72 /// begins with this packet's payload.
73 ///
74 /// When `pusi == false`, the payload is appended to the in-progress unit.
75 ///
76 /// Returns `Some(Vec<u8>)` when a completed unit is emitted; `None`
77 /// otherwise.
78 pub fn push(&mut self, pid: u16, pusi: bool, payload: &[u8]) -> Option<Vec<u8>> {
79 if pid != self.pid {
80 return None;
81 }
82
83 if pusi {
84 // A new unit begins. If we already had accumulated data, that old
85 // unit is complete — return it.
86 if self.started {
87 let completed = core::mem::take(&mut self.buf);
88 // Start fresh with this packet's payload.
89 self.buf.extend_from_slice(payload);
90 return Some(completed);
91 }
92
93 // First PUSI — just start accumulating.
94 self.started = true;
95 self.buf.extend_from_slice(payload);
96 return None;
97 }
98
99 // Non-PUSI: append to the in-progress unit.
100 self.buf.extend_from_slice(payload);
101 None
102 }
103
104 /// Return any in-progress (final) unit; the caller drains this at end of
105 /// the PID's data (the last real box has no following PUSI to close it).
106 ///
107 /// Returns `None` when no data has been accumulated.
108 pub fn flush(&mut self) -> Option<Vec<u8>> {
109 if self.started && !self.buf.is_empty() {
110 self.started = false;
111 Some(core::mem::take(&mut self.buf))
112 } else {
113 None
114 }
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 /// A synthetic "box" larger than a single TS payload (184 bytes).
123 /// We use a valid big-endian size prefix so the test is realistic.
124 const BIG_BOX_SIZE: u32 = 400;
125 /// Build a synthetic box with a valid 4-byte big-endian size and 'test' type.
126 fn synthetic_box_bytes(size: u32) -> Vec<u8> {
127 let n = size as usize;
128 let mut bytes = Vec::with_capacity(n);
129 bytes.extend_from_slice(&size.to_be_bytes()); // size
130 bytes.extend_from_slice(b"test"); // type
131 // Fill the rest (starting at offset 8) with a counter pattern.
132 for i in 8..n {
133 bytes.push((i & 0xFF) as u8);
134 }
135 debug_assert_eq!(bytes.len(), n);
136 bytes
137 }
138
139 #[test]
140 fn spanning_across_two_packets() {
141 let box_bytes = synthetic_box_bytes(BIG_BOX_SIZE);
142 let pid = 0x0004u16;
143
144 // Split at a boundary that fits in a single TS payload (≤ 184 bytes).
145 let chunk1 = &box_bytes[..184];
146 let chunk2 = &box_bytes[184..];
147
148 let mut reasm = PusiReassembler::new(pid);
149
150 // Push first chunk with PUSI = true.
151 assert!(reasm.push(pid, true, chunk1).is_none());
152
153 // Push second chunk (continuation).
154 assert!(reasm.push(pid, false, chunk2).is_none());
155
156 // Flush and verify.
157 let reassembled = reasm.flush().expect("flush should return the unit");
158 assert_eq!(reassembled, box_bytes);
159 }
160
161 #[test]
162 fn boundary_two_units() {
163 let unit_a = b"AAAA-first-unit-data";
164 let unit_b = b"BBBB-second-unit-data";
165 let pid = 0x0004u16;
166
167 let mut reasm = PusiReassembler::new(pid);
168
169 // Push unit A (PUSI start).
170 assert!(reasm.push(pid, true, unit_a).is_none());
171
172 // Push unit B (PUSI start) — this should close unit A and return it.
173 let closed = reasm.push(pid, true, unit_b);
174 assert_eq!(closed.as_deref(), Some(unit_a.as_slice()));
175
176 // Flush should return unit B.
177 let flushed = reasm.flush();
178 assert_eq!(flushed.as_deref(), Some(unit_b.as_slice()));
179 }
180
181 #[test]
182 fn different_pid_ignored() {
183 let pid = 0x0004u16;
184 let mut reasm = PusiReassembler::new(pid);
185
186 // Packet with different PID should be ignored.
187 assert!(reasm.push(0x0100, true, b"ignored").is_none());
188 assert!(reasm.push(0x0100, false, b"more-ignored").is_none());
189 assert!(reasm.flush().is_none());
190
191 // Now feed a packet on our PID.
192 assert!(reasm.push(pid, true, b"real-data").is_none());
193 let flushed = reasm.flush();
194 assert_eq!(flushed.as_deref(), Some(b"real-data".as_slice()));
195 }
196
197 #[test]
198 fn flush_empty_returns_none() {
199 let mut reasm = PusiReassembler::new(0x0004);
200 assert!(reasm.flush().is_none());
201 }
202
203 #[test]
204 fn single_packet_unit() {
205 let pid = 0x0004u16;
206 let mut reasm = PusiReassembler::new(pid);
207
208 assert!(reasm.push(pid, true, b"single-packet-emsg").is_none());
209 let flushed = reasm.flush();
210 assert_eq!(flushed.as_deref(), Some(b"single-packet-emsg".as_slice()));
211 }
212}