Skip to main content

dvb_t2mi/
pump.rs

1//! [`T2miPump`] — owning-[`Bytes`] feed-and-iterate T2-MI pump.
2//!
3//! Feed raw bytes (TS-encapsulated or bare T2-MI stream) in; get back an
4//! iterator of [`T2miEvent`]s — one per **CRC-valid** complete T2-MI packet.
5//! Lazy zero-copy: events own their [`bytes::Bytes`] slice and expose typed
6//! views ([`T2miEvent::header`], [`T2miEvent::payload`]) that borrow from it
7//! on demand.
8//!
9//! ```no_run
10//! use dvb_t2mi::pump::T2miPump;
11//! use dvb_t2mi::payload::AnyPayload;
12//!
13//! let mut pump = T2miPump::new(0x0006); // T2-MI PID from the PMT
14//! let ts_packet = [0u8; 188]; // a real TS packet from your source
15//! for event in pump.feed_ts(&ts_packet) {
16//!     if let Ok(AnyPayload::Bbframe(bb)) = event.payload() {
17//!         println!("BBFrame plp_id={}", bb.plp_id);
18//!     }
19//! }
20//! ```
21//!
22//! # CRC policy
23//!
24//! Every complete packet is validated against its 4-byte CRC-32 trailer
25//! (ETSI TS 102 773 Annex A / [`crate::crc::validate_crc`]) before being
26//! emitted.  Packets that fail CRC are silently dropped and counted in
27//! [`Stats::crc_failures`].  The caller never sees a corrupted packet.
28//!
29//! # TS header parsing
30//!
31//! [`T2miPump::feed_ts`] extracts the MPEG-TS payload in-place — sync byte
32//! 0x47, PID, PUSI flag, and adaptation-field skip per ISO/IEC 13818-1
33//! §2.4.3.2 — and passes it to [`crate::ts::PacketReassembler`].  No
34//! `dvb-si` dependency is introduced; the TS header reader is a private
35//! helper below.
36
37use bytes::Bytes;
38
39use crate::crc;
40use crate::packet::Header;
41use crate::payload::AnyPayload;
42use crate::ts::PacketReassembler;
43
44// ── TS header constants (ISO/IEC 13818-1 §2.4.3.2) ──────────────────────────
45
46/// TS sync byte.
47const TS_SYNC: u8 = 0x47;
48/// Expected size of one MPEG-TS packet.
49const TS_PACKET_SIZE: usize = 188;
50/// Byte 1 bit 6 = PUSI (Payload Unit Start Indicator).
51const PUSI_MASK: u8 = 0x40;
52/// Byte 1 bits 4..=0 = PID upper 5 bits.
53const PID_HI_MASK: u8 = 0x1F;
54/// Byte 3 bit 5 = adaptation_field_control bit 1 (adaptation field present).
55const ADAPTATION_FLAG: u8 = 0x20;
56/// Byte 3 bit 4 = adaptation_field_control bit 0 (payload present).
57const PAYLOAD_FLAG: u8 = 0x10;
58
59/// Minimal result of TS header parsing needed by the pump.
60struct TsInfo {
61    pid: u16,
62    pusi: bool,
63    /// Byte offset within the 188-byte packet where the payload starts.
64    payload_start: usize,
65}
66
67/// Parse the 4-byte MPEG-TS header and skip any adaptation field.
68///
69/// Returns `None` when:
70/// - `buf` is shorter than [`TS_PACKET_SIZE`],
71/// - the sync byte is not `0x47`,
72/// - the payload-present flag is clear, or
73/// - the adaptation field length overflows the packet.
74///
75/// Citation: ISO/IEC 13818-1:2019 §2.4.3.2 (transport_packet header) and
76/// §2.4.3.5 (adaptation_field length).
77fn parse_ts_header(buf: &[u8]) -> Option<TsInfo> {
78    if buf.len() < TS_PACKET_SIZE || buf[0] != TS_SYNC {
79        return None;
80    }
81    let b1 = buf[1];
82    let b3 = buf[3];
83
84    let pusi = (b1 & PUSI_MASK) != 0;
85    let pid = (((b1 & PID_HI_MASK) as u16) << 8) | (buf[2] as u16);
86    let has_adaptation = (b3 & ADAPTATION_FLAG) != 0;
87    let has_payload = (b3 & PAYLOAD_FLAG) != 0;
88
89    if !has_payload {
90        return None;
91    }
92
93    let mut cursor: usize = 4;
94    if has_adaptation {
95        if cursor >= TS_PACKET_SIZE {
96            return None;
97        }
98        let af_len = buf[cursor] as usize;
99        cursor += 1 + af_len;
100        if cursor > TS_PACKET_SIZE {
101            return None;
102        }
103    }
104
105    Some(TsInfo {
106        pid,
107        pusi,
108        payload_start: cursor,
109    })
110}
111
112// ── T2miEvent ─────────────────────────────────────────────────────────────────
113
114/// One complete, CRC-valid T2-MI packet. Owns its bytes — `'static`, cheap clone.
115///
116/// Only constructed after CRC-32 validation (ETSI TS 102 773 Annex A).
117/// [`T2miEvent::header`] and [`T2miEvent::payload`] are lazy: they borrow from
118/// the owned [`Bytes`] on demand.
119#[derive(Debug, Clone)]
120pub struct T2miEvent {
121    bytes: Bytes,
122}
123
124impl T2miEvent {
125    /// The full packet bytes (header + payload + CRC trailer).
126    #[must_use]
127    pub fn bytes(&self) -> &Bytes {
128        &self.bytes
129    }
130
131    /// The raw `packet_type` byte (byte 0 of the T2-MI header per §5.1).
132    ///
133    /// Never panics — events are only built for CRC-valid packets which are at
134    /// least `6` (header) + `4` (CRC) = 10 bytes.
135    #[must_use]
136    pub fn packet_type(&self) -> u8 {
137        self.bytes[0]
138    }
139
140    /// Parse the 6-byte T2-MI packet header (lazy, borrows this event's bytes).
141    ///
142    /// # Errors
143    ///
144    /// Propagates [`crate::Error`] from [`dvb_common::Parse::parse`] on [`Header`].
145    pub fn header(&self) -> crate::Result<Header> {
146        use dvb_common::Parse;
147        Header::parse(&self.bytes)
148    }
149
150    /// Parse the payload by dispatching on `packet_type`.
151    ///
152    /// Parses the 6-byte header to obtain `payload_len_bytes`, slices
153    /// `bytes[6..6+payload_len_bytes]`, and calls
154    /// [`AnyPayload::dispatch`].  Unrecognised packet types produce
155    /// [`AnyPayload::Unknown`] with the raw payload bytes.
156    ///
157    /// # Errors
158    ///
159    /// Returns [`crate::Error`] from parsing the [`Header`] or from the typed
160    /// payload parser.
161    pub fn payload(&self) -> crate::Result<AnyPayload<'_>> {
162        use dvb_common::Parse;
163        let hdr = Header::parse(&self.bytes)?;
164        let payload_bytes = hdr.payload_bytes(&self.bytes)?;
165        let packet_type = self.bytes[0];
166        Ok(match AnyPayload::dispatch(packet_type, payload_bytes) {
167            Some(result) => result?,
168            None => AnyPayload::Unknown {
169                packet_type,
170                body: payload_bytes,
171            },
172        })
173    }
174}
175
176// ── Stats ─────────────────────────────────────────────────────────────────────
177
178/// Accumulated pump statistics (monotonically growing across all `feed` calls).
179#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
180pub struct Stats {
181    /// TS packets fed via [`T2miPump::feed_ts`].
182    pub ts_packets: u64,
183    /// Complete T2-MI packets produced by the reassembler (pre-CRC check).
184    pub t2mi_packets: u64,
185    /// Packets dropped due to CRC-32 mismatch (ETSI TS 102 773 Annex A).
186    pub crc_failures: u64,
187    /// Malformed inputs: bad TS sync byte, truncated TS packet, overflowed
188    /// adaptation field, or `feed_ts` called on a raw-mode pump.
189    pub malformed_packets: u64,
190}
191
192// ── T2miPump ──────────────────────────────────────────────────────────────────
193
194/// Feed-and-iterate T2-MI pump.
195///
196/// Supports two operating modes:
197///
198/// - **TS-encapsulated** (most common): construct with [`T2miPump::new`],
199///   passing the 13-bit PID carrying T2-MI (from the PMT).  Feed 188-byte
200///   MPEG-TS packets with [`T2miPump::feed_ts`].  The pump filters by PID,
201///   strips the TS header per ISO/IEC 13818-1 §2.4.3.2, and forwards the
202///   payload to the internal [`PacketReassembler`] (ETSI TS 102 773 §6.1.1).
203///
204/// - **Raw** (un-encapsulated): construct with [`T2miPump::raw`].  Feed
205///   arbitrary byte slices with [`T2miPump::feed_raw`].  The pump buffers bytes
206///   and emits events once a full packet (determined by the header's
207///   `payload_len_bits`) is available.
208///
209/// # PID note
210///
211/// PIDs are 13-bit values (0x0000–0x1FFF per ISO/IEC 13818-1 §2.4.3.2).
212/// This type uses `u16` directly; no newtype is introduced.  Values above
213/// 0x1FFF are accepted without error — the PID filter simply never matches.
214pub struct T2miPump {
215    mode: PumpMode,
216    reasm: PacketReassembler,
217    stats: Stats,
218    scratch: Vec<T2miEvent>,
219    /// Raw-mode sync flag: true once the first raw feed has initialised the
220    /// reassembler via a PUSI=true, pointer=0 signal.
221    raw_started: bool,
222}
223
224enum PumpMode {
225    /// TS-encapsulated: filter packets to this PID.
226    Ts { pid: u16 },
227    /// Un-encapsulated raw byte stream.
228    Raw,
229}
230
231impl T2miPump {
232    /// Create a TS-encapsulated pump that filters to `pid`.
233    ///
234    /// `pid` is the 13-bit T2-MI PID from the PMT (e.g. 0x0006 for data
235    /// piping).
236    ///
237    /// # PID range
238    ///
239    /// Valid MPEG-TS PIDs are 13-bit (0x0000–0x1FFF); this parameter is `u16`.
240    /// No newtype is introduced to keep the API lightweight.
241    #[must_use]
242    pub fn new(pid: u16) -> Self {
243        Self {
244            mode: PumpMode::Ts { pid },
245            reasm: PacketReassembler::new(),
246            stats: Stats::default(),
247            scratch: Vec::new(),
248            raw_started: false,
249        }
250    }
251
252    /// Create an un-encapsulated raw-stream pump.
253    ///
254    /// Use [`T2miPump::feed_raw`] to supply bytes.  The pump buffers internally
255    /// and emits events by packet boundary, not by call boundary — a packet
256    /// split across two `feed_raw` calls produces exactly one event.
257    #[must_use]
258    pub fn raw() -> Self {
259        Self {
260            mode: PumpMode::Raw,
261            reasm: PacketReassembler::new(),
262            stats: Stats::default(),
263            scratch: Vec::new(),
264            raw_started: false,
265        }
266    }
267
268    /// Accumulated statistics.
269    #[must_use]
270    pub fn stats(&self) -> Stats {
271        self.stats
272    }
273
274    /// Feed one 188-byte MPEG-TS packet. Infallible: malformed packets are
275    /// counted in [`Stats::malformed_packets`] and discarded.
276    ///
277    /// Packets on the wrong PID are silently ignored (only [`Stats::ts_packets`]
278    /// is incremented).
279    ///
280    /// Returns a draining iterator over any T2-MI events completed by this feed.
281    pub fn feed_ts(&mut self, packet: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
282        self.scratch.clear();
283
284        match self.mode {
285            PumpMode::Raw => {
286                // feed_ts on a raw-mode pump is a caller error.
287                self.stats.malformed_packets += 1;
288            }
289            PumpMode::Ts { pid: filter_pid } => {
290                self.stats.ts_packets += 1;
291                match parse_ts_header(packet) {
292                    None => {
293                        self.stats.malformed_packets += 1;
294                    }
295                    Some(info) => {
296                        if info.pid == filter_pid {
297                            let payload = &packet[info.payload_start..TS_PACKET_SIZE];
298                            self.reasm.feed(payload, info.pusi);
299                            Self::drain_reasm_into(
300                                &mut self.reasm,
301                                &mut self.stats,
302                                &mut self.scratch,
303                            );
304                        }
305                        // Wrong PID: ignored cheaply — no stats beyond ts_packets.
306                    }
307                }
308            }
309        }
310
311        self.scratch.drain(..)
312    }
313
314    /// Feed raw T2-MI bytes (un-encapsulated mode).
315    ///
316    /// The slice may contain a partial packet; bytes are buffered internally.
317    /// A packet split across two `feed_raw` calls produces exactly one event.
318    ///
319    /// Returns a draining iterator over any T2-MI events completed by this feed.
320    pub fn feed_raw(&mut self, data: &[u8]) -> impl Iterator<Item = T2miEvent> + '_ {
321        self.scratch.clear();
322
323        match self.mode {
324            PumpMode::Ts { .. } => {
325                // feed_raw on a TS-mode pump is a caller error.
326                self.stats.malformed_packets += 1;
327            }
328            PumpMode::Raw => {
329                if !self.raw_started {
330                    // First call: initialise the reassembler with PUSI=true and
331                    // pointer_field=0.  PacketReassembler::feed interprets the
332                    // first byte of the payload as the pointer_field when PUSI is
333                    // set (ETSI TS 102 773 §6.1.1).  We prepend a 0x00 byte so the
334                    // reassembler sees pointer=0 and treats the rest as the start
335                    // of a new T2-MI packet.
336                    let mut buf = Vec::with_capacity(1 + data.len());
337                    buf.push(0x00); // pointer_field = 0
338                    buf.extend_from_slice(data);
339                    self.reasm.feed(&buf, true);
340                    self.raw_started = true;
341                } else {
342                    // Continuation: feed without PUSI — bytes extend the
343                    // current T2-MI packet in progress.
344                    self.reasm.feed(data, false);
345                }
346                Self::drain_reasm_into(&mut self.reasm, &mut self.stats, &mut self.scratch);
347            }
348        }
349
350        self.scratch.drain(..)
351    }
352
353    /// Drain all pending packets from the reassembler, CRC-validate each one,
354    /// and push valid packets to `scratch`.
355    fn drain_reasm_into(
356        reasm: &mut PacketReassembler,
357        stats: &mut Stats,
358        scratch: &mut Vec<T2miEvent>,
359    ) {
360        for raw in reasm.drain_packets() {
361            stats.t2mi_packets += 1;
362            match crc::validate_crc(&raw) {
363                Ok(()) => scratch.push(T2miEvent { bytes: raw }),
364                Err(_) => stats.crc_failures += 1,
365            }
366        }
367    }
368}
369
370// ── Tests ─────────────────────────────────────────────────────────────────────
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use dvb_common::crc32_mpeg2;
376
377    // ── Test helpers ─────────────────────────────────────────────────────────
378
379    /// Build a syntactically valid T2-MI packet (header + payload + CRC-32).
380    ///
381    /// `packet_type` is the raw byte (Table 1 of TS 102 773).
382    /// `payload` is the post-header, pre-CRC data.
383    /// Returns the full byte vector including the 4-byte CRC trailer.
384    fn make_t2mi_packet(packet_type: u8, payload: &[u8]) -> Vec<u8> {
385        let payload_len_bits = (payload.len() * 8) as u16;
386        let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
387        pkt.push(packet_type);
388        pkt.push(0x01); // packet_count
389        pkt.push(0x00); // superframe_idx=0, rfu=0, t2mi_stream_id=0
390        pkt.push(0x00); // rfu byte = 0
391        pkt.extend_from_slice(&payload_len_bits.to_be_bytes());
392        pkt.extend_from_slice(payload);
393        let crc = crc32_mpeg2::compute(&pkt);
394        pkt.extend_from_slice(&crc.to_be_bytes());
395        pkt
396    }
397
398    /// Wrap a T2-MI payload slice in a single 188-byte MPEG-TS packet.
399    ///
400    /// Sets PUSI=true and pointer_field=0 so the reassembler treats
401    /// the T2-MI data as starting at byte 0 of the payload.
402    /// The T2-MI bytes must fit in 183 bytes (188 − 4 header − 1 pointer).
403    fn ts_packet(pid: u16, t2mi_data: &[u8], pusi: bool, pointer_field: u8) -> [u8; 188] {
404        let mut pkt = [0xFFu8; 188];
405        pkt[0] = TS_SYNC;
406        pkt[1] = if pusi { PUSI_MASK } else { 0 };
407        pkt[1] |= ((pid >> 8) as u8) & PID_HI_MASK;
408        pkt[2] = (pid & 0xFF) as u8;
409        pkt[3] = PAYLOAD_FLAG; // payload present, no adaptation field
410        if pusi {
411            pkt[4] = pointer_field;
412            let start = 5 + pointer_field as usize;
413            assert!(
414                start + t2mi_data.len() <= 188,
415                "T2-MI data too large for one TS packet"
416            );
417            pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
418        } else {
419            let start = 4;
420            assert!(
421                start + t2mi_data.len() <= 188,
422                "T2-MI data too large for one TS packet"
423            );
424            pkt[start..start + t2mi_data.len()].copy_from_slice(t2mi_data);
425        }
426        pkt
427    }
428
429    // ── (a) valid T2-MI packet in TS → one event, typed payload ──────────────
430
431    #[test]
432    fn ts_packet_emits_one_event_with_typed_payload() {
433        // Build a valid BBFrame T2-MI packet.
434        // BbframePayload minimum: frame_idx(1) + plp_id(1) + flags(1) = 3 bytes.
435        let bbframe_payload = [0x01u8, 0x02, 0x00];
436        let t2mi = make_t2mi_packet(0x00, &bbframe_payload);
437
438        let pkt = ts_packet(0x0006, &t2mi, true, 0);
439        let mut pump = T2miPump::new(0x0006);
440        let events: Vec<_> = pump.feed_ts(&pkt).collect();
441
442        assert_eq!(events.len(), 1, "expected exactly one event");
443        assert_eq!(events[0].packet_type(), 0x00);
444
445        let payload = events[0].payload().expect("payload parse should succeed");
446        assert!(
447            matches!(payload, AnyPayload::Bbframe(_)),
448            "expected Bbframe, got {payload:?}"
449        );
450
451        let stats = pump.stats();
452        assert_eq!(stats.ts_packets, 1);
453        assert_eq!(stats.t2mi_packets, 1);
454        assert_eq!(stats.crc_failures, 0);
455        assert_eq!(stats.malformed_packets, 0);
456    }
457
458    // ── (b) corrupted CRC → zero events, crc_failures=1 ─────────────────────
459
460    #[test]
461    fn corrupted_crc_drops_packet_and_counts() {
462        let payload = [0x00u8, 0x00, 0x00]; // minimal BBFrame payload
463        let mut t2mi = make_t2mi_packet(0x00, &payload);
464        // Corrupt the last CRC byte.
465        *t2mi.last_mut().unwrap() ^= 0xFF;
466
467        let pkt = ts_packet(0x0006, &t2mi, true, 0);
468        let mut pump = T2miPump::new(0x0006);
469        let events: Vec<_> = pump.feed_ts(&pkt).collect();
470
471        assert_eq!(events.len(), 0, "corrupted packet must not emit");
472        let stats = pump.stats();
473        assert_eq!(stats.crc_failures, 1);
474        assert_eq!(stats.t2mi_packets, 1); // reassembler produced it, CRC gate dropped it
475    }
476
477    // ── (c) feed_raw with packet split across two calls → one event ──────────
478
479    #[test]
480    fn feed_raw_split_across_two_calls_emits_one_event() {
481        // Use a timestamp payload (11 bytes, all zeros), packet_type=0x20.
482        let ts_payload = [0x00u8; 11];
483        let t2mi = make_t2mi_packet(0x20, &ts_payload);
484
485        // Split at an arbitrary boundary (e.g. after the header).
486        let split = 6;
487        let first = &t2mi[..split];
488        let second = &t2mi[split..];
489
490        let mut pump = T2miPump::raw();
491
492        let ev1: Vec<_> = pump.feed_raw(first).collect();
493        assert_eq!(ev1.len(), 0, "no complete packet yet after first chunk");
494
495        let ev2: Vec<_> = pump.feed_raw(second).collect();
496        assert_eq!(
497            ev2.len(),
498            1,
499            "one event after second chunk completes the packet"
500        );
501
502        let stats = pump.stats();
503        assert_eq!(stats.t2mi_packets, 1);
504        assert_eq!(stats.crc_failures, 0);
505    }
506
507    // ── (d) garbage TS packet → malformed counted, no panic ──────────────────
508
509    #[test]
510    fn garbage_ts_packet_counted_no_panic() {
511        let mut pump = T2miPump::new(0x0006);
512        let garbage = [0x00u8; 188]; // bad sync byte
513        let events: Vec<_> = pump.feed_ts(&garbage).collect();
514        assert_eq!(events.len(), 0);
515        assert_eq!(pump.stats().malformed_packets, 1);
516        assert_eq!(pump.stats().ts_packets, 1);
517    }
518
519    // ── (e) wrong-PID TS packet → ignored cheaply ────────────────────────────
520
521    #[test]
522    fn wrong_pid_ts_packet_ignored() {
523        let payload = [0x00u8, 0x00, 0x00];
524        let t2mi = make_t2mi_packet(0x00, &payload);
525        let pkt = ts_packet(0x0100, &t2mi, true, 0); // PID 0x0100, pump listens on 0x0006
526
527        let mut pump = T2miPump::new(0x0006);
528        let events: Vec<_> = pump.feed_ts(&pkt).collect();
529
530        assert_eq!(events.len(), 0, "wrong-PID packet must not emit");
531        // ts_packets incremented, but nothing else moves.
532        let stats = pump.stats();
533        assert_eq!(stats.ts_packets, 1);
534        assert_eq!(stats.t2mi_packets, 0);
535        assert_eq!(stats.crc_failures, 0);
536        assert_eq!(stats.malformed_packets, 0);
537    }
538
539    // ── additional: header() lazy parse ──────────────────────────────────────
540
541    #[test]
542    fn event_header_lazy_parse_matches_packet_type() {
543        let payload = [0x00u8; 11]; // Timestamp payload
544        let t2mi = make_t2mi_packet(0x20, &payload);
545        let pkt = ts_packet(0x0010, &t2mi, true, 0);
546
547        let mut pump = T2miPump::new(0x0010);
548        let events: Vec<_> = pump.feed_ts(&pkt).collect();
549        assert_eq!(events.len(), 1);
550
551        let hdr = events[0].header().expect("header parse should succeed");
552        assert_eq!(hdr.packet_type as u8, 0x20);
553        assert_eq!(hdr.packet_count, 0x01);
554    }
555
556    // ── additional: stats() method ───────────────────────────────────────────
557
558    #[test]
559    fn stats_accumulate_across_feeds() {
560        let payload = [0x00u8, 0x00, 0x00];
561        let t2mi = make_t2mi_packet(0x00, &payload);
562        let pkt = ts_packet(0x0006, &t2mi, true, 0);
563
564        let mut pump = T2miPump::new(0x0006);
565        pump.feed_ts(&pkt).for_each(drop);
566        pump.feed_ts(&pkt).for_each(drop);
567
568        let stats = pump.stats();
569        assert_eq!(stats.ts_packets, 2);
570        // The reassembler resets on PUSI so we get 2 complete packets.
571        assert_eq!(stats.t2mi_packets, 2);
572    }
573}