dvb_si/resync.rs
1//! Stateful TS byte-stream resynchroniser — ISO/IEC 13818-1 §2.4.3.2.
2//!
3//! Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
4//! (file reads, UDP payloads) that may start mid-packet or contain leading
5//! garbage. Also detects 204-byte Reed-Solomon-coded packets (DVB RS-coded
6//! outer forward-error-correction layer) and strips the 16 parity bytes,
7//! yielding standard 188-byte TS packets in both cases.
8//!
9//! # Feature gate
10//!
11//! This module is only compiled when the `ts` feature is enabled (the
12//! default), because it depends on the TS constants in [`crate::ts`].
13//!
14//! # Example
15//!
16//! ```
17//! use dvb_si::resync::TsResync;
18//!
19//! let mut r = TsResync::new();
20//! // Feed arbitrary bytes (file chunks, UDP datagrams, etc.).
21//! let packets: Vec<[u8; 188]> = r.feed(b"some raw bytes");
22//! let stats = r.stats();
23//! ```
24
25use crate::ts::{TS_PACKET_SIZE, TS_SYNC_BYTE};
26
27/// Reed-Solomon-coded TS packet size: 188-byte payload + 16 parity bytes
28/// (DVB RS outer FEC, ISO/IEC 13818-1 §2.4.3.2 informative note).
29pub const RS_PACKET_SIZE: usize = 204;
30
31/// Number of Reed-Solomon parity bytes appended to a 204-byte packet.
32pub const RS_PARITY_LEN: usize = RS_PACKET_SIZE - TS_PACKET_SIZE;
33
34/// Consecutive sync bytes at the candidate stride required to declare lock.
35pub const LOCK_CONFIRMATIONS: usize = 5;
36
37/// Detected packet size after locking.
38#[non_exhaustive]
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize))]
41pub enum PacketStride {
42 /// Standard 188-byte TS packets (ISO/IEC 13818-1 §2.4.3.2).
43 Ts188,
44 /// 204-byte packets (188-byte TS + 16 Reed-Solomon parity bytes).
45 Rs204,
46}
47
48/// Counters accumulated during resynchronisation.
49#[non_exhaustive]
50#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize))]
52pub struct ResyncStats {
53 /// Total 188-byte TS packets emitted.
54 pub packets: u64,
55 /// Times sync was lost and reacquired.
56 pub resyncs: u64,
57 /// Bytes skipped/dropped (junk before lock + sync-loss bytes).
58 pub dropped_bytes: u64,
59}
60
61/// Stateful TS byte-stream resynchroniser (ISO/IEC 13818-1 §2.4.3.2).
62///
63/// Recovers 188-byte MPEG-TS packet alignment from an arbitrary byte stream
64/// that may start mid-packet or contain junk, and detects 204-byte
65/// Reed-Solomon-coded packets (stripping the 16 parity bytes).
66///
67/// Feed raw bytes with [`feed`](Self::feed); each call returns a `Vec` of
68/// aligned 188-byte TS packets. Bytes are buffered across calls so that
69/// packet boundaries may span call boundaries freely.
70///
71/// Lock is declared after [`LOCK_CONFIRMATIONS`] consecutive sync bytes are
72/// found at the candidate stride (188 or 204). On sync loss the resynchroniser
73/// re-scans from the byte after the lost position.
74///
75/// # Stats
76///
77/// [`stats`](Self::stats) returns cumulative counters (packets emitted,
78/// resyncs, dropped bytes).
79#[derive(Debug, Default)]
80pub struct TsResync {
81 buf: Vec<u8>,
82 /// Logical read head into `buf`; compacted periodically.
83 head: usize,
84 stride: Option<PacketStride>,
85 stats: ResyncStats,
86}
87
88impl TsResync {
89 /// Create a new resynchroniser with an empty internal buffer.
90 pub fn new() -> Self {
91 Self::default()
92 }
93
94 /// Feed `data` and emit every newly-aligned 188-byte TS packet.
95 ///
96 /// For a 204-byte stream the 16 Reed-Solomon parity bytes are stripped;
97 /// only the 188-byte TS payload is returned. Bytes that cannot yet form
98 /// a complete packet (or fall before lock) are buffered for the next call.
99 pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
100 self.buf.extend_from_slice(data);
101 let mut emitted = Vec::new();
102
103 loop {
104 match self.stride {
105 None => {
106 if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
107 self.stats.dropped_bytes += offset as u64;
108 self.head += offset;
109 self.stride = Some(s);
110 } else {
111 // Keep at most enough bytes to detect a future lock.
112 let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
113 let tail_len = self.buf.len() - self.head;
114 if tail_len > keep {
115 let excess = tail_len - keep;
116 self.stats.dropped_bytes += excess as u64;
117 self.head += excess;
118 }
119 self.compact();
120 return emitted;
121 }
122 }
123 Some(stride) => {
124 let s = match stride {
125 PacketStride::Ts188 => TS_PACKET_SIZE,
126 PacketStride::Rs204 => RS_PACKET_SIZE,
127 };
128 let tail_len = self.buf.len() - self.head;
129 if tail_len < s {
130 self.compact();
131 return emitted;
132 }
133 if self.buf[self.head] == TS_SYNC_BYTE {
134 let mut packet = [0u8; TS_PACKET_SIZE];
135 packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
136 emitted.push(packet);
137 self.head += s;
138 self.stats.packets += 1;
139 } else {
140 // Sync byte missing — record the loss and re-scan.
141 self.stats.resyncs += 1;
142 self.stats.dropped_bytes += 1;
143 self.head += 1;
144 self.stride = None;
145 }
146 }
147 }
148 }
149 }
150
151 /// Detected packet stride, or [`None`] before the stream has locked.
152 pub fn stride(&self) -> Option<PacketStride> {
153 self.stride
154 }
155
156 /// Accumulated statistics.
157 pub fn stats(&self) -> ResyncStats {
158 self.stats
159 }
160
161 /// Compact the internal buffer by discarding consumed bytes.
162 fn compact(&mut self) {
163 if self.head > 0 {
164 self.buf.drain(..self.head);
165 self.head = 0;
166 }
167 }
168}
169
170/// Scan `buf` for the smallest offset `o` such that stride `S` (tried 188
171/// first, then 204) yields [`LOCK_CONFIRMATIONS`] consecutive sync bytes at
172/// positions `o + k*S` for `k = 0 .. LOCK_CONFIRMATIONS`.
173///
174/// Returns `(offset, stride)` on success, or `None` if no lock is found.
175fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
176 for o in 0..buf.len() {
177 if buf[o] != TS_SYNC_BYTE {
178 continue;
179 }
180 if try_stride(buf, o, TS_PACKET_SIZE) {
181 return Some((o, PacketStride::Ts188));
182 }
183 if try_stride(buf, o, RS_PACKET_SIZE) {
184 return Some((o, PacketStride::Rs204));
185 }
186 }
187 None
188}
189
190/// Return `true` if `LOCK_CONFIRMATIONS` consecutive sync bytes exist at
191/// stride `s` starting from `offset` (the first is already known to be
192/// [`TS_SYNC_BYTE`]).
193fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
194 for k in 1..LOCK_CONFIRMATIONS {
195 let pos = offset + k * s;
196 if pos >= buf.len() || buf[pos] != TS_SYNC_BYTE {
197 return false;
198 }
199 }
200 true
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 /// Build a 188-byte TS packet starting with `TS_SYNC_BYTE` followed by
208 /// `tag` (repeated). All non-sync bytes are kept away from `0x47` so
209 /// that no false lock occurs in the fixture data.
210 fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
211 assert_ne!(tag, TS_SYNC_BYTE, "tag must not equal sync byte");
212 let mut pkt = [tag; TS_PACKET_SIZE];
213 pkt[0] = TS_SYNC_BYTE;
214 pkt
215 }
216
217 /// Build a 204-byte RS-coded packet: 188-byte TS payload (with `ts_tag`)
218 /// plus 16 parity bytes filled with `parity` (must not be `0x47`).
219 fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
220 assert_ne!(ts_tag, TS_SYNC_BYTE);
221 assert_ne!(parity, TS_SYNC_BYTE);
222 let mut pkt = [parity; RS_PACKET_SIZE];
223 pkt[0] = TS_SYNC_BYTE;
224 pkt[1..TS_PACKET_SIZE].fill(ts_tag);
225 pkt
226 }
227
228 /// Concatenate several 188-byte packet arrays into a flat `Vec<u8>`.
229 fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
230 let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
231 for p in packets {
232 v.extend_from_slice(p);
233 }
234 v
235 }
236
237 // ------------------------------------------------------------------
238 // Test 1 — aligned 188-byte passthrough
239 // ------------------------------------------------------------------
240 #[test]
241 fn aligned_188_passthrough() {
242 let packets: Vec<_> = (0u8..5).map(|i| ts_packet(i + 1)).collect();
243 let data = concat_ts(&packets);
244
245 let mut r = TsResync::new();
246 let emitted = r.feed(&data);
247
248 assert_eq!(emitted.len(), 5);
249 for (i, (e, p)) in emitted.iter().zip(packets.iter()).enumerate() {
250 assert_eq!(e, p, "packet {i} mismatch");
251 }
252 assert_eq!(r.stride(), Some(PacketStride::Ts188));
253 let s = r.stats();
254 assert_eq!(s.packets, 5);
255 assert_eq!(s.resyncs, 0);
256 assert_eq!(s.dropped_bytes, 0);
257 }
258
259 // ------------------------------------------------------------------
260 // Test 2 — lock only after LOCK_CONFIRMATIONS sync bytes
261 // ------------------------------------------------------------------
262 #[test]
263 fn requires_n_confirmations_before_lock() {
264 // Pin the default lock window. The hardcoded 4-vs-5 boundary below only
265 // bites the threshold when this is 5 — assert it explicitly so a change
266 // to the default (or to this test's literals) cannot silently pass.
267 assert_eq!(
268 LOCK_CONFIRMATIONS, 5,
269 "this test pins the default lock window of 5"
270 );
271
272 // FOUR confirming, stride-aligned packets are one short of the window:
273 // only four sync bytes sit at the 188-stride boundaries, so the
274 // resynchroniser must NOT lock and must emit nothing (it buffers).
275 let four = concat_ts(&(1u8..=4).map(ts_packet).collect::<Vec<_>>());
276 let mut r = TsResync::new();
277 assert_eq!(
278 r.feed(&four).len(),
279 0,
280 "4 confirmations (< 5) must not lock or emit"
281 );
282 assert_eq!(r.stride(), None, "stride must remain None below the window");
283
284 // The FIFTH stride-aligned sync byte completes the window → lock.
285 let mut out = r.feed(&ts_packet(5));
286 out.extend(r.feed(&[]));
287 assert!(
288 r.stride().is_some(),
289 "the 5th confirmation must trigger lock"
290 );
291 // Once locked, the buffered packets are emitted (5 fed, all aligned).
292 assert_eq!(out.len(), 5, "all five aligned packets emit once locked");
293 }
294
295 // ------------------------------------------------------------------
296 // Test 3 — junk prefix: leading garbage dropped, correct count returned
297 // ------------------------------------------------------------------
298 #[test]
299 fn junk_prefix_correct_dropped_count() {
300 let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
301 let stream = concat_ts(&pkts);
302
303 let junk_len = 7usize;
304 let junk: Vec<u8> = vec![0x00; junk_len];
305 let mut data = junk;
306 data.extend_from_slice(&stream);
307
308 let mut r = TsResync::new();
309 let emitted = r.feed(&data);
310
311 assert_eq!(emitted.len(), 6);
312 for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
313 assert_eq!(*e, *p, "packet {i} mismatch after junk prefix");
314 }
315 let s = r.stats();
316 assert_eq!(
317 s.dropped_bytes, junk_len as u64,
318 "dropped bytes must equal junk prefix"
319 );
320 assert_eq!(s.resyncs, 0);
321 assert_eq!(s.packets, 6);
322 }
323
324 // ------------------------------------------------------------------
325 // Test 4 — mid-stream sync loss and reacquisition
326 // ------------------------------------------------------------------
327 #[test]
328 fn midstream_loss_resync() {
329 // Need > LOCK_CONFIRMATIONS clean packets before and after the stray
330 // byte so the stream locks, loses sync, and re-locks.
331 let pkts: Vec<_> = (0u8..14).map(|i| ts_packet(i + 1)).collect();
332 let clean = concat_ts(&pkts);
333
334 // Insert a single stray byte 12 bytes into packet 7.
335 let insert_at = 7 * TS_PACKET_SIZE + 12;
336 let mut data = clean[..insert_at].to_vec();
337 data.push(0x00);
338 data.extend_from_slice(&clean[insert_at..]);
339
340 let mut r = TsResync::new();
341 let emitted = r.feed(&data);
342
343 let s = r.stats();
344 assert!(
345 s.resyncs >= 1,
346 "mid-stream corruption must trigger a resync, got {}",
347 s.resyncs
348 );
349 assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
350 assert!(
351 emitted.len() >= 10,
352 "should recover and emit most packets, got {}",
353 emitted.len()
354 );
355 }
356
357 // ------------------------------------------------------------------
358 // Test 5 — 204-byte RS-coded packets detected + RS stripped
359 // ------------------------------------------------------------------
360 #[test]
361 fn rs204_detected_and_stripped() {
362 let mut stream = Vec::new();
363 let mut expected = Vec::new();
364 for i in 0u8..6 {
365 let tag = i + 1;
366 let rs = rs_packet(tag, 0xFF);
367 stream.extend_from_slice(&rs);
368 expected.push(ts_packet(tag));
369 }
370
371 let mut r = TsResync::new();
372 let emitted = r.feed(&stream);
373
374 assert_eq!(emitted.len(), 6, "RS-coded stream must emit 6 packets");
375 assert_eq!(
376 r.stride(),
377 Some(PacketStride::Rs204),
378 "stride must be Rs204"
379 );
380 for (i, (e, p)) in emitted.iter().zip(expected.iter()).enumerate() {
381 assert_eq!(e, p, "RS-stripped packet {i} mismatch");
382 }
383 // Confirm the emitted 188-byte packets are parseable by TsPacket.
384 for (i, pkt) in emitted.iter().enumerate() {
385 crate::ts::TsPacket::parse(pkt)
386 .unwrap_or_else(|e| panic!("RS-stripped packet {i} TsPacket::parse failed: {e}"));
387 }
388 let s = r.stats();
389 assert_eq!(s.packets, 6);
390 assert_eq!(s.resyncs, 0);
391 assert_eq!(s.dropped_bytes, 0);
392 }
393
394 // ------------------------------------------------------------------
395 // Test 6 — aligned 188 stream yields same packets as plain chunks(188)
396 // (equivalence: fixture-less variant using synthetic data)
397 // ------------------------------------------------------------------
398 #[test]
399 fn aligned_188_matches_plain_chunks() {
400 let pkts: Vec<_> = (0u8..10).map(|i| ts_packet(i + 1)).collect();
401 let data = concat_ts(&pkts);
402
403 // Oracle: plain chunks(188) filtered by sync byte.
404 let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
405 .chunks_exact(TS_PACKET_SIZE)
406 .filter(|c| c[0] == TS_SYNC_BYTE)
407 .map(|c| c.try_into().unwrap())
408 .collect();
409
410 let mut r = TsResync::new();
411 let emitted = r.feed(&data);
412
413 assert_eq!(emitted.len(), oracle.len(), "count mismatch");
414 for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
415 assert_eq!(e, o, "packet {i} differs from chunks-oracle");
416 }
417 }
418
419 // ------------------------------------------------------------------
420 // Test 7 — chunked feed equivalence (same result in small increments)
421 // ------------------------------------------------------------------
422 #[test]
423 fn chunked_feed_equivalence() {
424 let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
425 let stream = concat_ts(&pkts);
426
427 let whole = {
428 let mut r = TsResync::new();
429 r.feed(&stream)
430 };
431
432 let chunked = {
433 let mut r = TsResync::new();
434 let mut out = Vec::new();
435 for chunk in stream.chunks(100) {
436 out.extend(r.feed(chunk));
437 }
438 out
439 };
440
441 assert_eq!(whole.len(), chunked.len());
442 for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
443 assert_eq!(w, c, "packet {i} mismatch");
444 }
445 }
446
447 // ------------------------------------------------------------------
448 // Test 8 — fixture-based: m6-single.ts differential
449 // feeding the real fixture through TsResync must yield the same
450 // 188-byte packets as plain chunks_exact(188) + sync-byte filter.
451 // ------------------------------------------------------------------
452 #[test]
453 fn fixture_m6_matches_plain_chunks() {
454 let path = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/m6-single.ts");
455 let data = std::fs::read(path).expect("m6-single.ts fixture not found");
456
457 // Oracle: plain aligned 188-byte reads.
458 let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
459 .chunks_exact(TS_PACKET_SIZE)
460 .filter(|c| c[0] == TS_SYNC_BYTE)
461 .map(|c| c.try_into().unwrap())
462 .collect();
463 assert!(!oracle.is_empty(), "oracle empty — fixture empty?");
464
465 let mut r = TsResync::new();
466 let emitted = r.feed(&data);
467
468 assert_eq!(
469 emitted.len(),
470 oracle.len(),
471 "packet count: TsResync={} oracle={}",
472 emitted.len(),
473 oracle.len()
474 );
475 for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
476 assert_eq!(e, o, "packet {i} mismatch vs oracle");
477 }
478 }
479}