Skip to main content

dvb_ule/
ts.rs

1//! SNDU → MPEG-2 TS packet mapping and reassembly (RFC 4326 §3, §4.3, §6, §7).
2//!
3//! A ULE Encapsulator maps SNDUs into the 184-byte payload of MPEG-2 TS
4//! packets on a single PID (TS Logical Channel). An SNDU may be carried whole,
5//! fragmented across several packets, or packed two-or-more-to-a-packet. This
6//! module provides:
7//!
8//! - [`UleReceiver`] — a de-fragmenting reassembler. Feed it the 184-byte
9//!   payload of each TS packet on the ULE PID (with that packet's PUSI flag)
10//!   and it yields each complete SNDU's bytes, validated to length and ready to
11//!   hand to [`crate::Sndu::parse`].
12//!
13//! The receiver follows the §7 reassembly rules: it idles until a PUSI=1
14//! packet, locates the first SNDU via the 1-byte Payload Pointer (§6.1),
15//! accumulates bytes across PUSI=0 continuations, and stops a packet's walk at
16//! an End Indicator / 0xFF padding (§4.3).
17
18use alloc::vec::Vec;
19
20use crate::sndu::{
21    is_end_indicator, BASE_HEADER_LEN, D_BIT_MASK, END_INDICATOR_LENGTH, LENGTH_MASK, PADDING_BYTE,
22};
23
24/// The number of TS-packet payload bytes when AFC = `01` (payload only): the
25/// 188-byte packet minus its 4-byte header (RFC 4326 §3).
26pub const TS_PAYLOAD_LEN: usize = 184;
27
28/// A de-fragmenting ULE receiver (RFC 4326 §7).
29///
30/// Stateful across TS packets on one PID. Hand it each packet's payload via
31/// [`UleReceiver::push`]; it returns the complete SNDUs that finished in that
32/// packet. The receiver owns a reassembly buffer for the partial SNDU spanning
33/// packet boundaries.
34#[derive(Debug, Default, Clone)]
35pub struct UleReceiver {
36    /// Bytes of an SNDU accumulated so far (header-first); empty when idle.
37    partial: Vec<u8>,
38    /// Total expected SNDU length once known (header + Length region), else 0.
39    expected: usize,
40    /// `true` once we have seen a valid SNDU start (not in the Idle State).
41    started: bool,
42}
43
44impl UleReceiver {
45    /// Create an idle receiver.
46    pub fn new() -> Self {
47        Self::default()
48    }
49
50    /// Reset to the Idle State (e.g. after a CC discontinuity, §7.3).
51    pub fn reset(&mut self) {
52        self.partial.clear();
53        self.expected = 0;
54        self.started = false;
55    }
56
57    /// `true` if the receiver is mid-SNDU (a fragment is buffered).
58    pub fn in_reassembly(&self) -> bool {
59        self.started && !self.partial.is_empty()
60    }
61
62    /// Feed one TS packet's payload (`payload` = the bytes after the 4-byte TS
63    /// header, length [`TS_PAYLOAD_LEN`] in practice) and its `pusi` flag.
64    /// Returns every SNDU that completed within this packet, as owned byte
65    /// vectors (header..CRC inclusive).
66    ///
67    /// On a malformed Payload Pointer or an inconsistent length the partial
68    /// SNDU is dropped and the receiver re-enters the Idle State (§7), but the
69    /// already-completed SNDUs from this packet are still returned.
70    pub fn push(&mut self, payload: &[u8], pusi: bool) -> Vec<Vec<u8>> {
71        let mut out = Vec::new();
72
73        if pusi {
74            // PUSI=1: a 1-byte Payload Pointer follows the TS header. It counts
75            // the bytes (excluding itself) up to the first new SNDU start.
76            if payload.is_empty() {
77                return out;
78            }
79            let pp = payload[0] as usize;
80            let pp_region = &payload[1..];
81            if pp > pp_region.len() {
82                // Bad PP: discard any partial and idle.
83                self.reset();
84                return out;
85            }
86            // RFC 4326 §7.2.1 payload-pointer consistency check: when we are
87            // mid-reassembly and a PUSI=1 packet arrives, the PP must equal
88            // exactly the number of bytes remaining in the partial SNDU. If we
89            // already know `expected`, compute `remaining`; if we don't (header
90            // is still partial), accept any PP so the first bytes can complete
91            // the header and let `maybe_finish` sort it out. A mismatch means
92            // the stream is corrupt — discard the partial and start fresh.
93            if self.in_reassembly() {
94                if self.expected != 0 {
95                    let remaining = self.expected.saturating_sub(self.partial.len());
96                    if remaining != 0 && pp != remaining {
97                        // PP inconsistency: corrupt stream. Discard partial (§7.2.1).
98                        self.reset();
99                        // Fall through: still process the new SNDUs after pp_region[pp..].
100                    } else {
101                        self.feed_continuation(&pp_region[..pp], &mut out);
102                    }
103                } else {
104                    self.feed_continuation(&pp_region[..pp], &mut out);
105                }
106            } else {
107                // Idle: bytes before the pointer belong to no SNDU; skip them.
108                self.partial.clear();
109                self.expected = 0;
110            }
111            self.started = true;
112            // Walk packed SNDUs starting at the pointer.
113            self.walk_new_sndus(&pp_region[pp..], &mut out);
114        } else {
115            // PUSI=0: pure continuation of the SNDU in progress.
116            if self.in_reassembly() {
117                self.feed_continuation(payload, &mut out);
118            }
119            // else: a continuation with nothing in progress — discard (idle).
120        }
121        out
122    }
123
124    /// Append continuation bytes to the partial SNDU, emitting it if it
125    /// completes. `chunk` is consumed fully (a continuation never starts a new
126    /// SNDU — packing only happens at a PUSI=1 pointer or right after a
127    /// completed SNDU within the same packet, handled by `walk_new_sndus`).
128    fn feed_continuation(&mut self, chunk: &[u8], out: &mut Vec<Vec<u8>>) {
129        self.partial.extend_from_slice(chunk);
130        self.maybe_finish(out);
131    }
132
133    /// Walk a region that begins at an SNDU start (a packing region): parse the
134    /// Length, consume whole SNDUs, and buffer the trailing partial for the
135    /// next packet. Stops at an End Indicator or 0xFF padding (§4.3).
136    fn walk_new_sndus(&mut self, mut region: &[u8], out: &mut Vec<Vec<u8>>) {
137        loop {
138            if region.is_empty() {
139                return;
140            }
141            // End Indicator / padding: no more SNDUs in this packet.
142            if region[0] == PADDING_BYTE {
143                // Either a 0xFFFF End Indicator or stray 0xFF stuffing.
144                if is_end_indicator(region) {
145                    // remainder is padding; nothing buffered.
146                }
147                return;
148            }
149            if region.len() < BASE_HEADER_LEN {
150                // Header straddles the packet boundary — buffer it.
151                self.partial.clear();
152                self.partial.extend_from_slice(region);
153                self.expected = 0;
154                return;
155            }
156            let first = u16::from_be_bytes([region[0], region[1]]);
157            let length = (first & LENGTH_MASK) as usize;
158            if (first & LENGTH_MASK) == END_INDICATOR_LENGTH && (first & D_BIT_MASK) != 0 {
159                // Explicit End Indicator caught even if not 0xFF-leading.
160                return;
161            }
162            let total = BASE_HEADER_LEN + length;
163            if region.len() >= total {
164                // A whole SNDU fits — emit it and continue packing.
165                out.push(region[..total].to_vec());
166                region = &region[total..];
167            } else {
168                // SNDU continues into the next packet — buffer the head.
169                self.partial.clear();
170                self.partial.extend_from_slice(region);
171                self.expected = total;
172                return;
173            }
174        }
175    }
176
177    /// If the buffered partial now holds a complete SNDU, emit it and clear.
178    fn maybe_finish(&mut self, out: &mut Vec<Vec<u8>>) {
179        if self.expected == 0 && self.partial.len() >= BASE_HEADER_LEN {
180            // We had buffered only a fragment of the header; now compute length.
181            let first = u16::from_be_bytes([self.partial[0], self.partial[1]]);
182            let length = (first & LENGTH_MASK) as usize;
183            self.expected = BASE_HEADER_LEN + length;
184        }
185        if self.expected != 0 && self.partial.len() >= self.expected {
186            let total = self.expected;
187            out.push(self.partial[..total].to_vec());
188            let rest: Vec<u8> = self.partial[total..].to_vec();
189            self.partial.clear();
190            self.expected = 0;
191            // Any bytes past the completed SNDU are a packed follow-on SNDU.
192            if !rest.is_empty() {
193                self.walk_new_sndus(&rest, out);
194            }
195        }
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202    use crate::sndu::Sndu;
203    use crate::type_field::TypeField;
204
205    fn make_sndu(pdu: &[u8]) -> Vec<u8> {
206        let s = Sndu::new(TypeField::EtherType(0x0800), None, pdu);
207        let mut b = alloc::vec![0u8; s.serialized_len()];
208        s.serialize_into(&mut b).unwrap();
209        b
210    }
211
212    // An SNDU fragmented across two TS packet payloads reassembles correctly.
213    #[test]
214    fn fragmented_across_two_packets() {
215        let pdu: Vec<u8> = (0u8..40).collect();
216        let sndu = make_sndu(&pdu);
217        assert!(sndu.len() > 20);
218
219        let mut rx = UleReceiver::new();
220        // Packet 1: PUSI=1, PP=0, carries the first 20 SNDU bytes.
221        let mut p1 = alloc::vec![0x00u8];
222        p1.extend_from_slice(&sndu[..20]);
223        let done = rx.push(&p1, true);
224        assert!(done.is_empty(), "still reassembling");
225        assert!(rx.in_reassembly());
226
227        // Packet 2: PUSI=0 continuation, rest of the SNDU + padding.
228        let mut p2 = sndu[20..].to_vec();
229        p2.extend_from_slice(&[0xFF, 0xFF]);
230        let done = rx.push(&p2, false);
231        assert_eq!(done.len(), 1);
232        assert_eq!(done[0], sndu);
233        // And it parses.
234        assert_eq!(Sndu::parse(&done[0]).unwrap().pdu(), &pdu[..]);
235    }
236
237    // Two SNDUs packed into one TS packet are both extracted.
238    #[test]
239    fn two_packed_sndus_one_packet() {
240        let a = make_sndu(&[0xAA; 5]);
241        let b = make_sndu(&[0xBB; 7]);
242
243        let mut payload = alloc::vec![0x00u8]; // PP = 0
244        payload.extend_from_slice(&a);
245        payload.extend_from_slice(&b);
246        payload.extend_from_slice(&[0xFF, 0xFF, 0xFF]); // End Indicator + pad
247
248        let mut rx = UleReceiver::new();
249        let done = rx.push(&payload, true);
250        assert_eq!(done.len(), 2, "both packed SNDUs extracted");
251        assert_eq!(done[0], a);
252        assert_eq!(done[1], b);
253    }
254
255    // A continuing SNDU completes mid-packet, then a packed SNDU starts (PP>0).
256    #[test]
257    fn continuation_then_packed_with_pp() {
258        let a = make_sndu(&[0x11; 30]); // will be fragmented
259        let b = make_sndu(&[0x22; 4]); // packed after a completes
260
261        let mut rx = UleReceiver::new();
262        // Packet 1: PUSI=1 PP=0, first 15 bytes of `a`.
263        let mut p1 = alloc::vec![0x00u8];
264        p1.extend_from_slice(&a[..15]);
265        assert!(rx.push(&p1, true).is_empty());
266
267        // Packet 2: PUSI=1, PP = (rest of a) so the pointer lands on `b`.
268        let rest_a = a.len() - 15;
269        let mut p2 = alloc::vec![rest_a as u8];
270        p2.extend_from_slice(&a[15..]); // completes a
271        p2.extend_from_slice(&b); // packed b
272        p2.push(0xFF);
273        let done = rx.push(&p2, true);
274        assert_eq!(done.len(), 2);
275        assert_eq!(done[0], a);
276        assert_eq!(done[1], b);
277    }
278
279    #[test]
280    fn idle_until_pusi() {
281        let mut rx = UleReceiver::new();
282        // A continuation arriving while idle is discarded.
283        let done = rx.push(&[0x01, 0x02, 0x03], false);
284        assert!(done.is_empty());
285        assert!(!rx.in_reassembly());
286    }
287
288    #[test]
289    fn bad_payload_pointer_resets() {
290        let mut rx = UleReceiver::new();
291        // PP claims 200 but only a few bytes follow.
292        let done = rx.push(&[200u8, 0x00, 0x10, 0x08, 0x00], true);
293        assert!(done.is_empty());
294        assert!(!rx.in_reassembly());
295    }
296
297    // RFC 4326 §7.2.1 payload-pointer consistency check: a PUSI=1 packet that
298    // arrives mid-reassembly with a PP that does NOT equal the remaining bytes
299    // in the partial SNDU must discard the partial and NOT emit garbage.
300    //
301    // Regression: without the PP consistency check the old code fed
302    // `pp_region[..wrong_pp]` into `feed_continuation` regardless of mismatch,
303    // silently contaminating the partial buffer with bytes from the wrong offset.
304    // A subsequent continuation then "completed" the SNDU from the corrupted
305    // partial, yielding a CRC-invalid blob.
306    #[test]
307    fn pusi_mid_reassembly_wrong_pp_discards_partial() {
308        // Build a 54-byte SNDU (4-byte header + 46-byte PDU + 4-byte CRC).
309        let pdu: Vec<u8> = (0u8..46).collect();
310        let sndu = make_sndu(&pdu);
311        assert_eq!(
312            sndu.len(),
313            54,
314            "expected 54 bytes: 4 header + 46 PDU + 4 CRC"
315        );
316
317        let mut rx = UleReceiver::new();
318
319        // Packet 1: PUSI=1, PP=0, start the SNDU (first 20 bytes).
320        // After this: partial=sndu[..20], expected=54, remaining=34.
321        let mut p1 = alloc::vec![0x00u8]; // PP=0
322        p1.extend_from_slice(&sndu[..20]);
323        let done = rx.push(&p1, true);
324        assert!(done.is_empty());
325        assert!(rx.in_reassembly(), "should be mid-reassembly after p1");
326
327        // Packet 2: PUSI=1 with WRONG PP=5 (real remaining is 34).
328        // Before the fix: feed_continuation appends 5 bytes of junk to partial
329        // (partial now = sndu[..20] + [0xDE;5], len=25), leaving expected=54.
330        // After the fix: the receiver detects PP≠remaining, resets, and walks
331        // fresh SNDUs from pp_region[WRONG_PP..].
332        const WRONG_PP: usize = 5;
333        let mut p2 = alloc::vec![WRONG_PP as u8];
334        // Junk in the pre-pointer region; p2 then ends with an End Indicator,
335        // so no fresh SNDU should complete from this packet.
336        p2.extend_from_slice(&[0xDEu8; WRONG_PP]);
337        p2.extend_from_slice(&[0xFF, 0xFF]);
338
339        let done2 = rx.push(&p2, true);
340        assert!(done2.is_empty(), "no SNDUs should complete in p2");
341
342        // Packet 3: PUSI=0 continuation.
343        // Pre-fix: receiver still has the corrupted partial (len=25, expected=54);
344        //   it feeds the correct tail bytes and "completes" a 54-byte blob whose
345        //   content is garbage — sndu[..20] + 0xDE*5 + sndu[20..49] — and the
346        //   CRC will NOT match.
347        // Post-fix: receiver reset after p2, so this continuation is idle-discarded.
348        let tail = &sndu[20..]; // correct continuation
349        let done3 = rx.push(tail, false);
350
351        for emitted in &done3 {
352            // Any blob emitted from a corrupted partial will have a bad CRC.
353            assert!(
354                crate::sndu::Sndu::parse(emitted).is_ok(),
355                "emitted SNDU has invalid CRC — corrupt partial was not discarded"
356            );
357        }
358
359        // Post-fix: receiver reset after WRONG_PP mismatch; partial is gone.
360        // The continuation (p3) was discarded because receiver is idle.
361        // So done3 must be empty — if it's non-empty AND any item parses, that
362        // means we got lucky with a coincidental valid CRC, which is astronomically
363        // unlikely.  Assert it's empty for clarity.
364        assert!(
365            done3.is_empty(),
366            "post-fix: idle receiver must discard the continuation (partial was reset)"
367        );
368    }
369}