Skip to main content

dvb_t2mi/
ts.rs

1//! TS packet reassembly utilities.
2//!
3//! Reconstructs T2-MI packets from MPEG-2 TS payloads per ETSI TS 102 773 §6.1.1.
4
5use std::collections::VecDeque;
6
7use crate::crc::CRC_LEN;
8
9/// Per-PID T2-MI packet reassembler.
10///
11/// Accepts TS payload slices with PUSI state and emits complete T2-MI packets.
12#[derive(Default)]
13pub struct PacketReassembler {
14    buf: bytes::BytesMut,
15    synced: bool,
16    pending: VecDeque<bytes::Bytes>,
17}
18
19/// Total bytes in a T2-MI header.
20const HEADER_LEN: usize = 6;
21
22impl PacketReassembler {
23    /// Create a new empty reassembler.
24    pub fn new() -> Self {
25        Self::default()
26    }
27
28    /// Feed a TS payload slice with its PUSI state.
29    ///
30    /// The reassembler is single-stream: the caller demultiplexes by PID
31    /// (typically 0x0006 data piping, or whatever the PMT assigns) and feeds
32    /// only the T2-MI PID's payloads — one `PacketReassembler` per PID.
33    ///
34    /// Per §6.1.1:
35    /// - If PUSI is set, byte 0 is `pointer_field` indicating offset to next T2-MI packet.
36    /// - T2-MI packets are packed back-to-back; a new one can start mid-payload.
37    /// - If PUSI is clear, continuation bytes extend the current packet.
38    pub fn feed(&mut self, payload: &[u8], pusi: bool) {
39        if payload.is_empty() {
40            return;
41        }
42
43        if pusi {
44            let ptr = payload[0] as usize;
45
46            // Bytes 1..boundary belong to the packet in progress (if synced).
47            let boundary = 1 + ptr;
48            if self.synced {
49                let end = boundary.min(payload.len());
50                if end > 1 {
51                    self.buf.extend_from_slice(&payload[1..end]);
52                }
53                // Try to extract complete packets from accumulated buffer
54                self.try_extract_packets();
55            }
56
57            // §6.1.1: pointer_field is authoritative — a new T2-MI packet
58            // starts exactly at `boundary`. Anything still buffered belongs to
59            // a packet that never completed (corrupt payload_len_bits or lost
60            // TS packets); drop it so the corruption cannot swallow the good
61            // packets that follow.
62            self.buf.clear();
63
64            if boundary <= payload.len() {
65                if boundary < payload.len() {
66                    self.buf.extend_from_slice(&payload[boundary..]);
67                    self.try_extract_packets();
68                }
69                // boundary == payload.len(): the new packet starts at the very
70                // end — zero bytes yet; continuation arrives on the next feed.
71                self.synced = true;
72            } else {
73                // pointer_field points past the payload — malformed; wait for
74                // the next PUSI to resync.
75                self.synced = false;
76            }
77        } else if self.synced {
78            // Continuation: all payload bytes extend current T2-MI packet
79            self.buf.extend_from_slice(payload);
80            self.try_extract_packets();
81        }
82        // !synced && !pusi: discard (waiting for first PUSI)
83    }
84
85    /// Attempt to extract one or more complete T2-MI packets from buf.
86    fn try_extract_packets(&mut self) {
87        loop {
88            // Need at least header bytes to determine packet size
89            if self.buf.len() < HEADER_LEN {
90                break;
91            }
92
93            // Parse payload_len_bits from header (bytes 4-5, big-endian)
94            let payload_len_bits = ((self.buf[4] as u16) << 8) | (self.buf[5] as u16);
95            let payload_len_bytes = payload_len_bits.div_ceil(8);
96            let total_packet_len = HEADER_LEN + payload_len_bytes as usize + CRC_LEN;
97
98            if self.buf.len() < total_packet_len {
99                break;
100            }
101
102            // Extract complete packet
103            let packet_bytes = self.buf.split_to(total_packet_len);
104            self.pending.push_back(packet_bytes.freeze());
105        }
106    }
107
108    /// Drain the next completed T2-MI packet.
109    pub fn pop_packet(&mut self) -> Option<bytes::Bytes> {
110        self.pending.pop_front()
111    }
112
113    /// Drain all pending packets.
114    pub fn drain_packets(&mut self) -> impl Iterator<Item = bytes::Bytes> + '_ {
115        self.pending.drain(..)
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    fn make_t2mi_packet(packet_type: u8, count: u8, payload: &[u8]) -> Vec<u8> {
124        let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
125        pkt.push(packet_type);
126        pkt.push(count);
127        // superframe_idx=0, rfu=0, t2mi_stream_id=0
128        let header_byte2 = 0x00;
129        let header_byte3 = 0x00;
130        pkt.push(header_byte2);
131        pkt.push(header_byte3);
132        let bits = (payload.len() * 8) as u16;
133        pkt.extend_from_slice(&bits.to_be_bytes());
134        pkt.extend_from_slice(payload);
135        // CRC (zeros for reassembler tests — we don't validate here)
136        pkt.extend_from_slice(&[0u8; 4]);
137        pkt
138    }
139
140    #[test]
141    fn reassembles_single_packet_with_pusi_offset_0() {
142        let t2mi = make_t2mi_packet(0x00, 0, &[0xAA, 0xBB]);
143        let mut reasm = PacketReassembler::new();
144        // TS payload: pointer_field=0, then T2-MI packet
145        let mut ts_payload = vec![0x00];
146        ts_payload.extend_from_slice(&t2mi);
147        reasm.feed(&ts_payload, true);
148        let pkt = reasm.pop_packet().unwrap();
149        assert_eq!(&pkt[..], &t2mi[..]);
150    }
151
152    #[test]
153    fn reassembles_packet_spanning_two_ts_packets() {
154        let t2mi = make_t2mi_packet(0x00, 0, &[0xCC; 200]);
155        let mut reasm = PacketReassembler::new();
156
157        // First TS: PUSI=1, pointer=0, first 100 bytes of T2-MI
158        let mut ts1 = vec![0x00];
159        ts1.extend_from_slice(&t2mi[..100]);
160        reasm.feed(&ts1, true);
161        assert!(reasm.pop_packet().is_none());
162
163        // Second TS: !PUSI, remaining bytes
164        reasm.feed(&t2mi[100..], false);
165        let pkt = reasm.pop_packet().unwrap();
166        assert_eq!(&pkt[..], &t2mi[..]);
167    }
168
169    #[test]
170    fn reassembles_two_packets_in_one_ts_payload() {
171        let t1 = make_t2mi_packet(0x00, 0, &[0x11]);
172        let t2 = make_t2mi_packet(0x01, 1, &[0x22]);
173        let mut reasm = PacketReassembler::new();
174
175        let mut ts_payload = vec![0x00]; // pointer=0
176        ts_payload.extend_from_slice(&t1);
177        ts_payload.extend_from_slice(&t2);
178        reasm.feed(&ts_payload, true);
179
180        let p1 = reasm.pop_packet().unwrap();
181        let p2 = reasm.pop_packet().unwrap();
182        assert_eq!(&p1[..], &t1[..]);
183        assert_eq!(&p2[..], &t2[..]);
184    }
185
186    #[test]
187    fn handles_pusi_with_nonzero_pointer() {
188        let t2mi = make_t2mi_packet(0x00, 5, &[0xDE]);
189        let mut reasm = PacketReassembler::new();
190        // TS payload: pointer=3, 3 bytes junk, then T2-MI packet
191        let mut ts_payload = vec![0x03, 0xFF, 0xFF, 0xFF];
192        ts_payload.extend_from_slice(&t2mi);
193        reasm.feed(&ts_payload, true);
194        let pkt = reasm.pop_packet().unwrap();
195        assert_eq!(&pkt[..], &t2mi[..]);
196    }
197
198    #[test]
199    fn discards_data_before_first_pusi() {
200        let mut reasm = PacketReassembler::new();
201        reasm.feed(&[0xAA, 0xBB], false); // !synced, !pusi → discard
202        assert!(reasm.pop_packet().is_none());
203    }
204
205    #[test]
206    fn handles_empty_payload() {
207        let mut reasm = PacketReassembler::new();
208        reasm.feed(&[], true);
209        assert!(reasm.pop_packet().is_none());
210    }
211
212    /// §6.1.1: pointer_field is authoritative. A buffered partial whose
213    /// declared length over-ran (corruption / lost TS packets) must be
214    /// dropped at the next PUSI instead of swallowing the good packet that
215    /// starts there.
216    #[test]
217    fn corrupt_length_resyncs_at_next_pusi() {
218        let mut reasm = PacketReassembler::new();
219
220        // A partial packet that claims a huge payload (8000 bits = 1000
221        // bytes) but only ever delivers a few bytes.
222        let mut corrupt = vec![0x00u8, 0x00, 0x00, 0x00];
223        corrupt.extend_from_slice(&8000u16.to_be_bytes());
224        corrupt.extend_from_slice(&[0xEE; 20]);
225        let mut ts1 = vec![0x00]; // PUSI, pointer=0
226        ts1.extend_from_slice(&corrupt);
227        reasm.feed(&ts1, true);
228        assert!(reasm.pop_packet().is_none());
229
230        // Next PUSI with pointer=0: a clean, complete packet starts here.
231        let good = make_t2mi_packet(0x00, 7, &[0xAB, 0xCD]);
232        let mut ts2 = vec![0x00];
233        ts2.extend_from_slice(&good);
234        reasm.feed(&ts2, true);
235
236        let pkt = reasm.pop_packet().expect("good packet must survive resync");
237        assert_eq!(&pkt[..], &good[..]);
238        assert!(reasm.pop_packet().is_none());
239    }
240
241    /// A pointer_field that points past the payload end is malformed — the
242    /// reassembler must drop sync and recover on the following PUSI.
243    #[test]
244    fn pointer_past_payload_end_drops_sync() {
245        let mut reasm = PacketReassembler::new();
246        reasm.feed(&[0xFF, 0xAA, 0xBB], true); // ptr=255 > payload len
247        assert!(reasm.pop_packet().is_none());
248        // Continuation while unsynced is discarded.
249        reasm.feed(&[0xCC; 8], false);
250        assert!(reasm.pop_packet().is_none());
251        // Clean PUSI recovers.
252        let good = make_t2mi_packet(0x00, 1, &[0x55]);
253        let mut ts = vec![0x00];
254        ts.extend_from_slice(&good);
255        reasm.feed(&ts, true);
256        assert_eq!(&reasm.pop_packet().unwrap()[..], &good[..]);
257    }
258
259    #[test]
260    fn drains_multiple_pending_packets() {
261        let t1 = make_t2mi_packet(0x00, 0, &[0xAA]);
262        let t2 = make_t2mi_packet(0x00, 1, &[0xBB]);
263        let t3 = make_t2mi_packet(0x00, 2, &[0xCC]);
264        let mut reasm = PacketReassembler::new();
265
266        let mut ts_payload = vec![0x00];
267        ts_payload.extend_from_slice(&t1);
268        ts_payload.extend_from_slice(&t2);
269        ts_payload.extend_from_slice(&t3);
270        reasm.feed(&ts_payload, true);
271
272        let packets: Vec<_> = reasm.drain_packets().collect();
273        assert_eq!(packets.len(), 3);
274        assert_eq!(&packets[0][..], &t1[..]);
275        assert_eq!(&packets[1][..], &t2[..]);
276        assert_eq!(&packets[2][..], &t3[..]);
277    }
278}