dvb_t2mi/pump.rs
1//! [`T2miPump`] — owning-[`Bytes`] feed-and-iterate T2-MI pump.
2//!
3//! Feed raw bytes (TS-encapsulated or bare T2-MI stream) in; get back an
4//! iterator of [`T2miEvent`]s — one per **CRC-valid** complete T2-MI packet.
5//! Lazy zero-copy: events own their [`bytes::Bytes`] slice and expose typed
6//! views ([`T2miEvent::header`], [`T2miEvent::payload`]) that borrow from it
7//! on demand.
8//!
9//! ```no_run
10//! use dvb_t2mi::pump::T2miPump;
11//! use dvb_t2mi::payload::AnyPayload;
12//!
13//! let mut pump = T2miPump::new(0x0006); // T2-MI PID from the PMT
14//! let ts_packet = [0u8; 188]; // a real TS packet from your source
15//! for event in pump.feed_ts(&ts_packet) {
16//! if let Ok(AnyPayload::Bbframe(bb)) = event.payload() {
17//! println!("BBFrame plp_id={}", bb.plp_id);
18//! }
19//! }
20//! ```
21//!
22//! # CRC policy
23//!
24//! Every complete packet is validated against its 4-byte CRC-32 trailer
25//! (ETSI TS 102 773 Annex A / [`crate::crc::validate_crc`]) before being
26//! emitted. Packets that fail CRC are silently dropped and counted in
27//! [`Stats::crc_failures`]. The caller never sees a corrupted packet.
28//!
29//! # TS header parsing
30//!
31//! [`T2miPump::feed_ts`] extracts the MPEG-TS payload in-place — sync byte
32//! 0x47, PID, PUSI flag, and adaptation-field skip per ISO/IEC 13818-1
33//! §2.4.3.2 — and passes it to [`crate::ts::PacketReassembler`]. No
34//! `dvb-si` dependency is introduced; the TS header reader is a private
35//! helper below.
36
37use bytes::Bytes;
38
39use crate::crc;
40use crate::packet::Header;
41use crate::payload::AnyPayload;
42use crate::ts::PacketReassembler;
43
44// ── TS header constants (ISO/IEC 13818-1 §2.4.3.2) ──────────────────────────
45
46/// TS sync byte.
47const TS_SYNC_BYTE: u8 = 0x47;
48/// Expected size of one MPEG-TS packet.
49const TS_PACKET_SIZE: usize = 188;
50/// Byte 1 bit 6 = PUSI (Payload Unit Start Indicator).
51const PUSI_MASK: u8 = 0x40;
52/// Byte 1 bits 4..=0 = PID upper 5 bits.
53const PID_MASK_HI: u8 = 0x1F;
54/// Byte 3 bit 5 = adaptation_field_control bit 1 (adaptation field present).
55const ADAPTATION_FLAG: u8 = 0x20;
56/// Byte 3 bit 4 = adaptation_field_control bit 0 (payload present).
57const PAYLOAD_FLAG: u8 = 0x10;
58
59/// Minimal result of TS header parsing needed by the pump.
60struct TsInfo {
61 pid: u16,
62 pusi: bool,
63 /// Byte offset within the 188-byte packet where the payload starts.
64 payload_start: usize,
65}
66
67/// Parse the 4-byte MPEG-TS header and skip any adaptation field.
68///
69/// Returns `None` when:
70/// - `buf` is shorter than [`TS_PACKET_SIZE`],
71/// - the sync byte is not `0x47`,
72/// - the payload-present flag is clear, or
73/// - the adaptation field length overflows the packet.
74///
75/// Citation: ISO/IEC 13818-1:2019 §2.4.3.2 (transport_packet header) and
76/// §2.4.3.5 (adaptation_field length).
77fn parse_ts_header(buf: &[u8]) -> Option<TsInfo> {
78 if buf.len() < TS_PACKET_SIZE || buf[0] != TS_SYNC_BYTE {
79 return None;
80 }
81 let b1 = buf[1];
82 let b3 = buf[3];
83
84 let pusi = (b1 & PUSI_MASK) != 0;
85 let pid = (((b1 & PID_MASK_HI) as u16) << 8) | (buf[2] as u16);
86 let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
87 let has_payload = (b3 & PAYLOAD_FLAG) != 0;
88
89 if !has_payload {
90 return None;
91 }
92
93 let mut cursor: usize = 4;
94 if has_adaptation {
95 let af_len = buf[cursor] as usize;
96 cursor += 1 + af_len;
97 if cursor > TS_PACKET_SIZE {
98 return None;
99 }
100 }
101
102 Some(TsInfo {
103 pid,
104 pusi,
105 payload_start: cursor,
106 })
107}
108
109// ── T2miEvent ─────────────────────────────────────────────────────────────────
110
111/// One complete, CRC-valid T2-MI packet. Owns its bytes — `'static`, cheap clone.
112///
113/// Only constructed after CRC-32 validation (ETSI TS 102 773 Annex A).
114/// [`T2miEvent::header`] and [`T2miEvent::payload`] are lazy: they borrow from
115/// the owned [`Bytes`] on demand.
116#[derive(Debug, Clone)]
117pub struct T2miEvent {
118 bytes: Bytes,
119}
120
121impl T2miEvent {
122 /// The full packet bytes (header + payload + CRC trailer).
123 #[must_use]
124 pub fn bytes(&self) -> &Bytes {
125 &self.bytes
126 }
127
128 /// The raw `packet_type` byte (byte 0 of the T2-MI header per §5.1).
129 ///
130 /// Never panics — events are only built for CRC-valid packets which are at
131 /// least `6` (header) + `4` (CRC) = 10 bytes.
132 #[must_use]
133 pub fn packet_type(&self) -> u8 {
134 self.bytes[0]
135 }
136
137 /// Parse the 6-byte T2-MI packet header (lazy, borrows this event's bytes).
138 ///
139 /// # Errors
140 ///
141 /// Propagates [`crate::Error`] from [`dvb_common::Parse::parse`] on [`Header`].
142 pub fn header(&self) -> crate::Result<Header> {
143 use dvb_common::Parse;
144 Header::parse(&self.bytes)
145 }
146
147 /// Extract the `packet_type` byte and payload slice from this event's
148 /// bytes — shared logic for [`payload`](Self::payload) and
149 /// [`payload_with`](Self::payload_with).
150 ///
151 /// Uses [`Header::raw_payload_bytes`] so that genuinely-private
152 /// `packet_type` values (not in [`PacketType`](crate::packet::PacketType))
153 /// are not rejected. The packet is already CRC-validated by the pump.
154 fn payload_parts(&self) -> crate::Result<(u8, &[u8])> {
155 let payload_bytes = Header::raw_payload_bytes(&self.bytes)?;
156 let packet_type = self.bytes[0];
157 Ok((packet_type, payload_bytes))
158 }
159
160 /// Parse the payload by dispatching on `packet_type`.
161 ///
162 /// Extracts the payload slice via [`Header::raw_payload_bytes`] (no
163 /// `packet_type` enum conversion), then calls
164 /// [`AnyPayload::dispatch`]. Unrecognised packet types produce
165 /// [`AnyPayload::Unknown`] with the raw payload bytes.
166 ///
167 /// # Errors
168 ///
169 /// Returns [`crate::Error`] from extracting the payload slice or from the
170 /// typed payload parser.
171 pub fn payload(&self) -> crate::Result<AnyPayload<'_>> {
172 let (packet_type, payload_bytes) = self.payload_parts()?;
173 Ok(match AnyPayload::dispatch(packet_type, payload_bytes) {
174 Some(result) => result?,
175 None => AnyPayload::Unknown {
176 packet_type,
177 body: payload_bytes,
178 },
179 })
180 }
181
182 /// Parse the payload by dispatching on `packet_type`, preferring the
183 /// registry's custom parsers over the built-in dispatch.
184 ///
185 /// Like [`payload`](Self::payload), but calls
186 /// [`AnyPayload::dispatch_with`] so that runtime-registered custom
187 /// packet types are resolved to [`AnyPayload::Other`]. Unrecognised
188 /// packet types produce [`AnyPayload::Unknown`] with the raw payload
189 /// bytes, exactly as [`payload`](Self::payload) does.
190 ///
191 /// # Errors
192 ///
193 /// Returns [`crate::Error`] from extracting the payload slice or from the
194 /// typed payload parser (built-in or custom).
195 pub fn payload_with(
196 &self,
197 registry: &crate::payload::PayloadRegistry,
198 ) -> crate::Result<AnyPayload<'_>> {
199 let (packet_type, payload_bytes) = self.payload_parts()?;
200 Ok(
201 match AnyPayload::dispatch_with(registry, packet_type, payload_bytes) {
202 Some(result) => result?,
203 None => AnyPayload::Unknown {
204 packet_type,
205 body: payload_bytes,
206 },
207 },
208 )
209 }
210}
211
212// ── Stats ─────────────────────────────────────────────────────────────────────
213
214/// Accumulated pump statistics (monotonically growing across all `feed` calls).
215///
216/// New counter fields may be added in a future release; construction is via
217/// [`Default`] only.
218#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
219#[non_exhaustive]
220pub struct Stats {
221 /// TS packets fed via [`T2miPump::feed_ts`].
222 pub ts_packets: u64,
223 /// Complete T2-MI packets produced by the reassembler (pre-CRC check).
224 pub t2mi_packets: u64,
225 /// Packets dropped due to CRC-32 mismatch (ETSI TS 102 773 Annex A).
226 pub crc_failures: u64,
227 /// Malformed inputs: bad TS sync byte, truncated TS packet, overflowed
228 /// adaptation field, or `feed_ts` called on a raw-mode pump.
229 pub malformed_packets: u64,
230}
231
232// ── T2miPump ──────────────────────────────────────────────────────────────────
233
234/// Feed-and-iterate T2-MI pump.
235///
236/// Supports two operating modes:
237///
238/// - **TS-encapsulated** (most common): construct with [`T2miPump::new`],
239/// passing the 13-bit PID carrying T2-MI (from the PMT). Feed 188-byte
240/// MPEG-TS packets with [`T2miPump::feed_ts`]. The pump filters by PID,
241/// strips the TS header per ISO/IEC 13818-1 §2.4.3.2, and forwards the
242/// payload to the internal [`PacketReassembler`] (ETSI TS 102 773 §6.1.1).
243///
244/// - **Raw** (un-encapsulated): construct with [`T2miPump::raw`]. Feed
245/// arbitrary byte slices with [`T2miPump::feed_raw`]. The pump buffers bytes
246/// and emits events once a full packet (determined by the header's
247/// `payload_len_bits`) is available.
248///
249/// # PID note
250///
251/// PIDs are 13-bit values (0x0000–0x1FFF per ISO/IEC 13818-1 §2.4.3.2).
252/// This type uses `u16` directly; no newtype is introduced. Values above
253/// 0x1FFF are accepted without error — the PID filter simply never matches.
254pub struct T2miPump {
255 mode: PumpMode,
256 reasm: PacketReassembler,
257 stats: Stats,
258 scratch: Vec<T2miEvent>,
259 /// Raw-mode sync flag: true once the first raw feed has initialised the
260 /// reassembler via a PUSI=true, pointer=0 signal.
261 raw_started: bool,
262}
263
264enum PumpMode {
265 /// TS-encapsulated: filter packets to this PID.
266 Ts { pid: u16 },
267 /// Un-encapsulated raw byte stream.
268 Raw,
269}
270
271impl T2miPump {
272 /// Create a TS-encapsulated pump that filters to `pid`.
273 ///
274 /// `pid` is the 13-bit T2-MI PID from the PMT (e.g. 0x0006 for data
275 /// piping).
276 ///
277 /// # PID range
278 ///
279 /// Valid MPEG-TS PIDs are 13-bit (0x0000–0x1FFF); this parameter is `u16`.
280 /// No newtype is introduced to keep the API lightweight.
281 #[must_use]
282 pub fn new(pid: u16) -> Self {
283 Self {
284 mode: PumpMode::Ts { pid },
285 reasm: PacketReassembler::new(),
286 stats: Stats::default(),
287 scratch: Vec::new(),
288 raw_started: false,
289 }
290 }
291
292 /// Create an un-encapsulated raw-stream pump.
293 ///
294 /// Use [`T2miPump::feed_raw`] to supply bytes. The pump buffers internally
295 /// and emits events by packet boundary, not by call boundary — a packet
296 /// split across two `feed_raw` calls produces exactly one event.
297 #[must_use]
298 pub fn raw() -> Self {
299 Self {
300 mode: PumpMode::Raw,
301 reasm: PacketReassembler::new(),
302 stats: Stats::default(),
303 scratch: Vec::new(),
304 raw_started: false,
305 }
306 }
307
308 /// Accumulated statistics.
309 #[must_use]
310 pub fn stats(&self) -> Stats {
311 self.stats
312 }
313
314 /// Feed one 188-byte MPEG-TS packet. Infallible: malformed packets are
315 /// counted in [`Stats::malformed_packets`] and discarded.
316 ///
317 /// Packets on the wrong PID are silently ignored (only [`Stats::ts_packets`]
318 /// is incremented).
319 ///
320 /// Returns a draining iterator over any T2-MI events completed by this feed.
321 pub fn feed_ts(&mut self, packet: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
322 self.scratch.clear();
323
324 match self.mode {
325 PumpMode::Raw => {
326 // feed_ts on a raw-mode pump is a caller error.
327 self.stats.malformed_packets += 1;
328 }
329 PumpMode::Ts { pid: filter_pid } => {
330 self.stats.ts_packets += 1;
331 match parse_ts_header(packet) {
332 None => {
333 self.stats.malformed_packets += 1;
334 }
335 Some(info) => {
336 if info.pid == filter_pid {
337 let payload = &packet[info.payload_start..TS_PACKET_SIZE];
338 self.reasm.feed(payload, info.pusi);
339 Self::drain_reasm_into(
340 &mut self.reasm,
341 &mut self.stats,
342 &mut self.scratch,
343 );
344 }
345 // Wrong PID: ignored cheaply — no stats beyond ts_packets.
346 }
347 }
348 }
349 }
350
351 self.scratch.drain(..)
352 }
353
354 /// Feed raw T2-MI bytes (un-encapsulated mode).
355 ///
356 /// The slice may contain a partial packet; bytes are buffered internally.
357 /// A packet split across two `feed_raw` calls produces exactly one event.
358 ///
359 /// Returns a draining iterator over any T2-MI events completed by this feed.
360 pub fn feed_raw(&mut self, data: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
361 self.scratch.clear();
362
363 match self.mode {
364 PumpMode::Ts { .. } => {
365 // feed_raw on a TS-mode pump is a caller error.
366 self.stats.malformed_packets += 1;
367 }
368 PumpMode::Raw => {
369 if !self.raw_started {
370 // First call: initialise the reassembler with PUSI=true and
371 // pointer_field=0. PacketReassembler::feed interprets the
372 // first byte of the payload as the pointer_field when PUSI is
373 // set (ETSI TS 102 773 §6.1.1). We prepend a 0x00 byte so the
374 // reassembler sees pointer=0 and treats the rest as the start
375 // of a new T2-MI packet.
376 let mut buf = Vec::with_capacity(1 + data.len());
377 buf.push(0x00); // pointer_field = 0
378 buf.extend_from_slice(data);
379 self.reasm.feed(&buf, true);
380 self.raw_started = true;
381 } else {
382 // Continuation: feed without PUSI — bytes extend the
383 // current T2-MI packet in progress.
384 self.reasm.feed(data, false);
385 }
386 Self::drain_reasm_into(&mut self.reasm, &mut self.stats, &mut self.scratch);
387 }
388 }
389
390 self.scratch.drain(..)
391 }
392
393 /// Drain all pending packets from the reassembler, CRC-validate each one,
394 /// and push valid packets to `scratch`.
395 fn drain_reasm_into(
396 reasm: &mut PacketReassembler,
397 stats: &mut Stats,
398 scratch: &mut Vec<T2miEvent>,
399 ) {
400 for raw in reasm.drain_packets() {
401 stats.t2mi_packets += 1;
402 match crc::validate_crc(&raw) {
403 Ok(()) => scratch.push(T2miEvent { bytes: raw }),
404 Err(_) => stats.crc_failures += 1,
405 }
406 }
407 }
408}
409
410// ── Tests ─────────────────────────────────────────────────────────────────────
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use dvb_common::crc32_mpeg2;
416
417 // ── Test helpers ─────────────────────────────────────────────────────────
418
419 /// Build a syntactically valid T2-MI packet (header + payload + CRC-32).
420 ///
421 /// `packet_type` is the raw byte (Table 1 of TS 102 773).
422 /// `payload` is the post-header, pre-CRC data.
423 /// Returns the full byte vector including the 4-byte CRC trailer.
424 fn make_t2mi_packet(packet_type: u8, payload: &[u8]) -> Vec<u8> {
425 let payload_len_bits = (payload.len() * 8) as u16;
426 let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
427 pkt.push(packet_type);
428 pkt.push(0x01); // packet_count
429 pkt.push(0x00); // superframe_idx=0, rfu=0, t2mi_stream_id=0
430 pkt.push(0x00); // rfu byte = 0
431 pkt.extend_from_slice(&payload_len_bits.to_be_bytes());
432 pkt.extend_from_slice(payload);
433 let crc = crc32_mpeg2::compute(&pkt);
434 pkt.extend_from_slice(&crc.to_be_bytes());
435 pkt
436 }
437
438 /// Wrap a T2-MI payload slice in a single 188-byte MPEG-TS packet.
439 ///
440 /// Sets PUSI=true and pointer_field=0 so the reassembler treats
441 /// the T2-MI data as starting at byte 0 of the payload.
442 /// The T2-MI bytes must fit in 183 bytes (188 − 4 header − 1 pointer).
443 fn ts_packet(pid: u16, t2mi_data: &[u8], pusi: bool, pointer_field: u8) -> [u8; 188] {
444 let mut pkt = [0xFFu8; 188];
445 pkt[0] = TS_SYNC_BYTE;
446 pkt[1] = if pusi { PUSI_MASK } else { 0 };
447 pkt[1] |= ((pid >> 8) as u8) & PID_MASK_HI;
448 pkt[2] = (pid & 0xFF) as u8;
449 pkt[3] = PAYLOAD_FLAG; // payload present, no adaptation field
450 if pusi {
451 pkt[4] = pointer_field;
452 let start = 5 + pointer_field as usize;
453 assert!(
454 start + t2mi_data.len() <= 188,
455 "T2-MI data too large for one TS packet"
456 );
457 pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
458 } else {
459 let start = 4;
460 assert!(
461 start + t2mi_data.len() <= 188,
462 "T2-MI data too large for one TS packet"
463 );
464 pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
465 }
466 pkt
467 }
468
469 // ── (a) valid T2-MI packet in TS → one event, typed payload ──────────────
470
471 #[test]
472 fn ts_packet_emits_one_event_with_typed_payload() {
473 // Build a valid BBFrame T2-MI packet.
474 // BbframePayload minimum: frame_idx(1) + plp_id(1) + flags(1) = 3 bytes.
475 let bbframe_payload = [0x01u8, 0x02, 0x00];
476 let t2mi = make_t2mi_packet(0x00, &bbframe_payload);
477
478 let pkt = ts_packet(0x0006, &t2mi, true, 0);
479 let mut pump = T2miPump::new(0x0006);
480 let events: Vec<_> = pump.feed_ts(&pkt).collect();
481
482 assert_eq!(events.len(), 1, "expected exactly one event");
483 assert_eq!(events[0].packet_type(), 0x00);
484
485 let payload = events[0].payload().expect("payload parse should succeed");
486 assert!(
487 matches!(payload, AnyPayload::Bbframe(_)),
488 "expected Bbframe, got {payload:?}"
489 );
490
491 let stats = pump.stats();
492 assert_eq!(stats.ts_packets, 1);
493 assert_eq!(stats.t2mi_packets, 1);
494 assert_eq!(stats.crc_failures, 0);
495 assert_eq!(stats.malformed_packets, 0);
496 }
497
498 // ── (b) corrupted CRC → zero events, crc_failures=1 ─────────────────────
499
500 #[test]
501 fn corrupted_crc_drops_packet_and_counts() {
502 let payload = [0x00u8, 0x00, 0x00]; // minimal BBFrame payload
503 let mut t2mi = make_t2mi_packet(0x00, &payload);
504 // Corrupt the last CRC byte.
505 *t2mi.last_mut().unwrap() ^= 0xFF;
506
507 let pkt = ts_packet(0x0006, &t2mi, true, 0);
508 let mut pump = T2miPump::new(0x0006);
509 let events: Vec<_> = pump.feed_ts(&pkt).collect();
510
511 assert_eq!(events.len(), 0, "corrupted packet must not emit");
512 let stats = pump.stats();
513 assert_eq!(stats.crc_failures, 1);
514 assert_eq!(stats.t2mi_packets, 1); // reassembler produced it, CRC gate dropped it
515 }
516
517 // ── (c) feed_raw with packet split across two calls → one event ──────────
518
519 #[test]
520 fn feed_raw_split_across_two_calls_emits_one_event() {
521 // Use a timestamp payload (11 bytes, all zeros), packet_type=0x20.
522 let ts_payload = [0x00u8; 11];
523 let t2mi = make_t2mi_packet(0x20, &ts_payload);
524
525 // Split at an arbitrary boundary (e.g. after the header).
526 let split = 6;
527 let first = &t2mi[..split];
528 let second = &t2mi[split..];
529
530 let mut pump = T2miPump::raw();
531
532 let ev1: Vec<_> = pump.feed_raw(first).collect();
533 assert_eq!(ev1.len(), 0, "no complete packet yet after first chunk");
534
535 let ev2: Vec<_> = pump.feed_raw(second).collect();
536 assert_eq!(
537 ev2.len(),
538 1,
539 "one event after second chunk completes the packet"
540 );
541
542 let stats = pump.stats();
543 assert_eq!(stats.t2mi_packets, 1);
544 assert_eq!(stats.crc_failures, 0);
545 }
546
547 // ── (d) garbage TS packet → malformed counted, no panic ──────────────────
548
549 #[test]
550 fn garbage_ts_packet_counted_no_panic() {
551 let mut pump = T2miPump::new(0x0006);
552 let garbage = [0x00u8; 188]; // bad sync byte
553 let events: Vec<_> = pump.feed_ts(&garbage).collect();
554 assert_eq!(events.len(), 0);
555 assert_eq!(pump.stats().malformed_packets, 1);
556 assert_eq!(pump.stats().ts_packets, 1);
557 }
558
559 // ── (e) wrong-PID TS packet → ignored cheaply ────────────────────────────
560
561 #[test]
562 fn wrong_pid_ts_packet_ignored() {
563 let payload = [0x00u8, 0x00, 0x00];
564 let t2mi = make_t2mi_packet(0x00, &payload);
565 let pkt = ts_packet(0x0100, &t2mi, true, 0); // PID 0x0100, pump listens on 0x0006
566
567 let mut pump = T2miPump::new(0x0006);
568 let events: Vec<_> = pump.feed_ts(&pkt).collect();
569
570 assert_eq!(events.len(), 0, "wrong-PID packet must not emit");
571 // ts_packets incremented, but nothing else moves.
572 let stats = pump.stats();
573 assert_eq!(stats.ts_packets, 1);
574 assert_eq!(stats.t2mi_packets, 0);
575 assert_eq!(stats.crc_failures, 0);
576 assert_eq!(stats.malformed_packets, 0);
577 }
578
579 // ── additional: header() lazy parse ──────────────────────────────────────
580
581 #[test]
582 fn event_header_lazy_parse_matches_packet_type() {
583 let payload = [0x00u8; 11]; // Timestamp payload
584 let t2mi = make_t2mi_packet(0x20, &payload);
585 let pkt = ts_packet(0x0010, &t2mi, true, 0);
586
587 let mut pump = T2miPump::new(0x0010);
588 let events: Vec<_> = pump.feed_ts(&pkt).collect();
589 assert_eq!(events.len(), 1);
590
591 let hdr = events[0].header().expect("header parse should succeed");
592 assert_eq!(hdr.packet_type as u8, 0x20);
593 assert_eq!(hdr.packet_count, 0x01);
594 }
595
596 // ── additional: stats() method ───────────────────────────────────────────
597
598 #[test]
599 fn stats_accumulate_across_feeds() {
600 let payload = [0x00u8, 0x00, 0x00];
601 let t2mi = make_t2mi_packet(0x00, &payload);
602 let pkt = ts_packet(0x0006, &t2mi, true, 0);
603
604 let mut pump = T2miPump::new(0x0006);
605 pump.feed_ts(&pkt).for_each(drop);
606 pump.feed_ts(&pkt).for_each(drop);
607
608 let stats = pump.stats();
609 assert_eq!(stats.ts_packets, 2);
610 // The reassembler resets on PUSI so we get 2 complete packets.
611 assert_eq!(stats.t2mi_packets, 2);
612 }
613
614 // ── payload_with registry seam ───────────────────────────────────────────
615
616 #[test]
617 fn payload_with_dispatches_custom_registered_type() {
618 use crate::payload::registry::PayloadRegistry;
619 use crate::traits::PayloadDef;
620 use dvb_common::Parse;
621
622 #[derive(Debug)]
623 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
624 struct TestPrivatePayload {
625 val: u8,
626 }
627
628 impl<'a> Parse<'a> for TestPrivatePayload {
629 type Error = crate::Error;
630 fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
631 if bytes.is_empty() {
632 return Err(crate::Error::BufferTooShort {
633 need: 1,
634 have: 0,
635 what: "TestPrivatePayload",
636 });
637 }
638 Ok(Self { val: bytes[0] })
639 }
640 }
641
642 impl<'a> PayloadDef<'a> for TestPrivatePayload {
643 const PACKET_TYPE: u8 = 0x00;
644 const NAME: &'static str = "TEST_PRIVATE";
645 }
646
647 let mut reg = PayloadRegistry::new();
648 reg.register::<TestPrivatePayload>();
649
650 let private_payload = [0x42u8, 0x02, 0x00];
651 let t2mi = make_t2mi_packet(0x00, &private_payload);
652 let pkt = ts_packet(0x0006, &t2mi, true, 0);
653
654 let mut pump = T2miPump::new(0x0006);
655 let events: Vec<_> = pump.feed_ts(&pkt).collect();
656 assert_eq!(events.len(), 1, "expected one event");
657
658 let result = events[0].payload_with(®).expect("payload_with parse");
659 match result {
660 AnyPayload::Other {
661 packet_type,
662 ref value,
663 } => {
664 assert_eq!(packet_type, 0x00);
665 let downcast = value.downcast_ref::<TestPrivatePayload>().unwrap();
666 assert_eq!(downcast.val, 0x42);
667 }
668 other => panic!("expected Other, got {other:?}"),
669 }
670
671 let built_in = events[0].payload().expect("payload parse");
672 assert!(
673 matches!(built_in, AnyPayload::Bbframe(_)),
674 "expected Bbframe via built-in dispatch, got {built_in:?}"
675 );
676 }
677
678 // ── payload_with with genuinely-private packet type (not in PacketType) ──
679
680 #[test]
681 fn payload_with_dispatches_genuinely_private_packet_type() {
682 use crate::payload::registry::PayloadRegistry;
683 use crate::traits::PayloadDef;
684 use dvb_common::Parse;
685
686 #[derive(Debug)]
687 #[cfg_attr(feature = "serde", derive(serde::Serialize))]
688 struct PrivatePayload {
689 val: u8,
690 }
691
692 impl<'a> Parse<'a> for PrivatePayload {
693 type Error = crate::Error;
694 fn parse(bytes: &'a [u8]) -> crate::Result<Self> {
695 if bytes.is_empty() {
696 return Err(crate::Error::BufferTooShort {
697 need: 1,
698 have: 0,
699 what: "PrivatePayload",
700 });
701 }
702 Ok(Self { val: bytes[0] })
703 }
704 }
705
706 impl<'a> PayloadDef<'a> for PrivatePayload {
707 const PACKET_TYPE: u8 = 0x42;
708 const NAME: &'static str = "PRIVATE_0X42";
709 }
710
711 let mut reg = PayloadRegistry::new();
712 reg.register::<PrivatePayload>();
713
714 let private_body = [0xABu8];
715 let t2mi = make_t2mi_packet(0x42, &private_body);
716 let pkt = ts_packet(0x0006, &t2mi, true, 0);
717
718 let mut pump = T2miPump::new(0x0006);
719 let events: Vec<_> = pump.feed_ts(&pkt).collect();
720 assert_eq!(events.len(), 1, "expected one event");
721
722 let result = events[0].payload_with(®).expect("payload_with parse");
723 match result {
724 AnyPayload::Other {
725 packet_type,
726 ref value,
727 } => {
728 assert_eq!(packet_type, 0x42);
729 let downcast = value.downcast_ref::<PrivatePayload>().unwrap();
730 assert_eq!(downcast.val, 0xAB);
731 }
732 other => panic!("expected Other, got {other:?}"),
733 }
734
735 let no_reg = events[0].payload().expect("payload without registry");
736 match no_reg {
737 AnyPayload::Unknown {
738 packet_type,
739 body: _,
740 } => {
741 assert_eq!(packet_type, 0x42);
742 }
743 other => panic!("expected Unknown, got {other:?}"),
744 }
745 }
746}