Skip to main content

dvb_si/
ts.rs

1//! MPEG-TS packet parser + section reassembler. Feature-gated under `ts`.
2
3use crate::error::{Error, Result};
4
5/// Size of one MPEG-TS packet (ETSI EN 300 468 §3.2, ISO/IEC 13818-1 §2.4.3.2).
6pub const TS_PACKET_SIZE: usize = 188;
7/// Sync byte that every TS packet starts with (ISO/IEC 13818-1 §2.4.3.2).
8pub const TS_SYNC_BYTE: u8 = 0x47;
9/// Upper bound on a single section: `section_length` is 12 bits (max 4095)
10/// plus the 3-byte header = 4098. (Long-form SI caps `section_length` at
11/// 4093 → total 4096, but maximal short-form private sections may reach
12/// 4098; the reassembler accepts the absolute ceiling.)
13const MAX_SECTION_SIZE: usize = 4098;
14
15/// ETSI EN 300 468 §3.2.3: transport header byte 1 bits 7 = tei (Transport Error Indicator).
16const TEI_MASK: u8 = 0x80;
17/// ETSI EN 300 468 §3.2.3: byte 1 bits 6 = pusi (Payload Unit Start Indicator).
18const PUSI_MASK: u8 = 0x40;
19/// ETSI EN 300 468 §3.2.3: byte 1 bits 5..=1 = 13-bit PID (upper 5 bits).
20pub const PID_MASK_HI: u8 = 0x1F;
21/// ETSI EN 300 468 §3.2.3: byte 3 bits 7..=6 = 2-bit scrambling control.
22pub const SCRAMBLING_MASK: u8 = 0xC0;
23/// ETSI EN 300 468 §3.2.3: byte 3 bit 4 = adaptation_field_control (bit 4 = 1 means adaptation present).
24pub const ADAPTATION_FLAG: u8 = 0x20;
25/// ETSI EN 300 468 §3.2.3: byte 3 bit 3 = adaptation_field_control (bit 3 = 1 means payload present).
26pub const PAYLOAD_FLAG: u8 = 0x10;
27/// ETSI EN 300 468 §3.2.3: byte 3 bits 3..=0 = 4-bit continuity_counter.
28pub const CC_MASK: u8 = 0x0F;
29
30/// Parsed TS header — the 4-byte transport header fields.
31#[derive(Clone, Debug, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33pub struct TsHeader {
34    /// Transport Error Indicator — set by the demodulator when an
35    /// uncorrectable error is present in the packet.
36    pub tei: bool,
37    /// Payload Unit Start Indicator — first byte of the payload is a new
38    /// PES packet or PSI section header when set.
39    pub pusi: bool,
40    /// 13-bit Packet Identifier.
41    pub pid: u16,
42    /// 2-bit transport_scrambling_control (0 = not scrambled).
43    pub scrambling: u8,
44    /// Adaptation field present flag (adaptation_field_control bit 1).
45    pub has_adaptation: bool,
46    /// Payload present flag (adaptation_field_control bit 0).
47    pub has_payload: bool,
48    /// 4-bit continuity_counter (wraps 0..=15 per PID).
49    pub continuity_counter: u8,
50}
51
52/// Borrowed view into one 188-byte TS packet.
53///
54/// Serde: Serialize-only. Deserialize is omitted because `raw` is a
55/// reference to fixed-length 188-byte storage that cannot be reconstructed
56/// from a deserializer's lifetime budget; the field is also redundant once
57/// the header has been parsed. `raw` is excluded from the serialized form.
58#[derive(Clone, Debug)]
59#[cfg_attr(feature = "serde", derive(serde::Serialize))]
60pub struct TsPacket<'a> {
61    /// Parsed header fields.
62    pub header: TsHeader,
63    /// Slice into the packet's payload, or `None` when `has_payload == false`
64    /// or the adaptation field consumed the whole packet body.
65    pub payload: Option<&'a [u8]>,
66    /// The raw 188 bytes of the packet — kept for cheap forwarding.
67    #[cfg_attr(feature = "serde", serde(skip))]
68    pub raw: &'a [u8; TS_PACKET_SIZE],
69}
70
71impl TsHeader {
72    /// Parse a 4-byte TS transport header.
73    ///
74    /// Returns `None` if `raw4` is shorter than 4 bytes.
75    pub fn parse(raw4: &[u8]) -> Option<Self> {
76        if raw4.len() < 4 {
77            return None;
78        }
79        let b1 = raw4[1];
80        let b2 = raw4[2];
81        let b3 = raw4[3];
82
83        let tei = (b1 & TEI_MASK) != 0;
84        let pusi = (b1 & PUSI_MASK) != 0;
85        let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (b2 as u16);
86        let scrambling = (b3 & SCRAMBLING_MASK) >> 6;
87        let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
88        let has_payload = (b3 & PAYLOAD_FLAG) != 0;
89        let continuity_counter = b3 & CC_MASK;
90
91        Some(Self {
92            tei,
93            pusi,
94            pid,
95            scrambling,
96            has_adaptation,
97            has_payload,
98            continuity_counter,
99        })
100    }
101
102    /// Serialize this header into the first 4 bytes of `buf`.
103    ///
104    /// Panics if `buf` is shorter than 4 bytes.
105    pub fn serialize_into(&self, buf: &mut [u8]) {
106        assert!(
107            buf.len() >= 4,
108            "buffer must have at least 4 bytes for TS header"
109        );
110        buf[0] = TS_SYNC_BYTE;
111        buf[1] = 0;
112        if self.tei {
113            buf[1] |= TEI_MASK;
114        }
115        if self.pusi {
116            buf[1] |= PUSI_MASK;
117        }
118        buf[1] |= ((self.pid >> 8) as u8) & PID_MASK_HI;
119        buf[2] = (self.pid & 0xFF) as u8;
120        buf[3] = (self.scrambling << 6) & SCRAMBLING_MASK;
121        if self.has_adaptation {
122            buf[3] |= ADAPTATION_FLAG;
123        }
124        if self.has_payload {
125            buf[3] |= PAYLOAD_FLAG;
126        }
127        buf[3] |= self.continuity_counter & CC_MASK;
128    }
129}
130
131impl<'a> TsPacket<'a> {
132    /// Parse a single 188-byte TS packet from a buffer.
133    ///
134    /// Returns `Err(Error::InvalidSyncByte)` if the first byte is not `0x47`,
135    /// `Err(Error::BufferTooShort)` if fewer than 188 bytes, or `Ok` with
136    /// the parsed packet otherwise.
137    pub fn parse(buf: &'a [u8]) -> Result<Self> {
138        if buf.len() < TS_PACKET_SIZE {
139            return Err(Error::BufferTooShort {
140                need: TS_PACKET_SIZE,
141                have: buf.len(),
142                what: "TsPacket::parse",
143            });
144        }
145        if buf[0] != TS_SYNC_BYTE {
146            return Err(Error::InvalidSyncByte { found: buf[0] });
147        }
148
149        let raw: &[u8; TS_PACKET_SIZE] =
150            buf[..TS_PACKET_SIZE]
151                .try_into()
152                .map_err(|_| Error::BufferTooShort {
153                    need: TS_PACKET_SIZE,
154                    have: buf.len(),
155                    what: "TsPacket::parse (array conversion)",
156                })?;
157
158        let header = TsHeader::parse(&raw[..4])
159            .expect("raw is 188 bytes so first 4 bytes are always present");
160
161        let mut cursor = 4usize;
162        let mut payload = None;
163
164        // Skip adaptation field if present (not parsed in detail — not needed for sections).
165        if header.has_adaptation && cursor < TS_PACKET_SIZE {
166            let af_len = raw[cursor] as usize;
167            cursor += 1 + af_len;
168        }
169
170        if header.has_payload && cursor < TS_PACKET_SIZE {
171            payload = Some(&raw[cursor..]);
172        }
173
174        Ok(TsPacket {
175            header,
176            payload,
177            raw,
178        })
179    }
180}
181
182/// Reassembles PSI/SI sections from TS packets on a single PID.
183///
184/// Feed each TS packet's payload with `feed`. Complete sections are
185/// appended to an internal queue; drain them with `pop_section`.
186#[derive(Default)]
187pub struct SectionReassembler {
188    buf: bytes::BytesMut,
189    expected: usize,
190    ready: std::collections::VecDeque<bytes::Bytes>,
191}
192
193impl SectionReassembler {
194    /// Feed a TS payload and whether its packet had PUSI set.
195    ///
196    /// Extracts complete SI sections into the internal queue. A single call
197    /// can produce zero or one section (the queue is for future-proofing
198    /// where one feed might yield multiple sections).
199    pub fn feed(&mut self, payload: &[u8], pusi: bool) {
200        if pusi {
201            // A PUSI packet whose adaptation field consumed the whole body is
202            // malformed but constructible — drop sync rather than panic.
203            if payload.is_empty() {
204                self.buf.clear();
205                self.expected = 0;
206                return;
207            }
208            let pointer = payload[0] as usize;
209            let start = 1 + pointer;
210            if start >= payload.len() {
211                self.buf.clear();
212                return;
213            }
214            self.buf.clear();
215            let new_data = &payload[start..];
216            if self.buf.len() + new_data.len() > MAX_SECTION_SIZE {
217                self.buf.clear();
218                self.expected = 0;
219                return;
220            }
221            self.buf.extend_from_slice(new_data);
222            if self.buf.len() >= 3 {
223                self.expected = 3 + (((self.buf[1] & 0x0F) as usize) << 8 | self.buf[2] as usize);
224            }
225        } else {
226            if self.buf.is_empty() {
227                return;
228            }
229            if self.buf.len() + payload.len() > MAX_SECTION_SIZE {
230                self.buf.clear();
231                self.expected = 0;
232                return;
233            }
234            self.buf.extend_from_slice(payload);
235        }
236
237        if self.expected > 0 && self.buf.len() >= self.expected {
238            // split_to returns the first `expected` bytes as an owned BytesMut,
239            // leaving the remaining bytes in self.buf — cheap (shifts pointers).
240            let section = self.buf.split_to(self.expected).freeze();
241            self.ready.push_back(section);
242            self.expected = 0;
243        }
244    }
245
246    /// Pop one complete section. Returns `None` when the queue is empty.
247    pub fn pop_section(&mut self) -> Option<bytes::Bytes> {
248        self.ready.pop_front()
249    }
250
251    /// Number of bytes currently buffered (incomplete section).
252    pub fn len(&self) -> usize {
253        self.buf.len()
254    }
255
256    /// True if no bytes are currently buffered.
257    pub fn is_empty(&self) -> bool {
258        self.buf.is_empty()
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    /// Helper: construct a minimal 188-byte TS packet buffer with given header flags and payload.
267    fn make_packet(b1: u8, b2: u8, b3: u8, payload_data: &[u8]) -> [u8; TS_PACKET_SIZE] {
268        let mut pkt = [0u8; TS_PACKET_SIZE];
269        pkt[0] = TS_SYNC_BYTE;
270        pkt[1] = b1;
271        pkt[2] = b2;
272        pkt[3] = b3;
273        let payload_start = 4;
274        let end = (payload_start + payload_data.len()).min(TS_PACKET_SIZE);
275        let len = (end - payload_start).min(payload_data.len());
276        pkt[payload_start..payload_start + len].copy_from_slice(&payload_data[..len]);
277        pkt
278    }
279
280    #[test]
281    fn parse_rejects_non_0x47_sync_byte() {
282        let mut pkt = [0u8; TS_PACKET_SIZE];
283        pkt[0] = 0x46; // wrong sync byte
284        let err = TsPacket::parse(&pkt).unwrap_err();
285        match err {
286            Error::InvalidSyncByte { found } => assert_eq!(found, 0x46),
287            other => panic!("expected InvalidSyncByte, got {other:?}"),
288        }
289    }
290
291    #[test]
292    fn parse_extracts_pid_and_continuity_counter() {
293        // PID = 0x1234 → upper 5 bits = 0x12, lower 8 bits = 0x34
294        // CC = 5 → 0x05
295        // b1 = 0x47 (sync=0, tei=0, pusi=0) | (0x12) = 0x47 & 0xE0 | 0x12 = 0x47 & 0xE0 = 0x40 | 0x12 = 0x52
296        // Actually: b1 bits: [tei:1][pusi:1][pid_hi:5]
297        // pid_hi = 0x12 = 0b00100_10 → bits 5..=1 = 0x12
298        // b1 = 0b00_010010 = 0x12 (no tei, no pusi)
299        let pkt = make_packet(0x12, 0x34, 0x05, &[]);
300        let pkt = TsPacket::parse(&pkt).unwrap();
301        assert_eq!(pkt.header.pid, 0x1234);
302        assert_eq!(pkt.header.continuity_counter, 5);
303    }
304
305    #[test]
306    fn payload_unit_start_indicator_flag_extracted() {
307        // b1 = 0x40 → pusi = true (bit 6 set, no tei, no pid bits)
308        let pkt1 = make_packet(0x40, 0x00, 0x00, &[]);
309        let pkt1 = TsPacket::parse(&pkt1).unwrap();
310        assert!(pkt1.header.pusi);
311
312        // b1 = 0x00 → pusi = false
313        let pkt2 = make_packet(0x00, 0x00, 0x00, &[]);
314        let pkt2 = TsPacket::parse(&pkt2).unwrap();
315        assert!(!pkt2.header.pusi);
316    }
317
318    /// Build a PSI-carrying TS payload: `pointer_field` byte followed by
319    /// (optionally) some tail of a previous section, followed by a fresh
320    /// section. `pointer_field` is the number of bytes of the previous
321    /// section that precede the new one (per ETSI EN 300 468 §5.1.4).
322    fn build_pusi_payload(pointer_field: u8, previous_tail: &[u8], section: &[u8]) -> Vec<u8> {
323        assert_eq!(pointer_field as usize, previous_tail.len());
324        let mut v = Vec::with_capacity(1 + previous_tail.len() + section.len());
325        v.push(pointer_field);
326        v.extend_from_slice(previous_tail);
327        v.extend_from_slice(section);
328        v
329    }
330
331    /// Build a long-form section with the given table_id and body bytes.
332    /// Returns the full section including its 3-byte + 5-byte header and a
333    /// placeholder CRC — for reassembler testing we don't validate the CRC.
334    fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
335        let section_length = body_after_length.len() as u16;
336        let mut v = Vec::with_capacity(3 + section_length as usize);
337        v.push(table_id);
338        // ssi=1, pi=0, reserved=11, length hi 4 bits
339        v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
340        v.push((section_length & 0xFF) as u8);
341        v.extend_from_slice(body_after_length);
342        v
343    }
344
345    // The reassembler tests below feed raw payload slices directly to
346    // `feed()` rather than wrapping them in 188-byte TS packets. This avoids
347    // the TS stuffing-byte tail (0xFF padding) bleeding into the reassembled
348    // section and keeps the assertions exact.
349
350    #[test]
351    fn reassembler_accumulates_multi_packet_section() {
352        // 200-byte section that spans two payload slices.
353        let body = vec![0xAAu8; 197];
354        let section = build_section(0x02, &body);
355        assert_eq!(section.len(), 200);
356
357        let first_chunk = 100;
358        let payload1 = build_pusi_payload(0, &[], &section[..first_chunk]);
359        let payload2 = section[first_chunk..].to_vec();
360
361        let mut reasm = SectionReassembler::default();
362        reasm.feed(&payload1, true);
363        reasm.feed(&payload2, false);
364
365        let out = reasm.pop_section().expect("section should be ready");
366        assert_eq!(out.len(), 200);
367        assert_eq!(out.as_ref(), &section[..]);
368    }
369
370    #[test]
371    fn reassembler_yields_complete_section_once_length_satisfied() {
372        // 1-byte-body section: table_id=0x42, section_length=1, total=4 bytes.
373        let section = build_section(0x42, &[0xAA]);
374        assert_eq!(section.len(), 4);
375        let payload = build_pusi_payload(0, &[], &section);
376
377        let mut reasm = SectionReassembler::default();
378        reasm.feed(&payload, true);
379
380        let out = reasm
381            .pop_section()
382            .expect("single-packet section should pop");
383        assert_eq!(out.as_ref(), &section[..]);
384    }
385
386    #[test]
387    fn reassembler_discards_on_buffer_overflow() {
388        // Declare section_length larger than a single payload can carry. No
389        // pop happens until continuations arrive; if continuations push the
390        // buffer past MAX_SECTION_SIZE the reassembler must reset, not panic.
391        let mut section = Vec::with_capacity(3 + 4095);
392        section.push(0x00); // table_id
393        section.push(0xB0 | ((4095u16 >> 8) as u8 & 0x0F));
394        section.push(0xFF);
395        section.extend_from_slice(&[0u8; 160]);
396        let payload1 = build_pusi_payload(0, &[], &section);
397
398        let mut reasm = SectionReassembler::default();
399        reasm.feed(&payload1, true);
400        assert!(reasm.pop_section().is_none());
401
402        // Push enough continuation data to cross MAX_SECTION_SIZE.
403        let filler = vec![0u8; 180];
404        for _ in 0..(MAX_SECTION_SIZE / 180 + 1) {
405            reasm.feed(&filler, false);
406        }
407        assert!(
408            reasm.pop_section().is_none(),
409            "no section should pop after overflow reset"
410        );
411
412        // State must be resettable — a fresh valid PUSI section works.
413        let valid_section = build_section(0x00, &[0xAA]);
414        let payload2 = build_pusi_payload(0, &[], &valid_section);
415        reasm.feed(&payload2, true);
416        let out = reasm
417            .pop_section()
418            .expect("fresh section should pop after reset");
419        assert_eq!(out.as_ref(), &valid_section[..]);
420    }
421
422    #[test]
423    fn reassembler_handles_pusi_with_nonzero_pointer_field() {
424        // payload = pointer_field=3, 3 bytes of prior-section tail, then new section.
425        let prior_tail = vec![0x11, 0x22, 0x33];
426        let new_section = build_section(0x02, &[0xBB]);
427        assert_eq!(new_section.len(), 4);
428        let payload = build_pusi_payload(3, &prior_tail, &new_section);
429
430        let mut reasm = SectionReassembler::default();
431        reasm.feed(&payload, true);
432
433        let out = reasm
434            .pop_section()
435            .expect("section after pointer_field skip should pop");
436        assert_eq!(out.as_ref(), &new_section[..]);
437    }
438
439    #[test]
440    fn reassembler_ignores_continuation_before_pusi() {
441        // Feed a non-PUSI payload first (no prior PUSI seen).
442        // SectionReassembler should discard it and stay empty.
443        let pkt = make_packet(0x00, 0x00, PAYLOAD_FLAG, &[0xAA, 0xBB, 0xCC]);
444
445        let mut reasm = SectionReassembler::default();
446        reasm.feed(&pkt[4..], false); // no PUSI
447
448        assert!(
449            reasm.pop_section().is_none(),
450            "no section should appear without prior PUSI"
451        );
452        assert!(
453            reasm.pop_section().is_none(),
454            "second pop should also be none"
455        );
456    }
457
458    /// A PUSI packet with an empty payload (adaptation field ate the body)
459    /// is malformed but must not panic — it drops sync.
460    #[test]
461    fn reassembler_empty_pusi_payload_does_not_panic() {
462        let mut reasm = SectionReassembler::default();
463        reasm.feed(&[], true);
464        assert!(reasm.pop_section().is_none());
465        // Recovers on the next clean PUSI.
466        let mut payload = vec![0x00u8, 0x72, 0x70, 0x01, 0x00];
467        payload.resize(5, 0);
468        reasm.feed(&payload, true);
469        assert!(reasm.pop_section().is_some());
470    }
471
472    /// A maximal short-form private section (section_length 0xFFF, total
473    /// 4098 bytes) reassembles — the ceiling is 12-bit length + 3-byte
474    /// header, not 4096.
475    #[test]
476    fn reassembler_accepts_maximal_private_section() {
477        let mut section = vec![0x80u8, 0x7F, 0xFF]; // user-private tid, SSI=0, len 0xFFF
478        section.resize(3 + 0xFFF, 0xAB);
479
480        let mut reasm = SectionReassembler::default();
481        // First TS payload: pointer_field 0 then the section start.
482        let mut first = vec![0x00];
483        first.extend_from_slice(&section[..183]);
484        reasm.feed(&first, true);
485        for chunk in section[183..].chunks(184) {
486            reasm.feed(chunk, false);
487        }
488        let out = reasm.pop_section().expect("4098-byte section should pop");
489        assert_eq!(out.len(), 4098);
490        assert_eq!(out.as_ref(), &section[..]);
491    }
492}