1pub const SYNC_BYTE: u8 = 0x47;
10
11pub const TS_PACKET_SIZE: usize = 188;
13
14pub const RS_PACKET_SIZE: usize = 204;
16
17const LOCK_CONFIRMATIONS: usize = 5;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize))]
23pub enum PacketStride {
24 Ts188,
26 Rs204,
28}
29
30#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize))]
33pub struct ResyncStats {
34 pub packets: u64,
36 pub resyncs: u64,
38 pub dropped_bytes: u64,
40}
41
42#[derive(Debug, Default)]
50pub struct TsResync {
51 buf: Vec<u8>,
52 stride: Option<PacketStride>,
53 stats: ResyncStats,
54}
55
56impl TsResync {
57 pub fn new() -> Self {
59 Self::default()
60 }
61
62 pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
68 self.buf.extend_from_slice(data);
69 let mut emitted = Vec::new();
70
71 loop {
72 match self.stride {
73 None => {
74 if let Some((offset, s)) = find_sync(&self.buf) {
75 self.stats.dropped_bytes += offset as u64;
76 self.buf.drain(..offset);
77 self.stride = Some(s);
78 } else {
79 let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
80 if self.buf.len() > keep {
81 let excess = self.buf.len() - keep;
82 self.stats.dropped_bytes += excess as u64;
83 self.buf.drain(..excess);
84 }
85 return emitted;
86 }
87 }
88 Some(stride) => {
89 let s = match stride {
90 PacketStride::Ts188 => TS_PACKET_SIZE,
91 PacketStride::Rs204 => RS_PACKET_SIZE,
92 };
93 if self.buf.len() < s {
94 return emitted;
95 }
96 if self.buf[0] == SYNC_BYTE {
97 let mut packet = [0u8; TS_PACKET_SIZE];
98 packet.copy_from_slice(&self.buf[..TS_PACKET_SIZE]);
99 emitted.push(packet);
100 self.buf.drain(..s);
101 self.stats.packets += 1;
102 } else {
103 self.stats.resyncs += 1;
104 self.stats.dropped_bytes += 1;
105 self.buf.drain(..1);
106 self.stride = None;
107 }
108 }
109 }
110 }
111 }
112
113 pub fn stride(&self) -> Option<PacketStride> {
115 self.stride
116 }
117
118 pub fn stats(&self) -> ResyncStats {
120 self.stats
121 }
122}
123
124fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
130 for o in 0..buf.len() {
131 if buf[o] != SYNC_BYTE {
132 continue;
133 }
134 if try_stride(buf, o, TS_PACKET_SIZE) {
135 return Some((o, PacketStride::Ts188));
136 }
137 if try_stride(buf, o, RS_PACKET_SIZE) {
138 return Some((o, PacketStride::Rs204));
139 }
140 }
141 None
142}
143
144fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
149 for k in 1..LOCK_CONFIRMATIONS {
150 let pos = offset + k * s;
151 if pos >= buf.len() || buf[pos] != SYNC_BYTE {
152 return false;
153 }
154 }
155 true
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
166 assert_ne!(tag, SYNC_BYTE, "tag must not equal sync byte");
167 let mut pkt = [tag; TS_PACKET_SIZE];
168 pkt[0] = SYNC_BYTE;
169 pkt
170 }
171
172 fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
175 assert_ne!(ts_tag, SYNC_BYTE);
176 assert_ne!(parity, SYNC_BYTE);
177 let mut pkt = [parity; RS_PACKET_SIZE];
178 pkt[0] = SYNC_BYTE;
179 pkt[1..TS_PACKET_SIZE].fill(ts_tag);
180 pkt
181 }
182
183 fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
185 let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
186 for p in packets {
187 v.extend_from_slice(p);
188 }
189 v
190 }
191
192 fn feed_once(data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
194 TsResync::new().feed(data)
195 }
196
197 #[test]
201 fn aligned_188_passthrough() {
202 let p0 = ts_packet(0x01);
203 let p1 = ts_packet(0x02);
204 let p2 = ts_packet(0x03);
205 let p3 = ts_packet(0x04);
206 let p4 = ts_packet(0x05);
207 let data = concat_ts(&[p0, p1, p2, p3, p4]);
208
209 let mut r = TsResync::new();
210 let emitted = r.feed(&data);
211
212 assert_eq!(emitted.len(), 5);
213 assert_eq!(emitted[0], p0);
214 assert_eq!(emitted[1], p1);
215 assert_eq!(emitted[2], p2);
216 assert_eq!(emitted[3], p3);
217 assert_eq!(emitted[4], p4);
218 assert_eq!(r.stride(), Some(PacketStride::Ts188));
219 let s = r.stats();
220 assert_eq!(s.packets, 5);
221 assert_eq!(s.resyncs, 0);
222 assert_eq!(s.dropped_bytes, 0);
223 }
224
225 #[test]
229 fn junk_prefix_locks() {
230 let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
231 let stream = concat_ts(&pkts);
232
233 let junk: Vec<u8> = vec![0x00; 7];
234 let mut data = junk.clone();
235 data.extend_from_slice(&stream);
236
237 let mut r = TsResync::new();
238 let emitted = r.feed(&data);
239
240 assert_eq!(emitted.len(), 6);
241 for (i, p) in emitted.iter().enumerate() {
242 assert_eq!(*p, pkts[i], "packet {i} mismatch");
243 }
244 assert_eq!(r.stride(), Some(PacketStride::Ts188));
245 let s = r.stats();
246 assert_eq!(s.packets, 6);
247 assert_eq!(s.resyncs, 0);
248 assert_eq!(s.dropped_bytes, 7);
249 }
250
251 #[test]
255 fn chunked_feed_equivalence() {
256 let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
257 let stream = concat_ts(&pkts);
258
259 let whole = feed_once(&stream);
261
262 let mut r = TsResync::new();
264 let mut chunked = Vec::new();
265 for chunk in stream.chunks(100) {
266 chunked.extend(r.feed(chunk));
267 }
268
269 assert_eq!(whole.len(), chunked.len());
270 for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
271 assert_eq!(w, c, "packet {i} mismatch");
272 }
273 }
274
275 #[test]
279 fn midstream_loss_resync() {
280 let pkts: Vec<_> = (0..14).map(|i| ts_packet(i + 1)).collect();
286 let clean = concat_ts(&pkts);
287
288 let insert_at = 7 * TS_PACKET_SIZE + 12;
290 let stray: u8 = 0x00;
291 let mut data = clean[..insert_at].to_vec();
292 data.push(stray);
293 data.extend_from_slice(&clean[insert_at..]);
294
295 let mut r = TsResync::new();
296 let emitted = r.feed(&data);
297
298 let s = r.stats();
299 assert!(
301 s.resyncs >= 1,
302 "mid-stream corruption must trigger a resync, got {}",
303 s.resyncs
304 );
305 assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
307 assert!(
309 emitted.len() >= 10,
310 "should recover and emit most packets, got {}",
311 emitted.len()
312 );
313 }
314
315 #[test]
319 fn rs204_detected_and_stripped() {
320 let mut stream = Vec::new();
321 let mut expected_payloads = Vec::new();
322 for i in 0u8..6 {
323 let tag = i + 1;
324 let rs = rs_packet(tag, 0xFF);
325 stream.extend_from_slice(&rs);
326 expected_payloads.push(ts_packet(tag));
327 }
328
329 let mut r = TsResync::new();
330 let emitted = r.feed(&stream);
331
332 assert_eq!(emitted.len(), 6);
333 assert_eq!(r.stride(), Some(PacketStride::Rs204));
334 for (i, (e, p)) in emitted.iter().zip(expected_payloads.iter()).enumerate() {
335 assert_eq!(e, p, "packet {i} mismatch");
336 }
337 let s = r.stats();
338 assert_eq!(s.packets, 6);
339 assert_eq!(s.resyncs, 0);
340 assert_eq!(s.dropped_bytes, 0);
341 }
342}