pub const SYNC_BYTE: u8 = 0x47;
pub const TS_PACKET_SIZE: usize = 188;
pub const RS_PACKET_SIZE: usize = 204;
const LOCK_CONFIRMATIONS: usize = 5;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub enum PacketStride {
Ts188,
Rs204,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub struct ResyncStats {
pub packets: u64,
pub resyncs: u64,
pub dropped_bytes: u64,
}
#[derive(Debug, Default)]
pub struct TsResync {
buf: Vec<u8>,
stride: Option<PacketStride>,
stats: ResyncStats,
}
impl TsResync {
pub fn new() -> Self {
Self::default()
}
pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
self.buf.extend_from_slice(data);
let mut emitted = Vec::new();
loop {
match self.stride {
None => {
if let Some((offset, s)) = find_sync(&self.buf) {
self.stats.dropped_bytes += offset as u64;
self.buf.drain(..offset);
self.stride = Some(s);
} else {
let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
if self.buf.len() > keep {
let excess = self.buf.len() - keep;
self.stats.dropped_bytes += excess as u64;
self.buf.drain(..excess);
}
return emitted;
}
}
Some(stride) => {
let s = match stride {
PacketStride::Ts188 => TS_PACKET_SIZE,
PacketStride::Rs204 => RS_PACKET_SIZE,
};
if self.buf.len() < s {
return emitted;
}
if self.buf[0] == SYNC_BYTE {
let mut packet = [0u8; TS_PACKET_SIZE];
packet.copy_from_slice(&self.buf[..TS_PACKET_SIZE]);
emitted.push(packet);
self.buf.drain(..s);
self.stats.packets += 1;
} else {
self.stats.resyncs += 1;
self.stats.dropped_bytes += 1;
self.buf.drain(..1);
self.stride = None;
}
}
}
}
}
pub fn stride(&self) -> Option<PacketStride> {
self.stride
}
pub fn stats(&self) -> ResyncStats {
self.stats
}
}
fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
for o in 0..buf.len() {
if buf[o] != SYNC_BYTE {
continue;
}
if try_stride(buf, o, TS_PACKET_SIZE) {
return Some((o, PacketStride::Ts188));
}
if try_stride(buf, o, RS_PACKET_SIZE) {
return Some((o, PacketStride::Rs204));
}
}
None
}
fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
for k in 1..LOCK_CONFIRMATIONS {
let pos = offset + k * s;
if pos >= buf.len() || buf[pos] != SYNC_BYTE {
return false;
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
assert_ne!(tag, SYNC_BYTE, "tag must not equal sync byte");
let mut pkt = [tag; TS_PACKET_SIZE];
pkt[0] = SYNC_BYTE;
pkt
}
fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
assert_ne!(ts_tag, SYNC_BYTE);
assert_ne!(parity, SYNC_BYTE);
let mut pkt = [parity; RS_PACKET_SIZE];
pkt[0] = SYNC_BYTE;
pkt[1..TS_PACKET_SIZE].fill(ts_tag);
pkt
}
fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
for p in packets {
v.extend_from_slice(p);
}
v
}
fn feed_once(data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
TsResync::new().feed(data)
}
#[test]
fn aligned_188_passthrough() {
let p0 = ts_packet(0x01);
let p1 = ts_packet(0x02);
let p2 = ts_packet(0x03);
let p3 = ts_packet(0x04);
let p4 = ts_packet(0x05);
let data = concat_ts(&[p0, p1, p2, p3, p4]);
let mut r = TsResync::new();
let emitted = r.feed(&data);
assert_eq!(emitted.len(), 5);
assert_eq!(emitted[0], p0);
assert_eq!(emitted[1], p1);
assert_eq!(emitted[2], p2);
assert_eq!(emitted[3], p3);
assert_eq!(emitted[4], p4);
assert_eq!(r.stride(), Some(PacketStride::Ts188));
let s = r.stats();
assert_eq!(s.packets, 5);
assert_eq!(s.resyncs, 0);
assert_eq!(s.dropped_bytes, 0);
}
#[test]
fn junk_prefix_locks() {
let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
let stream = concat_ts(&pkts);
let junk: Vec<u8> = vec![0x00; 7];
let mut data = junk.clone();
data.extend_from_slice(&stream);
let mut r = TsResync::new();
let emitted = r.feed(&data);
assert_eq!(emitted.len(), 6);
for (i, p) in emitted.iter().enumerate() {
assert_eq!(*p, pkts[i], "packet {i} mismatch");
}
assert_eq!(r.stride(), Some(PacketStride::Ts188));
let s = r.stats();
assert_eq!(s.packets, 6);
assert_eq!(s.resyncs, 0);
assert_eq!(s.dropped_bytes, 7);
}
#[test]
fn chunked_feed_equivalence() {
let pkts: Vec<_> = (0..6).map(|i| ts_packet(i + 1)).collect();
let stream = concat_ts(&pkts);
let whole = feed_once(&stream);
let mut r = TsResync::new();
let mut chunked = Vec::new();
for chunk in stream.chunks(100) {
chunked.extend(r.feed(chunk));
}
assert_eq!(whole.len(), chunked.len());
for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
assert_eq!(w, c, "packet {i} mismatch");
}
}
#[test]
fn midstream_loss_resync() {
let pkts: Vec<_> = (0..14).map(|i| ts_packet(i + 1)).collect();
let clean = concat_ts(&pkts);
let insert_at = 7 * TS_PACKET_SIZE + 12;
let stray: u8 = 0x00;
let mut data = clean[..insert_at].to_vec();
data.push(stray);
data.extend_from_slice(&clean[insert_at..]);
let mut r = TsResync::new();
let emitted = r.feed(&data);
let s = r.stats();
assert!(
s.resyncs >= 1,
"mid-stream corruption must trigger a resync, got {}",
s.resyncs
);
assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
assert!(
emitted.len() >= 10,
"should recover and emit most packets, got {}",
emitted.len()
);
}
#[test]
fn rs204_detected_and_stripped() {
let mut stream = Vec::new();
let mut expected_payloads = Vec::new();
for i in 0u8..6 {
let tag = i + 1;
let rs = rs_packet(tag, 0xFF);
stream.extend_from_slice(&rs);
expected_payloads.push(ts_packet(tag));
}
let mut r = TsResync::new();
let emitted = r.feed(&stream);
assert_eq!(emitted.len(), 6);
assert_eq!(r.stride(), Some(PacketStride::Rs204));
for (i, (e, p)) in emitted.iter().zip(expected_payloads.iter()).enumerate() {
assert_eq!(e, p, "packet {i} mismatch");
}
let s = r.stats();
assert_eq!(s.packets, 6);
assert_eq!(s.resyncs, 0);
assert_eq!(s.dropped_bytes, 0);
}
}