use crate::ts::{TS_PACKET_SIZE, TS_SYNC_BYTE};
pub const RS_PACKET_SIZE: usize = 204;
pub const RS_PARITY_LEN: usize = RS_PACKET_SIZE - TS_PACKET_SIZE;
pub const LOCK_CONFIRMATIONS: usize = 5;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub enum PacketStride {
Ts188,
Rs204,
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
pub struct ResyncStats {
pub packets: u64,
pub resyncs: u64,
pub dropped_bytes: u64,
}
#[derive(Debug, Default)]
pub struct TsResync {
buf: Vec<u8>,
head: usize,
stride: Option<PacketStride>,
stats: ResyncStats,
}
impl TsResync {
pub fn new() -> Self {
Self::default()
}
pub fn feed(&mut self, data: &[u8]) -> Vec<[u8; TS_PACKET_SIZE]> {
self.buf.extend_from_slice(data);
let mut emitted = Vec::new();
loop {
match self.stride {
None => {
if let Some((offset, s)) = find_sync(&self.buf[self.head..]) {
self.stats.dropped_bytes += offset as u64;
self.head += offset;
self.stride = Some(s);
} else {
let keep = LOCK_CONFIRMATIONS * RS_PACKET_SIZE;
let tail_len = self.buf.len() - self.head;
if tail_len > keep {
let excess = tail_len - keep;
self.stats.dropped_bytes += excess as u64;
self.head += excess;
}
self.compact();
return emitted;
}
}
Some(stride) => {
let s = match stride {
PacketStride::Ts188 => TS_PACKET_SIZE,
PacketStride::Rs204 => RS_PACKET_SIZE,
};
let tail_len = self.buf.len() - self.head;
if tail_len < s {
self.compact();
return emitted;
}
if self.buf[self.head] == TS_SYNC_BYTE {
let mut packet = [0u8; TS_PACKET_SIZE];
packet.copy_from_slice(&self.buf[self.head..self.head + TS_PACKET_SIZE]);
emitted.push(packet);
self.head += s;
self.stats.packets += 1;
} else {
self.stats.resyncs += 1;
self.stats.dropped_bytes += 1;
self.head += 1;
self.stride = None;
}
}
}
}
}
pub fn stride(&self) -> Option<PacketStride> {
self.stride
}
pub fn stats(&self) -> ResyncStats {
self.stats
}
fn compact(&mut self) {
if self.head > 0 {
self.buf.drain(..self.head);
self.head = 0;
}
}
}
fn find_sync(buf: &[u8]) -> Option<(usize, PacketStride)> {
for o in 0..buf.len() {
if buf[o] != TS_SYNC_BYTE {
continue;
}
if try_stride(buf, o, TS_PACKET_SIZE) {
return Some((o, PacketStride::Ts188));
}
if try_stride(buf, o, RS_PACKET_SIZE) {
return Some((o, PacketStride::Rs204));
}
}
None
}
fn try_stride(buf: &[u8], offset: usize, s: usize) -> bool {
for k in 1..LOCK_CONFIRMATIONS {
let pos = offset + k * s;
if pos >= buf.len() || buf[pos] != TS_SYNC_BYTE {
return false;
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
fn ts_packet(tag: u8) -> [u8; TS_PACKET_SIZE] {
assert_ne!(tag, TS_SYNC_BYTE, "tag must not equal sync byte");
let mut pkt = [tag; TS_PACKET_SIZE];
pkt[0] = TS_SYNC_BYTE;
pkt
}
fn rs_packet(ts_tag: u8, parity: u8) -> [u8; RS_PACKET_SIZE] {
assert_ne!(ts_tag, TS_SYNC_BYTE);
assert_ne!(parity, TS_SYNC_BYTE);
let mut pkt = [parity; RS_PACKET_SIZE];
pkt[0] = TS_SYNC_BYTE;
pkt[1..TS_PACKET_SIZE].fill(ts_tag);
pkt
}
fn concat_ts(packets: &[[u8; TS_PACKET_SIZE]]) -> Vec<u8> {
let mut v = Vec::with_capacity(packets.len() * TS_PACKET_SIZE);
for p in packets {
v.extend_from_slice(p);
}
v
}
#[test]
fn aligned_188_passthrough() {
let packets: Vec<_> = (0u8..5).map(|i| ts_packet(i + 1)).collect();
let data = concat_ts(&packets);
let mut r = TsResync::new();
let emitted = r.feed(&data);
assert_eq!(emitted.len(), 5);
for (i, (e, p)) in emitted.iter().zip(packets.iter()).enumerate() {
assert_eq!(e, p, "packet {i} mismatch");
}
assert_eq!(r.stride(), Some(PacketStride::Ts188));
let s = r.stats();
assert_eq!(s.packets, 5);
assert_eq!(s.resyncs, 0);
assert_eq!(s.dropped_bytes, 0);
}
#[test]
fn requires_n_confirmations_before_lock() {
assert_eq!(
LOCK_CONFIRMATIONS, 5,
"this test pins the default lock window of 5"
);
let four = concat_ts(&(1u8..=4).map(ts_packet).collect::<Vec<_>>());
let mut r = TsResync::new();
assert_eq!(
r.feed(&four).len(),
0,
"4 confirmations (< 5) must not lock or emit"
);
assert_eq!(r.stride(), None, "stride must remain None below the window");
let mut out = r.feed(&ts_packet(5));
out.extend(r.feed(&[]));
assert!(
r.stride().is_some(),
"the 5th confirmation must trigger lock"
);
assert_eq!(out.len(), 5, "all five aligned packets emit once locked");
}
#[test]
fn junk_prefix_correct_dropped_count() {
let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
let stream = concat_ts(&pkts);
let junk_len = 7usize;
let junk: Vec<u8> = vec![0x00; junk_len];
let mut data = junk;
data.extend_from_slice(&stream);
let mut r = TsResync::new();
let emitted = r.feed(&data);
assert_eq!(emitted.len(), 6);
for (i, (e, p)) in emitted.iter().zip(pkts.iter()).enumerate() {
assert_eq!(*e, *p, "packet {i} mismatch after junk prefix");
}
let s = r.stats();
assert_eq!(
s.dropped_bytes, junk_len as u64,
"dropped bytes must equal junk prefix"
);
assert_eq!(s.resyncs, 0);
assert_eq!(s.packets, 6);
}
#[test]
fn midstream_loss_resync() {
let pkts: Vec<_> = (0u8..14).map(|i| ts_packet(i + 1)).collect();
let clean = concat_ts(&pkts);
let insert_at = 7 * TS_PACKET_SIZE + 12;
let mut data = clean[..insert_at].to_vec();
data.push(0x00);
data.extend_from_slice(&clean[insert_at..]);
let mut r = TsResync::new();
let emitted = r.feed(&data);
let s = r.stats();
assert!(
s.resyncs >= 1,
"mid-stream corruption must trigger a resync, got {}",
s.resyncs
);
assert_eq!(emitted[0], pkts[0], "first emitted packet is P0");
assert!(
emitted.len() >= 10,
"should recover and emit most packets, got {}",
emitted.len()
);
}
#[test]
fn rs204_detected_and_stripped() {
let mut stream = Vec::new();
let mut expected = Vec::new();
for i in 0u8..6 {
let tag = i + 1;
let rs = rs_packet(tag, 0xFF);
stream.extend_from_slice(&rs);
expected.push(ts_packet(tag));
}
let mut r = TsResync::new();
let emitted = r.feed(&stream);
assert_eq!(emitted.len(), 6, "RS-coded stream must emit 6 packets");
assert_eq!(
r.stride(),
Some(PacketStride::Rs204),
"stride must be Rs204"
);
for (i, (e, p)) in emitted.iter().zip(expected.iter()).enumerate() {
assert_eq!(e, p, "RS-stripped packet {i} mismatch");
}
for (i, pkt) in emitted.iter().enumerate() {
crate::ts::TsPacket::parse(pkt)
.unwrap_or_else(|e| panic!("RS-stripped packet {i} TsPacket::parse failed: {e}"));
}
let s = r.stats();
assert_eq!(s.packets, 6);
assert_eq!(s.resyncs, 0);
assert_eq!(s.dropped_bytes, 0);
}
#[test]
fn aligned_188_matches_plain_chunks() {
let pkts: Vec<_> = (0u8..10).map(|i| ts_packet(i + 1)).collect();
let data = concat_ts(&pkts);
let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
.chunks_exact(TS_PACKET_SIZE)
.filter(|c| c[0] == TS_SYNC_BYTE)
.map(|c| c.try_into().unwrap())
.collect();
let mut r = TsResync::new();
let emitted = r.feed(&data);
assert_eq!(emitted.len(), oracle.len(), "count mismatch");
for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
assert_eq!(e, o, "packet {i} differs from chunks-oracle");
}
}
#[test]
fn chunked_feed_equivalence() {
let pkts: Vec<_> = (0u8..6).map(|i| ts_packet(i + 1)).collect();
let stream = concat_ts(&pkts);
let whole = {
let mut r = TsResync::new();
r.feed(&stream)
};
let chunked = {
let mut r = TsResync::new();
let mut out = Vec::new();
for chunk in stream.chunks(100) {
out.extend(r.feed(chunk));
}
out
};
assert_eq!(whole.len(), chunked.len());
for (i, (w, c)) in whole.iter().zip(chunked.iter()).enumerate() {
assert_eq!(w, c, "packet {i} mismatch");
}
}
#[test]
fn fixture_m6_matches_plain_chunks() {
let path = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/m6-single.ts");
let data = std::fs::read(path).expect("m6-single.ts fixture not found");
let oracle: Vec<[u8; TS_PACKET_SIZE]> = data
.chunks_exact(TS_PACKET_SIZE)
.filter(|c| c[0] == TS_SYNC_BYTE)
.map(|c| c.try_into().unwrap())
.collect();
assert!(!oracle.is_empty(), "oracle empty — fixture empty?");
let mut r = TsResync::new();
let emitted = r.feed(&data);
assert_eq!(
emitted.len(),
oracle.len(),
"packet count: TsResync={} oracle={}",
emitted.len(),
oracle.len()
);
for (i, (e, o)) in emitted.iter().zip(oracle.iter()).enumerate() {
assert_eq!(e, o, "packet {i} mismatch vs oracle");
}
}
}