Skip to main content

dvb_si/
ts.rs

1//! MPEG-TS packet parser + section reassembler. Feature-gated under `ts`.
2
3use crate::error::{Error, Result};
4
5/// Size of one MPEG-TS packet (ETSI EN 300 468 §3.2, ISO/IEC 13818-1 §2.4.3.2).
6pub const TS_PACKET_SIZE: usize = 188;
7/// Sync byte that every TS packet starts with (ISO/IEC 13818-1 §2.4.3.2).
8pub const TS_SYNC_BYTE: u8 = 0x47;
9/// Upper bound on a single section: `section_length` is 12 bits (max 4095)
10/// plus the 3-byte header = 4098. (Long-form SI caps `section_length` at
11/// 4093 → total 4096, but maximal short-form private sections may reach
12/// 4098; the reassembler accepts the absolute ceiling.)
13const MAX_SECTION_SIZE: usize = 4098;
14
15/// ISO/IEC 13818-1 §2.4.3.3: transport header byte 1 bit 7 = tei (Transport Error Indicator).
16const TEI_MASK: u8 = 0x80;
17/// ISO/IEC 13818-1 §2.4.3.3: byte 1 bit 6 = pusi (Payload Unit Start Indicator).
18const PUSI_MASK: u8 = 0x40;
19/// ISO/IEC 13818-1 §2.4.3.3: byte 1 bits `[4:0]` = 13-bit PID (upper 5 bits).
20pub const PID_MASK_HI: u8 = 0x1F;
21/// ISO/IEC 13818-1 §2.4.3.3: byte 3 bits `[7:6]` = 2-bit scrambling control.
22pub const SCRAMBLING_MASK: u8 = 0xC0;
23/// ISO/IEC 13818-1 §2.4.3.3: byte 3 bit 5 = adaptation_field_control (bit 5 = 1 means adaptation present).
24pub const ADAPTATION_FLAG: u8 = 0x20;
25/// ISO/IEC 13818-1 §2.4.3.3: byte 3 bit 4 = adaptation_field_control (bit 4 = 1 means payload present).
26pub const PAYLOAD_FLAG: u8 = 0x10;
27/// ISO/IEC 13818-1 §2.4.3.3: byte 3 bits `[3:0]` = 4-bit continuity_counter.
28pub const CC_MASK: u8 = 0x0F;
29
30/// Parsed TS header — the 4-byte transport header fields.
31#[derive(Clone, Debug, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize))]
33pub struct TsHeader {
34    /// Transport Error Indicator — set by the demodulator when an
35    /// uncorrectable error is present in the packet.
36    pub tei: bool,
37    /// Payload Unit Start Indicator — first byte of the payload is a new
38    /// PES packet or PSI section header when set.
39    pub pusi: bool,
40    /// 13-bit Packet Identifier.
41    pub pid: u16,
42    /// 2-bit transport_scrambling_control (0 = not scrambled).
43    pub scrambling: u8,
44    /// Adaptation field present flag (adaptation_field_control bit 1).
45    pub has_adaptation: bool,
46    /// Payload present flag (adaptation_field_control bit 0).
47    pub has_payload: bool,
48    /// 4-bit continuity_counter (wraps 0..=15 per PID).
49    pub continuity_counter: u8,
50}
51
52/// Borrowed view into one 188-byte TS packet.
53///
54/// Serde: Serialize-only (re-parse from wire bytes to reconstruct). `raw` is
55/// excluded from the serialized form because it is redundant once the header
56/// has been parsed.
57#[derive(Clone, Debug)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize))]
59pub struct TsPacket<'a> {
60    /// Parsed header fields.
61    pub header: TsHeader,
62    /// Slice into the packet's payload, or `None` when `has_payload == false`
63    /// or the adaptation field consumed the whole packet body.
64    pub payload: Option<&'a [u8]>,
65    /// The adaptation-field bytes (after the length byte). Internal capture
66    /// feeding [`adaptation_field`](Self::adaptation_field); not public.
67    #[cfg_attr(feature = "serde", serde(skip))]
68    adaptation: Option<&'a [u8]>,
69    /// The raw 188 bytes of the packet — kept for cheap forwarding.
70    #[cfg_attr(feature = "serde", serde(skip))]
71    pub raw: &'a [u8; TS_PACKET_SIZE],
72}
73
74impl TsHeader {
75    /// Parse a 4-byte TS transport header.
76    pub fn parse(raw4: &[u8]) -> Result<Self> {
77        if raw4.len() < 4 {
78            return Err(Error::BufferTooShort {
79                need: 4,
80                have: raw4.len(),
81                what: "TsHeader",
82            });
83        }
84        let b1 = raw4[1];
85        let b2 = raw4[2];
86        let b3 = raw4[3];
87
88        let tei = (b1 & TEI_MASK) != 0;
89        let pusi = (b1 & PUSI_MASK) != 0;
90        let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (b2 as u16);
91        let scrambling = (b3 & SCRAMBLING_MASK) >> 6;
92        let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
93        let has_payload = (b3 & PAYLOAD_FLAG) != 0;
94        let continuity_counter = b3 & CC_MASK;
95
96        Ok(Self {
97            tei,
98            pusi,
99            pid,
100            scrambling,
101            has_adaptation,
102            has_payload,
103            continuity_counter,
104        })
105    }
106
107    /// Number of bytes written by [`serialize_into`](Self::serialize_into).
108    pub const fn serialized_len() -> usize {
109        4
110    }
111
112    /// Serialize this header into the first 4 bytes of `buf`.
113    pub fn serialize_into(&self, buf: &mut [u8]) -> Result<usize> {
114        if buf.len() < 4 {
115            return Err(Error::OutputBufferTooSmall {
116                need: 4,
117                have: buf.len(),
118            });
119        }
120        buf[0] = TS_SYNC_BYTE;
121        buf[1] = 0;
122        if self.tei {
123            buf[1] |= TEI_MASK;
124        }
125        if self.pusi {
126            buf[1] |= PUSI_MASK;
127        }
128        buf[1] |= ((self.pid >> 8) as u8) & PID_MASK_HI;
129        buf[2] = (self.pid & 0xFF) as u8;
130        buf[3] = (self.scrambling << 6) & SCRAMBLING_MASK;
131        if self.has_adaptation {
132            buf[3] |= ADAPTATION_FLAG;
133        }
134        if self.has_payload {
135            buf[3] |= PAYLOAD_FLAG;
136        }
137        buf[3] |= self.continuity_counter & CC_MASK;
138        Ok(4)
139    }
140}
141
142impl<'a> dvb_common::Parse<'a> for TsHeader {
143    type Error = Error;
144
145    fn parse(bytes: &'a [u8]) -> Result<Self> {
146        TsHeader::parse(bytes)
147    }
148}
149
150impl dvb_common::Serialize for TsHeader {
151    type Error = Error;
152
153    fn serialized_len(&self) -> usize {
154        TsHeader::serialized_len()
155    }
156
157    fn serialize_into(&self, buf: &mut [u8]) -> Result<usize> {
158        TsHeader::serialize_into(self, buf)
159    }
160}
161
162impl<'a> TsPacket<'a> {
163    /// Parse a single 188-byte TS packet from a buffer.
164    ///
165    /// Returns `Err(Error::InvalidSyncByte)` if the first byte is not `0x47`,
166    /// `Err(Error::BufferTooShort)` if fewer than 188 bytes, or `Ok` with
167    /// the parsed packet otherwise.
168    pub fn parse(buf: &'a [u8]) -> Result<Self> {
169        if buf.len() < TS_PACKET_SIZE {
170            return Err(Error::BufferTooShort {
171                need: TS_PACKET_SIZE,
172                have: buf.len(),
173                what: "TsPacket",
174            });
175        }
176        if buf[0] != TS_SYNC_BYTE {
177            return Err(Error::InvalidSyncByte { found: buf[0] });
178        }
179
180        let raw: &[u8; TS_PACKET_SIZE] =
181            buf[..TS_PACKET_SIZE]
182                .try_into()
183                .map_err(|_| Error::BufferTooShort {
184                    need: TS_PACKET_SIZE,
185                    have: buf.len(),
186                    what: "TsPacket::parse (array conversion)",
187                })?;
188
189        let header = TsHeader::parse(&raw[..4])?;
190
191        let mut cursor = 4usize;
192        let mut payload = None;
193        let mut adaptation = None;
194
195        // Capture the adaptation field if present, then skip it (the section
196        // path does not need it; decode lazily via `adaptation_field`).
197        if header.has_adaptation && cursor < TS_PACKET_SIZE {
198            let af_len = raw[cursor] as usize;
199            let af_start = cursor + 1;
200            if af_len > 0 && af_start < TS_PACKET_SIZE {
201                let af_end = (af_start + af_len).min(TS_PACKET_SIZE);
202                adaptation = Some(&raw[af_start..af_end]);
203            }
204            cursor += 1 + af_len;
205        }
206
207        if header.has_payload && cursor < TS_PACKET_SIZE {
208            payload = Some(&raw[cursor..]);
209        }
210
211        Ok(TsPacket {
212            header,
213            payload,
214            adaptation,
215            raw,
216        })
217    }
218
219    /// Decode the adaptation field, if present.
220    ///
221    /// Returns `None` when the packet carries no adaptation field, and
222    /// `Some(Err(..))` when a present field is truncated. Layout per
223    /// ISO/IEC 13818-1:2007 §2.4.3.4 (`docs/iso_13818_1_systems.md`).
224    pub fn adaptation_field(&self) -> Option<crate::Result<AdaptationField>> {
225        self.adaptation.map(AdaptationField::parse)
226    }
227}
228
229// Adaptation-field flag bits, byte 0 (ISO/IEC 13818-1:2007 §2.4.3.4).
230const AF_DISCONTINUITY: u8 = 0x80;
231const AF_RANDOM_ACCESS: u8 = 0x40;
232const AF_ES_PRIORITY: u8 = 0x20;
233const AF_PCR_FLAG: u8 = 0x10;
234const AF_OPCR_FLAG: u8 = 0x08;
235const AF_SPLICING_FLAG: u8 = 0x04;
236/// Encoded PCR / OPCR field width: 33-bit base + 6 reserved + 9-bit extension.
237const PCR_FIELD_LEN: usize = 6;
238
239/// Program Clock Reference (ISO/IEC 13818-1:2007 §2.4.3.5): a 33-bit base on a
240/// 90 kHz clock plus a 9-bit extension on a 27 MHz clock.
241#[derive(Clone, Copy, Debug, PartialEq, Eq)]
242#[cfg_attr(feature = "serde", derive(serde::Serialize))]
243pub struct Pcr {
244    /// 33-bit base (90 kHz units).
245    pub base: u64,
246    /// 9-bit extension (27 MHz units).
247    pub extension: u16,
248}
249
250impl Pcr {
251    /// Full PCR value on the 27 MHz clock: `base * 300 + extension`.
252    #[must_use]
253    pub fn as_27mhz(self) -> u64 {
254        self.base * 300 + self.extension as u64
255    }
256
257    /// Decode the 6-byte PCR/OPCR field starting at `at` within `af`.
258    fn parse(af: &[u8], at: usize) -> Result<Self> {
259        let b: &[u8; PCR_FIELD_LEN] = af
260            .get(at..at + PCR_FIELD_LEN)
261            .and_then(|s| s.try_into().ok())
262            .ok_or(Error::BufferTooShort {
263                need: at + PCR_FIELD_LEN,
264                have: af.len(),
265                what: "adaptation_field PCR",
266            })?;
267        let base = ((b[0] as u64) << 25)
268            | ((b[1] as u64) << 17)
269            | ((b[2] as u64) << 9)
270            | ((b[3] as u64) << 1)
271            | ((b[4] as u64) >> 7);
272        let extension = (((b[4] & 0x01) as u16) << 8) | (b[5] as u16);
273        Ok(Self { base, extension })
274    }
275}
276
277/// Decoded adaptation field — flags plus PCR/OPCR and splice point per
278/// ISO/IEC 13818-1:2007 §2.4.3.4. Transport-private data and the
279/// adaptation-field extension are not yet surfaced; more fields may be
280/// added in future releases.
281#[non_exhaustive]
282#[derive(Clone, Copy, Debug, PartialEq, Eq)]
283#[cfg_attr(feature = "serde", derive(serde::Serialize))]
284pub struct AdaptationField {
285    /// A timing/continuity discontinuity starts at this packet.
286    pub discontinuity_indicator: bool,
287    /// This packet is a random-access point.
288    pub random_access_indicator: bool,
289    /// Elementary-stream priority hint.
290    pub elementary_stream_priority_indicator: bool,
291    /// Program Clock Reference, present iff the PCR flag is set.
292    pub pcr: Option<Pcr>,
293    /// Original PCR, present iff the OPCR flag is set.
294    pub opcr: Option<Pcr>,
295    /// Splice countdown (packets until the splice point), iff the flag is set.
296    pub splice_countdown: Option<i8>,
297}
298
299impl AdaptationField {
300    /// Parse the adaptation-field bytes (those following the length byte).
301    fn parse(af: &[u8]) -> Result<Self> {
302        let flags = *af.first().ok_or(Error::BufferTooShort {
303            need: 1,
304            have: 0,
305            what: "adaptation_field flags",
306        })?;
307        let mut cursor = 1usize;
308
309        let pcr = if flags & AF_PCR_FLAG != 0 {
310            let p = Pcr::parse(af, cursor)?;
311            cursor += PCR_FIELD_LEN;
312            Some(p)
313        } else {
314            None
315        };
316        let opcr = if flags & AF_OPCR_FLAG != 0 {
317            let p = Pcr::parse(af, cursor)?;
318            cursor += PCR_FIELD_LEN;
319            Some(p)
320        } else {
321            None
322        };
323        let splice_countdown = if flags & AF_SPLICING_FLAG != 0 {
324            let b = *af.get(cursor).ok_or(Error::BufferTooShort {
325                need: cursor + 1,
326                have: af.len(),
327                what: "adaptation_field splice_countdown",
328            })?;
329            Some(b as i8)
330        } else {
331            None
332        };
333
334        Ok(AdaptationField {
335            discontinuity_indicator: flags & AF_DISCONTINUITY != 0,
336            random_access_indicator: flags & AF_RANDOM_ACCESS != 0,
337            elementary_stream_priority_indicator: flags & AF_ES_PRIORITY != 0,
338            pcr,
339            opcr,
340            splice_countdown,
341        })
342    }
343}
344
345/// Reassembles PSI/SI sections from TS packets on a single PID.
346///
347/// Feed each TS packet's payload with `feed`. Complete sections are
348/// appended to an internal queue; drain them with `pop_section`.
349#[derive(Default)]
350pub struct SectionReassembler {
351    buf: bytes::BytesMut,
352    ready: std::collections::VecDeque<bytes::Bytes>,
353}
354
355impl SectionReassembler {
356    /// Feed a TS payload and whether its packet had PUSI set.
357    ///
358    /// Extracts complete SI sections into the internal queue. A single call
359    /// can produce zero, one, or **several** sections — a payload may
360    /// concatenate multiple complete sections after the `pointer_field`
361    /// (EN 300 468 §5.1.4; common on EMM PIDs). Drain with a
362    /// `while let Some(s) = r.pop_section()` loop, not a single `if let`.
363    pub fn feed(&mut self, payload: &[u8], pusi: bool) {
364        if pusi {
365            // A PUSI packet whose adaptation field consumed the whole body is
366            // malformed but constructible — drop sync rather than panic.
367            if payload.is_empty() {
368                self.buf.clear();
369                return;
370            }
371            let pointer = payload[0] as usize;
372
373            // The `pointer_field` counts bytes that belong to a section still
374            // in progress from a previous packet (ISO/IEC 13818-1 §2.4.4): the
375            // `pointer` bytes immediately after it are that section's tail and
376            // must complete it BEFORE new sections begin at `1 + pointer`.
377            // Skipping them (or clearing `buf` first) drops any section that
378            // spans into a PUSI packet — silent loss biased toward whichever
379            // section happens to straddle a packet boundary.
380            if !self.buf.is_empty() && pointer > 0 {
381                let avail = payload.len() - 1;
382                let tail_len = pointer.min(avail);
383                if self.buf.len() + tail_len > MAX_SECTION_SIZE {
384                    self.buf.clear();
385                } else {
386                    self.buf.extend_from_slice(&payload[1..1 + tail_len]);
387                    self.drain_complete_sections();
388                }
389            }
390
391            // New sections start at `1 + pointer`; anything still buffered is
392            // an incomplete (corrupt / lost-packet) section — discard it.
393            self.buf.clear();
394
395            let start = 1 + pointer;
396            if start >= payload.len() {
397                // Pointer spans to (or past) the end — no new section here.
398                return;
399            }
400            let new_data = &payload[start..];
401            if new_data.len() > MAX_SECTION_SIZE {
402                return;
403            }
404            self.buf.extend_from_slice(new_data);
405        } else {
406            if self.buf.is_empty() {
407                return;
408            }
409            if self.buf.len() + payload.len() > MAX_SECTION_SIZE {
410                self.buf.clear();
411                return;
412            }
413            self.buf.extend_from_slice(payload);
414        }
415
416        self.drain_complete_sections();
417    }
418
419    /// Queue every complete section the buffer currently holds.
420    ///
421    /// A single TS payload may concatenate multiple complete sections after
422    /// the `pointer_field` (legal per ETSI EN 300 468 §5.1.4 and common on
423    /// EMM PIDs, which pack several short messages into one payload). We must
424    /// keep extracting until the buffer holds only a partial (multi-packet
425    /// spanning) section, whose bytes stay buffered for the next packet to
426    /// continue (the expected length is recomputed from the section header on
427    /// each drain). A `0xFF` where a `table_id` is expected marks the rest of
428    /// the payload as stuffing.
429    fn drain_complete_sections(&mut self) {
430        loop {
431            if self.buf.len() < 3 {
432                // Not enough for a section header yet; keep the partial bytes
433                // and wait for the next packet to complete the header.
434                break;
435            }
436            if self.buf[0] == 0xFF {
437                // Stuffing where a table_id is expected — payload tail is fill.
438                self.buf.clear();
439                break;
440            }
441            let exp = 3 + (((self.buf[1] & 0x0F) as usize) << 8 | self.buf[2] as usize);
442            if self.buf.len() >= exp {
443                // split_to returns the first `exp` bytes as an owned BytesMut,
444                // leaving the remainder in self.buf — cheap (shifts pointers).
445                let section = self.buf.split_to(exp).freeze();
446                self.ready.push_back(section);
447            } else {
448                // Partial section spanning into later packets.
449                break;
450            }
451        }
452    }
453
454    /// Pop one complete section. Returns `None` when the queue is empty.
455    pub fn pop_section(&mut self) -> Option<bytes::Bytes> {
456        self.ready.pop_front()
457    }
458
459    /// Number of bytes currently buffered (incomplete section).
460    pub fn len(&self) -> usize {
461        self.buf.len()
462    }
463
464    /// True if no bytes are currently buffered.
465    pub fn is_empty(&self) -> bool {
466        self.buf.is_empty()
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    /// Helper: construct a minimal 188-byte TS packet buffer with given header flags and payload.
475    fn make_packet(b1: u8, b2: u8, b3: u8, payload_data: &[u8]) -> [u8; TS_PACKET_SIZE] {
476        let mut pkt = [0u8; TS_PACKET_SIZE];
477        pkt[0] = TS_SYNC_BYTE;
478        pkt[1] = b1;
479        pkt[2] = b2;
480        pkt[3] = b3;
481        let payload_start = 4;
482        let end = (payload_start + payload_data.len()).min(TS_PACKET_SIZE);
483        let len = (end - payload_start).min(payload_data.len());
484        pkt[payload_start..payload_start + len].copy_from_slice(&payload_data[..len]);
485        pkt
486    }
487
488    #[test]
489    fn parse_rejects_non_0x47_sync_byte() {
490        let mut pkt = [0u8; TS_PACKET_SIZE];
491        pkt[0] = 0x46; // wrong sync byte
492        let err = TsPacket::parse(&pkt).unwrap_err();
493        match err {
494            Error::InvalidSyncByte { found } => assert_eq!(found, 0x46),
495            other => panic!("expected InvalidSyncByte, got {other:?}"),
496        }
497    }
498
499    #[test]
500    fn parse_extracts_pid_and_continuity_counter() {
501        // PID = 0x1234 → upper 5 bits = 0x12, lower 8 bits = 0x34
502        // CC = 5 → 0x05
503        // b1 bits: [tei:1][pusi:1][pid_hi:5]
504        // pid_hi = 0x12 = 0b00100_10 → bits 5..=1 = 0x12
505        // b1 = 0b00_010010 = 0x12 (no tei, no pusi)
506        let pkt = make_packet(0x12, 0x34, 0x05, &[]);
507        let pkt = TsPacket::parse(&pkt).unwrap();
508        assert_eq!(pkt.header.pid, 0x1234);
509        assert_eq!(pkt.header.continuity_counter, 5);
510    }
511
512    #[test]
513    fn payload_unit_start_indicator_flag_extracted() {
514        // b1 = 0x40 → pusi = true (bit 6 set, no tei, no pid bits)
515        let pkt1 = make_packet(0x40, 0x00, 0x00, &[]);
516        let pkt1 = TsPacket::parse(&pkt1).unwrap();
517        assert!(pkt1.header.pusi);
518
519        // b1 = 0x00 → pusi = false
520        let pkt2 = make_packet(0x00, 0x00, 0x00, &[]);
521        let pkt2 = TsPacket::parse(&pkt2).unwrap();
522        assert!(!pkt2.header.pusi);
523    }
524
525    /// Build a PSI-carrying TS payload: `pointer_field` byte followed by
526    /// (optionally) some tail of a previous section, followed by a fresh
527    /// section. `pointer_field` is the number of bytes of the previous
528    /// section that precede the new one (per ETSI EN 300 468 §5.1.4).
529    fn build_pusi_payload(pointer_field: u8, previous_tail: &[u8], section: &[u8]) -> Vec<u8> {
530        assert_eq!(pointer_field as usize, previous_tail.len());
531        let mut v = Vec::with_capacity(1 + previous_tail.len() + section.len());
532        v.push(pointer_field);
533        v.extend_from_slice(previous_tail);
534        v.extend_from_slice(section);
535        v
536    }
537
538    /// Build a long-form section with the given table_id and body bytes.
539    /// Returns the full section including its 3-byte + 5-byte header and a
540    /// placeholder CRC — for reassembler testing we don't validate the CRC.
541    fn build_section(table_id: u8, body_after_length: &[u8]) -> Vec<u8> {
542        let section_length = body_after_length.len() as u16;
543        let mut v = Vec::with_capacity(3 + section_length as usize);
544        v.push(table_id);
545        // ssi=1, pi=0, reserved=11, length hi 4 bits
546        v.push(0xB0 | ((section_length >> 8) as u8 & 0x0F));
547        v.push((section_length & 0xFF) as u8);
548        v.extend_from_slice(body_after_length);
549        v
550    }
551
552    // The reassembler tests below feed raw payload slices directly to
553    // `feed()` rather than wrapping them in 188-byte TS packets. This avoids
554    // the TS stuffing-byte tail (0xFF padding) bleeding into the reassembled
555    // section and keeps the assertions exact.
556
557    #[test]
558    fn reassembler_accumulates_multi_packet_section() {
559        // 200-byte section that spans two payload slices.
560        let body = vec![0xAAu8; 197];
561        let section = build_section(0x02, &body);
562        assert_eq!(section.len(), 200);
563
564        let first_chunk = 100;
565        let payload1 = build_pusi_payload(0, &[], &section[..first_chunk]);
566        let payload2 = section[first_chunk..].to_vec();
567
568        let mut reasm = SectionReassembler::default();
569        reasm.feed(&payload1, true);
570        reasm.feed(&payload2, false);
571
572        let out = reasm.pop_section().expect("section should be ready");
573        assert_eq!(out.len(), 200);
574        assert_eq!(out.as_ref(), &section[..]);
575    }
576
577    #[test]
578    fn reassembler_yields_complete_section_once_length_satisfied() {
579        // 1-byte-body section: table_id=0x42, section_length=1, total=4 bytes.
580        let section = build_section(0x42, &[0xAA]);
581        assert_eq!(section.len(), 4);
582        let payload = build_pusi_payload(0, &[], &section);
583
584        let mut reasm = SectionReassembler::default();
585        reasm.feed(&payload, true);
586
587        let out = reasm
588            .pop_section()
589            .expect("single-packet section should pop");
590        assert_eq!(out.as_ref(), &section[..]);
591    }
592
593    #[test]
594    fn reassembler_extracts_all_concatenated_sections_in_one_payload() {
595        // Issue #29: a single PUSI payload packing three complete short
596        // sections after the pointer_field. All three must be queued — the
597        // old `feed` stopped after the first and the rest were silently lost
598        // (the CAS/EMM data-loss bug: SHARED EMMs landing as the 2nd+ section).
599        let s1 = build_section(0x42, &[0x11, 0x22]); // 5 bytes
600        let s2 = build_section(0x46, &[0x33]); // 4 bytes
601        let s3 = build_section(0x4A, &[0x44, 0x55, 0x66]); // 6 bytes
602
603        let mut concat = Vec::new();
604        concat.extend_from_slice(&s1);
605        concat.extend_from_slice(&s2);
606        concat.extend_from_slice(&s3);
607        let payload = build_pusi_payload(0, &[], &concat);
608
609        let mut reasm = SectionReassembler::default();
610        reasm.feed(&payload, true);
611
612        // Consumers must drain with a loop, not a single `if let`.
613        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
614        assert_eq!(got.len(), 3, "all three concatenated sections must pop");
615        assert_eq!(got[0].as_ref(), &s1[..]);
616        assert_eq!(got[1].as_ref(), &s2[..]);
617        assert_eq!(got[2].as_ref(), &s3[..]);
618    }
619
620    #[test]
621    fn reassembler_stops_at_stuffing_after_concatenated_sections() {
622        // Two sections then 0xFF stuffing fill — the stuffing must not be
623        // mistaken for a section header (0xFF table_id) nor leak into a
624        // section; both real sections still pop.
625        let s1 = build_section(0x42, &[0xAA]); // 4 bytes
626        let s2 = build_section(0x46, &[0xBB, 0xCC]); // 5 bytes
627        let mut concat = Vec::new();
628        concat.extend_from_slice(&s1);
629        concat.extend_from_slice(&s2);
630        concat.extend_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // stuffing tail
631        let payload = build_pusi_payload(0, &[], &concat);
632
633        let mut reasm = SectionReassembler::default();
634        reasm.feed(&payload, true);
635
636        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
637        assert_eq!(got.len(), 2);
638        assert_eq!(got[0].as_ref(), &s1[..]);
639        assert_eq!(got[1].as_ref(), &s2[..]);
640        assert!(
641            reasm.is_empty(),
642            "stuffing tail must be discarded, not buffered"
643        );
644    }
645
646    #[test]
647    fn reassembler_concatenated_then_spanning_tail() {
648        // One complete section followed by the head of a second that spans
649        // into a continuation packet: first pops immediately, second pops
650        // once the continuation arrives.
651        let s1 = build_section(0x42, &[0x01, 0x02]); // 5 bytes
652        let s2 = build_section(0x46, &[0x09u8; 60]); // 63 bytes
653        let split = 30;
654
655        let mut head = Vec::new();
656        head.extend_from_slice(&s1);
657        head.extend_from_slice(&s2[..split]);
658        let payload1 = build_pusi_payload(0, &[], &head);
659        let payload2 = s2[split..].to_vec();
660
661        let mut reasm = SectionReassembler::default();
662        reasm.feed(&payload1, true);
663        let first = reasm.pop_section().expect("first section pops at once");
664        assert_eq!(first.as_ref(), &s1[..]);
665        assert!(reasm.pop_section().is_none(), "second is still partial");
666
667        reasm.feed(&payload2, false);
668        let second = reasm.pop_section().expect("second pops after continuation");
669        assert_eq!(second.as_ref(), &s2[..]);
670    }
671
672    #[test]
673    fn reassembler_completes_section_spanning_into_pusi_packet() {
674        // Issue #29 (second case): a section starts late in packet A and spills
675        // into packet B, but B is itself PUSI=1 because new sections begin in it.
676        // B's pointer_field = the count of leading tail bytes belonging to the
677        // section from A. Those bytes MUST complete A's section before new
678        // sections start. 3.1.1 cleared buf + skipped them → the spanning
679        // section was lost (the SHARED EMM the smartcard needed).
680        let spanning = build_section(0x42, &[0x5Au8; 62]); // 65 bytes
681        let head = 41;
682        let tail = &spanning[head..]; // 24 bytes — lands in packet B
683        assert_eq!(tail.len(), 24);
684
685        // New section that begins in packet B after the spanning tail.
686        let next = build_section(0x46, &[0x77, 0x88]); // 5 bytes
687
688        // Packet A (PUSI): pointer 0, then the 41-byte head (incomplete).
689        let payload_a = build_pusi_payload(0, &[], &spanning[..head]);
690        // Packet B (PUSI): pointer = 24 (tail of A's section), then `next`.
691        let payload_b = build_pusi_payload(24, tail, &next);
692
693        let mut reasm = SectionReassembler::default();
694        reasm.feed(&payload_a, true);
695        assert!(reasm.pop_section().is_none(), "head alone is incomplete");
696
697        reasm.feed(&payload_b, true);
698        let got: Vec<_> = std::iter::from_fn(|| reasm.pop_section()).collect();
699        assert_eq!(got.len(), 2, "spanning section + new section must both pop");
700        assert_eq!(
701            got[0].as_ref(),
702            &spanning[..],
703            "spanning section completed from B's pointer tail"
704        );
705        assert_eq!(got[1].as_ref(), &next[..]);
706    }
707
708    #[test]
709    fn reassembler_pusi_pointer_spans_whole_payload() {
710        // A section spans into a PUSI packet whose pointer covers the ENTIRE
711        // remaining payload (no new section starts here) — the tail must be
712        // appended and the section completed once the count is satisfied.
713        let spanning = build_section(0x42, &[0x33u8; 40]); // 43 bytes
714        let head = 20;
715        let payload_a = build_pusi_payload(0, &[], &spanning[..head]);
716        let tail = &spanning[head..]; // 23 bytes — exactly the rest of payload B
717
718        let mut reasm = SectionReassembler::default();
719        reasm.feed(&payload_a, true);
720        // Packet B: pointer = 23 = all remaining bytes; no new section follows.
721        reasm.feed(&build_pusi_payload_pointer_spanning_all(tail), true);
722
723        let out = reasm.pop_section().expect("spanning section completes");
724        assert_eq!(out.as_ref(), &spanning[..]);
725        assert!(reasm.pop_section().is_none());
726    }
727
728    /// Build a PUSI payload whose `pointer_field` equals the whole tail (so the
729    /// pointer spans to the end of the payload and no new section starts).
730    fn build_pusi_payload_pointer_spanning_all(tail: &[u8]) -> Vec<u8> {
731        let mut v = Vec::with_capacity(1 + tail.len());
732        v.push(tail.len() as u8);
733        v.extend_from_slice(tail);
734        v
735    }
736
737    #[test]
738    fn reassembler_discards_on_buffer_overflow() {
739        // Declare section_length larger than a single payload can carry. No
740        // pop happens until continuations arrive; if continuations push the
741        // buffer past MAX_SECTION_SIZE the reassembler must reset, not panic.
742        let mut section = Vec::with_capacity(3 + 4095);
743        section.push(0x00); // table_id
744        section.push(0xB0 | ((4095u16 >> 8) as u8 & 0x0F));
745        section.push(0xFF);
746        section.extend_from_slice(&[0u8; 160]);
747        let payload1 = build_pusi_payload(0, &[], &section);
748
749        let mut reasm = SectionReassembler::default();
750        reasm.feed(&payload1, true);
751        assert!(reasm.pop_section().is_none());
752
753        // Push enough continuation data to cross MAX_SECTION_SIZE.
754        let filler = vec![0u8; 180];
755        for _ in 0..(MAX_SECTION_SIZE / 180 + 1) {
756            reasm.feed(&filler, false);
757        }
758        assert!(
759            reasm.pop_section().is_none(),
760            "no section should pop after overflow reset"
761        );
762
763        // State must be resettable — a fresh valid PUSI section works.
764        let valid_section = build_section(0x00, &[0xAA]);
765        let payload2 = build_pusi_payload(0, &[], &valid_section);
766        reasm.feed(&payload2, true);
767        let out = reasm
768            .pop_section()
769            .expect("fresh section should pop after reset");
770        assert_eq!(out.as_ref(), &valid_section[..]);
771    }
772
773    #[test]
774    fn reassembler_handles_pusi_with_nonzero_pointer_field() {
775        // payload = pointer_field=3, 3 bytes of prior-section tail, then new section.
776        let prior_tail = vec![0x11, 0x22, 0x33];
777        let new_section = build_section(0x02, &[0xBB]);
778        assert_eq!(new_section.len(), 4);
779        let payload = build_pusi_payload(3, &prior_tail, &new_section);
780
781        let mut reasm = SectionReassembler::default();
782        reasm.feed(&payload, true);
783
784        let out = reasm
785            .pop_section()
786            .expect("section after pointer_field skip should pop");
787        assert_eq!(out.as_ref(), &new_section[..]);
788    }
789
790    #[test]
791    fn reassembler_ignores_continuation_before_pusi() {
792        // Feed a non-PUSI payload first (no prior PUSI seen).
793        // SectionReassembler should discard it and stay empty.
794        let pkt = make_packet(0x00, 0x00, PAYLOAD_FLAG, &[0xAA, 0xBB, 0xCC]);
795
796        let mut reasm = SectionReassembler::default();
797        reasm.feed(&pkt[4..], false); // no PUSI
798
799        assert!(
800            reasm.pop_section().is_none(),
801            "no section should appear without prior PUSI"
802        );
803        assert!(
804            reasm.pop_section().is_none(),
805            "second pop should also be none"
806        );
807    }
808
809    /// A PUSI packet with an empty payload (adaptation field ate the body)
810    /// is malformed but must not panic — it drops sync.
811    #[test]
812    fn reassembler_empty_pusi_payload_does_not_panic() {
813        let mut reasm = SectionReassembler::default();
814        reasm.feed(&[], true);
815        assert!(reasm.pop_section().is_none());
816        // Recovers on the next clean PUSI.
817        let payload = vec![0x00u8, 0x72, 0x70, 0x01, 0x00];
818        reasm.feed(&payload, true);
819        assert!(reasm.pop_section().is_some());
820    }
821
822    /// A maximal short-form private section (section_length 0xFFF, total
823    /// 4098 bytes) reassembles — the ceiling is 12-bit length + 3-byte
824    /// header, not 4096.
825    #[test]
826    fn reassembler_accepts_maximal_private_section() {
827        let mut section = vec![0x80u8, 0x7F, 0xFF]; // user-private tid, SSI=0, len 0xFFF
828        section.resize(3 + 0xFFF, 0xAB);
829
830        let mut reasm = SectionReassembler::default();
831        // First TS payload: pointer_field 0 then the section start.
832        let mut first = vec![0x00];
833        first.extend_from_slice(&section[..183]);
834        reasm.feed(&first, true);
835        for chunk in section[183..].chunks(184) {
836            reasm.feed(chunk, false);
837        }
838        let out = reasm.pop_section().expect("4098-byte section should pop");
839        assert_eq!(out.len(), 4098);
840        assert_eq!(out.as_ref(), &section[..]);
841    }
842
843    // ── adaptation field / PCR (ISO/IEC 13818-1 §2.4.3.4–2.4.3.5) ──
844
845    #[test]
846    fn pcr_as_27mhz_known_value() {
847        assert_eq!(
848            Pcr {
849                base: 10_000,
850                extension: 0
851            }
852            .as_27mhz(),
853            3_000_000
854        );
855        // base*300 + extension: 1*300 + 100 = 400.
856        assert_eq!(
857            Pcr {
858                base: 1,
859                extension: 100
860            }
861            .as_27mhz(),
862            400
863        );
864    }
865
866    #[test]
867    fn pcr_decode_from_bytes() {
868        // 6-byte PCR encoding base=10000, extension=0 (reserved bits set).
869        let af = [0x10u8, 0x00, 0x00, 0x13, 0x88, 0x7E, 0x00];
870        let pcr = Pcr::parse(&af, 1).expect("6 bytes present");
871        assert_eq!(
872            pcr,
873            Pcr {
874                base: 10_000,
875                extension: 0
876            }
877        );
878        assert_eq!(pcr.as_27mhz(), 3_000_000);
879    }
880
881    #[test]
882    fn adaptation_field_flags_and_pcr() {
883        let mut raw = [0xAAu8; TS_PACKET_SIZE];
884        raw[0] = TS_SYNC_BYTE;
885        raw[1] = 0x01; // pid 0x0100
886        raw[2] = 0x00;
887        raw[3] = ADAPTATION_FLAG | PAYLOAD_FLAG;
888        raw[4] = 7; // adaptation_field_length: 1 flags + 6 PCR
889        raw[5] = AF_DISCONTINUITY | AF_PCR_FLAG;
890        raw[6..12].copy_from_slice(&[0x00, 0x00, 0x13, 0x88, 0x7E, 0x00]);
891        // raw[12..] stays 0xAA = payload.
892
893        let pkt = TsPacket::parse(&raw).expect("valid packet");
894        let af = pkt
895            .adaptation_field()
896            .expect("has adaptation field")
897            .expect("adaptation field parses");
898        assert!(af.discontinuity_indicator);
899        assert!(!af.random_access_indicator);
900        assert_eq!(
901            af.pcr,
902            Some(Pcr {
903                base: 10_000,
904                extension: 0
905            })
906        );
907        assert_eq!(af.pcr.unwrap().as_27mhz(), 3_000_000);
908        assert!(af.opcr.is_none());
909        assert!(af.splice_countdown.is_none());
910        // Payload begins right after the adaptation field (cursor 4+1+7=12).
911        let payload = pkt.payload.expect("payload present");
912        assert_eq!(payload.len(), TS_PACKET_SIZE - 12);
913        assert_eq!(payload[0], 0xAA);
914    }
915
916    #[test]
917    fn no_adaptation_returns_none() {
918        let mut raw = [0x00u8; TS_PACKET_SIZE];
919        raw[0] = TS_SYNC_BYTE;
920        raw[1] = 0x01;
921        raw[3] = PAYLOAD_FLAG; // payload only
922        let pkt = TsPacket::parse(&raw).expect("valid");
923        assert!(pkt.adaptation_field().is_none());
924        assert!(pkt.adaptation.is_none());
925    }
926
927    #[test]
928    fn adaptation_field_splice_countdown_negative() {
929        let mut raw = [0xAAu8; TS_PACKET_SIZE];
930        raw[0] = TS_SYNC_BYTE;
931        raw[1] = 0x01;
932        raw[2] = 0x00;
933        raw[3] = ADAPTATION_FLAG | PAYLOAD_FLAG;
934        raw[4] = 2; // 1 flags + 1 splice_countdown
935        raw[5] = AF_SPLICING_FLAG;
936        raw[6] = 0xFB; // -5 as i8
937        let pkt = TsPacket::parse(&raw).expect("valid");
938        let af = pkt.adaptation_field().unwrap().unwrap();
939        assert_eq!(af.splice_countdown, Some(-5));
940        assert!(af.pcr.is_none());
941    }
942}