Skip to main content

dvb_t2mi/
pump.rs

1//! [`T2miPump`] — owning-[`Bytes`] feed-and-iterate T2-MI pump.
2//!
3//! Feed raw bytes (TS-encapsulated or bare T2-MI stream) in; get back an
4//! iterator of [`T2miEvent`]s — one per **CRC-valid** complete T2-MI packet.
5//! Lazy zero-copy: events own their [`bytes::Bytes`] slice and expose typed
6//! views ([`T2miEvent::header`], [`T2miEvent::payload`]) that borrow from it
7//! on demand.
8//!
9//! ```no_run
10//! use dvb_t2mi::pump::T2miPump;
11//! use dvb_t2mi::payload::AnyPayload;
12//!
13//! let mut pump = T2miPump::new(0x0006); // T2-MI PID from the PMT
14//! let ts_packet = [0u8; 188]; // a real TS packet from your source
15//! for event in pump.feed_ts(&ts_packet) {
16//!     if let Ok(AnyPayload::Bbframe(bb)) = event.payload() {
17//!         println!("BBFrame plp_id={}", bb.plp_id);
18//!     }
19//! }
20//! ```
21//!
22//! # CRC policy
23//!
24//! Every complete packet is validated against its 4-byte CRC-32 trailer
25//! (ETSI TS 102 773 Annex A / [`crate::crc::validate_crc`]) before being
26//! emitted.  Packets that fail CRC are silently dropped and counted in
27//! [`Stats::crc_failures`].  The caller never sees a corrupted packet.
28//!
29//! # TS header parsing
30//!
31//! [`T2miPump::feed_ts`] extracts the MPEG-TS payload in-place — sync byte
32//! 0x47, PID, PUSI flag, and adaptation-field skip per ISO/IEC 13818-1
33//! §2.4.3.2 — and passes it to [`crate::ts::PacketReassembler`].  No
34//! `dvb-si` dependency is introduced; the TS header reader is a private
35//! helper below.
36
37use bytes::Bytes;
38
39use crate::crc;
40use crate::packet::Header;
41use crate::payload::AnyPayload;
42use crate::ts::PacketReassembler;
43
44// ── TS header constants (ISO/IEC 13818-1 §2.4.3.2) ──────────────────────────
45
46/// TS sync byte.
47const TS_SYNC_BYTE: u8 = 0x47;
48/// Expected size of one MPEG-TS packet.
49const TS_PACKET_SIZE: usize = 188;
50/// Byte 1 bit 6 = PUSI (Payload Unit Start Indicator).
51const PUSI_MASK: u8 = 0x40;
52/// Byte 1 bits 4..=0 = PID upper 5 bits.
53const PID_MASK_HI: u8 = 0x1F;
54/// Byte 3 bit 5 = adaptation_field_control bit 1 (adaptation field present).
55const ADAPTATION_FLAG: u8 = 0x20;
56/// Byte 3 bit 4 = adaptation_field_control bit 0 (payload present).
57const PAYLOAD_FLAG: u8 = 0x10;
58
59/// Minimal result of TS header parsing needed by the pump.
60struct TsInfo {
61    pid: u16,
62    pusi: bool,
63    /// Byte offset within the 188-byte packet where the payload starts.
64    payload_start: usize,
65}
66
67/// Parse the 4-byte MPEG-TS header and skip any adaptation field.
68///
69/// Returns `None` when:
70/// - `buf` is shorter than [`TS_PACKET_SIZE`],
71/// - the sync byte is not `0x47`,
72/// - the payload-present flag is clear, or
73/// - the adaptation field length overflows the packet.
74///
75/// Citation: ISO/IEC 13818-1:2019 §2.4.3.2 (transport_packet header) and
76/// §2.4.3.5 (adaptation_field length).
77fn parse_ts_header(buf: &[u8]) -> Option<TsInfo> {
78    if buf.len() < TS_PACKET_SIZE || buf[0] != TS_SYNC_BYTE {
79        return None;
80    }
81    let b1 = buf[1];
82    let b3 = buf[3];
83
84    let pusi = (b1 & PUSI_MASK) != 0;
85    let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (buf[2] as u16);
86    let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
87    let has_payload = (b3 & PAYLOAD_FLAG) != 0;
88
89    if !has_payload {
90        return None;
91    }
92
93    let mut cursor: usize = 4;
94    if has_adaptation {
95        let af_len = buf[cursor] as usize;
96        cursor += 1 + af_len;
97        if cursor > TS_PACKET_SIZE {
98            return None;
99        }
100    }
101
102    Some(TsInfo {
103        pid,
104        pusi,
105        payload_start: cursor,
106    })
107}
108
109// ── T2miEvent ─────────────────────────────────────────────────────────────────
110
111/// One complete, CRC-valid T2-MI packet. Owns its bytes — `'static`, cheap clone.
112///
113/// Only constructed after CRC-32 validation (ETSI TS 102 773 Annex A).
114/// [`T2miEvent::header`] and [`T2miEvent::payload`] are lazy: they borrow from
115/// the owned [`Bytes`] on demand.
116#[derive(Debug, Clone)]
117pub struct T2miEvent {
118    bytes: Bytes,
119}
120
121impl T2miEvent {
122    /// The full packet bytes (header + payload + CRC trailer).
123    #[must_use]
124    pub fn bytes(&self) -> &Bytes {
125        &self.bytes
126    }
127
128    /// The raw `packet_type` byte (byte 0 of the T2-MI header per §5.1).
129    ///
130    /// Never panics — events are only built for CRC-valid packets which are at
131    /// least `6` (header) + `4` (CRC) = 10 bytes.
132    #[must_use]
133    pub fn packet_type(&self) -> u8 {
134        self.bytes[0]
135    }
136
137    /// Parse the 6-byte T2-MI packet header (lazy, borrows this event's bytes).
138    ///
139    /// # Errors
140    ///
141    /// Propagates [`crate::Error`] from [`dvb_common::Parse::parse`] on [`Header`].
142    pub fn header(&self) -> crate::Result<Header> {
143        use dvb_common::Parse;
144        Header::parse(&self.bytes)
145    }
146
147    /// Extract the `packet_type` byte and payload slice from this event's
148    /// bytes — shared logic for [`payload`](Self::payload) and
149    /// [`payload_with`](Self::payload_with).
150    ///
151    /// Uses [`Header::raw_payload_bytes`] so that genuinely-private
152    /// `packet_type` values (not in [`PacketType`](crate::packet::PacketType))
153    /// are not rejected.  The packet is already CRC-validated by the pump.
154    fn payload_parts(&self) -> crate::Result<(u8, &[u8])> {
155        let payload_bytes = Header::raw_payload_bytes(&self.bytes)?;
156        let packet_type = self.bytes[0];
157        Ok((packet_type, payload_bytes))
158    }
159
160    /// Parse the payload by dispatching on `packet_type`.
161    ///
162    /// Extracts the payload slice via [`Header::raw_payload_bytes`] (no
163    /// `packet_type` enum conversion), then calls
164    /// [`AnyPayload::dispatch`].  Unrecognised packet types produce
165    /// [`AnyPayload::Unknown`] with the raw payload bytes.
166    ///
167    /// # Errors
168    ///
169    /// Returns [`crate::Error`] from extracting the payload slice or from the
170    /// typed payload parser.
171    pub fn payload(&self) -> crate::Result<AnyPayload<'_>> {
172        let (packet_type, payload_bytes) = self.payload_parts()?;
173        Ok(match AnyPayload::dispatch(packet_type, payload_bytes) {
174            Some(result) => result?,
175            None => AnyPayload::Unknown {
176                packet_type,
177                body: payload_bytes,
178            },
179        })
180    }
181
182    /// Parse the payload by dispatching on `packet_type`, preferring the
183    /// registry's custom parsers over the built-in dispatch.
184    ///
185    /// Like [`payload`](Self::payload), but calls
186    /// [`AnyPayload::dispatch_with`] so that runtime-registered custom
187    /// packet types are resolved to [`AnyPayload::Other`].  Unrecognised
188    /// packet types produce [`AnyPayload::Unknown`] with the raw payload
189    /// bytes, exactly as [`payload`](Self::payload) does.
190    ///
191    /// # Errors
192    ///
193    /// Returns [`crate::Error`] from extracting the payload slice or from the
194    /// typed payload parser (built-in or custom).
195    pub fn payload_with(
196        &self,
197        registry: &crate::payload::PayloadRegistry,
198    ) -> crate::Result<AnyPayload<'_>> {
199        let (packet_type, payload_bytes) = self.payload_parts()?;
200        Ok(
201            match AnyPayload::dispatch_with(registry, packet_type, payload_bytes) {
202                Some(result) => result?,
203                None => AnyPayload::Unknown {
204                    packet_type,
205                    body: payload_bytes,
206                },
207            },
208        )
209    }
210}
211
212// ── Stats ─────────────────────────────────────────────────────────────────────
213
214/// Accumulated pump statistics (monotonically growing across all `feed` calls).
215///
216/// New counter fields may be added in a future release; construction is via
217/// [`Default`] only.
218#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
219#[non_exhaustive]
220pub struct Stats {
221    /// TS packets fed via [`T2miPump::feed_ts`].
222    pub ts_packets: u64,
223    /// Complete T2-MI packets produced by the reassembler (pre-CRC check).
224    pub t2mi_packets: u64,
225    /// Packets dropped due to CRC-32 mismatch (ETSI TS 102 773 Annex A).
226    pub crc_failures: u64,
227    /// Malformed inputs: bad TS sync byte, truncated TS packet, overflowed
228    /// adaptation field, or `feed_ts` called on a raw-mode pump.
229    pub malformed_packets: u64,
230}
231
232// ── T2miPump ──────────────────────────────────────────────────────────────────
233
234/// Feed-and-iterate T2-MI pump.
235///
236/// Supports two operating modes:
237///
238/// - **TS-encapsulated** (most common): construct with [`T2miPump::new`],
239///   passing the 13-bit PID carrying T2-MI (from the PMT).  Feed 188-byte
240///   MPEG-TS packets with [`T2miPump::feed_ts`].  The pump filters by PID,
241///   strips the TS header per ISO/IEC 13818-1 §2.4.3.2, and forwards the
242///   payload to the internal [`PacketReassembler`] (ETSI TS 102 773 §6.1.1).
243///
244/// - **Raw** (un-encapsulated): construct with [`T2miPump::raw`].  Feed
245///   arbitrary byte slices with [`T2miPump::feed_raw`].  The pump buffers bytes
246///   and emits events once a full packet (determined by the header's
247///   `payload_len_bits`) is available.
248///
249/// # PID note
250///
251/// PIDs are 13-bit values (0x0000–0x1FFF per ISO/IEC 13818-1 §2.4.3.2).
252/// This type uses `u16` directly; no newtype is introduced.  Values above
253/// 0x1FFF are accepted without error — the PID filter simply never matches.
254pub struct T2miPump {
255    mode: PumpMode,
256    reasm: PacketReassembler,
257    stats: Stats,
258    scratch: Vec<T2miEvent>,
259    /// Raw-mode sync flag: true once the first raw feed has initialised the
260    /// reassembler via a PUSI=true, pointer=0 signal.
261    raw_started: bool,
262}
263
264enum PumpMode {
265    /// TS-encapsulated: filter packets to this PID.
266    Ts { pid: u16 },
267    /// Un-encapsulated raw byte stream.
268    Raw,
269}
270
271impl T2miPump {
272    /// Create a TS-encapsulated pump that filters to `pid`.
273    ///
274    /// `pid` is the 13-bit T2-MI PID from the PMT (e.g. 0x0006 for data
275    /// piping).
276    ///
277    /// # PID range
278    ///
279    /// Valid MPEG-TS PIDs are 13-bit (0x0000–0x1FFF); this parameter is `u16`.
280    /// No newtype is introduced to keep the API lightweight.
281    #[must_use]
282    pub fn new(pid: u16) -> Self {
283        Self {
284            mode: PumpMode::Ts { pid },
285            reasm: PacketReassembler::new(),
286            stats: Stats::default(),
287            scratch: Vec::new(),
288            raw_started: false,
289        }
290    }
291
292    /// Create an un-encapsulated raw-stream pump.
293    ///
294    /// Use [`T2miPump::feed_raw`] to supply bytes.  The pump buffers internally
295    /// and emits events by packet boundary, not by call boundary — a packet
296    /// split across two `feed_raw` calls produces exactly one event.
297    #[must_use]
298    pub fn raw() -> Self {
299        Self {
300            mode: PumpMode::Raw,
301            reasm: PacketReassembler::new(),
302            stats: Stats::default(),
303            scratch: Vec::new(),
304            raw_started: false,
305        }
306    }
307
308    /// Accumulated statistics.
309    #[must_use]
310    pub fn stats(&self) -> Stats {
311        self.stats
312    }
313
314    /// Feed one 188-byte MPEG-TS packet. Infallible: malformed packets are
315    /// counted in [`Stats::malformed_packets`] and discarded.
316    ///
317    /// Packets on the wrong PID are silently ignored (only [`Stats::ts_packets`]
318    /// is incremented).
319    ///
320    /// Returns a draining iterator over any T2-MI events completed by this feed.
321    pub fn feed_ts(&mut self, packet: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
322        self.scratch.clear();
323
324        match self.mode {
325            PumpMode::Raw => {
326                // feed_ts on a raw-mode pump is a caller error.
327                self.stats.malformed_packets += 1;
328            }
329            PumpMode::Ts { pid: filter_pid } => {
330                self.stats.ts_packets += 1;
331                match parse_ts_header(packet) {
332                    None => {
333                        self.stats.malformed_packets += 1;
334                    }
335                    Some(info) => {
336                        if info.pid == filter_pid {
337                            let payload = &packet[info.payload_start..TS_PACKET_SIZE];
338                            self.reasm.feed(payload, info.pusi);
339                            Self::drain_reasm_into(
340                                &mut self.reasm,
341                                &mut self.stats,
342                                &mut self.scratch,
343                            );
344                        }
345                        // Wrong PID: ignored cheaply — no stats beyond ts_packets.
346                    }
347                }
348            }
349        }
350
351        self.scratch.drain(..)
352    }
353
354    /// Feed raw T2-MI bytes (un-encapsulated mode).
355    ///
356    /// The slice may contain a partial packet; bytes are buffered internally.
357    /// A packet split across two `feed_raw` calls produces exactly one event.
358    ///
359    /// Returns a draining iterator over any T2-MI events completed by this feed.
360    pub fn feed_raw(&mut self, data: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
361        self.scratch.clear();
362
363        match self.mode {
364            PumpMode::Ts { .. } => {
365                // feed_raw on a TS-mode pump is a caller error.
366                self.stats.malformed_packets += 1;
367            }
368            PumpMode::Raw => {
369                if !self.raw_started {
370                    // First call: initialise the reassembler with PUSI=true and
371                    // pointer_field=0.  PacketReassembler::feed interprets the
372                    // first byte of the payload as the pointer_field when PUSI is
373                    // set (ETSI TS 102 773 §6.1.1).  We prepend a 0x00 byte so the
374                    // reassembler sees pointer=0 and treats the rest as the start
375                    // of a new T2-MI packet.
376                    let mut buf = Vec::with_capacity(1 + data.len());
377                    buf.push(0x00); // pointer_field = 0
378                    buf.extend_from_slice(data);
379                    self.reasm.feed(&buf, true);
380                    self.raw_started = true;
381                } else {
382                    // Continuation: feed without PUSI — bytes extend the
383                    // current T2-MI packet in progress.
384                    self.reasm.feed(data, false);
385                }
386                Self::drain_reasm_into(&mut self.reasm, &mut self.stats, &mut self.scratch);
387            }
388        }
389
390        self.scratch.drain(..)
391    }
392
393    /// Drain all pending packets from the reassembler, CRC-validate each one,
394    /// and push valid packets to `scratch`.
395    fn drain_reasm_into(
396        reasm: &mut PacketReassembler,
397        stats: &mut Stats,
398        scratch: &mut Vec<T2miEvent>,
399    ) {
400        for raw in reasm.drain_packets() {
401            stats.t2mi_packets += 1;
402            match crc::validate_crc(&raw) {
403                Ok(()) => scratch.push(T2miEvent { bytes: raw }),
404                Err(_) => stats.crc_failures += 1,
405            }
406        }
407    }
408}
409
410// ── Tests ─────────────────────────────────────────────────────────────────────
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use dvb_common::crc32_mpeg2;
416
417    // ── Test helpers ─────────────────────────────────────────────────────────
418
419    /// Build a syntactically valid T2-MI packet (header + payload + CRC-32).
420    ///
421    /// `packet_type` is the raw byte (Table 1 of TS 102 773).
422    /// `payload` is the post-header, pre-CRC data.
423    /// Returns the full byte vector including the 4-byte CRC trailer.
424    fn make_t2mi_packet(packet_type: u8, payload: &[u8]) -> Vec<u8> {
425        let payload_len_bits = (payload.len() * 8) as u16;
426        let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
427        pkt.push(packet_type);
428        pkt.push(0x01); // packet_count
429        pkt.push(0x00); // superframe_idx=0, rfu=0, t2mi_stream_id=0
430        pkt.push(0x00); // rfu byte = 0
431        pkt.extend_from_slice(&payload_len_bits.to_be_bytes());
432        pkt.extend_from_slice(payload);
433        let crc = crc32_mpeg2::compute(&pkt);
434        pkt.extend_from_slice(&crc.to_be_bytes());
435        pkt
436    }
437
438    /// Wrap a T2-MI payload slice in a single 188-byte MPEG-TS packet.
439    ///
440    /// Sets PUSI=true and pointer_field=0 so the reassembler treats
441    /// the T2-MI data as starting at byte 0 of the payload.
442    /// The T2-MI bytes must fit in 183 bytes (188 − 4 header − 1 pointer).
443    fn ts_packet(pid: u16, t2mi_data: &[u8], pusi: bool, pointer_field: u8) -> [u8; 188] {
444        let mut pkt = [0xFFu8; 188];
445        pkt[0] = TS_SYNC_BYTE;
446        pkt[1] = if pusi { PUSI_MASK } else { 0 };
447        pkt[1] |= ((pid >> 8) as u8) & PID_MASK_HI;
448        pkt[2] = (pid & 0xFF) as u8;
449        pkt[3] = PAYLOAD_FLAG; // payload present, no adaptation field
450        if pusi {
451            pkt[4] = pointer_field;
452            let start = 5 + pointer_field as usize;
453            assert!(
454                start + t2mi_data.len() <= 188,
455                "T2-MI data too large for one TS packet"
456            );
457            pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
458        } else {
459            let start = 4;
460            assert!(
461                start + t2mi_data.len() <= 188,
462                "T2-MI data too large for one TS packet"
463            );
464            pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
465        }
466        pkt
467    }
468
469    // ── (a) valid T2-MI packet in TS → one event, typed payload ──────────────
470
471    #[test]
472    fn ts_packet_emits_one_event_with_typed_payload() {
473        // Build a valid BBFrame T2-MI packet.
474        // BbframePayload minimum: frame_idx(1) + plp_id(1) + flags(1) = 3 bytes.
475        let bbframe_payload = [0x01u8, 0x02, 0x00];
476        let t2mi = make_t2mi_packet(0x00, &bbframe_payload);
477
478        let pkt = ts_packet(0x0006, &t2mi, true, 0);
479        let mut pump = T2miPump::new(0x0006);
480        let events: Vec<_> = pump.feed_ts(&pkt).collect();
481
482        assert_eq!(events.len(), 1, "expected exactly one event");
483        assert_eq!(events[0].packet_type(), 0x00);
484
485        let payload = events[0].payload().expect("payload parse should succeed");
486        assert!(
487            matches!(payload, AnyPayload::Bbframe(_)),
488            "expected Bbframe, got {payload:?}"
489        );
490
491        let stats = pump.stats();
492        assert_eq!(stats.ts_packets, 1);
493        assert_eq!(stats.t2mi_packets, 1);
494        assert_eq!(stats.crc_failures, 0);
495        assert_eq!(stats.malformed_packets, 0);
496    }
497
498    // ── (b) corrupted CRC → zero events, crc_failures=1 ─────────────────────
499
500    #[test]
501    fn corrupted_crc_drops_packet_and_counts() {
502        let payload = [0x00u8, 0x00, 0x00]; // minimal BBFrame payload
503        let mut t2mi = make_t2mi_packet(0x00, &payload);
504        // Corrupt the last CRC byte.
505        *t2mi.last_mut().unwrap() ^= 0xFF;
506
507        let pkt = ts_packet(0x0006, &t2mi, true, 0);
508        let mut pump = T2miPump::new(0x0006);
509        let events: Vec<_> = pump.feed_ts(&pkt).collect();
510
511        assert_eq!(events.len(), 0, "corrupted packet must not emit");
512        let stats = pump.stats();
513        assert_eq!(stats.crc_failures, 1);
514        assert_eq!(stats.t2mi_packets, 1); // reassembler produced it, CRC gate dropped it
515    }
516
517    // ── (c) feed_raw with packet split across two calls → one event ──────────
518
519    #[test]
520    fn feed_raw_split_across_two_calls_emits_one_event() {
521        // Use a timestamp payload (11 bytes, all zeros), packet_type=0x20.
522        let ts_payload = [0x00u8; 11];
523        let t2mi = make_t2mi_packet(0x20, &ts_payload);
524
525        // Split at an arbitrary boundary (e.g. after the header).
526        let split = 6;
527        let first = &t2mi[..split];
528        let second = &t2mi[split..];
529
530        let mut pump = T2miPump::raw();
531
532        let ev1: Vec<_> = pump.feed_raw(first).collect();
533        assert_eq!(ev1.len(), 0, "no complete packet yet after first chunk");
534
535        let ev2: Vec<_> = pump.feed_raw(second).collect();
536        assert_eq!(
537            ev2.len(),
538            1,
539            "one event after second chunk completes the packet"
540        );
541
542        let stats = pump.stats();
543        assert_eq!(stats.t2mi_packets, 1);
544        assert_eq!(stats.crc_failures, 0);
545    }
546
547    // ── (d) garbage TS packet → malformed counted, no panic ──────────────────
548
549    #[test]
550    fn garbage_ts_packet_counted_no_panic() {
551        let mut pump = T2miPump::new(0x0006);
552        let garbage = [0x00u8; 188]; // bad sync byte
553        let events: Vec<_> = pump.feed_ts(&garbage).collect();
554        assert_eq!(events.len(), 0);
555        assert_eq!(pump.stats().malformed_packets, 1);
556        assert_eq!(pump.stats().ts_packets, 1);
557    }
558
559    // ── (e) wrong-PID TS packet → ignored cheaply ────────────────────────────
560
561    #[test]
562    fn wrong_pid_ts_packet_ignored() {
563        let payload = [0x00u8, 0x00, 0x00];
564        let t2mi = make_t2mi_packet(0x00, &payload);
565        let pkt = ts_packet(0x0100, &t2mi, true, 0); // PID 0x0100, pump listens on 0x0006
566
567        let mut pump = T2miPump::new(0x0006);
568        let events: Vec<_> = pump.feed_ts(&pkt).collect();
569
570        assert_eq!(events.len(), 0, "wrong-PID packet must not emit");
571        // ts_packets incremented, but nothing else moves.
572        let stats = pump.stats();
573        assert_eq!(stats.ts_packets, 1);
574        assert_eq!(stats.t2mi_packets, 0);
575        assert_eq!(stats.crc_failures, 0);
576        assert_eq!(stats.malformed_packets, 0);
577    }
578
579    // ── additional: header() lazy parse ──────────────────────────────────────
580
581    #[test]
582    fn event_header_lazy_parse_matches_packet_type() {
583        let payload = [0x00u8; 11]; // Timestamp payload
584        let t2mi = make_t2mi_packet(0x20, &payload);
585        let pkt = ts_packet(0x0010, &t2mi, true, 0);
586
587        let mut pump = T2miPump::new(0x0010);
588        let events: Vec<_> = pump.feed_ts(&pkt).collect();
589        assert_eq!(events.len(), 1);
590
591        let hdr = events[0].header().expect("header parse should succeed");
592        assert_eq!(hdr.packet_type as u8, 0x20);
593        assert_eq!(hdr.packet_count, 0x01);
594    }
595
596    // ── additional: stats() method ───────────────────────────────────────────
597
598    #[test]
599    fn stats_accumulate_across_feeds() {
600        let payload = [0x00u8, 0x00, 0x00];
601        let t2mi = make_t2mi_packet(0x00, &payload);
602        let pkt = ts_packet(0x0006, &t2mi, true, 0);
603
604        let mut pump = T2miPump::new(0x0006);
605        pump.feed_ts(&pkt).for_each(drop);
606        pump.feed_ts(&pkt).for_each(drop);
607
608        let stats = pump.stats();
609        assert_eq!(stats.ts_packets, 2);
610        // The reassembler resets on PUSI so we get 2 complete packets.
611        assert_eq!(stats.t2mi_packets, 2);
612    }
613
614    // ── payload_with registry seam ───────────────────────────────────────────
615
616    #[test]
617    fn payload_with_dispatches_custom_registered_type() {
618        use crate::payload::registry::PayloadRegistry;
619        use crate::traits::PayloadDef;
620        use dvb_common::Parse;
621
622        #[derive(Debug)]
623        #[cfg_attr(feature = "serde", derive(serde::Serialize))]
624        struct TestPrivatePayload {
625            val: u8,
626        }
627
628        impl<'a> Parse<'a> for TestPrivatePayload {
629            type Error = crate::Error;
630            fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
631                if bytes.is_empty() {
632                    return Err(crate::Error::BufferTooShort {
633                        need: 1,
634                        have: 0,
635                        what: "TestPrivatePayload",
636                    });
637                }
638                Ok(Self { val: bytes[0] })
639            }
640        }
641
642        impl<'a> PayloadDef<'a> for TestPrivatePayload {
643            const PACKET_TYPE: u8 = 0x00;
644            const NAME: &'static str = "TEST_PRIVATE";
645        }
646
647        let mut reg = PayloadRegistry::new();
648        reg.register::<TestPrivatePayload>();
649
650        let private_payload = [0x42u8, 0x02, 0x00];
651        let t2mi = make_t2mi_packet(0x00, &private_payload);
652        let pkt = ts_packet(0x0006, &t2mi, true, 0);
653
654        let mut pump = T2miPump::new(0x0006);
655        let events: Vec<_> = pump.feed_ts(&pkt).collect();
656        assert_eq!(events.len(), 1, "expected one event");
657
658        let result = events[0].payload_with(&reg).expect("payload_with parse");
659        match result {
660            AnyPayload::Other {
661                packet_type,
662                ref value,
663            } => {
664                assert_eq!(packet_type, 0x00);
665                let downcast = value.downcast_ref::<TestPrivatePayload>().unwrap();
666                assert_eq!(downcast.val, 0x42);
667            }
668            other => panic!("expected Other, got {other:?}"),
669        }
670
671        let built_in = events[0].payload().expect("payload parse");
672        assert!(
673            matches!(built_in, AnyPayload::Bbframe(_)),
674            "expected Bbframe via built-in dispatch, got {built_in:?}"
675        );
676    }
677
678    // ── payload_with with genuinely-private packet type (not in PacketType) ──
679
680    #[test]
681    fn payload_with_dispatches_genuinely_private_packet_type() {
682        use crate::payload::registry::PayloadRegistry;
683        use crate::traits::PayloadDef;
684        use dvb_common::Parse;
685
686        #[derive(Debug)]
687        #[cfg_attr(feature = "serde", derive(serde::Serialize))]
688        struct PrivatePayload {
689            val: u8,
690        }
691
692        impl<'a> Parse<'a> for PrivatePayload {
693            type Error = crate::Error;
694            fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
695                if bytes.is_empty() {
696                    return Err(crate::Error::BufferTooShort {
697                        need: 1,
698                        have: 0,
699                        what: "PrivatePayload",
700                    });
701                }
702                Ok(Self { val: bytes[0] })
703            }
704        }
705
706        impl<'a> PayloadDef<'a> for PrivatePayload {
707            const PACKET_TYPE: u8 = 0x42;
708            const NAME: &'static str = "PRIVATE_0X42";
709        }
710
711        let mut reg = PayloadRegistry::new();
712        reg.register::<PrivatePayload>();
713
714        let private_body = [0xABu8];
715        let t2mi = make_t2mi_packet(0x42, &private_body);
716        let pkt = ts_packet(0x0006, &t2mi, true, 0);
717
718        let mut pump = T2miPump::new(0x0006);
719        let events: Vec<_> = pump.feed_ts(&pkt).collect();
720        assert_eq!(events.len(), 1, "expected one event");
721
722        let result = events[0].payload_with(&reg).expect("payload_with parse");
723        match result {
724            AnyPayload::Other {
725                packet_type,
726                ref value,
727            } => {
728                assert_eq!(packet_type, 0x42);
729                let downcast = value.downcast_ref::<PrivatePayload>().unwrap();
730                assert_eq!(downcast.val, 0xAB);
731            }
732            other => panic!("expected Other, got {other:?}"),
733        }
734
735        let no_reg = events[0].payload().expect("payload without registry");
736        match no_reg {
737            AnyPayload::Unknown {
738                packet_type,
739                body: _,
740            } => {
741                assert_eq!(packet_type, 0x42);
742            }
743            other => panic!("expected Unknown, got {other:?}"),
744        }
745    }
746}