Skip to main content

dvb_t2mi/
ts.rs

1//! TS packet reassembly utilities.
2//!
3//! Reconstructs T2-MI packets from MPEG-2 TS payloads per ETSI TS 102 773 §6.1.1.
4
5use std::collections::VecDeque;
6
7use crate::crc::CRC_LEN;
8
9/// Per-PID T2-MI packet reassembler.
10///
11/// Accepts TS payload slices with PUSI state and emits complete T2-MI packets.
12#[derive(Default)]
13pub struct PacketReassembler {
14    buf: bytes::BytesMut,
15    synced: bool,
16    pending: VecDeque<bytes::Bytes>,
17}
18
19/// Total bytes in a T2-MI header.
20const HEADER_LEN: usize = 6;
21
22impl PacketReassembler {
23    /// Create a new empty reassembler.
24    #[must_use]
25    pub fn new() -> Self {
26        Self::default()
27    }
28
29    /// Feed a TS payload slice with its PUSI state.
30    ///
31    /// The reassembler is single-stream: the caller demultiplexes by PID
32    /// (typically 0x0006 data piping, or whatever the PMT assigns) and feeds
33    /// only the T2-MI PID's payloads — one `PacketReassembler` per PID.
34    ///
35    /// Per §6.1.1:
36    /// - If PUSI is set, byte 0 is `pointer_field` indicating offset to next T2-MI packet.
37    /// - T2-MI packets are packed back-to-back; a new one can start mid-payload.
38    /// - If PUSI is clear, continuation bytes extend the current packet.
39    pub fn feed(&mut self, payload: &[u8], pusi: bool) {
40        if payload.is_empty() {
41            return;
42        }
43
44        if pusi {
45            let ptr = payload[0] as usize;
46
47            // Bytes 1..boundary belong to the packet in progress (if synced).
48            let boundary = 1 + ptr;
49            if self.synced {
50                let end = boundary.min(payload.len());
51                if end > 1 {
52                    self.buf.extend_from_slice(&payload[1..end]);
53                }
54                // Try to extract complete packets from accumulated buffer
55                self.try_extract_packets();
56            }
57
58            // §6.1.1: pointer_field is authoritative — a new T2-MI packet
59            // starts exactly at `boundary`. Anything still buffered belongs to
60            // a packet that never completed (corrupt payload_len_bits or lost
61            // TS packets); drop it so the corruption cannot swallow the good
62            // packets that follow.
63            self.buf.clear();
64
65            if boundary <= payload.len() {
66                if boundary < payload.len() {
67                    self.buf.extend_from_slice(&payload[boundary..]);
68                    self.try_extract_packets();
69                }
70                // boundary == payload.len(): the new packet starts at the very
71                // end — zero bytes yet; continuation arrives on the next feed.
72                self.synced = true;
73            } else {
74                // pointer_field points past the payload — malformed; wait for
75                // the next PUSI to resync.
76                self.synced = false;
77            }
78        } else if self.synced {
79            // Continuation: all payload bytes extend current T2-MI packet
80            self.buf.extend_from_slice(payload);
81            self.try_extract_packets();
82        }
83        // !synced && !pusi: discard (waiting for first PUSI)
84    }
85
86    /// Attempt to extract one or more complete T2-MI packets from buf.
87    fn try_extract_packets(&mut self) {
88        loop {
89            // Need at least header bytes to determine packet size
90            if self.buf.len() < HEADER_LEN {
91                break;
92            }
93
94            // Parse payload_len_bits from header (bytes 4-5, big-endian)
95            let payload_len_bits = ((self.buf[4] as u16) << 8) | (self.buf[5] as u16);
96            let payload_len_bytes = payload_len_bits.div_ceil(8);
97            let total_packet_len = HEADER_LEN + payload_len_bytes as usize + CRC_LEN;
98
99            if self.buf.len() < total_packet_len {
100                break;
101            }
102
103            // Extract complete packet
104            let packet_bytes = self.buf.split_to(total_packet_len);
105            self.pending.push_back(packet_bytes.freeze());
106        }
107    }
108
109    /// Drain the next completed T2-MI packet.
110    pub fn pop_packet(&mut self) -> Option<bytes::Bytes> {
111        self.pending.pop_front()
112    }
113
114    /// Drain all pending packets.
115    pub fn drain_packets(&mut self) -> impl Iterator<Item = bytes::Bytes> + '_ {
116        self.pending.drain(..)
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    fn make_t2mi_packet(packet_type: u8, count: u8, payload: &[u8]) -> Vec<u8> {
125        let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
126        pkt.push(packet_type);
127        pkt.push(count);
128        // superframe_idx=0, rfu=0, t2mi_stream_id=0
129        let header_byte2 = 0x00;
130        let header_byte3 = 0x00;
131        pkt.push(header_byte2);
132        pkt.push(header_byte3);
133        let bits = (payload.len() * 8) as u16;
134        pkt.extend_from_slice(&bits.to_be_bytes());
135        pkt.extend_from_slice(payload);
136        // CRC (zeros for reassembler tests — we don't validate here)
137        pkt.extend_from_slice(&[0u8; 4]);
138        pkt
139    }
140
141    #[test]
142    fn reassembles_single_packet_with_pusi_offset_0() {
143        let t2mi = make_t2mi_packet(0x00, 0, &[0xAA, 0xBB]);
144        let mut reasm = PacketReassembler::new();
145        // TS payload: pointer_field=0, then T2-MI packet
146        let mut ts_payload = vec![0x00];
147        ts_payload.extend_from_slice(&t2mi);
148        reasm.feed(&ts_payload, true);
149        let pkt = reasm.pop_packet().unwrap();
150        assert_eq!(&pkt[..], &t2mi[..]);
151    }
152
153    #[test]
154    fn reassembles_packet_spanning_two_ts_packets() {
155        let t2mi = make_t2mi_packet(0x00, 0, &[0xCC; 200]);
156        let mut reasm = PacketReassembler::new();
157
158        // First TS: PUSI=1, pointer=0, first 100 bytes of T2-MI
159        let mut ts1 = vec![0x00];
160        ts1.extend_from_slice(&t2mi[..100]);
161        reasm.feed(&ts1, true);
162        assert!(reasm.pop_packet().is_none());
163
164        // Second TS: !PUSI, remaining bytes
165        reasm.feed(&t2mi[100..], false);
166        let pkt = reasm.pop_packet().unwrap();
167        assert_eq!(&pkt[..], &t2mi[..]);
168    }
169
170    #[test]
171    fn reassembles_two_packets_in_one_ts_payload() {
172        let t1 = make_t2mi_packet(0x00, 0, &[0x11]);
173        let t2 = make_t2mi_packet(0x01, 1, &[0x22]);
174        let mut reasm = PacketReassembler::new();
175
176        let mut ts_payload = vec![0x00]; // pointer=0
177        ts_payload.extend_from_slice(&t1);
178        ts_payload.extend_from_slice(&t2);
179        reasm.feed(&ts_payload, true);
180
181        let p1 = reasm.pop_packet().unwrap();
182        let p2 = reasm.pop_packet().unwrap();
183        assert_eq!(&p1[..], &t1[..]);
184        assert_eq!(&p2[..], &t2[..]);
185    }
186
187    #[test]
188    fn handles_pusi_with_nonzero_pointer() {
189        let t2mi = make_t2mi_packet(0x00, 5, &[0xDE]);
190        let mut reasm = PacketReassembler::new();
191        // TS payload: pointer=3, 3 bytes junk, then T2-MI packet
192        let mut ts_payload = vec![0x03, 0xFF, 0xFF, 0xFF];
193        ts_payload.extend_from_slice(&t2mi);
194        reasm.feed(&ts_payload, true);
195        let pkt = reasm.pop_packet().unwrap();
196        assert_eq!(&pkt[..], &t2mi[..]);
197    }
198
199    #[test]
200    fn discards_data_before_first_pusi() {
201        let mut reasm = PacketReassembler::new();
202        reasm.feed(&[0xAA, 0xBB], false); // !synced, !pusi → discard
203        assert!(reasm.pop_packet().is_none());
204    }
205
206    #[test]
207    fn handles_empty_payload() {
208        let mut reasm = PacketReassembler::new();
209        reasm.feed(&[], true);
210        assert!(reasm.pop_packet().is_none());
211    }
212
213    /// §6.1.1: pointer_field is authoritative. A buffered partial whose
214    /// declared length over-ran (corruption / lost TS packets) must be
215    /// dropped at the next PUSI instead of swallowing the good packet that
216    /// starts there.
217    #[test]
218    fn corrupt_length_resyncs_at_next_pusi() {
219        let mut reasm = PacketReassembler::new();
220
221        // A partial packet that claims a huge payload (8000 bits = 1000
222        // bytes) but only ever delivers a few bytes.
223        let mut corrupt = vec![0x00u8, 0x00, 0x00, 0x00];
224        corrupt.extend_from_slice(&8000u16.to_be_bytes());
225        corrupt.extend_from_slice(&[0xEE; 20]);
226        let mut ts1 = vec![0x00]; // PUSI, pointer=0
227        ts1.extend_from_slice(&corrupt);
228        reasm.feed(&ts1, true);
229        assert!(reasm.pop_packet().is_none());
230
231        // Next PUSI with pointer=0: a clean, complete packet starts here.
232        let good = make_t2mi_packet(0x00, 7, &[0xAB, 0xCD]);
233        let mut ts2 = vec![0x00];
234        ts2.extend_from_slice(&good);
235        reasm.feed(&ts2, true);
236
237        let pkt = reasm.pop_packet().expect("good packet must survive resync");
238        assert_eq!(&pkt[..], &good[..]);
239        assert!(reasm.pop_packet().is_none());
240    }
241
242    /// A pointer_field that points past the payload end is malformed — the
243    /// reassembler must drop sync and recover on the following PUSI.
244    #[test]
245    fn pointer_past_payload_end_drops_sync() {
246        let mut reasm = PacketReassembler::new();
247        reasm.feed(&[0xFF, 0xAA, 0xBB], true); // ptr=255 > payload len
248        assert!(reasm.pop_packet().is_none());
249        // Continuation while unsynced is discarded.
250        reasm.feed(&[0xCC; 8], false);
251        assert!(reasm.pop_packet().is_none());
252        // Clean PUSI recovers.
253        let good = make_t2mi_packet(0x00, 1, &[0x55]);
254        let mut ts = vec![0x00];
255        ts.extend_from_slice(&good);
256        reasm.feed(&ts, true);
257        assert_eq!(&reasm.pop_packet().unwrap()[..], &good[..]);
258    }
259
260    #[test]
261    fn drains_multiple_pending_packets() {
262        let t1 = make_t2mi_packet(0x00, 0, &[0xAA]);
263        let t2 = make_t2mi_packet(0x00, 1, &[0xBB]);
264        let t3 = make_t2mi_packet(0x00, 2, &[0xCC]);
265        let mut reasm = PacketReassembler::new();
266
267        let mut ts_payload = vec![0x00];
268        ts_payload.extend_from_slice(&t1);
269        ts_payload.extend_from_slice(&t2);
270        ts_payload.extend_from_slice(&t3);
271        reasm.feed(&ts_payload, true);
272
273        let packets: Vec<_> = reasm.drain_packets().collect();
274        assert_eq!(packets.len(), 3);
275        assert_eq!(&packets[0][..], &t1[..]);
276        assert_eq!(&packets[1][..], &t2[..]);
277        assert_eq!(&packets[2][..], &t3[..]);
278    }
279}