Skip to main content

dvb_si/
ts.rs

1//! MPEG-TS packet parser + section reassembler. Feature-gated under `ts`.
2
3use crate::error::{Error, Result};
4
5/// Size of one MPEG-TS packet (ETSI EN 300 468 §3.2, ISO/IEC 13818-1 §2.4.3.2).
6pub const TS_PACKET_SIZE: usize = 188;
7/// Sync byte that every TS packet starts with (ISO/IEC 13818-1 §2.4.3.2).
8pub const TS_SYNC_BYTE: u8 = 0x47;
9/// Upper bound on a single section: `section_length` is 12 bits (max 4095)
10/// plus the 3-byte header = 4098. (Long-form SI caps `section_length` at
11/// 4093 → total 4096, but maximal short-form private sections may reach
12/// 4098; the reassembler accepts the absolute ceiling.)
13const MAX_SECTION_SIZE: usize = 4098;
14/// Bytes before the `section_length` payload: `table_id` (1) + the two bytes
15/// carrying the syntax/RFU flags and the 12-bit `section_length`
16/// (ISO/IEC 13818-1 §2.4.4.1).
17const SECTION_HEADER_LEN: usize = 3;
18/// Mask for the 4 most-significant `section_length` bits in a section's second
19/// byte (ISO/IEC 13818-1 §2.4.4.1 — `section_length` is 12 bits). Shared with
20/// the packetizer in `mux.rs`.
21pub(crate) const SECTION_LENGTH_HI_MASK: u8 = 0x0F;
22
23/// ISO/IEC 13818-1 §2.4.3.3: transport header byte 1 bit 7 = tei (Transport Error Indicator).
24const TEI_MASK: u8 = 0x80;
25/// ISO/IEC 13818-1 §2.4.3.3: byte 1 bit 6 = pusi (Payload Unit Start Indicator).
26const PUSI_MASK: u8 = 0x40;
27/// ISO/IEC 13818-1 §2.4.3.3: byte 1 bits `[4:0]` = 13-bit PID (upper 5 bits).
28pub const PID_MASK_HI: u8 = 0x1F;
29/// ISO/IEC 13818-1 §2.4.3.3: byte 3 bits `[7:6]` = 2-bit scrambling control.
30pub const SCRAMBLING_MASK: u8 = 0xC0;
31/// ISO/IEC 13818-1 §2.4.3.3: byte 3 bit 5 = adaptation_field_control (bit 5 = 1 means adaptation present).
32pub const ADAPTATION_FLAG: u8 = 0x20;
33/// ISO/IEC 13818-1 §2.4.3.3: byte 3 bit 4 = adaptation_field_control (bit 4 = 1 means payload present).
34pub const PAYLOAD_FLAG: u8 = 0x10;
35/// ISO/IEC 13818-1 §2.4.3.3: byte 3 bits `[3:0]` = 4-bit continuity_counter.
36pub const CC_MASK: u8 = 0x0F;
37
38/// Parsed TS header — the 4-byte transport header fields.
39#[derive(Clone, Debug, PartialEq, Eq)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize))]
41pub struct TsHeader {
42    /// Transport Error Indicator — set by the demodulator when an
43    /// uncorrectable error is present in the packet.
44    pub tei: bool,
45    /// Payload Unit Start Indicator — first byte of the payload is a new
46    /// PES packet or PSI section header when set.
47    pub pusi: bool,
48    /// 13-bit Packet Identifier.
49    pub pid: u16,
50    /// 2-bit transport_scrambling_control (0 = not scrambled).
51    pub scrambling: u8,
52    /// Adaptation field present flag (adaptation_field_control bit 1).
53    pub has_adaptation: bool,
54    /// Payload present flag (adaptation_field_control bit 0).
55    pub has_payload: bool,
56    /// 4-bit continuity_counter (wraps 0..=15 per PID).
57    pub continuity_counter: u8,
58}
59
60/// Borrowed view into one 188-byte TS packet.
61///
62/// Serde: Serialize-only (re-parse from wire bytes to reconstruct). `raw` is
63/// excluded from the serialized form because it is redundant once the header
64/// has been parsed.
65#[derive(Clone, Debug)]
66#[cfg_attr(feature = "serde", derive(serde::Serialize))]
67pub struct TsPacket<'a> {
68    /// Parsed header fields.
69    pub header: TsHeader,
70    /// Slice into the packet's payload, or `None` when `has_payload == false`
71    /// or the adaptation field consumed the whole packet body.
72    pub payload: Option<&'a [u8]>,
73    /// The adaptation-field bytes (after the length byte). Internal capture
74    /// feeding [`adaptation_field`](Self::adaptation_field); not public.
75    #[cfg_attr(feature = "serde", serde(skip))]
76    adaptation: Option<&'a [u8]>,
77    /// The raw 188 bytes of the packet — kept for cheap forwarding.
78    #[cfg_attr(feature = "serde", serde(skip))]
79    pub raw: &'a [u8; TS_PACKET_SIZE],
80}
81
82impl TsHeader {
83    /// Parse a 4-byte TS transport header.
84    pub fn parse(raw4: &[u8]) -> Result<Self> {
85        if raw4.len() < 4 {
86            return Err(Error::BufferTooShort {
87                need: 4,
88                have: raw4.len(),
89                what: "TsHeader",
90            });
91        }
92        let b1 = raw4[1];
93        let b2 = raw4[2];
94        let b3 = raw4[3];
95
96        let tei = (b1 & TEI_MASK) != 0;
97        let pusi = (b1 & PUSI_MASK) != 0;
98        let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (b2 as u16);
99        let scrambling = (b3 & SCRAMBLING_MASK) >> 6;
100        let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
101        let has_payload = (b3 & PAYLOAD_FLAG) != 0;
102        let continuity_counter = b3 & CC_MASK;
103
104        Ok(Self {
105            tei,
106            pusi,
107            pid,
108            scrambling,
109            has_adaptation,
110            has_payload,
111            continuity_counter,
112        })
113    }
114
115    /// Number of bytes written by [`serialize_into`](Self::serialize_into).
116    pub const fn serialized_len() -> usize {
117        4
118    }
119
120    /// Serialize this header into the first 4 bytes of `buf`.
121    pub fn serialize_into(&self, buf: &mut [u8]) -> Result<usize> {
122        if buf.len() < 4 {
123            return Err(Error::OutputBufferTooSmall {
124                need: 4,
125                have: buf.len(),
126            });
127        }
128        buf[0] = TS_SYNC_BYTE;
129        buf[1] = 0;
130        if self.tei {
131            buf[1] |= TEI_MASK;
132        }
133        if self.pusi {
134            buf[1] |= PUSI_MASK;
135        }
136        buf[1] |= ((self.pid >> 8) as u8) & PID_MASK_HI;
137        buf[2] = (self.pid & 0xFF) as u8;
138        buf[3] = (self.scrambling << 6) & SCRAMBLING_MASK;
139        if self.has_adaptation {
140            buf[3] |= ADAPTATION_FLAG;
141        }
142        if self.has_payload {
143            buf[3] |= PAYLOAD_FLAG;
144        }
145        buf[3] |= self.continuity_counter & CC_MASK;
146        Ok(4)
147    }
148}
149
150impl<'a> dvb_common::Parse<'a> for TsHeader {
151    type Error = Error;
152
153    fn parse(bytes: &'a [u8]) -> Result<Self> {
154        TsHeader::parse(bytes)
155    }
156}
157
158impl dvb_common::Serialize for TsHeader {
159    type Error = Error;
160
161    fn serialized_len(&self) -> usize {
162        TsHeader::serialized_len()
163    }
164
165    fn serialize_into(&self, buf: &mut [u8]) -> Result<usize> {
166        TsHeader::serialize_into(self, buf)
167    }
168}
169
170impl<'a> TsPacket<'a> {
171    /// Parse a single 188-byte TS packet from a buffer.
172    ///
173    /// Returns `Err(Error::InvalidSyncByte)` if the first byte is not `0x47`,
174    /// `Err(Error::BufferTooShort)` if fewer than 188 bytes, or `Ok` with
175    /// the parsed packet otherwise.
176    pub fn parse(buf: &'a [u8]) -> Result<Self> {
177        if buf.len() < TS_PACKET_SIZE {
178            return Err(Error::BufferTooShort {
179                need: TS_PACKET_SIZE,
180                have: buf.len(),
181                what: "TsPacket",
182            });
183        }
184        if buf[0] != TS_SYNC_BYTE {
185            return Err(Error::InvalidSyncByte { found: buf[0] });
186        }
187
188        let raw: &[u8; TS_PACKET_SIZE] =
189            buf[..TS_PACKET_SIZE]
190                .try_into()
191                .map_err(|_| Error::BufferTooShort {
192                    need: TS_PACKET_SIZE,
193                    have: buf.len(),
194                    what: "TsPacket::parse (array conversion)",
195                })?;
196
197        let header = TsHeader::parse(&raw[..4])?;
198
199        let mut cursor = 4usize;
200        let mut payload = None;
201        let mut adaptation = None;
202
203        // Capture the adaptation field if present, then skip it (the section
204        // path does not need it; decode lazily via `adaptation_field`).
205        if header.has_adaptation && cursor < TS_PACKET_SIZE {
206            let af_len = raw[cursor] as usize;
207            let af_start = cursor + 1;
208            if af_len > 0 && af_start < TS_PACKET_SIZE {
209                let af_end = (af_start + af_len).min(TS_PACKET_SIZE);
210                adaptation = Some(&raw[af_start..af_end]);
211            }
212            cursor += 1 + af_len;
213        }
214
215        if header.has_payload && cursor < TS_PACKET_SIZE {
216            payload = Some(&raw[cursor..]);
217        }
218
219        Ok(TsPacket {
220            header,
221            payload,
222            adaptation,
223            raw,
224        })
225    }
226
227    /// Decode the adaptation field, if present.
228    ///
229    /// Returns `None` when the packet carries no adaptation field, and
230    /// `Some(Err(..))` when a present field is truncated. Layout per
231    /// ISO/IEC 13818-1:2007 §2.4.3.4 (`docs/iso_13818_1_systems.md`).
232    pub fn adaptation_field(&self) -> Option<crate::Result<AdaptationField>> {
233        self.adaptation.map(AdaptationField::parse)
234    }
235}
236
237// Adaptation-field flag bits, byte 0 (ISO/IEC 13818-1:2007 §2.4.3.4).
238const AF_DISCONTINUITY: u8 = 0x80;
239const AF_RANDOM_ACCESS: u8 = 0x40;
240const AF_ES_PRIORITY: u8 = 0x20;
241const AF_PCR_FLAG: u8 = 0x10;
242const AF_OPCR_FLAG: u8 = 0x08;
243const AF_SPLICING_FLAG: u8 = 0x04;
244/// Encoded PCR / OPCR field width: 33-bit base + 6 reserved + 9-bit extension.
245const PCR_FIELD_LEN: usize = 6;
246
247/// Program Clock Reference (ISO/IEC 13818-1:2007 §2.4.3.5): a 33-bit base on a
248/// 90 kHz clock plus a 9-bit extension on a 27 MHz clock.
249#[derive(Clone, Copy, Debug, PartialEq, Eq)]
250#[cfg_attr(feature = "serde", derive(serde::Serialize))]
251pub struct Pcr {
252    /// 33-bit base (90 kHz units).
253    pub base: u64,
254    /// 9-bit extension (27 MHz units).
255    pub extension: u16,
256}
257
258impl Pcr {
259    /// Full PCR value on the 27 MHz clock: `base * 300 + extension`.
260    #[must_use]
261    pub fn as_27mhz(self) -> u64 {
262        self.base * 300 + self.extension as u64
263    }
264
265    /// Decode the 6-byte PCR/OPCR field starting at `at` within `af`.
266    fn parse(af: &[u8], at: usize) -> Result<Self> {
267        let b: &[u8; PCR_FIELD_LEN] = af
268            .get(at..at + PCR_FIELD_LEN)
269            .and_then(|s| s.try_into().ok())
270            .ok_or(Error::BufferTooShort {
271                need: at + PCR_FIELD_LEN,
272                have: af.len(),
273                what: "adaptation_field PCR",
274            })?;
275        let base = ((b[0] as u64) << 25)
276            | ((b[1] as u64) << 17)
277            | ((b[2] as u64) << 9)
278            | ((b[3] as u64) << 1)
279            | ((b[4] as u64) >> 7);
280        let extension = (((b[4] & 0x01) as u16) << 8) | (b[5] as u16);
281        Ok(Self { base, extension })
282    }
283}
284
285/// Decoded adaptation field — flags plus PCR/OPCR and splice point per
286/// ISO/IEC 13818-1:2007 §2.4.3.4. Transport-private data and the
287/// adaptation-field extension are not yet surfaced; more fields may be
288/// added in future releases.
289#[non_exhaustive]
290#[derive(Clone, Copy, Debug, PartialEq, Eq)]
291#[cfg_attr(feature = "serde", derive(serde::Serialize))]
292pub struct AdaptationField {
293    /// A timing/continuity discontinuity starts at this packet.
294    pub discontinuity_indicator: bool,
295    /// This packet is a random-access point.
296    pub random_access_indicator: bool,
297    /// Elementary-stream priority hint.
298    pub elementary_stream_priority_indicator: bool,
299    /// Program Clock Reference, present iff the PCR flag is set.
300    pub pcr: Option<Pcr>,
301    /// Original PCR, present iff the OPCR flag is set.
302    pub opcr: Option<Pcr>,
303    /// Splice countdown (packets until the splice point), iff the flag is set.
304    pub splice_countdown: Option<i8>,
305}
306
307impl AdaptationField {
308    /// Parse the adaptation-field bytes (those following the length byte).
309    fn parse(af: &[u8]) -> Result<Self> {
310        let flags = *af.first().ok_or(Error::BufferTooShort {
311            need: 1,
312            have: 0,
313            what: "adaptation_field flags",
314        })?;
315        let mut cursor = 1usize;
316
317        let pcr = if flags & AF_PCR_FLAG != 0 {
318            let p = Pcr::parse(af, cursor)?;
319            cursor += PCR_FIELD_LEN;
320            Some(p)
321        } else {
322            None
323        };
324        let opcr = if flags & AF_OPCR_FLAG != 0 {
325            let p = Pcr::parse(af, cursor)?;
326            cursor += PCR_FIELD_LEN;
327            Some(p)
328        } else {
329            None
330        };
331        let splice_countdown = if flags & AF_SPLICING_FLAG != 0 {
332            let b = *af.get(cursor).ok_or(Error::BufferTooShort {
333                need: cursor + 1,
334                have: af.len(),
335                what: "adaptation_field splice_countdown",
336            })?;
337            Some(b as i8)
338        } else {
339            None
340        };
341
342        Ok(AdaptationField {
343            discontinuity_indicator: flags & AF_DISCONTINUITY != 0,
344            random_access_indicator: flags & AF_RANDOM_ACCESS != 0,
345            elementary_stream_priority_indicator: flags & AF_ES_PRIORITY != 0,
346            pcr,
347            opcr,
348            splice_countdown,
349        })
350    }
351}
352
353/// Reassembles PSI/SI sections from TS packets on a single PID.
354///
355/// Feed each TS packet's payload with `feed`. Complete sections are
356/// appended to an internal queue; drain them with `pop_section`.
357#[derive(Default)]
358pub struct SectionReassembler {
359    buf: bytes::BytesMut,
360    ready: std::collections::VecDeque<bytes::Bytes>,
361}
362
363impl SectionReassembler {
364    /// Feed a TS payload and whether its packet had PUSI set.
365    ///
366    /// Extracts complete SI sections into the internal queue. A single call
367    /// can produce zero, one, or **several** sections — a payload may
368    /// concatenate multiple complete sections after the `pointer_field`
369    /// (EN 300 468 §5.1.4; common on EMM PIDs). Drain with a
370    /// `while let Some(s) = r.pop_section()` loop, not a single `if let`.
371    pub fn feed(&mut self, payload: &[u8], pusi: bool) {
372        if pusi {
373            // A PUSI packet whose adaptation field consumed the whole body is
374            // malformed but constructible — drop sync rather than panic.
375            if payload.is_empty() {
376                self.buf.clear();
377                return;
378            }
379            let pointer = payload[0] as usize;
380
381            // The `pointer_field` counts bytes that belong to a section still
382            // in progress from a previous packet (ISO/IEC 13818-1 §2.4.4): the
383            // `pointer` bytes immediately after it are that section's tail and
384            // must complete it BEFORE new sections begin at `1 + pointer`.
385            // Skipping them (or clearing `buf` first) drops any section that
386            // spans into a PUSI packet — silent loss biased toward whichever
387            // section happens to straddle a packet boundary.
388            if !self.buf.is_empty() && pointer > 0 {
389                let avail = payload.len() - 1;
390                let tail_len = pointer.min(avail);
391                if self.buf.len() + tail_len > MAX_SECTION_SIZE {
392                    self.buf.clear();
393                } else {
394                    self.buf.extend_from_slice(&payload[1..1 + tail_len]);
395                    self.drain_complete_sections();
396                }
397            }
398
399            // New sections start at `1 + pointer`; anything still buffered is
400            // an incomplete (corrupt / lost-packet) section — discard it.
401            self.buf.clear();
402
403            let start = 1 + pointer;
404            if start >= payload.len() {
405                // Pointer spans to (or past) the end — no new section here.
406                return;
407            }
408            let new_data = &payload[start..];
409            if new_data.len() > MAX_SECTION_SIZE {
410                return;
411            }
412            self.buf.extend_from_slice(new_data);
413        } else {
414            if self.buf.is_empty() {
415                return;
416            }
417            // Append only the bytes the in-progress section still needs. A new
418            // section cannot start in a continuation (non-PUSI) packet
419            // (ISO/IEC 13818-1 §2.4.4), so once the section's declared length
420            // is satisfied the remaining payload bytes are 0xFF stuffing and
421            // are ignored. Counting that stuffing toward `MAX_SECTION_SIZE`
422            // previously dropped valid near-maximal sections (#148). Because
423            // the 12-bit `section_length` caps a section at `MAX_SECTION_SIZE`,
424            // `take` is inherently bounded and the buffer cannot grow without
425            // limit.
426            let take = if self.buf.len() >= SECTION_HEADER_LEN {
427                let exp = SECTION_HEADER_LEN
428                    + (((self.buf[1] & SECTION_LENGTH_HI_MASK) as usize) << 8
429                        | self.buf[2] as usize);
430                exp.saturating_sub(self.buf.len()).min(payload.len())
431            } else {
432                // Header not yet complete (split across the packet boundary) —
433                // take enough to read `section_length` on the next drain,
434                // bounded by the maximum possible section size.
435                payload.len().min(MAX_SECTION_SIZE - self.buf.len())
436            };
437            self.buf.extend_from_slice(&payload[..take]);
438        }
439
440        self.drain_complete_sections();
441    }
442
443    /// Queue every complete section the buffer currently holds.
444    ///
445    /// A single TS payload may concatenate multiple complete sections after
446    /// the `pointer_field` (legal per ETSI EN 300 468 §5.1.4 and common on
447    /// EMM PIDs, which pack several short messages into one payload). We must
448    /// keep extracting until the buffer holds only a partial (multi-packet
449    /// spanning) section, whose bytes stay buffered for the next packet to
450    /// continue (the expected length is recomputed from the section header on
451    /// each drain). A `0xFF` where a `table_id` is expected marks the rest of
452    /// the payload as stuffing.
453    fn drain_complete_sections(&mut self) {
454        loop {
455            if self.buf.len() < SECTION_HEADER_LEN {
456                // Not enough for a section header yet; keep the partial bytes
457                // and wait for the next packet to complete the header.
458                break;
459            }
460            if self.buf[0] == 0xFF {
461                // Stuffing where a table_id is expected — payload tail is fill.
462                self.buf.clear();
463                break;
464            }
465            let exp = SECTION_HEADER_LEN
466                + (((self.buf[1] & SECTION_LENGTH_HI_MASK) as usize) << 8 | self.buf[2] as usize);
467            if self.buf.len() >= exp {
468                // split_to returns the first `exp` bytes as an owned BytesMut,
469                // leaving the remainder in self.buf — cheap (shifts pointers).
470                let section = self.buf.split_to(exp).freeze();
471                self.ready.push_back(section);
472            } else {
473                // Partial section spanning into later packets.
474                break;
475            }
476        }
477    }
478
479    /// Pop one complete section. Returns `None` when the queue is empty.
480    pub fn pop_section(&mut self) -> Option<bytes::Bytes> {
481        self.ready.pop_front()
482    }
483
484    /// Number of bytes currently buffered (incomplete section).
485    pub fn len(&self) -> usize {
486        self.buf.len()
487    }
488
489    /// True if no bytes are currently buffered.
490    pub fn is_empty(&self) -> bool {
491        self.buf.is_empty()
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    /// Helper: construct a minimal 188-byte TS packet buffer with given header flags and payload.
500    fn make_packet(b1: u8, b2: u8, b3: u8, payload_data: &[u8]) -> [u8; TS_PACKET_SIZE] {
501        let mut pkt = [0u8; TS_PACKET_SIZE];
502        pkt[0] = TS_SYNC_BYTE;
503        pkt[1] = b1;
504        pkt[2] = b2;
505        pkt[3] = b3;
506        let payload_start = 4;
507        let end = (payload_start + payload_data.len()).min(TS_PACKET_SIZE);
508        let len = (end - payload_start).min(payload_data.len());
509        pkt[payload_start..payload_start + len].copy_from_slice(&payload_data[..len]);
510        pkt
511    }
512
513    #[test]
514    fn parse_rejects_non_0x47_sync_byte() {
515        let mut pkt = [0u8; TS_PACKET_SIZE];
516        pkt[0] = 0x46; // wrong sync byte
517        let err = TsPacket::parse(&pkt).unwrap_err();
518        match err {
519            Error::InvalidSyncByte { found } => assert_eq!(found, 0x46),
520            other => panic!("expected InvalidSyncByte, got {other:?}"),
521        }
522    }
523
524    #[test]
525    fn ts_header_round_trip() {
526        // struct → serialize → parse must reproduce the header (the project's
527        // symmetric Parse/Serialize invariant) across flag/field combinations.
528        let cases = [
529            TsHeader {
530                tei: false,
531                pusi: true,
532                pid: 0x0000,
533                scrambling: 0,
534                has_adaptation: false,
535                has_payload: true,
536                continuity_counter: 0,
537            },
538            TsHeader {
539                tei: true,
540                pusi: false,
541                pid: 0x1FFF,
542                scrambling: 0b11,
543                has_adaptation: true,
544                has_payload: true,
545                continuity_counter: 0x0F,
546            },
547            TsHeader {
548                tei: false,
549                pusi: false,
550                pid: 0x0100,
551                scrambling: 0b10,
552                has_adaptation: true,
553                has_payload: false,
554                continuity_counter: 7,
555            },
556        ];
557        for h in cases {
558            let mut buf = [0u8; 4];
559            assert_eq!(h.serialize_into(&mut buf).unwrap(), 4);
560            assert_eq!(TsHeader::parse(&buf).unwrap(), h, "round-trip mismatch");
561        }
562    }
563
564    #[test]
565    fn parse_extracts_pid_and_continuity_counter() {
566        // PID = 0x1234 → upper 5 bits = 0x12, lower 8 bits = 0x34
567        // CC = 5 → 0x05
568        // b1 bits: [tei:1][pusi:1][pid_hi:5]
569        // pid_hi = 0x12 = 0b00100_10 → bits 5..=1 = 0x12
570        // b1 = 0b00_010010 = 0x12 (no tei, no pusi)
571        let pkt = make_packet(0x12, 0x34, 0x05, &[]);
572        let pkt = TsPacket::parse(&pkt).unwrap();
573        assert_eq!(pkt.header.pid, 0x1234);
574        assert_eq!(pkt.header.continuity_counter, 5);
575    }
576
577    #[test]
578    fn payload_unit_start_indicator_flag_extracted() {
579        // b1 = 0x40 → pusi = true (bit 6 set, no tei, no pid bits)
580        let pkt1 = make_packet(0x40, 0x00, 0x00, &[]);
581        let pkt1 = TsPacket::parse(&pkt1).unwrap();
582        assert!(pkt1.header.pusi);
583
584        // b1 = 0x00 → pusi = false
585        let pkt2 = make_packet(0x00, 0x00, 0x00, &[]);
586        let pkt2 = TsPacket::parse(&pkt2).unwrap();
587        assert!(!pkt2.header.pusi);
588    }
589
590    /// Build a PSI-carrying TS payload: `pointer_field` byte followed by
591    /// (optionally) some tail of a previous section, followed by a fresh
592    /// section. `pointer_field` is the number of bytes of the previous
593    /// section that precede the new one (per ETSI EN 300 468 §5.1.4).
594    fn build_pusi_payload(pointer_field: u8, previous_tail: &[u8], section: &[u8]) -> Vec<u8> {
595        assert_eq!(pointer_field as usize, previous_tail.len());
596        let mut v = Vec::with_capacity(1 + previous_tail.len() + section.len());
597        v.push(pointer_field);
598        v.extend_from_slice(previous_tail);
599        v.extend_from_slice(section);
600        v
601    }
602
603    /// Build a long-form section with the given table_id and body bytes.
604    /// Returns the full section including its 3-byte + 5-byte header and a
605    /// placeholder CRC — for reassembler testing we don't validate the CRC.
606    fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
607        let section_length = body_after_length.len() as u16;
608        let mut v = Vec::with_capacity(3 + section_length as usize);
609        v.push(table_id);
610        // ssi=1, pi=0, reserved=11, length hi 4 bits
611        v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
612        v.push((section_length & 0xFF) as u8);
613        v.extend_from_slice(body_after_length);
614        v
615    }
616
617    // The reassembler tests below feed raw payload slices directly to
618    // `feed()` rather than wrapping them in 188-byte TS packets. This avoids
619    // the TS stuffing-byte tail (0xFF padding) bleeding into the reassembled
620    // section and keeps the assertions exact.
621
622    #[test]
623    fn reassembler_accumulates_multi_packet_section() {
624        // 200-byte section that spans two payload slices.
625        let body = vec![0xAAu8; 197];
626        let section = build_section(0x02, &body);
627        assert_eq!(section.len(), 200);
628
629        let first_chunk = 100;
630        let payload1 = build_pusi_payload(0, &[], &section[..first_chunk]);
631        let payload2 = section[first_chunk..].to_vec();
632
633        let mut reasm = SectionReassembler::default();
634        reasm.feed(&payload1, true);
635        reasm.feed(&payload2, false);
636
637        let out = reasm.pop_section().expect("section should be ready");
638        assert_eq!(out.len(), 200);
639        assert_eq!(out.as_ref(), &section[..]);
640    }
641
642    #[test]
643    fn reassembler_yields_complete_section_once_length_satisfied() {
644        // 1-byte-body section: table_id=0x42, section_length=1, total=4 bytes.
645        let section = build_section(0x42, &[0xAA]);
646        assert_eq!(section.len(), 4);
647        let payload = build_pusi_payload(0, &[], &section);
648
649        let mut reasm = SectionReassembler::default();
650        reasm.feed(&payload, true);
651
652        let out = reasm
653            .pop_section()
654            .expect("single-packet section should pop");
655        assert_eq!(out.as_ref(), &section[..]);
656    }
657
658    #[test]
659    fn reassembler_extracts_all_concatenated_sections_in_one_payload() {
660        // Issue #29: a single PUSI payload packing three complete short
661        // sections after the pointer_field. All three must be queued — the
662        // old `feed` stopped after the first and the rest were silently lost
663        // (the CAS/EMM data-loss bug: SHARED EMMs landing as the 2nd+ section).
664        let s1 = build_section(0x42, &[0x11, 0x22]); // 5 bytes
665        let s2 = build_section(0x46, &[0x33]); // 4 bytes
666        let s3 = build_section(0x4A, &[0x44, 0x55, 0x66]); // 6 bytes
667
668        let mut concat = Vec::new();
669        concat.extend_from_slice(&s1);
670        concat.extend_from_slice(&s2);
671        concat.extend_from_slice(&s3);
672        let payload = build_pusi_payload(0, &[], &concat);
673
674        let mut reasm = SectionReassembler::default();
675        reasm.feed(&payload, true);
676
677        // Consumers must drain with a loop, not a single `if let`.
678        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
679        assert_eq!(got.len(), 3, "all three concatenated sections must pop");
680        assert_eq!(got[0].as_ref(), &s1[..]);
681        assert_eq!(got[1].as_ref(), &s2[..]);
682        assert_eq!(got[2].as_ref(), &s3[..]);
683    }
684
685    #[test]
686    fn reassembler_stops_at_stuffing_after_concatenated_sections() {
687        // Two sections then 0xFF stuffing fill — the stuffing must not be
688        // mistaken for a section header (0xFF table_id) nor leak into a
689        // section; both real sections still pop.
690        let s1 = build_section(0x42, &[0xAA]); // 4 bytes
691        let s2 = build_section(0x46, &[0xBB, 0xCC]); // 5 bytes
692        let mut concat = Vec::new();
693        concat.extend_from_slice(&s1);
694        concat.extend_from_slice(&s2);
695        concat.extend_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // stuffing tail
696        let payload = build_pusi_payload(0, &[], &concat);
697
698        let mut reasm = SectionReassembler::default();
699        reasm.feed(&payload, true);
700
701        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
702        assert_eq!(got.len(), 2);
703        assert_eq!(got[0].as_ref(), &s1[..]);
704        assert_eq!(got[1].as_ref(), &s2[..]);
705        assert!(
706            reasm.is_empty(),
707            "stuffing tail must be discarded, not buffered"
708        );
709    }
710
711    #[test]
712    fn reassembler_concatenated_then_spanning_tail() {
713        // One complete section followed by the head of a second that spans
714        // into a continuation packet: first pops immediately, second pops
715        // once the continuation arrives.
716        let s1 = build_section(0x42, &[0x01, 0x02]); // 5 bytes
717        let s2 = build_section(0x46, &[0x09u8; 60]); // 63 bytes
718        let split = 30;
719
720        let mut head = Vec::new();
721        head.extend_from_slice(&s1);
722        head.extend_from_slice(&s2[..split]);
723        let payload1 = build_pusi_payload(0, &[], &head);
724        let payload2 = s2[split..].to_vec();
725
726        let mut reasm = SectionReassembler::default();
727        reasm.feed(&payload1, true);
728        let first = reasm.pop_section().expect("first section pops at once");
729        assert_eq!(first.as_ref(), &s1[..]);
730        assert!(reasm.pop_section().is_none(), "second is still partial");
731
732        reasm.feed(&payload2, false);
733        let second = reasm.pop_section().expect("second pops after continuation");
734        assert_eq!(second.as_ref(), &s2[..]);
735    }
736
737    #[test]
738    fn reassembler_completes_section_spanning_into_pusi_packet() {
739        // Issue #29 (second case): a section starts late in packet A and spills
740        // into packet B, but B is itself PUSI=1 because new sections begin in it.
741        // B's pointer_field = the count of leading tail bytes belonging to the
742        // section from A. Those bytes MUST complete A's section before new
743        // sections start. 3.1.1 cleared buf + skipped them → the spanning
744        // section was lost (the SHARED EMM the smartcard needed).
745        let spanning = build_section(0x42, &[0x5Au8; 62]); // 65 bytes
746        let head = 41;
747        let tail = &spanning[head..]; // 24 bytes — lands in packet B
748        assert_eq!(tail.len(), 24);
749
750        // New section that begins in packet B after the spanning tail.
751        let next = build_section(0x46, &[0x77, 0x88]); // 5 bytes
752
753        // Packet A (PUSI): pointer 0, then the 41-byte head (incomplete).
754        let payload_a = build_pusi_payload(0, &[], &spanning[..head]);
755        // Packet B (PUSI): pointer = 24 (tail of A's section), then `next`.
756        let payload_b = build_pusi_payload(24, tail, &next);
757
758        let mut reasm = SectionReassembler::default();
759        reasm.feed(&payload_a, true);
760        assert!(reasm.pop_section().is_none(), "head alone is incomplete");
761
762        reasm.feed(&payload_b, true);
763        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
764        assert_eq!(got.len(), 2, "spanning section + new section must both pop");
765        assert_eq!(
766            got[0].as_ref(),
767            &spanning[..],
768            "spanning section completed from B's pointer tail"
769        );
770        assert_eq!(got[1].as_ref(), &next[..]);
771    }
772
773    #[test]
774    fn reassembler_pusi_pointer_spans_whole_payload() {
775        // A section spans into a PUSI packet whose pointer covers the ENTIRE
776        // remaining payload (no new section starts here) — the tail must be
777        // appended and the section completed once the count is satisfied.
778        let spanning = build_section(0x42, &[0x33u8; 40]); // 43 bytes
779        let head = 20;
780        let payload_a = build_pusi_payload(0, &[], &spanning[..head]);
781        let tail = &spanning[head..]; // 23 bytes — exactly the rest of payload B
782
783        let mut reasm = SectionReassembler::default();
784        reasm.feed(&payload_a, true);
785        // Packet B: pointer = 23 = all remaining bytes; no new section follows.
786        reasm.feed(&build_pusi_payload_pointer_spanning_all(tail), true);
787
788        let out = reasm.pop_section().expect("spanning section completes");
789        assert_eq!(out.as_ref(), &spanning[..]);
790        assert!(reasm.pop_section().is_none());
791    }
792
793    /// Build a PUSI payload whose `pointer_field` equals the whole tail (so the
794    /// pointer spans to the end of the payload and no new section starts).
795    fn build_pusi_payload_pointer_spanning_all(tail: &[u8]) -> Vec<u8> {
796        let mut v = Vec::with_capacity(1 + tail.len());
797        v.push(tail.len() as u8);
798        v.extend_from_slice(tail);
799        v
800    }
801
802    #[test]
803    fn reassembler_completes_max_length_section_and_stays_usable() {
804        // A section declaring the maximum `section_length` (0xFFF → 4098 bytes
805        // total). The 12-bit length structurally caps the buffer at
806        // MAX_SECTION_SIZE, so there is no unbounded growth — and (unlike the
807        // pre-#148 guard, which discarded once buf+payload crossed the cap) a
808        // valid max-length section completes at its declared length.
809        let mut section = Vec::with_capacity(MAX_SECTION_SIZE);
810        section.push(0x00); // table_id
811        section.push(0xB0 | ((4095u16 >> 8) as u8 & 0x0F));
812        section.push(0xFF); // section_length = 0xFFF
813        section.resize(MAX_SECTION_SIZE, 0u8);
814        assert_eq!(section.len(), MAX_SECTION_SIZE);
815
816        let mut reasm = SectionReassembler::default();
817        let mut first = vec![0x00u8]; // pointer_field 0
818        first.extend_from_slice(&section[..183]);
819        reasm.feed(&first, true);
820        assert!(
821            reasm.pop_section().is_none(),
822            "incomplete until the declared length arrives"
823        );
824
825        for chunk in section[183..].chunks(184) {
826            reasm.feed(chunk, false);
827        }
828        let out = reasm
829            .pop_section()
830            .expect("max-length section completes at its declared length");
831        assert_eq!(out.len(), MAX_SECTION_SIZE);
832        assert_eq!(out.as_ref(), &section[..]);
833        assert!(reasm.is_empty());
834
835        // Extra trailing continuation data after completion is ignored (the
836        // buffer is empty, so a non-PUSI payload is dropped) — no panic, no
837        // spurious section.
838        reasm.feed(&[0u8; 184], false);
839        assert!(reasm.pop_section().is_none());
840
841        // State must be resettable — a fresh valid PUSI section works.
842        let valid_section = build_section(0x00, &[0xAA]);
843        let payload2 = build_pusi_payload(0, &[], &valid_section);
844        reasm.feed(&payload2, true);
845        let out = reasm
846            .pop_section()
847            .expect("fresh section should pop after reset");
848        assert_eq!(out.as_ref(), &valid_section[..]);
849    }
850
851    #[test]
852    fn reassembler_handles_pusi_with_nonzero_pointer_field() {
853        // payload = pointer_field=3, 3 bytes of prior-section tail, then new section.
854        let prior_tail = vec![0x11, 0x22, 0x33];
855        let new_section = build_section(0x02, &[0xBB]);
856        assert_eq!(new_section.len(), 4);
857        let payload = build_pusi_payload(3, &prior_tail, &new_section);
858
859        let mut reasm = SectionReassembler::default();
860        reasm.feed(&payload, true);
861
862        let out = reasm
863            .pop_section()
864            .expect("section after pointer_field skip should pop");
865        assert_eq!(out.as_ref(), &new_section[..]);
866    }
867
868    #[test]
869    fn reassembler_ignores_continuation_before_pusi() {
870        // Feed a non-PUSI payload first (no prior PUSI seen).
871        // SectionReassembler should discard it and stay empty.
872        let pkt = make_packet(0x00, 0x00, PAYLOAD_FLAG, &[0xAA, 0xBB, 0xCC]);
873
874        let mut reasm = SectionReassembler::default();
875        reasm.feed(&pkt[4..], false); // no PUSI
876
877        assert!(
878            reasm.pop_section().is_none(),
879            "no section should appear without prior PUSI"
880        );
881        assert!(
882            reasm.pop_section().is_none(),
883            "second pop should also be none"
884        );
885    }
886
887    /// A PUSI packet with an empty payload (adaptation field ate the body)
888    /// is malformed but must not panic — it drops sync.
889    #[test]
890    fn reassembler_empty_pusi_payload_does_not_panic() {
891        let mut reasm = SectionReassembler::default();
892        reasm.feed(&[], true);
893        assert!(reasm.pop_section().is_none());
894        // Recovers on the next clean PUSI.
895        let payload = vec![0x00u8, 0x72, 0x70, 0x01, 0x00];
896        reasm.feed(&payload, true);
897        assert!(reasm.pop_section().is_some());
898    }
899
900    /// A maximal short-form private section (section_length 0xFFF, total
901    /// 4098 bytes) reassembles — the ceiling is 12-bit length + 3-byte
902    /// header, not 4096.
903    #[test]
904    fn reassembler_accepts_maximal_private_section() {
905        let mut section = vec![0x80u8, 0x7F, 0xFF]; // user-private tid, SSI=0, len 0xFFF
906        section.resize(3 + 0xFFF, 0xAB);
907
908        let mut reasm = SectionReassembler::default();
909        // First TS payload: pointer_field 0 then the section start.
910        let mut first = vec![0x00];
911        first.extend_from_slice(&section[..183]);
912        reasm.feed(&first, true);
913        for chunk in section[183..].chunks(184) {
914            reasm.feed(chunk, false);
915        }
916        let out = reasm.pop_section().expect("4098-byte section should pop");
917        assert_eq!(out.len(), 4098);
918        assert_eq!(out.as_ref(), &section[..]);
919    }
920
921    /// Issue #148: a near-maximal section whose final continuation packet
922    /// carries the section tail followed by `0xFF` **stuffing** must still
923    /// complete. The old overflow guard counted the trailing stuffing toward
924    /// `MAX_SECTION_SIZE` and dropped the section.
925    #[test]
926    fn reassembler_completes_large_section_with_trailing_stuffing() {
927        let body = vec![0x5Au8; 4096 - 3];
928        let section = build_section(0x50, &body); // 4096 bytes total
929        assert_eq!(section.len(), 4096);
930
931        let mut reasm = SectionReassembler::default();
932        // First payload (PUSI): pointer_field 0 + first 183 section bytes.
933        let mut first = vec![0x00u8];
934        first.extend_from_slice(&section[..183]);
935        reasm.feed(&first, true);
936
937        // Continuation payloads of a full 184 bytes each; the final one is
938        // padded with 0xFF stuffing to a complete 184-byte payload, exactly as
939        // a real TS packet would carry it.
940        let mut pos = 183usize;
941        while pos < section.len() {
942            let take = (section.len() - pos).min(184);
943            let mut payload = section[pos..pos + take].to_vec();
944            if take < 184 {
945                payload.resize(184, 0xFF); // stuffing
946            }
947            reasm.feed(&payload, false);
948            pos += take;
949        }
950
951        let out = reasm
952            .pop_section()
953            .expect("4096-byte section must complete despite trailing stuffing (#148)");
954        assert_eq!(out.len(), 4096);
955        assert_eq!(out.as_ref(), &section[..]);
956        assert!(reasm.is_empty(), "stuffing tail must be discarded");
957    }
958
959    // ── adaptation field / PCR (ISO/IEC 13818-1 §2.4.3.4–2.4.3.5) ──
960
961    #[test]
962    fn pcr_as_27mhz_known_value() {
963        assert_eq!(
964            Pcr {
965                base: 10_000,
966                extension: 0
967            }
968            .as_27mhz(),
969            3_000_000
970        );
971        // base*300 + extension: 1*300 + 100 = 400.
972        assert_eq!(
973            Pcr {
974                base: 1,
975                extension: 100
976            }
977            .as_27mhz(),
978            400
979        );
980    }
981
982    #[test]
983    fn pcr_decode_from_bytes() {
984        // 6-byte PCR encoding base=10000, extension=0 (reserved bits set).
985        let af = [0x10u8, 0x00, 0x00, 0x13, 0x88, 0x7E, 0x00];
986        let pcr = Pcr::parse(&af, 1).expect("6 bytes present");
987        assert_eq!(
988            pcr,
989            Pcr {
990                base: 10_000,
991                extension: 0
992            }
993        );
994        assert_eq!(pcr.as_27mhz(), 3_000_000);
995    }
996
997    #[test]
998    fn adaptation_field_flags_and_pcr() {
999        let mut raw = [0xAAu8; TS_PACKET_SIZE];
1000        raw[0] = TS_SYNC_BYTE;
1001        raw[1] = 0x01; // pid 0x0100
1002        raw[2] = 0x00;
1003        raw[3] = ADAPTATION_FLAG | PAYLOAD_FLAG;
1004        raw[4] = 7; // adaptation_field_length: 1 flags + 6 PCR
1005        raw[5] = AF_DISCONTINUITY | AF_PCR_FLAG;
1006        raw[6..12].copy_from_slice(&[0x00, 0x00, 0x13, 0x88, 0x7E, 0x00]);
1007        // raw[12..] stays 0xAA = payload.
1008
1009        let pkt = TsPacket::parse(&raw).expect("valid packet");
1010        let af = pkt
1011            .adaptation_field()
1012            .expect("has adaptation field")
1013            .expect("adaptation field parses");
1014        assert!(af.discontinuity_indicator);
1015        assert!(!af.random_access_indicator);
1016        assert_eq!(
1017            af.pcr,
1018            Some(Pcr {
1019                base: 10_000,
1020                extension: 0
1021            })
1022        );
1023        assert_eq!(af.pcr.unwrap().as_27mhz(), 3_000_000);
1024        assert!(af.opcr.is_none());
1025        assert!(af.splice_countdown.is_none());
1026        // Payload begins right after the adaptation field (cursor 4+1+7=12).
1027        let payload = pkt.payload.expect("payload present");
1028        assert_eq!(payload.len(), TS_PACKET_SIZE - 12);
1029        assert_eq!(payload[0], 0xAA);
1030    }
1031
1032    #[test]
1033    fn no_adaptation_returns_none() {
1034        let mut raw = [0x00u8; TS_PACKET_SIZE];
1035        raw[0] = TS_SYNC_BYTE;
1036        raw[1] = 0x01;
1037        raw[3] = PAYLOAD_FLAG; // payload only
1038        let pkt = TsPacket::parse(&raw).expect("valid");
1039        assert!(pkt.adaptation_field().is_none());
1040        assert!(pkt.adaptation.is_none());
1041    }
1042
1043    #[test]
1044    fn adaptation_field_splice_countdown_negative() {
1045        let mut raw = [0xAAu8; TS_PACKET_SIZE];
1046        raw[0] = TS_SYNC_BYTE;
1047        raw[1] = 0x01;
1048        raw[2] = 0x00;
1049        raw[3] = ADAPTATION_FLAG | PAYLOAD_FLAG;
1050        raw[4] = 2; // 1 flags + 1 splice_countdown
1051        raw[5] = AF_SPLICING_FLAG;
1052        raw[6] = 0xFB; // -5 as i8
1053        let pkt = TsPacket::parse(&raw).expect("valid");
1054        let af = pkt.adaptation_field().unwrap().unwrap();
1055        assert_eq!(af.splice_countdown, Some(-5));
1056        assert!(af.pcr.is_none());
1057    }
1058}