Skip to main content

dvb_si/
resync.rs

1//! TS Byte-Stream Resynchroniser — ISO/IEC 13818-1 §2.4.3.2.
2//!
3//! Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
4//! (file reads, UDP payloads) that may start mid-packet or contain junk.
5//! Also detects 204-byte packets (188-byte TS + 16 Reed-Solomon parity)
6//! and strips the parity bytes.
7
8/// Sync byte value (ISO/IEC 13818-1 §2.4.3.2).
9pub const SYNC_BYTE: u8 = 0x47;
10
11/// Transport Stream packet size (ISO/IEC 13818-1 §2.4.3.2).
12pub const TS_PACKET_SIZE: usize = 188;
13
14/// TS packet with 16-byte Reed-Solomon outer code (DVB).
15pub const RS_PACKET_SIZE: usize = 204;
16
17/// Consecutive sync bytes at the candidate stride required to declare lock.
18const LOCK_CONFIRMATIONS: usize = 5;
19
20/// Detected packet size after locking.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize))]
23pub enum PacketStride {
24    /// Standard 188-byte TS packets.
25    Ts188,
26    /// 204-byte packets (188-byte TS + 16 Reed-Solomon parity bytes).
27    Rs204,
28}
29
30/// Counters accumulated during resynchronisation.
31#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize))]
33pub struct ResyncStats {
34    /// Total 188-byte TS packets emitted.
35    pub packets: u64,
36    /// Times sync was lost and reacquired.
37    pub resyncs: u64,
38    /// Bytes skipped/dropped (junk before lock + sync-loss bytes).
39    pub dropped_bytes: u64,
40}
41
42/// TS byte-stream resynchroniser.
43///
44/// Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
45/// that may start mid-packet or contain junk, and detects 204-byte
46/// Reed-Solomon-coded packets (stripping the 16 parity bytes).
47///
48/// ISO/IEC 13818-1 §2.4.3.2.
49#[derive(Debug, Default)]
50pub struct TsResync {
51    buf: Vec<u8>,
52    stride: Option<PacketStride>,
53    stats: ResyncStats,
54}
55
56impl TsResync {
57    /// Create a new resynchroniser with an empty buffer.
58    pub fn new() -> Self {
59        Self::default()
60    }
61
62    /// Append `data`, emit every newly-aligned 188-byte TS packet.
63    ///
64    /// When the stream is detected as 204-byte, the 16 Reed-Solomon parity
65    /// bytes are stripped and only the 188-byte TS payload is emitted.
66    /// Bytes are buffered across calls.
67    pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
68        self.buf.extend_from_slice(data);
69        let mut emitted = Vec::new();
70
71        loop {
72            match self.stride {
73                None => {
74                    if let Some((offset, s)) = find_sync(&self.buf) {
75                        self.stats.dropped_bytes += offset as u64;
76                        self.buf.drain(..offset);
77                        self.stride = Some(s);
78                    } else {
79                        let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
80                        if self.buf.len() > keep {
81                            let excess = self.buf.len() - keep;
82                            self.stats.dropped_bytes += excess as u64;
83                            self.buf.drain(..excess);
84                        }
85                        return emitted;
86                    }
87                }
88                Some(stride) => {
89                    let s = match stride {
90                        PacketStride::Ts188 => TS_PACKET_SIZE,
91                        PacketStride::Rs204 => RS_PACKET_SIZE,
92                    };
93                    if self.buf.len() < s {
94                        return emitted;
95                    }
96                    if self.buf[0] == SYNC_BYTE {
97                        let mut packet = [0u8; TS_PACKET_SIZE];
98                        packet.copy_from_slice(&self.buf[..TS_PACKET_SIZE]);
99                        emitted.push(packet);
100                        self.buf.drain(..s);
101                        self.stats.packets += 1;
102                    } else {
103                        self.stats.resyncs += 1;
104                        self.stats.dropped_bytes += 1;
105                        self.buf.drain(..1);
106                        self.stride = None;
107                    }
108                }
109            }
110        }
111    }
112
113    /// Detected packet stride, or [`None`] before the stream has locked.
114    pub fn stride(&self) -> Option<PacketStride> {
115        self.stride
116    }
117
118    /// Accumulated statistics.
119    pub fn stats(&self) -> ResyncStats {
120        self.stats
121    }
122}
123
124/// Scan the buffer for the smallest offset `o` such that a stride S
125/// (tried 188 first, then 204) yields [`LOCK_CONFIRMATIONS`] consecutive
126/// sync bytes at positions `o + k*S` for `k = 0 .. LOCK_CONFIRMATIONS`.
127///
128/// Returns `(offset, stride)` on success, or [`None`] if no lock is found.
129fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
130    for o in 0..buf.len() {
131        if buf[o] != SYNC_BYTE {
132            continue;
133        }
134        if try_stride(buf, o, TS_PACKET_SIZE) {
135            return Some((o, PacketStride::Ts188));
136        }
137        if try_stride(buf, o, RS_PACKET_SIZE) {
138            return Some((o, PacketStride::Rs204));
139        }
140    }
141    None
142}
143
144/// Check that `LOCK_CONFIRMATIONS` consecutive sync bytes exist at stride `s`
145/// starting from `offset` (inclusive).  The first byte (`buf[offset]`) is
146/// already known to be [`SYNC_BYTE`]; this checks the next
147/// `LOCK_CONFIRMATIONS - 1` positions.
148fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
149    for k in 1..LOCK_CONFIRMATIONS {
150        let pos = offset + k * s;
151        if pos >= buf.len() || buf[pos] != SYNC_BYTE {
152            return false;
153        }
154    }
155    true
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161
162    /// Build a 188-byte TS packet starting with 0x47 followed by a distinct
163    /// payload byte `tag` (repeated).  All non-sync bytes are kept away from
164    /// 0x47 so that no false lock can occur.
165    fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
166        assert_ne!(tag, SYNC_BYTE, "tag must not equal sync byte");
167        let mut pkt = [tag; TS_PACKET_SIZE];
168        pkt[0] = SYNC_BYTE;
169        pkt
170    }
171
172    /// Build a 204-byte RS-coded packet: 188-byte TS payload (with tag) +
173    /// 16 parity bytes filled with `parity` (must not be 0x47).
174    fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
175        assert_ne!(ts_tag, SYNC_BYTE);
176        assert_ne!(parity, SYNC_BYTE);
177        let mut pkt = [parity; RS_PACKET_SIZE];
178        pkt[0] = SYNC_BYTE;
179        pkt[1..TS_PACKET_SIZE].fill(ts_tag);
180        pkt
181    }
182
183    /// Concatenate several 188-byte packet arrays into a flat `Vec<u8>`.
184    fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
185        let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
186        for p in packets {
187            v.extend_from_slice(p);
188        }
189        v
190    }
191
192    /// Feed `data` into a fresh [`TsResync`] and return everything emitted.
193    fn feed_once(data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
194        TsResync::new().feed(data)
195    }
196
197    // ------------------------------------------------------------------
198    // Test 1 — aligned 188-byte passthrough
199    // ------------------------------------------------------------------
200    #[test]
201    fn aligned_188_passthrough() {
202        let p0 = ts_packet(0x01);
203        let p1 = ts_packet(0x02);
204        let p2 = ts_packet(0x03);
205        let p3 = ts_packet(0x04);
206        let p4 = ts_packet(0x05);
207        let data = concat_ts(&[p0, p1, p2, p3, p4]);
208
209        let mut r = TsResync::new();
210        let emitted = r.feed(&data);
211
212        assert_eq!(emitted.len(), 5);
213        assert_eq!(emitted[0], p0);
214        assert_eq!(emitted[1], p1);
215        assert_eq!(emitted[2], p2);
216        assert_eq!(emitted[3], p3);
217        assert_eq!(emitted[4], p4);
218        assert_eq!(r.stride(), Some(PacketStride::Ts188));
219        let s = r.stats();
220        assert_eq!(s.packets, 5);
221        assert_eq!(s.resyncs, 0);
222        assert_eq!(s.dropped_bytes, 0);
223    }
224
225    // ------------------------------------------------------------------
226    // Test 2 — junk prefix, then lock and recover all packets
227    // ------------------------------------------------------------------
228    #[test]
229    fn junk_prefix_locks() {
230        let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
231        let stream = concat_ts(&pkts);
232
233        let junk: Vec<u8> = vec![0x00; 7];
234        let mut data = junk.clone();
235        data.extend_from_slice(&stream);
236
237        let mut r = TsResync::new();
238        let emitted = r.feed(&data);
239
240        assert_eq!(emitted.len(), 6);
241        for (i, p) in emitted.iter().enumerate() {
242            assert_eq!(*p, pkts[i], "packet {i} mismatch");
243        }
244        assert_eq!(r.stride(), Some(PacketStride::Ts188));
245        let s = r.stats();
246        assert_eq!(s.packets, 6);
247        assert_eq!(s.resyncs, 0);
248        assert_eq!(s.dropped_bytes, 7);
249    }
250
251    // ------------------------------------------------------------------
252    // Test 3 — chunked feed equivalence
253    // ------------------------------------------------------------------
254    #[test]
255    fn chunked_feed_equivalence() {
256        let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
257        let stream = concat_ts(&pkts);
258
259        // Whole feed
260        let whole = feed_once(&stream);
261
262        // Chunked feed — 100 bytes at a time
263        let mut r = TsResync::new();
264        let mut chunked = Vec::new();
265        for chunk in stream.chunks(100) {
266            chunked.extend(r.feed(chunk));
267        }
268
269        assert_eq!(whole.len(), chunked.len());
270        for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
271            assert_eq!(w, c, "packet {i} mismatch");
272        }
273    }
274
275    // ------------------------------------------------------------------
276    // Test 4 — mid-stream sync loss and reacquisition
277    // ------------------------------------------------------------------
278    #[test]
279    fn midstream_loss_resync() {
280        // Need >= LOCK_CONFIRMATIONS clean packets BEFORE the corruption (so
281        // the stream actually locks first, making the disruption a genuine
282        // resync rather than a delayed initial lock) and >= LOCK_CONFIRMATIONS
283        // after (so it can re-lock and recover). 14 packets, corruption inside
284        // packet 7, satisfies both.
285        let pkts: Vec<_> = (0..14).map(|i| ts_packet(i + 1)).collect();
286        let clean = concat_ts(&pkts);
287
288        // Insert a single stray byte 12 bytes into packet 7.
289        let insert_at = 7 * TS_PACKET_SIZE + 12;
290        let stray: u8 = 0x00;
291        let mut data = clean[..insert_at].to_vec();
292        data.push(stray);
293        data.extend_from_slice(&clean[insert_at..]);
294
295        let mut r = TsResync::new();
296        let emitted = r.feed(&data);
297
298        let s = r.stats();
299        // A locked stream losing alignment mid-way must record a resync.
300        assert!(
301            s.resyncs >= 1,
302            "mid-stream corruption must trigger a resync, got {}",
303            s.resyncs
304        );
305        // Locks on the clean prefix: the first emitted packet is P0.
306        assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
307        // Re-locks after the corruption and recovers most of the stream.
308        assert!(
309            emitted.len() >= 10,
310            "should recover and emit most packets, got {}",
311            emitted.len()
312        );
313    }
314
315    // ------------------------------------------------------------------
316    // Test 5 — 204-byte RS-coded packets detected and stripped
317    // ------------------------------------------------------------------
318    #[test]
319    fn rs204_detected_and_stripped() {
320        let mut stream = Vec::new();
321        let mut expected_payloads = Vec::new();
322        for i in 0u8..6 {
323            let tag = i + 1;
324            let rs = rs_packet(tag, 0xFF);
325            stream.extend_from_slice(&rs);
326            expected_payloads.push(ts_packet(tag));
327        }
328
329        let mut r = TsResync::new();
330        let emitted = r.feed(&stream);
331
332        assert_eq!(emitted.len(), 6);
333        assert_eq!(r.stride(), Some(PacketStride::Rs204));
334        for (i, (e, p)) in emitted.iter().zip(expected_payloads.iter()).enumerate() {
335            assert_eq!(e, p, "packet {i} mismatch");
336        }
337        let s = r.stats();
338        assert_eq!(s.packets, 6);
339        assert_eq!(s.resyncs, 0);
340        assert_eq!(s.dropped_bytes, 0);
341    }
342}