Skip to main content

dvb_si/
resync.rs

1//! Stateful TS byte-stream resynchroniser — ISO/IEC 13818-1 §2.4.3.2.
2//!
3//! Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
4//! (file reads, UDP payloads) that may start mid-packet or contain leading
5//! garbage.  Also detects 204-byte Reed-Solomon-coded packets (DVB RS-coded
6//! outer forward-error-correction layer) and strips the 16 parity bytes,
7//! yielding standard 188-byte TS packets in both cases.
8//!
9//! # Feature gate
10//!
11//! This module is only compiled when the `ts` feature is enabled (the
12//! default), because it depends on the TS constants in [`crate::ts`].
13//!
14//! # Example
15//!
16//! ```
17//! use dvb_si::resync::TsResync;
18//!
19//! let mut r = TsResync::new();
20//! // Feed arbitrary bytes (file chunks, UDP datagrams, etc.).
21//! let packets: Vec<[u8; 188]> = r.feed(b"some raw bytes");
22//! let stats = r.stats();
23//! ```
24
25use crate::ts::{TS_PACKET_SIZE, TS_SYNC_BYTE};
26
27/// Reed-Solomon-coded TS packet size: 188-byte payload + 16 parity bytes
28/// (DVB RS outer FEC, ISO/IEC 13818-1 §2.4.3.2 informative note).
29pub const RS_PACKET_SIZE: usize = 204;
30
31/// Number of Reed-Solomon parity bytes appended to a 204-byte packet.
32pub const RS_PARITY_LEN: usize = RS_PACKET_SIZE - TS_PACKET_SIZE;
33
34/// Consecutive sync bytes at the candidate stride required to declare lock.
35pub const LOCK_CONFIRMATIONS: usize = 5;
36
37/// Detected packet size after locking.
38#[non_exhaustive]
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize))]
41pub enum PacketStride {
42    /// Standard 188-byte TS packets (ISO/IEC 13818-1 §2.4.3.2).
43    Ts188,
44    /// 204-byte packets (188-byte TS + 16 Reed-Solomon parity bytes).
45    Rs204,
46}
47
48/// Counters accumulated during resynchronisation.
49#[non_exhaustive]
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize))]
52pub struct ResyncStats {
53    /// Total 188-byte TS packets emitted.
54    pub packets: u64,
55    /// Times sync was lost and reacquired.
56    pub resyncs: u64,
57    /// Bytes skipped/dropped (junk before lock + sync-loss bytes).
58    pub dropped_bytes: u64,
59}
60
61/// Stateful TS byte-stream resynchroniser (ISO/IEC 13818-1 §2.4.3.2).
62///
63/// Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
64/// that may start mid-packet or contain junk, and detects 204-byte
65/// Reed-Solomon-coded packets (stripping the 16 parity bytes).
66///
67/// Feed raw bytes with [`feed`](Self::feed); each call returns a `Vec` of
68/// aligned 188-byte TS packets.  Bytes are buffered across calls so that
69/// packet boundaries may span call boundaries freely.
70///
71/// Lock is declared after [`LOCK_CONFIRMATIONS`] consecutive sync bytes are
72/// found at the candidate stride (188 or 204).  On sync loss the resynchroniser
73/// re-scans from the byte after the lost position.
74///
75/// # Stats
76///
77/// [`stats`](Self::stats) returns cumulative counters (packets emitted,
78/// resyncs, dropped bytes).
79#[derive(Debug, Default)]
80pub struct TsResync {
81    buf: Vec<u8>,
82    /// Logical read head into `buf`; compacted periodically.
83    head: usize,
84    stride: Option<PacketStride>,
85    stats: ResyncStats,
86}
87
88impl TsResync {
89    /// Create a new resynchroniser with an empty internal buffer.
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Feed `data` and emit every newly-aligned 188-byte TS packet.
95    ///
96    /// For a 204-byte stream the 16 Reed-Solomon parity bytes are stripped;
97    /// only the 188-byte TS payload is returned.  Bytes that cannot yet form
98    /// a complete packet (or fall before lock) are buffered for the next call.
99    pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
100        self.buf.extend_from_slice(data);
101        let mut emitted = Vec::new();
102
103        loop {
104            match self.stride {
105                None => {
106                    if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
107                        self.stats.dropped_bytes += offset as u64;
108                        self.head += offset;
109                        self.stride = Some(s);
110                    } else {
111                        // Keep at most enough bytes to detect a future lock.
112                        let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
113                        let tail_len = self.buf.len() - self.head;
114                        if tail_len > keep {
115                            let excess = tail_len - keep;
116                            self.stats.dropped_bytes += excess as u64;
117                            self.head += excess;
118                        }
119                        self.compact();
120                        return emitted;
121                    }
122                }
123                Some(stride) => {
124                    let s = match stride {
125                        PacketStride::Ts188 => TS_PACKET_SIZE,
126                        PacketStride::Rs204 => RS_PACKET_SIZE,
127                    };
128                    let tail_len = self.buf.len() - self.head;
129                    if tail_len < s {
130                        self.compact();
131                        return emitted;
132                    }
133                    if self.buf[self.head] == TS_SYNC_BYTE {
134                        let mut packet = [0u8; TS_PACKET_SIZE];
135                        packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
136                        emitted.push(packet);
137                        self.head += s;
138                        self.stats.packets += 1;
139                    } else {
140                        // Sync byte missing — record the loss and re-scan.
141                        self.stats.resyncs += 1;
142                        self.stats.dropped_bytes += 1;
143                        self.head += 1;
144                        self.stride = None;
145                    }
146                }
147            }
148        }
149    }
150
151    /// Detected packet stride, or [`None`] before the stream has locked.
152    pub fn stride(&self) -> Option<PacketStride> {
153        self.stride
154    }
155
156    /// Accumulated statistics.
157    pub fn stats(&self) -> ResyncStats {
158        self.stats
159    }
160
161    /// Compact the internal buffer by discarding consumed bytes.
162    fn compact(&mut self) {
163        if self.head > 0 {
164            self.buf.drain(..self.head);
165            self.head = 0;
166        }
167    }
168}
169
170/// Scan `buf` for the smallest offset `o` such that stride `S` (tried 188
171/// first, then 204) yields [`LOCK_CONFIRMATIONS`] consecutive sync bytes at
172/// positions `o + k*S` for `k = 0 .. LOCK_CONFIRMATIONS`.
173///
174/// Returns `(offset, stride)` on success, or `None` if no lock is found.
175fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
176    for o in 0..buf.len() {
177        if buf[o] != TS_SYNC_BYTE {
178            continue;
179        }
180        if try_stride(buf, o, TS_PACKET_SIZE) {
181            return Some((o, PacketStride::Ts188));
182        }
183        if try_stride(buf, o, RS_PACKET_SIZE) {
184            return Some((o, PacketStride::Rs204));
185        }
186    }
187    None
188}
189
190/// Return `true` if `LOCK_CONFIRMATIONS` consecutive sync bytes exist at
191/// stride `s` starting from `offset` (the first is already known to be
192/// [`TS_SYNC_BYTE`]).
193fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
194    for k in 1..LOCK_CONFIRMATIONS {
195        let pos = offset + k * s;
196        if pos >= buf.len() || buf[pos] != TS_SYNC_BYTE {
197            return false;
198        }
199    }
200    true
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    /// Build a 188-byte TS packet starting with `TS_SYNC_BYTE` followed by
208    /// `tag` (repeated).  All non-sync bytes are kept away from `0x47` so
209    /// that no false lock occurs in the fixture data.
210    fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
211        assert_ne!(tag, TS_SYNC_BYTE, "tag must not equal sync byte");
212        let mut pkt = [tag; TS_PACKET_SIZE];
213        pkt[0] = TS_SYNC_BYTE;
214        pkt
215    }
216
217    /// Build a 204-byte RS-coded packet: 188-byte TS payload (with `ts_tag`)
218    /// plus 16 parity bytes filled with `parity` (must not be `0x47`).
219    fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
220        assert_ne!(ts_tag, TS_SYNC_BYTE);
221        assert_ne!(parity, TS_SYNC_BYTE);
222        let mut pkt = [parity; RS_PACKET_SIZE];
223        pkt[0] = TS_SYNC_BYTE;
224        pkt[1..TS_PACKET_SIZE].fill(ts_tag);
225        pkt
226    }
227
228    /// Concatenate several 188-byte packet arrays into a flat `Vec<u8>`.
229    fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
230        let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
231        for p in packets {
232            v.extend_from_slice(p);
233        }
234        v
235    }
236
237    // ------------------------------------------------------------------
238    // Test 1 — aligned 188-byte passthrough
239    // ------------------------------------------------------------------
240    #[test]
241    fn aligned_188_passthrough() {
242        let packets: Vec<_> = (0u8..5).map(|i| ts_packet(i + 1)).collect();
243        let data = concat_ts(&packets);
244
245        let mut r = TsResync::new();
246        let emitted = r.feed(&data);
247
248        assert_eq!(emitted.len(), 5);
249        for (i, (e, p)) in emitted.iter().zip(packets.iter()).enumerate() {
250            assert_eq!(e, p, "packet {i} mismatch");
251        }
252        assert_eq!(r.stride(), Some(PacketStride::Ts188));
253        let s = r.stats();
254        assert_eq!(s.packets, 5);
255        assert_eq!(s.resyncs, 0);
256        assert_eq!(s.dropped_bytes, 0);
257    }
258
259    // ------------------------------------------------------------------
260    // Test 2 — lock only after LOCK_CONFIRMATIONS sync bytes
261    // ------------------------------------------------------------------
262    #[test]
263    fn requires_n_confirmations_before_lock() {
264        // Pin the default lock window. The hardcoded 4-vs-5 boundary below only
265        // bites the threshold when this is 5 — assert it explicitly so a change
266        // to the default (or to this test's literals) cannot silently pass.
267        assert_eq!(
268            LOCK_CONFIRMATIONS, 5,
269            "this test pins the default lock window of 5"
270        );
271
272        // FOUR confirming, stride-aligned packets are one short of the window:
273        // only four sync bytes sit at the 188-stride boundaries, so the
274        // resynchroniser must NOT lock and must emit nothing (it buffers).
275        let four = concat_ts(&(1u8..=4).map(ts_packet).collect::<Vec<_>>());
276        let mut r = TsResync::new();
277        assert_eq!(
278            r.feed(&four).len(),
279            0,
280            "4 confirmations (< 5) must not lock or emit"
281        );
282        assert_eq!(r.stride(), None, "stride must remain None below the window");
283
284        // The FIFTH stride-aligned sync byte completes the window → lock.
285        let mut out = r.feed(&ts_packet(5));
286        out.extend(r.feed(&[]));
287        assert!(
288            r.stride().is_some(),
289            "the 5th confirmation must trigger lock"
290        );
291        // Once locked, the buffered packets are emitted (5 fed, all aligned).
292        assert_eq!(out.len(), 5, "all five aligned packets emit once locked");
293    }
294
295    // ------------------------------------------------------------------
296    // Test 3 — junk prefix: leading garbage dropped, correct count returned
297    // ------------------------------------------------------------------
298    #[test]
299    fn junk_prefix_correct_dropped_count() {
300        let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
301        let stream = concat_ts(&pkts);
302
303        let junk_len = 7usize;
304        let junk: Vec<u8> = vec![0x00; junk_len];
305        let mut data = junk;
306        data.extend_from_slice(&stream);
307
308        let mut r = TsResync::new();
309        let emitted = r.feed(&data);
310
311        assert_eq!(emitted.len(), 6);
312        for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
313            assert_eq!(*e, *p, "packet {i} mismatch after junk prefix");
314        }
315        let s = r.stats();
316        assert_eq!(
317            s.dropped_bytes, junk_len as u64,
318            "dropped bytes must equal junk prefix"
319        );
320        assert_eq!(s.resyncs, 0);
321        assert_eq!(s.packets, 6);
322    }
323
324    // ------------------------------------------------------------------
325    // Test 4 — mid-stream sync loss and reacquisition
326    // ------------------------------------------------------------------
327    #[test]
328    fn midstream_loss_resync() {
329        // Need > LOCK_CONFIRMATIONS clean packets before and after the stray
330        // byte so the stream locks, loses sync, and re-locks.
331        let pkts: Vec<_> = (0u8..14).map(|i| ts_packet(i + 1)).collect();
332        let clean = concat_ts(&pkts);
333
334        // Insert a single stray byte 12 bytes into packet 7.
335        let insert_at = 7 * TS_PACKET_SIZE + 12;
336        let mut data = clean[..insert_at].to_vec();
337        data.push(0x00);
338        data.extend_from_slice(&clean[insert_at..]);
339
340        let mut r = TsResync::new();
341        let emitted = r.feed(&data);
342
343        let s = r.stats();
344        assert!(
345            s.resyncs >= 1,
346            "mid-stream corruption must trigger a resync, got {}",
347            s.resyncs
348        );
349        assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
350        assert!(
351            emitted.len() >= 10,
352            "should recover and emit most packets, got {}",
353            emitted.len()
354        );
355    }
356
357    // ------------------------------------------------------------------
358    // Test 5 — 204-byte RS-coded packets detected + RS stripped
359    // ------------------------------------------------------------------
360    #[test]
361    fn rs204_detected_and_stripped() {
362        let mut stream = Vec::new();
363        let mut expected = Vec::new();
364        for i in 0u8..6 {
365            let tag = i + 1;
366            let rs = rs_packet(tag, 0xFF);
367            stream.extend_from_slice(&rs);
368            expected.push(ts_packet(tag));
369        }
370
371        let mut r = TsResync::new();
372        let emitted = r.feed(&stream);
373
374        assert_eq!(emitted.len(), 6, "RS-coded stream must emit 6 packets");
375        assert_eq!(
376            r.stride(),
377            Some(PacketStride::Rs204),
378            "stride must be Rs204"
379        );
380        for (i, (e, p)) in emitted.iter().zip(expected.iter()).enumerate() {
381            assert_eq!(e, p, "RS-stripped packet {i} mismatch");
382        }
383        // Confirm the emitted 188-byte packets are parseable by TsPacket.
384        for (i, pkt) in emitted.iter().enumerate() {
385            crate::ts::TsPacket::parse(pkt)
386                .unwrap_or_else(|e| panic!("RS-stripped packet {i} TsPacket::parse failed: {e}"));
387        }
388        let s = r.stats();
389        assert_eq!(s.packets, 6);
390        assert_eq!(s.resyncs, 0);
391        assert_eq!(s.dropped_bytes, 0);
392    }
393
394    // ------------------------------------------------------------------
395    // Test 6 — aligned 188 stream yields same packets as plain chunks(188)
396    //   (equivalence: fixture-less variant using synthetic data)
397    // ------------------------------------------------------------------
398    #[test]
399    fn aligned_188_matches_plain_chunks() {
400        let pkts: Vec<_> = (0u8..10).map(|i| ts_packet(i + 1)).collect();
401        let data = concat_ts(&pkts);
402
403        // Oracle: plain chunks(188) filtered by sync byte.
404        let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
405            .chunks_exact(TS_PACKET_SIZE)
406            .filter(|c| c[0] == TS_SYNC_BYTE)
407            .map(|c| c.try_into().unwrap())
408            .collect();
409
410        let mut r = TsResync::new();
411        let emitted = r.feed(&data);
412
413        assert_eq!(emitted.len(), oracle.len(), "count mismatch");
414        for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
415            assert_eq!(e, o, "packet {i} differs from chunks-oracle");
416        }
417    }
418
419    // ------------------------------------------------------------------
420    // Test 7 — chunked feed equivalence (same result in small increments)
421    // ------------------------------------------------------------------
422    #[test]
423    fn chunked_feed_equivalence() {
424        let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
425        let stream = concat_ts(&pkts);
426
427        let whole = {
428            let mut r = TsResync::new();
429            r.feed(&stream)
430        };
431
432        let chunked = {
433            let mut r = TsResync::new();
434            let mut out = Vec::new();
435            for chunk in stream.chunks(100) {
436                out.extend(r.feed(chunk));
437            }
438            out
439        };
440
441        assert_eq!(whole.len(), chunked.len());
442        for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
443            assert_eq!(w, c, "packet {i} mismatch");
444        }
445    }
446
447    // ------------------------------------------------------------------
448    // Test 8 — fixture-based: m6-single.ts differential
449    //   feeding the real fixture through TsResync must yield the same
450    //   188-byte packets as plain chunks_exact(188) + sync-byte filter.
451    // ------------------------------------------------------------------
452    #[test]
453    fn fixture_m6_matches_plain_chunks() {
454        let path = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/m6-single.ts");
455        let data = std::fs::read(path).expect("m6-single.ts fixture not found");
456
457        // Oracle: plain aligned 188-byte reads.
458        let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
459            .chunks_exact(TS_PACKET_SIZE)
460            .filter(|c| c[0] == TS_SYNC_BYTE)
461            .map(|c| c.try_into().unwrap())
462            .collect();
463        assert!(!oracle.is_empty(), "oracle empty — fixture empty?");
464
465        let mut r = TsResync::new();
466        let emitted = r.feed(&data);
467
468        assert_eq!(
469            emitted.len(),
470            oracle.len(),
471            "packet count: TsResync={} oracle={}",
472            emitted.len(),
473            oracle.len()
474        );
475        for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
476            assert_eq!(e, o, "packet {i} mismatch vs oracle");
477        }
478    }
479}