dvb_t2mi/pump.rs
1//! [`T2miPump`] — owning-[`Bytes`] feed-and-iterate T2-MI pump.
2//!
3//! Feed raw bytes (TS-encapsulated or bare T2-MI stream) in; get back an
4//! iterator of [`T2miEvent`]s — one per **CRC-valid** complete T2-MI packet.
5//! Lazy zero-copy: events own their [`bytes::Bytes`] slice and expose typed
6//! views ([`T2miEvent::header`], [`T2miEvent::payload`]) that borrow from it
7//! on demand.
8//!
9//! ```no_run
10//! use dvb_t2mi::pump::T2miPump;
11//! use dvb_t2mi::payload::AnyPayload;
12//!
13//! let mut pump = T2miPump::new(0x0006); // T2-MI PID from the PMT
14//! let ts_packet = [0u8; 188]; // a real TS packet from your source
15//! for event in pump.feed_ts(&ts_packet) {
16//! if let Ok(AnyPayload::Bbframe(bb)) = event.payload() {
17//! println!("BBFrame plp_id={}", bb.plp_id);
18//! }
19//! }
20//! ```
21//!
22//! # CRC policy
23//!
24//! Every complete packet is validated against its 4-byte CRC-32 trailer
25//! (ETSI TS 102 773 Annex A / [`crate::crc::validate_crc`]) before being
26//! emitted. Packets that fail CRC are silently dropped and counted in
27//! [`Stats::crc_failures`]. The caller never sees a corrupted packet.
28//!
29//! # TS header parsing
30//!
31//! [`T2miPump::feed_ts`] extracts the MPEG-TS payload in-place — sync byte
32//! 0x47, PID, PUSI flag, and adaptation-field skip per ISO/IEC 13818-1
33//! §2.4.3.2 — and passes it to [`crate::ts::PacketReassembler`]. No
34//! `dvb-si` dependency is introduced; the TS header reader is a private
35//! helper below.
36
37use bytes::Bytes;
38
39use crate::crc;
40use crate::packet::Header;
41use crate::payload::AnyPayload;
42use crate::ts::PacketReassembler;
43
44// ── TS header constants (ISO/IEC 13818-1 §2.4.3.2) ──────────────────────────
45
46/// TS sync byte.
47const TS_SYNC: u8 = 0x47;
48/// Expected size of one MPEG-TS packet.
49const TS_PACKET_SIZE: usize = 188;
50/// Byte 1 bit 6 = PUSI (Payload Unit Start Indicator).
51const PUSI_MASK: u8 = 0x40;
52/// Byte 1 bits 4..=0 = PID upper 5 bits.
53const PID_HI_MASK: u8 = 0x1F;
54/// Byte 3 bit 5 = adaptation_field_control bit 1 (adaptation field present).
55const ADAPTATION_FLAG: u8 = 0x20;
56/// Byte 3 bit 4 = adaptation_field_control bit 0 (payload present).
57const PAYLOAD_FLAG: u8 = 0x10;
58
59/// Minimal result of TS header parsing needed by the pump.
60struct TsInfo {
61 pid: u16,
62 pusi: bool,
63 /// Byte offset within the 188-byte packet where the payload starts.
64 payload_start: usize,
65}
66
67/// Parse the 4-byte MPEG-TS header and skip any adaptation field.
68///
69/// Returns `None` when:
70/// - `buf` is shorter than [`TS_PACKET_SIZE`],
71/// - the sync byte is not `0x47`,
72/// - the payload-present flag is clear, or
73/// - the adaptation field length overflows the packet.
74///
75/// Citation: ISO/IEC 13818-1:2019 §2.4.3.2 (transport_packet header) and
76/// §2.4.3.5 (adaptation_field length).
77fn parse_ts_header(buf: &[u8]) -> Option<TsInfo> {
78 if buf.len() < TS_PACKET_SIZE || buf[0] != TS_SYNC {
79 return None;
80 }
81 let b1 = buf[1];
82 let b3 = buf[3];
83
84 let pusi = (b1 & PUSI_MASK) != 0;
85 let pid = (((b1 & PID_HI_MASK) as u16) << 8) | (buf[2] as u16);
86 let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
87 let has_payload = (b3 & PAYLOAD_FLAG) != 0;
88
89 if !has_payload {
90 return None;
91 }
92
93 let mut cursor: usize = 4;
94 if has_adaptation {
95 if cursor >= TS_PACKET_SIZE {
96 return None;
97 }
98 let af_len = buf[cursor] as usize;
99 cursor += 1 + af_len;
100 if cursor > TS_PACKET_SIZE {
101 return None;
102 }
103 }
104
105 Some(TsInfo {
106 pid,
107 pusi,
108 payload_start: cursor,
109 })
110}
111
112// ── T2miEvent ─────────────────────────────────────────────────────────────────
113
114/// One complete, CRC-valid T2-MI packet. Owns its bytes — `'static`, cheap clone.
115///
116/// Only constructed after CRC-32 validation (ETSI TS 102 773 Annex A).
117/// [`T2miEvent::header`] and [`T2miEvent::payload`] are lazy: they borrow from
118/// the owned [`Bytes`] on demand.
119#[derive(Debug, Clone)]
120pub struct T2miEvent {
121 bytes: Bytes,
122}
123
124impl T2miEvent {
125 /// The full packet bytes (header + payload + CRC trailer).
126 #[must_use]
127 pub fn bytes(&self) -> &Bytes {
128 &self.bytes
129 }
130
131 /// The raw `packet_type` byte (byte 0 of the T2-MI header per §5.1).
132 ///
133 /// Never panics — events are only built for CRC-valid packets which are at
134 /// least `6` (header) + `4` (CRC) = 10 bytes.
135 #[must_use]
136 pub fn packet_type(&self) -> u8 {
137 self.bytes[0]
138 }
139
140 /// Parse the 6-byte T2-MI packet header (lazy, borrows this event's bytes).
141 ///
142 /// # Errors
143 ///
144 /// Propagates [`crate::Error`] from [`dvb_common::Parse::parse`] on [`Header`].
145 pub fn header(&self) -> crate::Result<Header> {
146 use dvb_common::Parse;
147 Header::parse(&self.bytes)
148 }
149
150 /// Parse the payload by dispatching on `packet_type`.
151 ///
152 /// Parses the 6-byte header to obtain `payload_len_bytes`, slices
153 /// `bytes[6..6+payload_len_bytes]`, and calls
154 /// [`AnyPayload::dispatch`]. Unrecognised packet types produce
155 /// [`AnyPayload::Unknown`] with the raw payload bytes.
156 ///
157 /// # Errors
158 ///
159 /// Returns [`crate::Error`] from parsing the [`Header`] or from the typed
160 /// payload parser.
161 pub fn payload(&self) -> crate::Result<AnyPayload<'_>> {
162 use dvb_common::Parse;
163 let hdr = Header::parse(&self.bytes)?;
164 let payload_bytes = hdr.payload_bytes(&self.bytes)?;
165 let packet_type = self.bytes[0];
166 Ok(match AnyPayload::dispatch(packet_type, payload_bytes) {
167 Some(result) => result?,
168 None => AnyPayload::Unknown {
169 packet_type,
170 body: payload_bytes,
171 },
172 })
173 }
174}
175
176// ── Stats ─────────────────────────────────────────────────────────────────────
177
178/// Accumulated pump statistics (monotonically growing across all `feed` calls).
179#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
180pub struct Stats {
181 /// TS packets fed via [`T2miPump::feed_ts`].
182 pub ts_packets: u64,
183 /// Complete T2-MI packets produced by the reassembler (pre-CRC check).
184 pub t2mi_packets: u64,
185 /// Packets dropped due to CRC-32 mismatch (ETSI TS 102 773 Annex A).
186 pub crc_failures: u64,
187 /// Malformed inputs: bad TS sync byte, truncated TS packet, overflowed
188 /// adaptation field, or `feed_ts` called on a raw-mode pump.
189 pub malformed_packets: u64,
190}
191
192// ── T2miPump ──────────────────────────────────────────────────────────────────
193
194/// Feed-and-iterate T2-MI pump.
195///
196/// Supports two operating modes:
197///
198/// - **TS-encapsulated** (most common): construct with [`T2miPump::new`],
199/// passing the 13-bit PID carrying T2-MI (from the PMT). Feed 188-byte
200/// MPEG-TS packets with [`T2miPump::feed_ts`]. The pump filters by PID,
201/// strips the TS header per ISO/IEC 13818-1 §2.4.3.2, and forwards the
202/// payload to the internal [`PacketReassembler`] (ETSI TS 102 773 §6.1.1).
203///
204/// - **Raw** (un-encapsulated): construct with [`T2miPump::raw`]. Feed
205/// arbitrary byte slices with [`T2miPump::feed_raw`]. The pump buffers bytes
206/// and emits events once a full packet (determined by the header's
207/// `payload_len_bits`) is available.
208///
209/// # PID note
210///
211/// PIDs are 13-bit values (0x0000–0x1FFF per ISO/IEC 13818-1 §2.4.3.2).
212/// This type uses `u16` directly; no newtype is introduced. Values above
213/// 0x1FFF are accepted without error — the PID filter simply never matches.
214pub struct T2miPump {
215 mode: PumpMode,
216 reasm: PacketReassembler,
217 stats: Stats,
218 scratch: Vec<T2miEvent>,
219 /// Raw-mode sync flag: true once the first raw feed has initialised the
220 /// reassembler via a PUSI=true, pointer=0 signal.
221 raw_started: bool,
222}
223
224enum PumpMode {
225 /// TS-encapsulated: filter packets to this PID.
226 Ts { pid: u16 },
227 /// Un-encapsulated raw byte stream.
228 Raw,
229}
230
231impl T2miPump {
232 /// Create a TS-encapsulated pump that filters to `pid`.
233 ///
234 /// `pid` is the 13-bit T2-MI PID from the PMT (e.g. 0x0006 for data
235 /// piping).
236 ///
237 /// # PID range
238 ///
239 /// Valid MPEG-TS PIDs are 13-bit (0x0000–0x1FFF); this parameter is `u16`.
240 /// No newtype is introduced to keep the API lightweight.
241 #[must_use]
242 pub fn new(pid: u16) -> Self {
243 Self {
244 mode: PumpMode::Ts { pid },
245 reasm: PacketReassembler::new(),
246 stats: Stats::default(),
247 scratch: Vec::new(),
248 raw_started: false,
249 }
250 }
251
252 /// Create an un-encapsulated raw-stream pump.
253 ///
254 /// Use [`T2miPump::feed_raw`] to supply bytes. The pump buffers internally
255 /// and emits events by packet boundary, not by call boundary — a packet
256 /// split across two `feed_raw` calls produces exactly one event.
257 #[must_use]
258 pub fn raw() -> Self {
259 Self {
260 mode: PumpMode::Raw,
261 reasm: PacketReassembler::new(),
262 stats: Stats::default(),
263 scratch: Vec::new(),
264 raw_started: false,
265 }
266 }
267
268 /// Accumulated statistics.
269 #[must_use]
270 pub fn stats(&self) -> Stats {
271 self.stats
272 }
273
274 /// Feed one 188-byte MPEG-TS packet. Infallible: malformed packets are
275 /// counted in [`Stats::malformed_packets`] and discarded.
276 ///
277 /// Packets on the wrong PID are silently ignored (only [`Stats::ts_packets`]
278 /// is incremented).
279 ///
280 /// Returns a draining iterator over any T2-MI events completed by this feed.
281 pub fn feed_ts(&mut self, packet: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
282 self.scratch.clear();
283
284 match self.mode {
285 PumpMode::Raw => {
286 // feed_ts on a raw-mode pump is a caller error.
287 self.stats.malformed_packets += 1;
288 }
289 PumpMode::Ts { pid: filter_pid } => {
290 self.stats.ts_packets += 1;
291 match parse_ts_header(packet) {
292 None => {
293 self.stats.malformed_packets += 1;
294 }
295 Some(info) => {
296 if info.pid == filter_pid {
297 let payload = &packet[info.payload_start..TS_PACKET_SIZE];
298 self.reasm.feed(payload, info.pusi);
299 Self::drain_reasm_into(
300 &mut self.reasm,
301 &mut self.stats,
302 &mut self.scratch,
303 );
304 }
305 // Wrong PID: ignored cheaply — no stats beyond ts_packets.
306 }
307 }
308 }
309 }
310
311 self.scratch.drain(..)
312 }
313
314 /// Feed raw T2-MI bytes (un-encapsulated mode).
315 ///
316 /// The slice may contain a partial packet; bytes are buffered internally.
317 /// A packet split across two `feed_raw` calls produces exactly one event.
318 ///
319 /// Returns a draining iterator over any T2-MI events completed by this feed.
320 pub fn feed_raw(&mut self, data: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
321 self.scratch.clear();
322
323 match self.mode {
324 PumpMode::Ts { .. } => {
325 // feed_raw on a TS-mode pump is a caller error.
326 self.stats.malformed_packets += 1;
327 }
328 PumpMode::Raw => {
329 if !self.raw_started {
330 // First call: initialise the reassembler with PUSI=true and
331 // pointer_field=0. PacketReassembler::feed interprets the
332 // first byte of the payload as the pointer_field when PUSI is
333 // set (ETSI TS 102 773 §6.1.1). We prepend a 0x00 byte so the
334 // reassembler sees pointer=0 and treats the rest as the start
335 // of a new T2-MI packet.
336 let mut buf = Vec::with_capacity(1 + data.len());
337 buf.push(0x00); // pointer_field = 0
338 buf.extend_from_slice(data);
339 self.reasm.feed(&buf, true);
340 self.raw_started = true;
341 } else {
342 // Continuation: feed without PUSI — bytes extend the
343 // current T2-MI packet in progress.
344 self.reasm.feed(data, false);
345 }
346 Self::drain_reasm_into(&mut self.reasm, &mut self.stats, &mut self.scratch);
347 }
348 }
349
350 self.scratch.drain(..)
351 }
352
353 /// Drain all pending packets from the reassembler, CRC-validate each one,
354 /// and push valid packets to `scratch`.
355 fn drain_reasm_into(
356 reasm: &mut PacketReassembler,
357 stats: &mut Stats,
358 scratch: &mut Vec<T2miEvent>,
359 ) {
360 for raw in reasm.drain_packets() {
361 stats.t2mi_packets += 1;
362 match crc::validate_crc(&raw) {
363 Ok(()) => scratch.push(T2miEvent { bytes: raw }),
364 Err(_) => stats.crc_failures += 1,
365 }
366 }
367 }
368}
369
370// ── Tests ─────────────────────────────────────────────────────────────────────
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use dvb_common::crc32_mpeg2;
376
377 // ── Test helpers ─────────────────────────────────────────────────────────
378
379 /// Build a syntactically valid T2-MI packet (header + payload + CRC-32).
380 ///
381 /// `packet_type` is the raw byte (Table 1 of TS 102 773).
382 /// `payload` is the post-header, pre-CRC data.
383 /// Returns the full byte vector including the 4-byte CRC trailer.
384 fn make_t2mi_packet(packet_type: u8, payload: &[u8]) -> Vec<u8> {
385 let payload_len_bits = (payload.len() * 8) as u16;
386 let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
387 pkt.push(packet_type);
388 pkt.push(0x01); // packet_count
389 pkt.push(0x00); // superframe_idx=0, rfu=0, t2mi_stream_id=0
390 pkt.push(0x00); // rfu byte = 0
391 pkt.extend_from_slice(&payload_len_bits.to_be_bytes());
392 pkt.extend_from_slice(payload);
393 let crc = crc32_mpeg2::compute(&pkt);
394 pkt.extend_from_slice(&crc.to_be_bytes());
395 pkt
396 }
397
398 /// Wrap a T2-MI payload slice in a single 188-byte MPEG-TS packet.
399 ///
400 /// Sets PUSI=true and pointer_field=0 so the reassembler treats
401 /// the T2-MI data as starting at byte 0 of the payload.
402 /// The T2-MI bytes must fit in 183 bytes (188 − 4 header − 1 pointer).
403 fn ts_packet(pid: u16, t2mi_data: &[u8], pusi: bool, pointer_field: u8) -> [u8; 188] {
404 let mut pkt = [0xFFu8; 188];
405 pkt[0] = TS_SYNC;
406 pkt[1] = if pusi { PUSI_MASK } else { 0 };
407 pkt[1] |= ((pid >> 8) as u8) & PID_HI_MASK;
408 pkt[2] = (pid & 0xFF) as u8;
409 pkt[3] = PAYLOAD_FLAG; // payload present, no adaptation field
410 if pusi {
411 pkt[4] = pointer_field;
412 let start = 5 + pointer_field as usize;
413 assert!(
414 start + t2mi_data.len() <= 188,
415 "T2-MI data too large for one TS packet"
416 );
417 pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
418 } else {
419 let start = 4;
420 assert!(
421 start + t2mi_data.len() <= 188,
422 "T2-MI data too large for one TS packet"
423 );
424 pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
425 }
426 pkt
427 }
428
429 // ── (a) valid T2-MI packet in TS → one event, typed payload ──────────────
430
431 #[test]
432 fn ts_packet_emits_one_event_with_typed_payload() {
433 // Build a valid BBFrame T2-MI packet.
434 // BbframePayload minimum: frame_idx(1) + plp_id(1) + flags(1) = 3 bytes.
435 let bbframe_payload = [0x01u8, 0x02, 0x00];
436 let t2mi = make_t2mi_packet(0x00, &bbframe_payload);
437
438 let pkt = ts_packet(0x0006, &t2mi, true, 0);
439 let mut pump = T2miPump::new(0x0006);
440 let events: Vec<_> = pump.feed_ts(&pkt).collect();
441
442 assert_eq!(events.len(), 1, "expected exactly one event");
443 assert_eq!(events[0].packet_type(), 0x00);
444
445 let payload = events[0].payload().expect("payload parse should succeed");
446 assert!(
447 matches!(payload, AnyPayload::Bbframe(_)),
448 "expected Bbframe, got {payload:?}"
449 );
450
451 let stats = pump.stats();
452 assert_eq!(stats.ts_packets, 1);
453 assert_eq!(stats.t2mi_packets, 1);
454 assert_eq!(stats.crc_failures, 0);
455 assert_eq!(stats.malformed_packets, 0);
456 }
457
458 // ── (b) corrupted CRC → zero events, crc_failures=1 ─────────────────────
459
460 #[test]
461 fn corrupted_crc_drops_packet_and_counts() {
462 let payload = [0x00u8, 0x00, 0x00]; // minimal BBFrame payload
463 let mut t2mi = make_t2mi_packet(0x00, &payload);
464 // Corrupt the last CRC byte.
465 *t2mi.last_mut().unwrap() ^= 0xFF;
466
467 let pkt = ts_packet(0x0006, &t2mi, true, 0);
468 let mut pump = T2miPump::new(0x0006);
469 let events: Vec<_> = pump.feed_ts(&pkt).collect();
470
471 assert_eq!(events.len(), 0, "corrupted packet must not emit");
472 let stats = pump.stats();
473 assert_eq!(stats.crc_failures, 1);
474 assert_eq!(stats.t2mi_packets, 1); // reassembler produced it, CRC gate dropped it
475 }
476
477 // ── (c) feed_raw with packet split across two calls → one event ──────────
478
479 #[test]
480 fn feed_raw_split_across_two_calls_emits_one_event() {
481 // Use a timestamp payload (11 bytes, all zeros), packet_type=0x20.
482 let ts_payload = [0x00u8; 11];
483 let t2mi = make_t2mi_packet(0x20, &ts_payload);
484
485 // Split at an arbitrary boundary (e.g. after the header).
486 let split = 6;
487 let first = &t2mi[..split];
488 let second = &t2mi[split..];
489
490 let mut pump = T2miPump::raw();
491
492 let ev1: Vec<_> = pump.feed_raw(first).collect();
493 assert_eq!(ev1.len(), 0, "no complete packet yet after first chunk");
494
495 let ev2: Vec<_> = pump.feed_raw(second).collect();
496 assert_eq!(
497 ev2.len(),
498 1,
499 "one event after second chunk completes the packet"
500 );
501
502 let stats = pump.stats();
503 assert_eq!(stats.t2mi_packets, 1);
504 assert_eq!(stats.crc_failures, 0);
505 }
506
507 // ── (d) garbage TS packet → malformed counted, no panic ──────────────────
508
509 #[test]
510 fn garbage_ts_packet_counted_no_panic() {
511 let mut pump = T2miPump::new(0x0006);
512 let garbage = [0x00u8; 188]; // bad sync byte
513 let events: Vec<_> = pump.feed_ts(&garbage).collect();
514 assert_eq!(events.len(), 0);
515 assert_eq!(pump.stats().malformed_packets, 1);
516 assert_eq!(pump.stats().ts_packets, 1);
517 }
518
519 // ── (e) wrong-PID TS packet → ignored cheaply ────────────────────────────
520
521 #[test]
522 fn wrong_pid_ts_packet_ignored() {
523 let payload = [0x00u8, 0x00, 0x00];
524 let t2mi = make_t2mi_packet(0x00, &payload);
525 let pkt = ts_packet(0x0100, &t2mi, true, 0); // PID 0x0100, pump listens on 0x0006
526
527 let mut pump = T2miPump::new(0x0006);
528 let events: Vec<_> = pump.feed_ts(&pkt).collect();
529
530 assert_eq!(events.len(), 0, "wrong-PID packet must not emit");
531 // ts_packets incremented, but nothing else moves.
532 let stats = pump.stats();
533 assert_eq!(stats.ts_packets, 1);
534 assert_eq!(stats.t2mi_packets, 0);
535 assert_eq!(stats.crc_failures, 0);
536 assert_eq!(stats.malformed_packets, 0);
537 }
538
539 // ── additional: header() lazy parse ──────────────────────────────────────
540
541 #[test]
542 fn event_header_lazy_parse_matches_packet_type() {
543 let payload = [0x00u8; 11]; // Timestamp payload
544 let t2mi = make_t2mi_packet(0x20, &payload);
545 let pkt = ts_packet(0x0010, &t2mi, true, 0);
546
547 let mut pump = T2miPump::new(0x0010);
548 let events: Vec<_> = pump.feed_ts(&pkt).collect();
549 assert_eq!(events.len(), 1);
550
551 let hdr = events[0].header().expect("header parse should succeed");
552 assert_eq!(hdr.packet_type as u8, 0x20);
553 assert_eq!(hdr.packet_count, 0x01);
554 }
555
556 // ── additional: stats() method ───────────────────────────────────────────
557
558 #[test]
559 fn stats_accumulate_across_feeds() {
560 let payload = [0x00u8, 0x00, 0x00];
561 let t2mi = make_t2mi_packet(0x00, &payload);
562 let pkt = ts_packet(0x0006, &t2mi, true, 0);
563
564 let mut pump = T2miPump::new(0x0006);
565 pump.feed_ts(&pkt).for_each(drop);
566 pump.feed_ts(&pkt).for_each(drop);
567
568 let stats = pump.stats();
569 assert_eq!(stats.ts_packets, 2);
570 // The reassembler resets on PUSI so we get 2 complete packets.
571 assert_eq!(stats.t2mi_packets, 2);
572 }
573}