1pub const SYNC_BYTE: u8 = 0x47;
10
11pub const TS_PACKET_SIZE: usize = 188;
13
14pub const RS_PACKET_SIZE: usize = 204;
16
17const LOCK_CONFIRMATIONS: usize = 5;
19
20#[non_exhaustive]
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[cfg_attr(feature = "serde", derive(serde::Serialize))]
24pub enum PacketStride {
25 Ts188,
27 Rs204,
29}
30
31#[non_exhaustive]
33#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
34#[cfg_attr(feature = "serde", derive(serde::Serialize))]
35pub struct ResyncStats {
36 pub packets: u64,
38 pub resyncs: u64,
40 pub dropped_bytes: u64,
42}
43
44#[derive(Debug, Default)]
52pub struct TsResync {
53 buf: Vec<u8>,
54 head: usize,
55 stride: Option<PacketStride>,
56 stats: ResyncStats,
57}
58
59impl TsResync {
60 pub fn new() -> Self {
62 Self::default()
63 }
64
65 pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
71 self.buf.extend_from_slice(data);
72 let mut emitted = Vec::new();
73
74 loop {
75 match self.stride {
76 None => {
77 if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
78 self.stats.dropped_bytes += offset as u64;
79 self.head += offset;
80 self.stride = Some(s);
81 } else {
82 let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
83 let tail_len = self.buf.len() - self.head;
84 if tail_len > keep {
85 let excess = tail_len - keep;
86 self.stats.dropped_bytes += excess as u64;
87 self.head += excess;
88 }
89 self.compact();
90 return emitted;
91 }
92 }
93 Some(stride) => {
94 let s = match stride {
95 PacketStride::Ts188 => TS_PACKET_SIZE,
96 PacketStride::Rs204 => RS_PACKET_SIZE,
97 };
98 let tail_len = self.buf.len() - self.head;
99 if tail_len < s {
100 self.compact();
101 return emitted;
102 }
103 if self.buf[self.head] == SYNC_BYTE {
104 let mut packet = [0u8; TS_PACKET_SIZE];
105 packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
106 emitted.push(packet);
107 self.head += s;
108 self.stats.packets += 1;
109 } else {
110 self.stats.resyncs += 1;
111 self.stats.dropped_bytes += 1;
112 self.head += 1;
113 self.stride = None;
114 }
115 }
116 }
117 }
118 }
119
120 pub fn stride(&self) -> Option<PacketStride> {
122 self.stride
123 }
124
125 pub fn stats(&self) -> ResyncStats {
127 self.stats
128 }
129
130 fn compact(&mut self) {
131 if self.head > 0 {
132 self.buf.drain(..self.head);
133 self.head = 0;
134 }
135 }
136}
137
138fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
144 for o in 0..buf.len() {
145 if buf[o] != SYNC_BYTE {
146 continue;
147 }
148 if try_stride(buf, o, TS_PACKET_SIZE) {
149 return Some((o, PacketStride::Ts188));
150 }
151 if try_stride(buf, o, RS_PACKET_SIZE) {
152 return Some((o, PacketStride::Rs204));
153 }
154 }
155 None
156}
157
158fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
163 for k in 1..LOCK_CONFIRMATIONS {
164 let pos = offset + k * s;
165 if pos >= buf.len() || buf[pos] != SYNC_BYTE {
166 return false;
167 }
168 }
169 true
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
180 assert_ne!(tag, SYNC_BYTE, "tag must not equal sync byte");
181 let mut pkt = [tag; TS_PACKET_SIZE];
182 pkt[0] = SYNC_BYTE;
183 pkt
184 }
185
186 fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
189 assert_ne!(ts_tag, SYNC_BYTE);
190 assert_ne!(parity, SYNC_BYTE);
191 let mut pkt = [parity; RS_PACKET_SIZE];
192 pkt[0] = SYNC_BYTE;
193 pkt[1..TS_PACKET_SIZE].fill(ts_tag);
194 pkt
195 }
196
197 fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
199 let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
200 for p in packets {
201 v.extend_from_slice(p);
202 }
203 v
204 }
205
206 fn feed_once(data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
208 TsResync::new().feed(data)
209 }
210
211 #[test]
215 fn aligned_188_passthrough() {
216 let p0 = ts_packet(0x01);
217 let p1 = ts_packet(0x02);
218 let p2 = ts_packet(0x03);
219 let p3 = ts_packet(0x04);
220 let p4 = ts_packet(0x05);
221 let data = concat_ts(&[p0, p1, p2, p3, p4]);
222
223 let mut r = TsResync::new();
224 let emitted = r.feed(&data);
225
226 assert_eq!(emitted.len(), 5);
227 assert_eq!(emitted[0], p0);
228 assert_eq!(emitted[1], p1);
229 assert_eq!(emitted[2], p2);
230 assert_eq!(emitted[3], p3);
231 assert_eq!(emitted[4], p4);
232 assert_eq!(r.stride(), Some(PacketStride::Ts188));
233 let s = r.stats();
234 assert_eq!(s.packets, 5);
235 assert_eq!(s.resyncs, 0);
236 assert_eq!(s.dropped_bytes, 0);
237 }
238
239 #[test]
243 fn junk_prefix_locks() {
244 let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
245 let stream = concat_ts(&pkts);
246
247 let junk: Vec<u8> = vec![0x00; 7];
248 let mut data = junk.clone();
249 data.extend_from_slice(&stream);
250
251 let mut r = TsResync::new();
252 let emitted = r.feed(&data);
253
254 assert_eq!(emitted.len(), 6);
255 for (i, p) in emitted.iter().enumerate() {
256 assert_eq!(*p, pkts[i], "packet {i} mismatch");
257 }
258 assert_eq!(r.stride(), Some(PacketStride::Ts188));
259 let s = r.stats();
260 assert_eq!(s.packets, 6);
261 assert_eq!(s.resyncs, 0);
262 assert_eq!(s.dropped_bytes, 7);
263 }
264
265 #[test]
269 fn chunked_feed_equivalence() {
270 let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
271 let stream = concat_ts(&pkts);
272
273 let whole = feed_once(&stream);
275
276 let mut r = TsResync::new();
278 let mut chunked = Vec::new();
279 for chunk in stream.chunks(100) {
280 chunked.extend(r.feed(chunk));
281 }
282
283 assert_eq!(whole.len(), chunked.len());
284 for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
285 assert_eq!(w, c, "packet {i} mismatch");
286 }
287 }
288
289 #[test]
293 fn midstream_loss_resync() {
294 let pkts: Vec<_> = (0..14).map(|i| ts_packet(i + 1)).collect();
300 let clean = concat_ts(&pkts);
301
302 let insert_at = 7 * TS_PACKET_SIZE + 12;
304 let stray: u8 = 0x00;
305 let mut data = clean[..insert_at].to_vec();
306 data.push(stray);
307 data.extend_from_slice(&clean[insert_at..]);
308
309 let mut r = TsResync::new();
310 let emitted = r.feed(&data);
311
312 let s = r.stats();
313 assert!(
315 s.resyncs >= 1,
316 "mid-stream corruption must trigger a resync, got {}",
317 s.resyncs
318 );
319 assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
321 assert!(
323 emitted.len() >= 10,
324 "should recover and emit most packets, got {}",
325 emitted.len()
326 );
327 }
328
329 #[test]
333 fn rs204_detected_and_stripped() {
334 let mut stream = Vec::new();
335 let mut expected_payloads = Vec::new();
336 for i in 0u8..6 {
337 let tag = i + 1;
338 let rs = rs_packet(tag, 0xFF);
339 stream.extend_from_slice(&rs);
340 expected_payloads.push(ts_packet(tag));
341 }
342
343 let mut r = TsResync::new();
344 let emitted = r.feed(&stream);
345
346 assert_eq!(emitted.len(), 6);
347 assert_eq!(r.stride(), Some(PacketStride::Rs204));
348 for (i, (e, p)) in emitted.iter().zip(expected_payloads.iter()).enumerate() {
349 assert_eq!(e, p, "packet {i} mismatch");
350 }
351 let s = r.stats();
352 assert_eq!(s.packets, 6);
353 assert_eq!(s.resyncs, 0);
354 assert_eq!(s.dropped_bytes, 0);
355 }
356
357 #[test]
361 fn large_buffer_single_feed() {
362 const NUM_PACKETS: usize = 500;
363 let pkts: Vec<_> = (0..NUM_PACKETS)
364 .map(|i| {
365 let mut tag = (i % 253 + 1) as u8;
366 if tag >= SYNC_BYTE {
367 tag += 1;
368 }
369 ts_packet(tag)
370 })
371 .collect();
372 let stream = concat_ts(&pkts);
373
374 let garbage: Vec<u8> = vec![0x00; 13];
376 let mut data = garbage;
377 data.extend_from_slice(&stream);
378
379 let mut r = TsResync::new();
380 let emitted = r.feed(&data);
381
382 assert_eq!(emitted.len(), NUM_PACKETS);
383 for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
384 assert_eq!(e, p, "packet {i} mismatch");
385 }
386 assert_eq!(r.stride(), Some(PacketStride::Ts188));
387 let s = r.stats();
388 assert_eq!(s.packets, NUM_PACKETS as u64);
389 assert_eq!(s.resyncs, 0);
390 assert_eq!(s.dropped_bytes, 13);
391 }
392}