Skip to main content

mpeg_ts/
resync.rs

1//! Stateful TS byte-stream resynchroniser — ITU-T H.222.0 §2.4.3.2 (= ISO/IEC 13818-1 §2.4.3.2).
2//!
3//! Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
4//! (file reads, UDP payloads) that may start mid-packet or contain leading
5//! garbage.  Also detects 204-byte Reed-Solomon-coded packets (DVB RS-coded
6//! outer forward-error-correction layer) and strips the 16 parity bytes,
7//! yielding standard 188-byte TS packets in both cases.
8//!
9//! # Feature gate
10//!
11//! This module is only compiled when the `ts` feature is enabled (the
12//! default), because it depends on the TS constants in [`crate::ts`].
13//!
14//! # Example
15//!
16//! ```
17//! use mpeg_ts::resync::TsResync;
18//!
19//! let mut r = TsResync::new();
20//! // Feed arbitrary bytes (file chunks, UDP datagrams, etc.).
21//! let packets: Vec<[u8; 188]> = r.feed(b"some raw bytes");
22//! let stats = r.stats();
23//! ```
24
25use alloc::vec::Vec;
26
27use crate::ts::{TS_PACKET_SIZE, TS_SYNC_BYTE};
28
29/// Reed-Solomon-coded TS packet size: 188-byte payload + 16 parity bytes
30/// (DVB RS outer FEC, ISO/IEC 13818-1 §2.4.3.2 informative note).
31pub const RS_PACKET_SIZE: usize = 204;
32
33/// Number of Reed-Solomon parity bytes appended to a 204-byte packet.
34pub const RS_PARITY_LEN: usize = RS_PACKET_SIZE - TS_PACKET_SIZE;
35
36/// Consecutive sync bytes at the candidate stride required to declare lock.
37pub const LOCK_CONFIRMATIONS: usize = 5;
38
39/// Detected packet size after locking.
40#[non_exhaustive]
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42#[cfg_attr(feature = "serde", derive(serde::Serialize))]
43pub enum PacketStride {
44    /// Standard 188-byte TS packets (ISO/IEC 13818-1 §2.4.3.2).
45    Ts188,
46    /// 204-byte packets (188-byte TS + 16 Reed-Solomon parity bytes).
47    Rs204,
48}
49
50impl PacketStride {
51    /// Short machine-readable label for this stride.
52    pub fn name(&self) -> &'static str {
53        match self {
54            PacketStride::Ts188 => "ts_188",
55            PacketStride::Rs204 => "rs_204",
56        }
57    }
58}
59
60broadcast_common::impl_spec_display!(PacketStride);
61
62/// Counters accumulated during resynchronisation.
63#[non_exhaustive]
64#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
65#[cfg_attr(feature = "serde", derive(serde::Serialize))]
66pub struct ResyncStats {
67    /// Total 188-byte TS packets emitted.
68    pub packets: u64,
69    /// Times sync was lost and reacquired.
70    pub resyncs: u64,
71    /// Bytes skipped/dropped (junk before lock + sync-loss bytes).
72    pub dropped_bytes: u64,
73}
74
75/// Stateful TS byte-stream resynchroniser (ISO/IEC 13818-1 §2.4.3.2).
76///
77/// Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
78/// that may start mid-packet or contain junk, and detects 204-byte
79/// Reed-Solomon-coded packets (stripping the 16 parity bytes).
80///
81/// Feed raw bytes with [`feed`](Self::feed); each call returns a `Vec` of
82/// aligned 188-byte TS packets.  Bytes are buffered across calls so that
83/// packet boundaries may span call boundaries freely.
84///
85/// Lock is declared after [`LOCK_CONFIRMATIONS`] consecutive sync bytes are
86/// found at the candidate stride (188 or 204).  On sync loss the resynchroniser
87/// re-scans from the byte after the lost position.
88///
89/// # Stats
90///
91/// [`stats`](Self::stats) returns cumulative counters (packets emitted,
92/// resyncs, dropped bytes).
93#[derive(Debug, Default)]
94pub struct TsResync {
95    buf: Vec<u8>,
96    /// Logical read head into `buf`; compacted periodically.
97    head: usize,
98    stride: Option<PacketStride>,
99    stats: ResyncStats,
100}
101
102impl TsResync {
103    /// Create a new resynchroniser with an empty internal buffer.
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    /// Feed `data` and emit every newly-aligned 188-byte TS packet.
109    ///
110    /// For a 204-byte stream the 16 Reed-Solomon parity bytes are stripped;
111    /// only the 188-byte TS payload is returned.  Bytes that cannot yet form
112    /// a complete packet (or fall before lock) are buffered for the next call.
113    pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
114        self.buf.extend_from_slice(data);
115        let mut emitted = Vec::new();
116
117        loop {
118            match self.stride {
119                None => {
120                    if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
121                        self.stats.dropped_bytes += offset as u64;
122                        self.head += offset;
123                        self.stride = Some(s);
124                    } else {
125                        // Keep at most enough bytes to detect a future lock.
126                        let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
127                        let tail_len = self.buf.len() - self.head;
128                        if tail_len > keep {
129                            let excess = tail_len - keep;
130                            self.stats.dropped_bytes += excess as u64;
131                            self.head += excess;
132                        }
133                        self.compact();
134                        return emitted;
135                    }
136                }
137                Some(stride) => {
138                    let s = match stride {
139                        PacketStride::Ts188 => TS_PACKET_SIZE,
140                        PacketStride::Rs204 => RS_PACKET_SIZE,
141                    };
142                    let tail_len = self.buf.len() - self.head;
143                    if tail_len < s {
144                        self.compact();
145                        return emitted;
146                    }
147                    if self.buf[self.head] == TS_SYNC_BYTE {
148                        let mut packet = [0u8; TS_PACKET_SIZE];
149                        packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
150                        emitted.push(packet);
151                        self.head += s;
152                        self.stats.packets += 1;
153                    } else {
154                        // Sync byte missing — record the loss and re-scan.
155                        self.stats.resyncs += 1;
156                        self.stats.dropped_bytes += 1;
157                        self.head += 1;
158                        self.stride = None;
159                    }
160                }
161            }
162        }
163    }
164
165    /// Detected packet stride, or [`None`] before the stream has locked.
166    pub fn stride(&self) -> Option<PacketStride> {
167        self.stride
168    }
169
170    /// Accumulated statistics.
171    pub fn stats(&self) -> ResyncStats {
172        self.stats
173    }
174
175    /// Compact the internal buffer by discarding consumed bytes.
176    fn compact(&mut self) {
177        if self.head > 0 {
178            self.buf.drain(..self.head);
179            self.head = 0;
180        }
181    }
182}
183
184/// Scan `buf` for the smallest offset `o` such that stride `S` (tried 188
185/// first, then 204) yields [`LOCK_CONFIRMATIONS`] consecutive sync bytes at
186/// positions `o + k*S` for `k = 0 .. LOCK_CONFIRMATIONS`.
187///
188/// Returns `(offset, stride)` on success, or `None` if no lock is found.
189fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
190    for o in 0..buf.len() {
191        if buf[o] != TS_SYNC_BYTE {
192            continue;
193        }
194        if try_stride(buf, o, TS_PACKET_SIZE) {
195            return Some((o, PacketStride::Ts188));
196        }
197        if try_stride(buf, o, RS_PACKET_SIZE) {
198            return Some((o, PacketStride::Rs204));
199        }
200    }
201    None
202}
203
204/// Return `true` if `LOCK_CONFIRMATIONS` consecutive sync bytes exist at
205/// stride `s` starting from `offset` (the first is already known to be
206/// [`TS_SYNC_BYTE`]).
207fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
208    for k in 1..LOCK_CONFIRMATIONS {
209        let pos = offset + k * s;
210        if pos >= buf.len() || buf[pos] != TS_SYNC_BYTE {
211            return false;
212        }
213    }
214    true
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use alloc::vec;
221    use alloc::vec::Vec;
222    extern crate std;
223
224    /// Build a 188-byte TS packet starting with `TS_SYNC_BYTE` followed by
225    /// `tag` (repeated).  All non-sync bytes are kept away from `0x47` so
226    /// that no false lock occurs in the fixture data.
227    fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
228        assert_ne!(tag, TS_SYNC_BYTE, "tag must not equal sync byte");
229        let mut pkt = [tag; TS_PACKET_SIZE];
230        pkt[0] = TS_SYNC_BYTE;
231        pkt
232    }
233
234    /// Build a 204-byte RS-coded packet: 188-byte TS payload (with `ts_tag`)
235    /// plus 16 parity bytes filled with `parity` (must not be `0x47`).
236    fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
237        assert_ne!(ts_tag, TS_SYNC_BYTE);
238        assert_ne!(parity, TS_SYNC_BYTE);
239        let mut pkt = [parity; RS_PACKET_SIZE];
240        pkt[0] = TS_SYNC_BYTE;
241        pkt[1..TS_PACKET_SIZE].fill(ts_tag);
242        pkt
243    }
244
245    /// Concatenate several 188-byte packet arrays into a flat `Vec<u8>`.
246    fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
247        let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
248        for p in packets {
249            v.extend_from_slice(p);
250        }
251        v
252    }
253
254    // ------------------------------------------------------------------
255    // Test 1 — aligned 188-byte passthrough
256    // ------------------------------------------------------------------
257    #[test]
258    fn aligned_188_passthrough() {
259        let packets: Vec<_> = (0u8..5).map(|i| ts_packet(i + 1)).collect();
260        let data = concat_ts(&packets);
261
262        let mut r = TsResync::new();
263        let emitted = r.feed(&data);
264
265        assert_eq!(emitted.len(), 5);
266        for (i, (e, p)) in emitted.iter().zip(packets.iter()).enumerate() {
267            assert_eq!(e, p, "packet {i} mismatch");
268        }
269        assert_eq!(r.stride(), Some(PacketStride::Ts188));
270        let s = r.stats();
271        assert_eq!(s.packets, 5);
272        assert_eq!(s.resyncs, 0);
273        assert_eq!(s.dropped_bytes, 0);
274    }
275
276    // ------------------------------------------------------------------
277    // Test 2 — lock only after LOCK_CONFIRMATIONS sync bytes
278    // ------------------------------------------------------------------
279    #[test]
280    fn requires_n_confirmations_before_lock() {
281        // Pin the default lock window. The hardcoded 4-vs-5 boundary below only
282        // bites the threshold when this is 5 — assert it explicitly so a change
283        // to the default (or to this test's literals) cannot silently pass.
284        assert_eq!(
285            LOCK_CONFIRMATIONS, 5,
286            "this test pins the default lock window of 5"
287        );
288
289        // FOUR confirming, stride-aligned packets are one short of the window:
290        // only four sync bytes sit at the 188-stride boundaries, so the
291        // resynchroniser must NOT lock and must emit nothing (it buffers).
292        let four = concat_ts(&(1u8..=4).map(ts_packet).collect::<Vec<_>>());
293        let mut r = TsResync::new();
294        assert_eq!(
295            r.feed(&four).len(),
296            0,
297            "4 confirmations (< 5) must not lock or emit"
298        );
299        assert_eq!(r.stride(), None, "stride must remain None below the window");
300
301        // The FIFTH stride-aligned sync byte completes the window → lock.
302        let mut out = r.feed(&ts_packet(5));
303        out.extend(r.feed(&[]));
304        assert!(
305            r.stride().is_some(),
306            "the 5th confirmation must trigger lock"
307        );
308        // Once locked, the buffered packets are emitted (5 fed, all aligned).
309        assert_eq!(out.len(), 5, "all five aligned packets emit once locked");
310    }
311
312    // ------------------------------------------------------------------
313    // Test 3 — junk prefix: leading garbage dropped, correct count returned
314    // ------------------------------------------------------------------
315    #[test]
316    fn junk_prefix_correct_dropped_count() {
317        let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
318        let stream = concat_ts(&pkts);
319
320        let junk_len = 7usize;
321        let junk: Vec<u8> = vec![0x00; junk_len];
322        let mut data = junk;
323        data.extend_from_slice(&stream);
324
325        let mut r = TsResync::new();
326        let emitted = r.feed(&data);
327
328        assert_eq!(emitted.len(), 6);
329        for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
330            assert_eq!(*e, *p, "packet {i} mismatch after junk prefix");
331        }
332        let s = r.stats();
333        assert_eq!(
334            s.dropped_bytes, junk_len as u64,
335            "dropped bytes must equal junk prefix"
336        );
337        assert_eq!(s.resyncs, 0);
338        assert_eq!(s.packets, 6);
339    }
340
341    // ------------------------------------------------------------------
342    // Test 4 — mid-stream sync loss and reacquisition
343    // ------------------------------------------------------------------
344    #[test]
345    fn midstream_loss_resync() {
346        // Need > LOCK_CONFIRMATIONS clean packets before and after the stray
347        // byte so the stream locks, loses sync, and re-locks.
348        let pkts: Vec<_> = (0u8..14).map(|i| ts_packet(i + 1)).collect();
349        let clean = concat_ts(&pkts);
350
351        // Insert a single stray byte 12 bytes into packet 7.
352        let insert_at = 7 * TS_PACKET_SIZE + 12;
353        let mut data = clean[..insert_at].to_vec();
354        data.push(0x00);
355        data.extend_from_slice(&clean[insert_at..]);
356
357        let mut r = TsResync::new();
358        let emitted = r.feed(&data);
359
360        let s = r.stats();
361        assert!(
362            s.resyncs >= 1,
363            "mid-stream corruption must trigger a resync, got {}",
364            s.resyncs
365        );
366        assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
367        assert!(
368            emitted.len() >= 10,
369            "should recover and emit most packets, got {}",
370            emitted.len()
371        );
372    }
373
374    // ------------------------------------------------------------------
375    // Test 5 — 204-byte RS-coded packets detected + RS stripped
376    // ------------------------------------------------------------------
377    #[test]
378    fn rs204_detected_and_stripped() {
379        let mut stream = Vec::new();
380        let mut expected = Vec::new();
381        for i in 0u8..6 {
382            let tag = i + 1;
383            let rs = rs_packet(tag, 0xFF);
384            stream.extend_from_slice(&rs);
385            expected.push(ts_packet(tag));
386        }
387
388        let mut r = TsResync::new();
389        let emitted = r.feed(&stream);
390
391        assert_eq!(emitted.len(), 6, "RS-coded stream must emit 6 packets");
392        assert_eq!(
393            r.stride(),
394            Some(PacketStride::Rs204),
395            "stride must be Rs204"
396        );
397        for (i, (e, p)) in emitted.iter().zip(expected.iter()).enumerate() {
398            assert_eq!(e, p, "RS-stripped packet {i} mismatch");
399        }
400        // Confirm the emitted 188-byte packets are parseable by TsPacket.
401        for (i, pkt) in emitted.iter().enumerate() {
402            crate::ts::TsPacket::parse(pkt)
403                .unwrap_or_else(|e| panic!("RS-stripped packet {i} TsPacket::parse failed: {e}"));
404        }
405        let s = r.stats();
406        assert_eq!(s.packets, 6);
407        assert_eq!(s.resyncs, 0);
408        assert_eq!(s.dropped_bytes, 0);
409    }
410
411    // ------------------------------------------------------------------
412    // Test 6 — aligned 188 stream yields same packets as plain chunks(188)
413    //   (equivalence: fixture-less variant using synthetic data)
414    // ------------------------------------------------------------------
415    #[test]
416    fn aligned_188_matches_plain_chunks() {
417        let pkts: Vec<_> = (0u8..10).map(|i| ts_packet(i + 1)).collect();
418        let data = concat_ts(&pkts);
419
420        // Oracle: plain chunks(188) filtered by sync byte.
421        let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
422            .chunks_exact(TS_PACKET_SIZE)
423            .filter(|c| c[0] == TS_SYNC_BYTE)
424            .map(|c| c.try_into().unwrap())
425            .collect();
426
427        let mut r = TsResync::new();
428        let emitted = r.feed(&data);
429
430        assert_eq!(emitted.len(), oracle.len(), "count mismatch");
431        for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
432            assert_eq!(e, o, "packet {i} differs from chunks-oracle");
433        }
434    }
435
436    // ------------------------------------------------------------------
437    // Test 7 — chunked feed equivalence (same result in small increments)
438    // ------------------------------------------------------------------
439    #[test]
440    fn chunked_feed_equivalence() {
441        let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
442        let stream = concat_ts(&pkts);
443
444        let whole = {
445            let mut r = TsResync::new();
446            r.feed(&stream)
447        };
448
449        let chunked = {
450            let mut r = TsResync::new();
451            let mut out = Vec::new();
452            for chunk in stream.chunks(100) {
453                out.extend(r.feed(chunk));
454            }
455            out
456        };
457
458        assert_eq!(whole.len(), chunked.len());
459        for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
460            assert_eq!(w, c, "packet {i} mismatch");
461        }
462    }
463
464    // ------------------------------------------------------------------
465    // Test 8 — fixture-based: m6-single.ts differential
466    //   feeding the real fixture through TsResync must yield the same
467    //   188-byte packets as plain chunks_exact(188) + sync-byte filter.
468    // ------------------------------------------------------------------
469    #[test]
470    fn fixture_m6_matches_plain_chunks() {
471        let path = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/m6-single.ts");
472        let data = std::fs::read(path).expect("m6-single.ts fixture not found");
473
474        // Oracle: plain aligned 188-byte reads.
475        let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
476            .chunks_exact(TS_PACKET_SIZE)
477            .filter(|c| c[0] == TS_SYNC_BYTE)
478            .map(|c| c.try_into().unwrap())
479            .collect();
480        assert!(!oracle.is_empty(), "oracle empty — fixture empty?");
481
482        let mut r = TsResync::new();
483        let emitted = r.feed(&data);
484
485        assert_eq!(
486            emitted.len(),
487            oracle.len(),
488            "packet count: TsResync={} oracle={}",
489            emitted.len(),
490            oracle.len()
491        );
492        for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
493            assert_eq!(e, o, "packet {i} mismatch vs oracle");
494        }
495    }
496}