dvb_si/resync.rs
1//! Stateful TS byte-stream resynchroniser — ISO/IEC 13818-1 §2.4.3.2.
2//!
3//! Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
4//! (file reads, UDP payloads) that may start mid-packet or contain leading
5//! garbage. Also detects 204-byte Reed-Solomon-coded packets (DVB RS-coded
6//! outer forward-error-correction layer) and strips the 16 parity bytes,
7//! yielding standard 188-byte TS packets in both cases.
8//!
9//! # Feature gate
10//!
11//! This module is only compiled when the `ts` feature is enabled (the
12//! default), because it depends on the TS constants in [`crate::ts`].
13//!
14//! # Example
15//!
16//! ```
17//! use dvb_si::resync::TsResync;
18//!
19//! let mut r = TsResync::new();
20//! // Feed arbitrary bytes (file chunks, UDP datagrams, etc.).
21//! let packets: Vec<[u8; 188]> = r.feed(b"some raw bytes");
22//! let stats = r.stats();
23//! ```
24
25use alloc::vec::Vec;
26
27use crate::ts::{TS_PACKET_SIZE, TS_SYNC_BYTE};
28
29/// Reed-Solomon-coded TS packet size: 188-byte payload + 16 parity bytes
30/// (DVB RS outer FEC, ISO/IEC 13818-1 §2.4.3.2 informative note).
31pub const RS_PACKET_SIZE: usize = 204;
32
33/// Number of Reed-Solomon parity bytes appended to a 204-byte packet.
34pub const RS_PARITY_LEN: usize = RS_PACKET_SIZE - TS_PACKET_SIZE;
35
36/// Consecutive sync bytes at the candidate stride required to declare lock.
37pub const LOCK_CONFIRMATIONS: usize = 5;
38
39/// Detected packet size after locking.
40#[non_exhaustive]
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42#[cfg_attr(feature = "serde", derive(serde::Serialize))]
43pub enum PacketStride {
44 /// Standard 188-byte TS packets (ISO/IEC 13818-1 §2.4.3.2).
45 Ts188,
46 /// 204-byte packets (188-byte TS + 16 Reed-Solomon parity bytes).
47 Rs204,
48}
49
50/// Counters accumulated during resynchronisation.
51#[non_exhaustive]
52#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
53#[cfg_attr(feature = "serde", derive(serde::Serialize))]
54pub struct ResyncStats {
55 /// Total 188-byte TS packets emitted.
56 pub packets: u64,
57 /// Times sync was lost and reacquired.
58 pub resyncs: u64,
59 /// Bytes skipped/dropped (junk before lock + sync-loss bytes).
60 pub dropped_bytes: u64,
61}
62
63/// Stateful TS byte-stream resynchroniser (ISO/IEC 13818-1 §2.4.3.2).
64///
65/// Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
66/// that may start mid-packet or contain junk, and detects 204-byte
67/// Reed-Solomon-coded packets (stripping the 16 parity bytes).
68///
69/// Feed raw bytes with [`feed`](Self::feed); each call returns a `Vec` of
70/// aligned 188-byte TS packets. Bytes are buffered across calls so that
71/// packet boundaries may span call boundaries freely.
72///
73/// Lock is declared after [`LOCK_CONFIRMATIONS`] consecutive sync bytes are
74/// found at the candidate stride (188 or 204). On sync loss the resynchroniser
75/// re-scans from the byte after the lost position.
76///
77/// # Stats
78///
79/// [`stats`](Self::stats) returns cumulative counters (packets emitted,
80/// resyncs, dropped bytes).
81#[derive(Debug, Default)]
82pub struct TsResync {
83 buf: Vec<u8>,
84 /// Logical read head into `buf`; compacted periodically.
85 head: usize,
86 stride: Option<PacketStride>,
87 stats: ResyncStats,
88}
89
90impl TsResync {
91 /// Create a new resynchroniser with an empty internal buffer.
92 pub fn new() -> Self {
93 Self::default()
94 }
95
96 /// Feed `data` and emit every newly-aligned 188-byte TS packet.
97 ///
98 /// For a 204-byte stream the 16 Reed-Solomon parity bytes are stripped;
99 /// only the 188-byte TS payload is returned. Bytes that cannot yet form
100 /// a complete packet (or fall before lock) are buffered for the next call.
101 pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
102 self.buf.extend_from_slice(data);
103 let mut emitted = Vec::new();
104
105 loop {
106 match self.stride {
107 None => {
108 if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
109 self.stats.dropped_bytes += offset as u64;
110 self.head += offset;
111 self.stride = Some(s);
112 } else {
113 // Keep at most enough bytes to detect a future lock.
114 let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
115 let tail_len = self.buf.len() - self.head;
116 if tail_len > keep {
117 let excess = tail_len - keep;
118 self.stats.dropped_bytes += excess as u64;
119 self.head += excess;
120 }
121 self.compact();
122 return emitted;
123 }
124 }
125 Some(stride) => {
126 let s = match stride {
127 PacketStride::Ts188 => TS_PACKET_SIZE,
128 PacketStride::Rs204 => RS_PACKET_SIZE,
129 };
130 let tail_len = self.buf.len() - self.head;
131 if tail_len < s {
132 self.compact();
133 return emitted;
134 }
135 if self.buf[self.head] == TS_SYNC_BYTE {
136 let mut packet = [0u8; TS_PACKET_SIZE];
137 packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
138 emitted.push(packet);
139 self.head += s;
140 self.stats.packets += 1;
141 } else {
142 // Sync byte missing — record the loss and re-scan.
143 self.stats.resyncs += 1;
144 self.stats.dropped_bytes += 1;
145 self.head += 1;
146 self.stride = None;
147 }
148 }
149 }
150 }
151 }
152
153 /// Detected packet stride, or [`None`] before the stream has locked.
154 pub fn stride(&self) -> Option<PacketStride> {
155 self.stride
156 }
157
158 /// Accumulated statistics.
159 pub fn stats(&self) -> ResyncStats {
160 self.stats
161 }
162
163 /// Compact the internal buffer by discarding consumed bytes.
164 fn compact(&mut self) {
165 if self.head > 0 {
166 self.buf.drain(..self.head);
167 self.head = 0;
168 }
169 }
170}
171
172/// Scan `buf` for the smallest offset `o` such that stride `S` (tried 188
173/// first, then 204) yields [`LOCK_CONFIRMATIONS`] consecutive sync bytes at
174/// positions `o + k*S` for `k = 0 .. LOCK_CONFIRMATIONS`.
175///
176/// Returns `(offset, stride)` on success, or `None` if no lock is found.
177fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
178 for o in 0..buf.len() {
179 if buf[o] != TS_SYNC_BYTE {
180 continue;
181 }
182 if try_stride(buf, o, TS_PACKET_SIZE) {
183 return Some((o, PacketStride::Ts188));
184 }
185 if try_stride(buf, o, RS_PACKET_SIZE) {
186 return Some((o, PacketStride::Rs204));
187 }
188 }
189 None
190}
191
192/// Return `true` if `LOCK_CONFIRMATIONS` consecutive sync bytes exist at
193/// stride `s` starting from `offset` (the first is already known to be
194/// [`TS_SYNC_BYTE`]).
195fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
196 for k in 1..LOCK_CONFIRMATIONS {
197 let pos = offset + k * s;
198 if pos >= buf.len() || buf[pos] != TS_SYNC_BYTE {
199 return false;
200 }
201 }
202 true
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208
209 /// Build a 188-byte TS packet starting with `TS_SYNC_BYTE` followed by
210 /// `tag` (repeated). All non-sync bytes are kept away from `0x47` so
211 /// that no false lock occurs in the fixture data.
212 fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
213 assert_ne!(tag, TS_SYNC_BYTE, "tag must not equal sync byte");
214 let mut pkt = [tag; TS_PACKET_SIZE];
215 pkt[0] = TS_SYNC_BYTE;
216 pkt
217 }
218
219 /// Build a 204-byte RS-coded packet: 188-byte TS payload (with `ts_tag`)
220 /// plus 16 parity bytes filled with `parity` (must not be `0x47`).
221 fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
222 assert_ne!(ts_tag, TS_SYNC_BYTE);
223 assert_ne!(parity, TS_SYNC_BYTE);
224 let mut pkt = [parity; RS_PACKET_SIZE];
225 pkt[0] = TS_SYNC_BYTE;
226 pkt[1..TS_PACKET_SIZE].fill(ts_tag);
227 pkt
228 }
229
230 /// Concatenate several 188-byte packet arrays into a flat `Vec<u8>`.
231 fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
232 let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
233 for p in packets {
234 v.extend_from_slice(p);
235 }
236 v
237 }
238
239 // ------------------------------------------------------------------
240 // Test 1 — aligned 188-byte passthrough
241 // ------------------------------------------------------------------
242 #[test]
243 fn aligned_188_passthrough() {
244 let packets: Vec<_> = (0u8..5).map(|i| ts_packet(i + 1)).collect();
245 let data = concat_ts(&packets);
246
247 let mut r = TsResync::new();
248 let emitted = r.feed(&data);
249
250 assert_eq!(emitted.len(), 5);
251 for (i, (e, p)) in emitted.iter().zip(packets.iter()).enumerate() {
252 assert_eq!(e, p, "packet {i} mismatch");
253 }
254 assert_eq!(r.stride(), Some(PacketStride::Ts188));
255 let s = r.stats();
256 assert_eq!(s.packets, 5);
257 assert_eq!(s.resyncs, 0);
258 assert_eq!(s.dropped_bytes, 0);
259 }
260
261 // ------------------------------------------------------------------
262 // Test 2 — lock only after LOCK_CONFIRMATIONS sync bytes
263 // ------------------------------------------------------------------
264 #[test]
265 fn requires_n_confirmations_before_lock() {
266 // Pin the default lock window. The hardcoded 4-vs-5 boundary below only
267 // bites the threshold when this is 5 — assert it explicitly so a change
268 // to the default (or to this test's literals) cannot silently pass.
269 assert_eq!(
270 LOCK_CONFIRMATIONS, 5,
271 "this test pins the default lock window of 5"
272 );
273
274 // FOUR confirming, stride-aligned packets are one short of the window:
275 // only four sync bytes sit at the 188-stride boundaries, so the
276 // resynchroniser must NOT lock and must emit nothing (it buffers).
277 let four = concat_ts(&(1u8..=4).map(ts_packet).collect::<Vec<_>>());
278 let mut r = TsResync::new();
279 assert_eq!(
280 r.feed(&four).len(),
281 0,
282 "4 confirmations (< 5) must not lock or emit"
283 );
284 assert_eq!(r.stride(), None, "stride must remain None below the window");
285
286 // The FIFTH stride-aligned sync byte completes the window → lock.
287 let mut out = r.feed(&ts_packet(5));
288 out.extend(r.feed(&[]));
289 assert!(
290 r.stride().is_some(),
291 "the 5th confirmation must trigger lock"
292 );
293 // Once locked, the buffered packets are emitted (5 fed, all aligned).
294 assert_eq!(out.len(), 5, "all five aligned packets emit once locked");
295 }
296
297 // ------------------------------------------------------------------
298 // Test 3 — junk prefix: leading garbage dropped, correct count returned
299 // ------------------------------------------------------------------
300 #[test]
301 fn junk_prefix_correct_dropped_count() {
302 let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
303 let stream = concat_ts(&pkts);
304
305 let junk_len = 7usize;
306 let junk: Vec<u8> = vec![0x00; junk_len];
307 let mut data = junk;
308 data.extend_from_slice(&stream);
309
310 let mut r = TsResync::new();
311 let emitted = r.feed(&data);
312
313 assert_eq!(emitted.len(), 6);
314 for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
315 assert_eq!(*e, *p, "packet {i} mismatch after junk prefix");
316 }
317 let s = r.stats();
318 assert_eq!(
319 s.dropped_bytes, junk_len as u64,
320 "dropped bytes must equal junk prefix"
321 );
322 assert_eq!(s.resyncs, 0);
323 assert_eq!(s.packets, 6);
324 }
325
326 // ------------------------------------------------------------------
327 // Test 4 — mid-stream sync loss and reacquisition
328 // ------------------------------------------------------------------
329 #[test]
330 fn midstream_loss_resync() {
331 // Need > LOCK_CONFIRMATIONS clean packets before and after the stray
332 // byte so the stream locks, loses sync, and re-locks.
333 let pkts: Vec<_> = (0u8..14).map(|i| ts_packet(i + 1)).collect();
334 let clean = concat_ts(&pkts);
335
336 // Insert a single stray byte 12 bytes into packet 7.
337 let insert_at = 7 * TS_PACKET_SIZE + 12;
338 let mut data = clean[..insert_at].to_vec();
339 data.push(0x00);
340 data.extend_from_slice(&clean[insert_at..]);
341
342 let mut r = TsResync::new();
343 let emitted = r.feed(&data);
344
345 let s = r.stats();
346 assert!(
347 s.resyncs >= 1,
348 "mid-stream corruption must trigger a resync, got {}",
349 s.resyncs
350 );
351 assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
352 assert!(
353 emitted.len() >= 10,
354 "should recover and emit most packets, got {}",
355 emitted.len()
356 );
357 }
358
359 // ------------------------------------------------------------------
360 // Test 5 — 204-byte RS-coded packets detected + RS stripped
361 // ------------------------------------------------------------------
362 #[test]
363 fn rs204_detected_and_stripped() {
364 let mut stream = Vec::new();
365 let mut expected = Vec::new();
366 for i in 0u8..6 {
367 let tag = i + 1;
368 let rs = rs_packet(tag, 0xFF);
369 stream.extend_from_slice(&rs);
370 expected.push(ts_packet(tag));
371 }
372
373 let mut r = TsResync::new();
374 let emitted = r.feed(&stream);
375
376 assert_eq!(emitted.len(), 6, "RS-coded stream must emit 6 packets");
377 assert_eq!(
378 r.stride(),
379 Some(PacketStride::Rs204),
380 "stride must be Rs204"
381 );
382 for (i, (e, p)) in emitted.iter().zip(expected.iter()).enumerate() {
383 assert_eq!(e, p, "RS-stripped packet {i} mismatch");
384 }
385 // Confirm the emitted 188-byte packets are parseable by TsPacket.
386 for (i, pkt) in emitted.iter().enumerate() {
387 crate::ts::TsPacket::parse(pkt)
388 .unwrap_or_else(|e| panic!("RS-stripped packet {i} TsPacket::parse failed: {e}"));
389 }
390 let s = r.stats();
391 assert_eq!(s.packets, 6);
392 assert_eq!(s.resyncs, 0);
393 assert_eq!(s.dropped_bytes, 0);
394 }
395
396 // ------------------------------------------------------------------
397 // Test 6 — aligned 188 stream yields same packets as plain chunks(188)
398 // (equivalence: fixture-less variant using synthetic data)
399 // ------------------------------------------------------------------
400 #[test]
401 fn aligned_188_matches_plain_chunks() {
402 let pkts: Vec<_> = (0u8..10).map(|i| ts_packet(i + 1)).collect();
403 let data = concat_ts(&pkts);
404
405 // Oracle: plain chunks(188) filtered by sync byte.
406 let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
407 .chunks_exact(TS_PACKET_SIZE)
408 .filter(|c| c[0] == TS_SYNC_BYTE)
409 .map(|c| c.try_into().unwrap())
410 .collect();
411
412 let mut r = TsResync::new();
413 let emitted = r.feed(&data);
414
415 assert_eq!(emitted.len(), oracle.len(), "count mismatch");
416 for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
417 assert_eq!(e, o, "packet {i} differs from chunks-oracle");
418 }
419 }
420
421 // ------------------------------------------------------------------
422 // Test 7 — chunked feed equivalence (same result in small increments)
423 // ------------------------------------------------------------------
424 #[test]
425 fn chunked_feed_equivalence() {
426 let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
427 let stream = concat_ts(&pkts);
428
429 let whole = {
430 let mut r = TsResync::new();
431 r.feed(&stream)
432 };
433
434 let chunked = {
435 let mut r = TsResync::new();
436 let mut out = Vec::new();
437 for chunk in stream.chunks(100) {
438 out.extend(r.feed(chunk));
439 }
440 out
441 };
442
443 assert_eq!(whole.len(), chunked.len());
444 for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
445 assert_eq!(w, c, "packet {i} mismatch");
446 }
447 }
448
449 // ------------------------------------------------------------------
450 // Test 8 — fixture-based: m6-single.ts differential
451 // feeding the real fixture through TsResync must yield the same
452 // 188-byte packets as plain chunks_exact(188) + sync-byte filter.
453 // ------------------------------------------------------------------
454 #[test]
455 fn fixture_m6_matches_plain_chunks() {
456 let path = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/m6-single.ts");
457 let data = std::fs::read(path).expect("m6-single.ts fixture not found");
458
459 // Oracle: plain aligned 188-byte reads.
460 let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
461 .chunks_exact(TS_PACKET_SIZE)
462 .filter(|c| c[0] == TS_SYNC_BYTE)
463 .map(|c| c.try_into().unwrap())
464 .collect();
465 assert!(!oracle.is_empty(), "oracle empty — fixture empty?");
466
467 let mut r = TsResync::new();
468 let emitted = r.feed(&data);
469
470 assert_eq!(
471 emitted.len(),
472 oracle.len(),
473 "packet count: TsResync={} oracle={}",
474 emitted.len(),
475 oracle.len()
476 );
477 for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
478 assert_eq!(e, o, "packet {i} mismatch vs oracle");
479 }
480 }
481}