Skip to main content

dvb_si/
resync.rs

1//! Stateful TS byte-stream resynchroniser — ISO/IEC 13818-1 §2.4.3.2.
2//!
3//! Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
4//! (file reads, UDP payloads) that may start mid-packet or contain leading
5//! garbage.  Also detects 204-byte Reed-Solomon-coded packets (DVB RS-coded
6//! outer forward-error-correction layer) and strips the 16 parity bytes,
7//! yielding standard 188-byte TS packets in both cases.
8//!
9//! # Feature gate
10//!
11//! This module is only compiled when the `ts` feature is enabled (the
12//! default), because it depends on the TS constants in [`crate::ts`].
13//!
14//! # Example
15//!
16//! ```
17//! use dvb_si::resync::TsResync;
18//!
19//! let mut r = TsResync::new();
20//! // Feed arbitrary bytes (file chunks, UDP datagrams, etc.).
21//! let packets: Vec<[u8; 188]> = r.feed(b"some raw bytes");
22//! let stats = r.stats();
23//! ```
24
25use alloc::vec::Vec;
26
27use crate::ts::{TS_PACKET_SIZE, TS_SYNC_BYTE};
28
29/// Reed-Solomon-coded TS packet size: 188-byte payload + 16 parity bytes
30/// (DVB RS outer FEC, ISO/IEC 13818-1 §2.4.3.2 informative note).
31pub const RS_PACKET_SIZE: usize = 204;
32
33/// Number of Reed-Solomon parity bytes appended to a 204-byte packet.
34pub const RS_PARITY_LEN: usize = RS_PACKET_SIZE - TS_PACKET_SIZE;
35
36/// Consecutive sync bytes at the candidate stride required to declare lock.
37pub const LOCK_CONFIRMATIONS: usize = 5;
38
39/// Detected packet size after locking.
40#[non_exhaustive]
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42#[cfg_attr(feature = "serde", derive(serde::Serialize))]
43pub enum PacketStride {
44    /// Standard 188-byte TS packets (ISO/IEC 13818-1 §2.4.3.2).
45    Ts188,
46    /// 204-byte packets (188-byte TS + 16 Reed-Solomon parity bytes).
47    Rs204,
48}
49
50/// Counters accumulated during resynchronisation.
51#[non_exhaustive]
52#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
53#[cfg_attr(feature = "serde", derive(serde::Serialize))]
54pub struct ResyncStats {
55    /// Total 188-byte TS packets emitted.
56    pub packets: u64,
57    /// Times sync was lost and reacquired.
58    pub resyncs: u64,
59    /// Bytes skipped/dropped (junk before lock + sync-loss bytes).
60    pub dropped_bytes: u64,
61}
62
63/// Stateful TS byte-stream resynchroniser (ISO/IEC 13818-1 §2.4.3.2).
64///
65/// Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
66/// that may start mid-packet or contain junk, and detects 204-byte
67/// Reed-Solomon-coded packets (stripping the 16 parity bytes).
68///
69/// Feed raw bytes with [`feed`](Self::feed); each call returns a `Vec` of
70/// aligned 188-byte TS packets.  Bytes are buffered across calls so that
71/// packet boundaries may span call boundaries freely.
72///
73/// Lock is declared after [`LOCK_CONFIRMATIONS`] consecutive sync bytes are
74/// found at the candidate stride (188 or 204).  On sync loss the resynchroniser
75/// re-scans from the byte after the lost position.
76///
77/// # Stats
78///
79/// [`stats`](Self::stats) returns cumulative counters (packets emitted,
80/// resyncs, dropped bytes).
81#[derive(Debug, Default)]
82pub struct TsResync {
83    buf: Vec<u8>,
84    /// Logical read head into `buf`; compacted periodically.
85    head: usize,
86    stride: Option<PacketStride>,
87    stats: ResyncStats,
88}
89
90impl TsResync {
91    /// Create a new resynchroniser with an empty internal buffer.
92    pub fn new() -> Self {
93        Self::default()
94    }
95
96    /// Feed `data` and emit every newly-aligned 188-byte TS packet.
97    ///
98    /// For a 204-byte stream the 16 Reed-Solomon parity bytes are stripped;
99    /// only the 188-byte TS payload is returned.  Bytes that cannot yet form
100    /// a complete packet (or fall before lock) are buffered for the next call.
101    pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
102        self.buf.extend_from_slice(data);
103        let mut emitted = Vec::new();
104
105        loop {
106            match self.stride {
107                None => {
108                    if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
109                        self.stats.dropped_bytes += offset as u64;
110                        self.head += offset;
111                        self.stride = Some(s);
112                    } else {
113                        // Keep at most enough bytes to detect a future lock.
114                        let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
115                        let tail_len = self.buf.len() - self.head;
116                        if tail_len > keep {
117                            let excess = tail_len - keep;
118                            self.stats.dropped_bytes += excess as u64;
119                            self.head += excess;
120                        }
121                        self.compact();
122                        return emitted;
123                    }
124                }
125                Some(stride) => {
126                    let s = match stride {
127                        PacketStride::Ts188 => TS_PACKET_SIZE,
128                        PacketStride::Rs204 => RS_PACKET_SIZE,
129                    };
130                    let tail_len = self.buf.len() - self.head;
131                    if tail_len < s {
132                        self.compact();
133                        return emitted;
134                    }
135                    if self.buf[self.head] == TS_SYNC_BYTE {
136                        let mut packet = [0u8; TS_PACKET_SIZE];
137                        packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
138                        emitted.push(packet);
139                        self.head += s;
140                        self.stats.packets += 1;
141                    } else {
142                        // Sync byte missing — record the loss and re-scan.
143                        self.stats.resyncs += 1;
144                        self.stats.dropped_bytes += 1;
145                        self.head += 1;
146                        self.stride = None;
147                    }
148                }
149            }
150        }
151    }
152
153    /// Detected packet stride, or [`None`] before the stream has locked.
154    pub fn stride(&self) -> Option<PacketStride> {
155        self.stride
156    }
157
158    /// Accumulated statistics.
159    pub fn stats(&self) -> ResyncStats {
160        self.stats
161    }
162
163    /// Compact the internal buffer by discarding consumed bytes.
164    fn compact(&mut self) {
165        if self.head > 0 {
166            self.buf.drain(..self.head);
167            self.head = 0;
168        }
169    }
170}
171
172/// Scan `buf` for the smallest offset `o` such that stride `S` (tried 188
173/// first, then 204) yields [`LOCK_CONFIRMATIONS`] consecutive sync bytes at
174/// positions `o + k*S` for `k = 0 .. LOCK_CONFIRMATIONS`.
175///
176/// Returns `(offset, stride)` on success, or `None` if no lock is found.
177fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
178    for o in 0..buf.len() {
179        if buf[o] != TS_SYNC_BYTE {
180            continue;
181        }
182        if try_stride(buf, o, TS_PACKET_SIZE) {
183            return Some((o, PacketStride::Ts188));
184        }
185        if try_stride(buf, o, RS_PACKET_SIZE) {
186            return Some((o, PacketStride::Rs204));
187        }
188    }
189    None
190}
191
192/// Return `true` if `LOCK_CONFIRMATIONS` consecutive sync bytes exist at
193/// stride `s` starting from `offset` (the first is already known to be
194/// [`TS_SYNC_BYTE`]).
195fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
196    for k in 1..LOCK_CONFIRMATIONS {
197        let pos = offset + k * s;
198        if pos >= buf.len() || buf[pos] != TS_SYNC_BYTE {
199            return false;
200        }
201    }
202    true
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208
209    /// Build a 188-byte TS packet starting with `TS_SYNC_BYTE` followed by
210    /// `tag` (repeated).  All non-sync bytes are kept away from `0x47` so
211    /// that no false lock occurs in the fixture data.
212    fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
213        assert_ne!(tag, TS_SYNC_BYTE, "tag must not equal sync byte");
214        let mut pkt = [tag; TS_PACKET_SIZE];
215        pkt[0] = TS_SYNC_BYTE;
216        pkt
217    }
218
219    /// Build a 204-byte RS-coded packet: 188-byte TS payload (with `ts_tag`)
220    /// plus 16 parity bytes filled with `parity` (must not be `0x47`).
221    fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
222        assert_ne!(ts_tag, TS_SYNC_BYTE);
223        assert_ne!(parity, TS_SYNC_BYTE);
224        let mut pkt = [parity; RS_PACKET_SIZE];
225        pkt[0] = TS_SYNC_BYTE;
226        pkt[1..TS_PACKET_SIZE].fill(ts_tag);
227        pkt
228    }
229
230    /// Concatenate several 188-byte packet arrays into a flat `Vec<u8>`.
231    fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
232        let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
233        for p in packets {
234            v.extend_from_slice(p);
235        }
236        v
237    }
238
239    // ------------------------------------------------------------------
240    // Test 1 — aligned 188-byte passthrough
241    // ------------------------------------------------------------------
242    #[test]
243    fn aligned_188_passthrough() {
244        let packets: Vec<_> = (0u8..5).map(|i| ts_packet(i + 1)).collect();
245        let data = concat_ts(&packets);
246
247        let mut r = TsResync::new();
248        let emitted = r.feed(&data);
249
250        assert_eq!(emitted.len(), 5);
251        for (i, (e, p)) in emitted.iter().zip(packets.iter()).enumerate() {
252            assert_eq!(e, p, "packet {i} mismatch");
253        }
254        assert_eq!(r.stride(), Some(PacketStride::Ts188));
255        let s = r.stats();
256        assert_eq!(s.packets, 5);
257        assert_eq!(s.resyncs, 0);
258        assert_eq!(s.dropped_bytes, 0);
259    }
260
261    // ------------------------------------------------------------------
262    // Test 2 — lock only after LOCK_CONFIRMATIONS sync bytes
263    // ------------------------------------------------------------------
264    #[test]
265    fn requires_n_confirmations_before_lock() {
266        // Pin the default lock window. The hardcoded 4-vs-5 boundary below only
267        // bites the threshold when this is 5 — assert it explicitly so a change
268        // to the default (or to this test's literals) cannot silently pass.
269        assert_eq!(
270            LOCK_CONFIRMATIONS, 5,
271            "this test pins the default lock window of 5"
272        );
273
274        // FOUR confirming, stride-aligned packets are one short of the window:
275        // only four sync bytes sit at the 188-stride boundaries, so the
276        // resynchroniser must NOT lock and must emit nothing (it buffers).
277        let four = concat_ts(&(1u8..=4).map(ts_packet).collect::<Vec<_>>());
278        let mut r = TsResync::new();
279        assert_eq!(
280            r.feed(&four).len(),
281            0,
282            "4 confirmations (< 5) must not lock or emit"
283        );
284        assert_eq!(r.stride(), None, "stride must remain None below the window");
285
286        // The FIFTH stride-aligned sync byte completes the window → lock.
287        let mut out = r.feed(&ts_packet(5));
288        out.extend(r.feed(&[]));
289        assert!(
290            r.stride().is_some(),
291            "the 5th confirmation must trigger lock"
292        );
293        // Once locked, the buffered packets are emitted (5 fed, all aligned).
294        assert_eq!(out.len(), 5, "all five aligned packets emit once locked");
295    }
296
297    // ------------------------------------------------------------------
298    // Test 3 — junk prefix: leading garbage dropped, correct count returned
299    // ------------------------------------------------------------------
300    #[test]
301    fn junk_prefix_correct_dropped_count() {
302        let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
303        let stream = concat_ts(&pkts);
304
305        let junk_len = 7usize;
306        let junk: Vec<u8> = vec![0x00; junk_len];
307        let mut data = junk;
308        data.extend_from_slice(&stream);
309
310        let mut r = TsResync::new();
311        let emitted = r.feed(&data);
312
313        assert_eq!(emitted.len(), 6);
314        for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
315            assert_eq!(*e, *p, "packet {i} mismatch after junk prefix");
316        }
317        let s = r.stats();
318        assert_eq!(
319            s.dropped_bytes, junk_len as u64,
320            "dropped bytes must equal junk prefix"
321        );
322        assert_eq!(s.resyncs, 0);
323        assert_eq!(s.packets, 6);
324    }
325
326    // ------------------------------------------------------------------
327    // Test 4 — mid-stream sync loss and reacquisition
328    // ------------------------------------------------------------------
329    #[test]
330    fn midstream_loss_resync() {
331        // Need > LOCK_CONFIRMATIONS clean packets before and after the stray
332        // byte so the stream locks, loses sync, and re-locks.
333        let pkts: Vec<_> = (0u8..14).map(|i| ts_packet(i + 1)).collect();
334        let clean = concat_ts(&pkts);
335
336        // Insert a single stray byte 12 bytes into packet 7.
337        let insert_at = 7 * TS_PACKET_SIZE + 12;
338        let mut data = clean[..insert_at].to_vec();
339        data.push(0x00);
340        data.extend_from_slice(&clean[insert_at..]);
341
342        let mut r = TsResync::new();
343        let emitted = r.feed(&data);
344
345        let s = r.stats();
346        assert!(
347            s.resyncs >= 1,
348            "mid-stream corruption must trigger a resync, got {}",
349            s.resyncs
350        );
351        assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
352        assert!(
353            emitted.len() >= 10,
354            "should recover and emit most packets, got {}",
355            emitted.len()
356        );
357    }
358
359    // ------------------------------------------------------------------
360    // Test 5 — 204-byte RS-coded packets detected + RS stripped
361    // ------------------------------------------------------------------
362    #[test]
363    fn rs204_detected_and_stripped() {
364        let mut stream = Vec::new();
365        let mut expected = Vec::new();
366        for i in 0u8..6 {
367            let tag = i + 1;
368            let rs = rs_packet(tag, 0xFF);
369            stream.extend_from_slice(&rs);
370            expected.push(ts_packet(tag));
371        }
372
373        let mut r = TsResync::new();
374        let emitted = r.feed(&stream);
375
376        assert_eq!(emitted.len(), 6, "RS-coded stream must emit 6 packets");
377        assert_eq!(
378            r.stride(),
379            Some(PacketStride::Rs204),
380            "stride must be Rs204"
381        );
382        for (i, (e, p)) in emitted.iter().zip(expected.iter()).enumerate() {
383            assert_eq!(e, p, "RS-stripped packet {i} mismatch");
384        }
385        // Confirm the emitted 188-byte packets are parseable by TsPacket.
386        for (i, pkt) in emitted.iter().enumerate() {
387            crate::ts::TsPacket::parse(pkt)
388                .unwrap_or_else(|e| panic!("RS-stripped packet {i} TsPacket::parse failed: {e}"));
389        }
390        let s = r.stats();
391        assert_eq!(s.packets, 6);
392        assert_eq!(s.resyncs, 0);
393        assert_eq!(s.dropped_bytes, 0);
394    }
395
396    // ------------------------------------------------------------------
397    // Test 6 — aligned 188 stream yields same packets as plain chunks(188)
398    //   (equivalence: fixture-less variant using synthetic data)
399    // ------------------------------------------------------------------
400    #[test]
401    fn aligned_188_matches_plain_chunks() {
402        let pkts: Vec<_> = (0u8..10).map(|i| ts_packet(i + 1)).collect();
403        let data = concat_ts(&pkts);
404
405        // Oracle: plain chunks(188) filtered by sync byte.
406        let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
407            .chunks_exact(TS_PACKET_SIZE)
408            .filter(|c| c[0] == TS_SYNC_BYTE)
409            .map(|c| c.try_into().unwrap())
410            .collect();
411
412        let mut r = TsResync::new();
413        let emitted = r.feed(&data);
414
415        assert_eq!(emitted.len(), oracle.len(), "count mismatch");
416        for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
417            assert_eq!(e, o, "packet {i} differs from chunks-oracle");
418        }
419    }
420
421    // ------------------------------------------------------------------
422    // Test 7 — chunked feed equivalence (same result in small increments)
423    // ------------------------------------------------------------------
424    #[test]
425    fn chunked_feed_equivalence() {
426        let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
427        let stream = concat_ts(&pkts);
428
429        let whole = {
430            let mut r = TsResync::new();
431            r.feed(&stream)
432        };
433
434        let chunked = {
435            let mut r = TsResync::new();
436            let mut out = Vec::new();
437            for chunk in stream.chunks(100) {
438                out.extend(r.feed(chunk));
439            }
440            out
441        };
442
443        assert_eq!(whole.len(), chunked.len());
444        for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
445            assert_eq!(w, c, "packet {i} mismatch");
446        }
447    }
448
449    // ------------------------------------------------------------------
450    // Test 8 — fixture-based: m6-single.ts differential
451    //   feeding the real fixture through TsResync must yield the same
452    //   188-byte packets as plain chunks_exact(188) + sync-byte filter.
453    // ------------------------------------------------------------------
454    #[test]
455    fn fixture_m6_matches_plain_chunks() {
456        let path = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/m6-single.ts");
457        let data = std::fs::read(path).expect("m6-single.ts fixture not found");
458
459        // Oracle: plain aligned 188-byte reads.
460        let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
461            .chunks_exact(TS_PACKET_SIZE)
462            .filter(|c| c[0] == TS_SYNC_BYTE)
463            .map(|c| c.try_into().unwrap())
464            .collect();
465        assert!(!oracle.is_empty(), "oracle empty — fixture empty?");
466
467        let mut r = TsResync::new();
468        let emitted = r.feed(&data);
469
470        assert_eq!(
471            emitted.len(),
472            oracle.len(),
473            "packet count: TsResync={} oracle={}",
474            emitted.len(),
475            oracle.len()
476        );
477        for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
478            assert_eq!(e, o, "packet {i} mismatch vs oracle");
479        }
480    }
481}