mpeg_ts/resync.rs
1//! Stateful TS byte-stream resynchroniser — ITU-T H.222.0 §2.4.3.2 (= ISO/IEC 13818-1 §2.4.3.2).
2//!
3//! Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
4//! (file reads, UDP payloads) that may start mid-packet or contain leading
5//! garbage. Also detects 204-byte Reed-Solomon-coded packets (DVB RS-coded
6//! outer forward-error-correction layer) and strips the 16 parity bytes,
7//! yielding standard 188-byte TS packets in both cases.
8//!
9//! # Feature gate
10//!
11//! This module is only compiled when the `ts` feature is enabled (the
12//! default), because it depends on the TS constants in [`crate::ts`].
13//!
14//! # Example
15//!
16//! ```
17//! use mpeg_ts::resync::TsResync;
18//!
19//! let mut r = TsResync::new();
20//! // Feed arbitrary bytes (file chunks, UDP datagrams, etc.).
21//! let packets: Vec<[u8; 188]> = r.feed(b"some raw bytes");
22//! let stats = r.stats();
23//! ```
24
25use alloc::vec::Vec;
26
27use crate::ts::{TS_PACKET_SIZE, TS_SYNC_BYTE};
28
29/// Reed-Solomon-coded TS packet size: 188-byte payload + 16 parity bytes
30/// (DVB RS outer FEC, ISO/IEC 13818-1 §2.4.3.2 informative note).
31pub const RS_PACKET_SIZE: usize = 204;
32
33/// Number of Reed-Solomon parity bytes appended to a 204-byte packet.
34pub const RS_PARITY_LEN: usize = RS_PACKET_SIZE - TS_PACKET_SIZE;
35
36/// Consecutive sync bytes at the candidate stride required to declare lock.
37pub const LOCK_CONFIRMATIONS: usize = 5;
38
39/// Detected packet size after locking.
40#[non_exhaustive]
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42#[cfg_attr(feature = "serde", derive(serde::Serialize))]
43pub enum PacketStride {
44 /// Standard 188-byte TS packets (ISO/IEC 13818-1 §2.4.3.2).
45 Ts188,
46 /// 204-byte packets (188-byte TS + 16 Reed-Solomon parity bytes).
47 Rs204,
48}
49
50impl PacketStride {
51 /// Short machine-readable label for this stride.
52 pub fn name(&self) -> &'static str {
53 match self {
54 PacketStride::Ts188 => "ts_188",
55 PacketStride::Rs204 => "rs_204",
56 }
57 }
58}
59
60broadcast_common::impl_spec_display!(PacketStride);
61
62/// Counters accumulated during resynchronisation.
63#[non_exhaustive]
64#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
65#[cfg_attr(feature = "serde", derive(serde::Serialize))]
66pub struct ResyncStats {
67 /// Total 188-byte TS packets emitted.
68 pub packets: u64,
69 /// Times sync was lost and reacquired.
70 pub resyncs: u64,
71 /// Bytes skipped/dropped (junk before lock + sync-loss bytes).
72 pub dropped_bytes: u64,
73}
74
75/// Stateful TS byte-stream resynchroniser (ISO/IEC 13818-1 §2.4.3.2).
76///
77/// Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
78/// that may start mid-packet or contain junk, and detects 204-byte
79/// Reed-Solomon-coded packets (stripping the 16 parity bytes).
80///
81/// Feed raw bytes with [`feed`](Self::feed); each call returns a `Vec` of
82/// aligned 188-byte TS packets. Bytes are buffered across calls so that
83/// packet boundaries may span call boundaries freely.
84///
85/// Lock is declared after [`LOCK_CONFIRMATIONS`] consecutive sync bytes are
86/// found at the candidate stride (188 or 204). On sync loss the resynchroniser
87/// re-scans from the byte after the lost position.
88///
89/// # Stats
90///
91/// [`stats`](Self::stats) returns cumulative counters (packets emitted,
92/// resyncs, dropped bytes).
93#[derive(Debug, Default)]
94pub struct TsResync {
95 buf: Vec<u8>,
96 /// Logical read head into `buf`; compacted periodically.
97 head: usize,
98 stride: Option<PacketStride>,
99 stats: ResyncStats,
100}
101
102impl TsResync {
103 /// Create a new resynchroniser with an empty internal buffer.
104 pub fn new() -> Self {
105 Self::default()
106 }
107
108 /// Feed `data` and emit every newly-aligned 188-byte TS packet.
109 ///
110 /// For a 204-byte stream the 16 Reed-Solomon parity bytes are stripped;
111 /// only the 188-byte TS payload is returned. Bytes that cannot yet form
112 /// a complete packet (or fall before lock) are buffered for the next call.
113 pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
114 self.buf.extend_from_slice(data);
115 let mut emitted = Vec::new();
116
117 loop {
118 match self.stride {
119 None => {
120 if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
121 self.stats.dropped_bytes += offset as u64;
122 self.head += offset;
123 self.stride = Some(s);
124 } else {
125 // Keep at most enough bytes to detect a future lock.
126 let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
127 let tail_len = self.buf.len() - self.head;
128 if tail_len > keep {
129 let excess = tail_len - keep;
130 self.stats.dropped_bytes += excess as u64;
131 self.head += excess;
132 }
133 self.compact();
134 return emitted;
135 }
136 }
137 Some(stride) => {
138 let s = match stride {
139 PacketStride::Ts188 => TS_PACKET_SIZE,
140 PacketStride::Rs204 => RS_PACKET_SIZE,
141 };
142 let tail_len = self.buf.len() - self.head;
143 if tail_len < s {
144 self.compact();
145 return emitted;
146 }
147 if self.buf[self.head] == TS_SYNC_BYTE {
148 let mut packet = [0u8; TS_PACKET_SIZE];
149 packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
150 emitted.push(packet);
151 self.head += s;
152 self.stats.packets += 1;
153 } else {
154 // Sync byte missing — record the loss and re-scan.
155 self.stats.resyncs += 1;
156 self.stats.dropped_bytes += 1;
157 self.head += 1;
158 self.stride = None;
159 }
160 }
161 }
162 }
163 }
164
165 /// Detected packet stride, or [`None`] before the stream has locked.
166 pub fn stride(&self) -> Option<PacketStride> {
167 self.stride
168 }
169
170 /// Accumulated statistics.
171 pub fn stats(&self) -> ResyncStats {
172 self.stats
173 }
174
175 /// Compact the internal buffer by discarding consumed bytes.
176 fn compact(&mut self) {
177 if self.head > 0 {
178 self.buf.drain(..self.head);
179 self.head = 0;
180 }
181 }
182}
183
184/// Scan `buf` for the smallest offset `o` such that stride `S` (tried 188
185/// first, then 204) yields [`LOCK_CONFIRMATIONS`] consecutive sync bytes at
186/// positions `o + k*S` for `k = 0 .. LOCK_CONFIRMATIONS`.
187///
188/// Returns `(offset, stride)` on success, or `None` if no lock is found.
189fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
190 for o in 0..buf.len() {
191 if buf[o] != TS_SYNC_BYTE {
192 continue;
193 }
194 if try_stride(buf, o, TS_PACKET_SIZE) {
195 return Some((o, PacketStride::Ts188));
196 }
197 if try_stride(buf, o, RS_PACKET_SIZE) {
198 return Some((o, PacketStride::Rs204));
199 }
200 }
201 None
202}
203
204/// Return `true` if `LOCK_CONFIRMATIONS` consecutive sync bytes exist at
205/// stride `s` starting from `offset` (the first is already known to be
206/// [`TS_SYNC_BYTE`]).
207fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
208 for k in 1..LOCK_CONFIRMATIONS {
209 let pos = offset + k * s;
210 if pos >= buf.len() || buf[pos] != TS_SYNC_BYTE {
211 return false;
212 }
213 }
214 true
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use alloc::vec;
221 use alloc::vec::Vec;
222 extern crate std;
223
224 /// Build a 188-byte TS packet starting with `TS_SYNC_BYTE` followed by
225 /// `tag` (repeated). All non-sync bytes are kept away from `0x47` so
226 /// that no false lock occurs in the fixture data.
227 fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
228 assert_ne!(tag, TS_SYNC_BYTE, "tag must not equal sync byte");
229 let mut pkt = [tag; TS_PACKET_SIZE];
230 pkt[0] = TS_SYNC_BYTE;
231 pkt
232 }
233
234 /// Build a 204-byte RS-coded packet: 188-byte TS payload (with `ts_tag`)
235 /// plus 16 parity bytes filled with `parity` (must not be `0x47`).
236 fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
237 assert_ne!(ts_tag, TS_SYNC_BYTE);
238 assert_ne!(parity, TS_SYNC_BYTE);
239 let mut pkt = [parity; RS_PACKET_SIZE];
240 pkt[0] = TS_SYNC_BYTE;
241 pkt[1..TS_PACKET_SIZE].fill(ts_tag);
242 pkt
243 }
244
245 /// Concatenate several 188-byte packet arrays into a flat `Vec<u8>`.
246 fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
247 let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
248 for p in packets {
249 v.extend_from_slice(p);
250 }
251 v
252 }
253
254 // ------------------------------------------------------------------
255 // Test 1 — aligned 188-byte passthrough
256 // ------------------------------------------------------------------
257 #[test]
258 fn aligned_188_passthrough() {
259 let packets: Vec<_> = (0u8..5).map(|i| ts_packet(i + 1)).collect();
260 let data = concat_ts(&packets);
261
262 let mut r = TsResync::new();
263 let emitted = r.feed(&data);
264
265 assert_eq!(emitted.len(), 5);
266 for (i, (e, p)) in emitted.iter().zip(packets.iter()).enumerate() {
267 assert_eq!(e, p, "packet {i} mismatch");
268 }
269 assert_eq!(r.stride(), Some(PacketStride::Ts188));
270 let s = r.stats();
271 assert_eq!(s.packets, 5);
272 assert_eq!(s.resyncs, 0);
273 assert_eq!(s.dropped_bytes, 0);
274 }
275
276 // ------------------------------------------------------------------
277 // Test 2 — lock only after LOCK_CONFIRMATIONS sync bytes
278 // ------------------------------------------------------------------
279 #[test]
280 fn requires_n_confirmations_before_lock() {
281 // Pin the default lock window. The hardcoded 4-vs-5 boundary below only
282 // bites the threshold when this is 5 — assert it explicitly so a change
283 // to the default (or to this test's literals) cannot silently pass.
284 assert_eq!(
285 LOCK_CONFIRMATIONS, 5,
286 "this test pins the default lock window of 5"
287 );
288
289 // FOUR confirming, stride-aligned packets are one short of the window:
290 // only four sync bytes sit at the 188-stride boundaries, so the
291 // resynchroniser must NOT lock and must emit nothing (it buffers).
292 let four = concat_ts(&(1u8..=4).map(ts_packet).collect::<Vec<_>>());
293 let mut r = TsResync::new();
294 assert_eq!(
295 r.feed(&four).len(),
296 0,
297 "4 confirmations (< 5) must not lock or emit"
298 );
299 assert_eq!(r.stride(), None, "stride must remain None below the window");
300
301 // The FIFTH stride-aligned sync byte completes the window → lock.
302 let mut out = r.feed(&ts_packet(5));
303 out.extend(r.feed(&[]));
304 assert!(
305 r.stride().is_some(),
306 "the 5th confirmation must trigger lock"
307 );
308 // Once locked, the buffered packets are emitted (5 fed, all aligned).
309 assert_eq!(out.len(), 5, "all five aligned packets emit once locked");
310 }
311
312 // ------------------------------------------------------------------
313 // Test 3 — junk prefix: leading garbage dropped, correct count returned
314 // ------------------------------------------------------------------
315 #[test]
316 fn junk_prefix_correct_dropped_count() {
317 let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
318 let stream = concat_ts(&pkts);
319
320 let junk_len = 7usize;
321 let junk: Vec<u8> = vec![0x00; junk_len];
322 let mut data = junk;
323 data.extend_from_slice(&stream);
324
325 let mut r = TsResync::new();
326 let emitted = r.feed(&data);
327
328 assert_eq!(emitted.len(), 6);
329 for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
330 assert_eq!(*e, *p, "packet {i} mismatch after junk prefix");
331 }
332 let s = r.stats();
333 assert_eq!(
334 s.dropped_bytes, junk_len as u64,
335 "dropped bytes must equal junk prefix"
336 );
337 assert_eq!(s.resyncs, 0);
338 assert_eq!(s.packets, 6);
339 }
340
341 // ------------------------------------------------------------------
342 // Test 4 — mid-stream sync loss and reacquisition
343 // ------------------------------------------------------------------
344 #[test]
345 fn midstream_loss_resync() {
346 // Need > LOCK_CONFIRMATIONS clean packets before and after the stray
347 // byte so the stream locks, loses sync, and re-locks.
348 let pkts: Vec<_> = (0u8..14).map(|i| ts_packet(i + 1)).collect();
349 let clean = concat_ts(&pkts);
350
351 // Insert a single stray byte 12 bytes into packet 7.
352 let insert_at = 7 * TS_PACKET_SIZE + 12;
353 let mut data = clean[..insert_at].to_vec();
354 data.push(0x00);
355 data.extend_from_slice(&clean[insert_at..]);
356
357 let mut r = TsResync::new();
358 let emitted = r.feed(&data);
359
360 let s = r.stats();
361 assert!(
362 s.resyncs >= 1,
363 "mid-stream corruption must trigger a resync, got {}",
364 s.resyncs
365 );
366 assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
367 assert!(
368 emitted.len() >= 10,
369 "should recover and emit most packets, got {}",
370 emitted.len()
371 );
372 }
373
374 // ------------------------------------------------------------------
375 // Test 5 — 204-byte RS-coded packets detected + RS stripped
376 // ------------------------------------------------------------------
377 #[test]
378 fn rs204_detected_and_stripped() {
379 let mut stream = Vec::new();
380 let mut expected = Vec::new();
381 for i in 0u8..6 {
382 let tag = i + 1;
383 let rs = rs_packet(tag, 0xFF);
384 stream.extend_from_slice(&rs);
385 expected.push(ts_packet(tag));
386 }
387
388 let mut r = TsResync::new();
389 let emitted = r.feed(&stream);
390
391 assert_eq!(emitted.len(), 6, "RS-coded stream must emit 6 packets");
392 assert_eq!(
393 r.stride(),
394 Some(PacketStride::Rs204),
395 "stride must be Rs204"
396 );
397 for (i, (e, p)) in emitted.iter().zip(expected.iter()).enumerate() {
398 assert_eq!(e, p, "RS-stripped packet {i} mismatch");
399 }
400 // Confirm the emitted 188-byte packets are parseable by TsPacket.
401 for (i, pkt) in emitted.iter().enumerate() {
402 crate::ts::TsPacket::parse(pkt)
403 .unwrap_or_else(|e| panic!("RS-stripped packet {i} TsPacket::parse failed: {e}"));
404 }
405 let s = r.stats();
406 assert_eq!(s.packets, 6);
407 assert_eq!(s.resyncs, 0);
408 assert_eq!(s.dropped_bytes, 0);
409 }
410
411 // ------------------------------------------------------------------
412 // Test 6 — aligned 188 stream yields same packets as plain chunks(188)
413 // (equivalence: fixture-less variant using synthetic data)
414 // ------------------------------------------------------------------
415 #[test]
416 fn aligned_188_matches_plain_chunks() {
417 let pkts: Vec<_> = (0u8..10).map(|i| ts_packet(i + 1)).collect();
418 let data = concat_ts(&pkts);
419
420 // Oracle: plain chunks(188) filtered by sync byte.
421 let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
422 .chunks_exact(TS_PACKET_SIZE)
423 .filter(|c| c[0] == TS_SYNC_BYTE)
424 .map(|c| c.try_into().unwrap())
425 .collect();
426
427 let mut r = TsResync::new();
428 let emitted = r.feed(&data);
429
430 assert_eq!(emitted.len(), oracle.len(), "count mismatch");
431 for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
432 assert_eq!(e, o, "packet {i} differs from chunks-oracle");
433 }
434 }
435
436 // ------------------------------------------------------------------
437 // Test 7 — chunked feed equivalence (same result in small increments)
438 // ------------------------------------------------------------------
439 #[test]
440 fn chunked_feed_equivalence() {
441 let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
442 let stream = concat_ts(&pkts);
443
444 let whole = {
445 let mut r = TsResync::new();
446 r.feed(&stream)
447 };
448
449 let chunked = {
450 let mut r = TsResync::new();
451 let mut out = Vec::new();
452 for chunk in stream.chunks(100) {
453 out.extend(r.feed(chunk));
454 }
455 out
456 };
457
458 assert_eq!(whole.len(), chunked.len());
459 for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
460 assert_eq!(w, c, "packet {i} mismatch");
461 }
462 }
463
464 // ------------------------------------------------------------------
465 // Test 8 — fixture-based: m6-single.ts differential
466 // feeding the real fixture through TsResync must yield the same
467 // 188-byte packets as plain chunks_exact(188) + sync-byte filter.
468 // ------------------------------------------------------------------
469 #[test]
470 fn fixture_m6_matches_plain_chunks() {
471 let path = concat!(env!("CARGO_MANIFEST_DIR"), "/../fixtures/ts/m6-single.ts");
472 let data = std::fs::read(path).expect("m6-single.ts fixture not found");
473
474 // Oracle: plain aligned 188-byte reads.
475 let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
476 .chunks_exact(TS_PACKET_SIZE)
477 .filter(|c| c[0] == TS_SYNC_BYTE)
478 .map(|c| c.try_into().unwrap())
479 .collect();
480 assert!(!oracle.is_empty(), "oracle empty — fixture empty?");
481
482 let mut r = TsResync::new();
483 let emitted = r.feed(&data);
484
485 assert_eq!(
486 emitted.len(),
487 oracle.len(),
488 "packet count: TsResync={} oracle={}",
489 emitted.len(),
490 oracle.len()
491 );
492 for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
493 assert_eq!(e, o, "packet {i} mismatch vs oracle");
494 }
495 }
496}