Skip to main content

dvb_si/
resync.rs

1//! TS Byte-Stream Resynchroniser — ISO/IEC 13818-1 §2.4.3.2.
2//!
3//! Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
4//! (file reads, UDP payloads) that may start mid-packet or contain junk.
5//! Also detects 204-byte packets (188-byte TS + 16 Reed-Solomon parity)
6//! and strips the parity bytes.
7
8/// Sync byte value (ISO/IEC 13818-1 §2.4.3.2).
9pub const SYNC_BYTE: u8 = 0x47;
10
11/// Transport Stream packet size (ISO/IEC 13818-1 §2.4.3.2).
12pub const TS_PACKET_SIZE: usize = 188;
13
14/// TS packet with 16-byte Reed-Solomon outer code (DVB).
15pub const RS_PACKET_SIZE: usize = 204;
16
17/// Consecutive sync bytes at the candidate stride required to declare lock.
18const LOCK_CONFIRMATIONS: usize = 5;
19
20/// Detected packet size after locking.
21#[non_exhaustive]
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[cfg_attr(feature = "serde", derive(serde::Serialize))]
24pub enum PacketStride {
25    /// Standard 188-byte TS packets.
26    Ts188,
27    /// 204-byte packets (188-byte TS + 16 Reed-Solomon parity bytes).
28    Rs204,
29}
30
31/// Counters accumulated during resynchronisation.
32#[non_exhaustive]
33#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
34#[cfg_attr(feature = "serde", derive(serde::Serialize))]
35pub struct ResyncStats {
36    /// Total 188-byte TS packets emitted.
37    pub packets: u64,
38    /// Times sync was lost and reacquired.
39    pub resyncs: u64,
40    /// Bytes skipped/dropped (junk before lock + sync-loss bytes).
41    pub dropped_bytes: u64,
42}
43
44/// TS byte-stream resynchroniser.
45///
46/// Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
47/// that may start mid-packet or contain junk, and detects 204-byte
48/// Reed-Solomon-coded packets (stripping the 16 parity bytes).
49///
50/// ISO/IEC 13818-1 §2.4.3.2.
51#[derive(Debug, Default)]
52pub struct TsResync {
53    buf: Vec<u8>,
54    head: usize,
55    stride: Option<PacketStride>,
56    stats: ResyncStats,
57}
58
59impl TsResync {
60    /// Create a new resynchroniser with an empty buffer.
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    /// Append `data`, emit every newly-aligned 188-byte TS packet.
66    ///
67    /// When the stream is detected as 204-byte, the 16 Reed-Solomon parity
68    /// bytes are stripped and only the 188-byte TS payload is emitted.
69    /// Bytes are buffered across calls.
70    pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
71        self.buf.extend_from_slice(data);
72        let mut emitted = Vec::new();
73
74        loop {
75            match self.stride {
76                None => {
77                    if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
78                        self.stats.dropped_bytes += offset as u64;
79                        self.head += offset;
80                        self.stride = Some(s);
81                    } else {
82                        let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
83                        let tail_len = self.buf.len() - self.head;
84                        if tail_len > keep {
85                            let excess = tail_len - keep;
86                            self.stats.dropped_bytes += excess as u64;
87                            self.head += excess;
88                        }
89                        self.compact();
90                        return emitted;
91                    }
92                }
93                Some(stride) => {
94                    let s = match stride {
95                        PacketStride::Ts188 => TS_PACKET_SIZE,
96                        PacketStride::Rs204 => RS_PACKET_SIZE,
97                    };
98                    let tail_len = self.buf.len() - self.head;
99                    if tail_len < s {
100                        self.compact();
101                        return emitted;
102                    }
103                    if self.buf[self.head] == SYNC_BYTE {
104                        let mut packet = [0u8; TS_PACKET_SIZE];
105                        packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
106                        emitted.push(packet);
107                        self.head += s;
108                        self.stats.packets += 1;
109                    } else {
110                        self.stats.resyncs += 1;
111                        self.stats.dropped_bytes += 1;
112                        self.head += 1;
113                        self.stride = None;
114                    }
115                }
116            }
117        }
118    }
119
120    /// Detected packet stride, or [`None`] before the stream has locked.
121    pub fn stride(&self) -> Option<PacketStride> {
122        self.stride
123    }
124
125    /// Accumulated statistics.
126    pub fn stats(&self) -> ResyncStats {
127        self.stats
128    }
129
130    fn compact(&mut self) {
131        if self.head > 0 {
132            self.buf.drain(..self.head);
133            self.head = 0;
134        }
135    }
136}
137
138/// Scan the buffer for the smallest offset `o` such that a stride S
139/// (tried 188 first, then 204) yields [`LOCK_CONFIRMATIONS`] consecutive
140/// sync bytes at positions `o + k*S` for `k = 0 .. LOCK_CONFIRMATIONS`.
141///
142/// Returns `(offset, stride)` on success, or [`None`] if no lock is found.
143fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
144    for o in 0..buf.len() {
145        if buf[o] != SYNC_BYTE {
146            continue;
147        }
148        if try_stride(buf, o, TS_PACKET_SIZE) {
149            return Some((o, PacketStride::Ts188));
150        }
151        if try_stride(buf, o, RS_PACKET_SIZE) {
152            return Some((o, PacketStride::Rs204));
153        }
154    }
155    None
156}
157
158/// Check that `LOCK_CONFIRMATIONS` consecutive sync bytes exist at stride `s`
159/// starting from `offset` (inclusive).  The first byte (`buf[offset]`) is
160/// already known to be [`SYNC_BYTE`]; this checks the next
161/// `LOCK_CONFIRMATIONS - 1` positions.
162fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
163    for k in 1..LOCK_CONFIRMATIONS {
164        let pos = offset + k * s;
165        if pos >= buf.len() || buf[pos] != SYNC_BYTE {
166            return false;
167        }
168    }
169    true
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    /// Build a 188-byte TS packet starting with 0x47 followed by a distinct
177    /// payload byte `tag` (repeated).  All non-sync bytes are kept away from
178    /// 0x47 so that no false lock can occur.
179    fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
180        assert_ne!(tag, SYNC_BYTE, "tag must not equal sync byte");
181        let mut pkt = [tag; TS_PACKET_SIZE];
182        pkt[0] = SYNC_BYTE;
183        pkt
184    }
185
186    /// Build a 204-byte RS-coded packet: 188-byte TS payload (with tag) +
187    /// 16 parity bytes filled with `parity` (must not be 0x47).
188    fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
189        assert_ne!(ts_tag, SYNC_BYTE);
190        assert_ne!(parity, SYNC_BYTE);
191        let mut pkt = [parity; RS_PACKET_SIZE];
192        pkt[0] = SYNC_BYTE;
193        pkt[1..TS_PACKET_SIZE].fill(ts_tag);
194        pkt
195    }
196
197    /// Concatenate several 188-byte packet arrays into a flat `Vec<u8>`.
198    fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
199        let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
200        for p in packets {
201            v.extend_from_slice(p);
202        }
203        v
204    }
205
206    /// Feed `data` into a fresh [`TsResync`] and return everything emitted.
207    fn feed_once(data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
208        TsResync::new().feed(data)
209    }
210
211    // ------------------------------------------------------------------
212    // Test 1 — aligned 188-byte passthrough
213    // ------------------------------------------------------------------
214    #[test]
215    fn aligned_188_passthrough() {
216        let p0 = ts_packet(0x01);
217        let p1 = ts_packet(0x02);
218        let p2 = ts_packet(0x03);
219        let p3 = ts_packet(0x04);
220        let p4 = ts_packet(0x05);
221        let data = concat_ts(&[p0, p1, p2, p3, p4]);
222
223        let mut r = TsResync::new();
224        let emitted = r.feed(&data);
225
226        assert_eq!(emitted.len(), 5);
227        assert_eq!(emitted[0], p0);
228        assert_eq!(emitted[1], p1);
229        assert_eq!(emitted[2], p2);
230        assert_eq!(emitted[3], p3);
231        assert_eq!(emitted[4], p4);
232        assert_eq!(r.stride(), Some(PacketStride::Ts188));
233        let s = r.stats();
234        assert_eq!(s.packets, 5);
235        assert_eq!(s.resyncs, 0);
236        assert_eq!(s.dropped_bytes, 0);
237    }
238
239    // ------------------------------------------------------------------
240    // Test 2 — junk prefix, then lock and recover all packets
241    // ------------------------------------------------------------------
242    #[test]
243    fn junk_prefix_locks() {
244        let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
245        let stream = concat_ts(&pkts);
246
247        let junk: Vec<u8> = vec![0x00; 7];
248        let mut data = junk.clone();
249        data.extend_from_slice(&stream);
250
251        let mut r = TsResync::new();
252        let emitted = r.feed(&data);
253
254        assert_eq!(emitted.len(), 6);
255        for (i, p) in emitted.iter().enumerate() {
256            assert_eq!(*p, pkts[i], "packet {i} mismatch");
257        }
258        assert_eq!(r.stride(), Some(PacketStride::Ts188));
259        let s = r.stats();
260        assert_eq!(s.packets, 6);
261        assert_eq!(s.resyncs, 0);
262        assert_eq!(s.dropped_bytes, 7);
263    }
264
265    // ------------------------------------------------------------------
266    // Test 3 — chunked feed equivalence
267    // ------------------------------------------------------------------
268    #[test]
269    fn chunked_feed_equivalence() {
270        let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
271        let stream = concat_ts(&pkts);
272
273        // Whole feed
274        let whole = feed_once(&stream);
275
276        // Chunked feed — 100 bytes at a time
277        let mut r = TsResync::new();
278        let mut chunked = Vec::new();
279        for chunk in stream.chunks(100) {
280            chunked.extend(r.feed(chunk));
281        }
282
283        assert_eq!(whole.len(), chunked.len());
284        for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
285            assert_eq!(w, c, "packet {i} mismatch");
286        }
287    }
288
289    // ------------------------------------------------------------------
290    // Test 4 — mid-stream sync loss and reacquisition
291    // ------------------------------------------------------------------
292    #[test]
293    fn midstream_loss_resync() {
294        // Need >= LOCK_CONFIRMATIONS clean packets BEFORE the corruption (so
295        // the stream actually locks first, making the disruption a genuine
296        // resync rather than a delayed initial lock) and >= LOCK_CONFIRMATIONS
297        // after (so it can re-lock and recover). 14 packets, corruption inside
298        // packet 7, satisfies both.
299        let pkts: Vec<_> = (0..14).map(|i| ts_packet(i + 1)).collect();
300        let clean = concat_ts(&pkts);
301
302        // Insert a single stray byte 12 bytes into packet 7.
303        let insert_at = 7 * TS_PACKET_SIZE + 12;
304        let stray: u8 = 0x00;
305        let mut data = clean[..insert_at].to_vec();
306        data.push(stray);
307        data.extend_from_slice(&clean[insert_at..]);
308
309        let mut r = TsResync::new();
310        let emitted = r.feed(&data);
311
312        let s = r.stats();
313        // A locked stream losing alignment mid-way must record a resync.
314        assert!(
315            s.resyncs >= 1,
316            "mid-stream corruption must trigger a resync, got {}",
317            s.resyncs
318        );
319        // Locks on the clean prefix: the first emitted packet is P0.
320        assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
321        // Re-locks after the corruption and recovers most of the stream.
322        assert!(
323            emitted.len() >= 10,
324            "should recover and emit most packets, got {}",
325            emitted.len()
326        );
327    }
328
329    // ------------------------------------------------------------------
330    // Test 5 — 204-byte RS-coded packets detected and stripped
331    // ------------------------------------------------------------------
332    #[test]
333    fn rs204_detected_and_stripped() {
334        let mut stream = Vec::new();
335        let mut expected_payloads = Vec::new();
336        for i in 0u8..6 {
337            let tag = i + 1;
338            let rs = rs_packet(tag, 0xFF);
339            stream.extend_from_slice(&rs);
340            expected_payloads.push(ts_packet(tag));
341        }
342
343        let mut r = TsResync::new();
344        let emitted = r.feed(&stream);
345
346        assert_eq!(emitted.len(), 6);
347        assert_eq!(r.stride(), Some(PacketStride::Rs204));
348        for (i, (e, p)) in emitted.iter().zip(expected_payloads.iter()).enumerate() {
349            assert_eq!(e, p, "packet {i} mismatch");
350        }
351        let s = r.stats();
352        assert_eq!(s.packets, 6);
353        assert_eq!(s.resyncs, 0);
354        assert_eq!(s.dropped_bytes, 0);
355    }
356
357    // ------------------------------------------------------------------
358    // Test 6 — large single-feed O(n) exercise (not O(n²))
359    // ------------------------------------------------------------------
360    #[test]
361    fn large_buffer_single_feed() {
362        const NUM_PACKETS: usize = 500;
363        let pkts: Vec<_> = (0..NUM_PACKETS)
364            .map(|i| {
365                let mut tag = (i % 253 + 1) as u8;
366                if tag >= SYNC_BYTE {
367                    tag += 1;
368                }
369                ts_packet(tag)
370            })
371            .collect();
372        let stream = concat_ts(&pkts);
373
374        // Prepend some garbage so the resync path is exercised.
375        let garbage: Vec<u8> = vec![0x00; 13];
376        let mut data = garbage;
377        data.extend_from_slice(&stream);
378
379        let mut r = TsResync::new();
380        let emitted = r.feed(&data);
381
382        assert_eq!(emitted.len(), NUM_PACKETS);
383        for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
384            assert_eq!(e, p, "packet {i} mismatch");
385        }
386        assert_eq!(r.stride(), Some(PacketStride::Ts188));
387        let s = r.stats();
388        assert_eq!(s.packets, NUM_PACKETS as u64);
389        assert_eq!(s.resyncs, 0);
390        assert_eq!(s.dropped_bytes, 13);
391    }
392}