mod merge;
mod summary;
mod timecode;
#[cfg(feature = "python")]
use pyo3::{prelude::*, types::PyBytes};
use std::fmt::Display;
use std::io::Read;
use serde::{Deserialize, Serialize};
pub use crate::prelude::*;
pub use merge::*;
pub use summary::*;
pub use timecode::*;
pub type Apid = u16;
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(feature = "python", pyclass(frozen))]
pub struct Packet {
pub header: PrimaryHeader,
pub data: Vec<u8>,
pub(crate) offset: usize,
}
impl Display for Packet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Packet{{header: {:?}, data:[len={}]}}",
self.header,
self.data.len()
)?;
Ok(())
}
}
#[cfg_attr(feature = "python", pymethods)]
impl Packet {
const MAX_LEN: usize = 65536;
#[cfg(feature = "python")]
#[getter]
fn header(&self) -> PrimaryHeader {
self.header
}
#[cfg(feature = "python")]
#[new]
fn py_new(buf: &[u8]) -> PyResult<Self> {
Ok(Packet::decode(buf)?)
}
#[cfg(feature = "python")]
#[getter]
fn data<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new_bound(py, &self.data)
}
#[cfg(feature = "python")]
#[getter]
fn user_data<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new_bound(py, &self.data[PrimaryHeader::LEN..])
}
#[cfg(feature = "python")]
fn __str__(&self) -> String {
format!("{self}")
}
#[must_use]
pub fn is_first(&self) -> bool {
self.header.sequence_flags == PrimaryHeader::SEQ_FIRST
}
#[must_use]
pub fn is_last(&self) -> bool {
self.header.sequence_flags == PrimaryHeader::SEQ_LAST
}
#[must_use]
pub fn is_cont(&self) -> bool {
self.header.sequence_flags == PrimaryHeader::SEQ_CONTINUATION
}
#[must_use]
pub fn is_standalone(&self) -> bool {
self.header.sequence_flags == PrimaryHeader::SEQ_UNSEGMENTED
}
}
impl Packet {
pub fn decode(buf: &[u8]) -> Result<Packet> {
if buf.len() < PrimaryHeader::LEN {
return Err(Error::NotEnoughData {
got: buf.len(),
wanted: PrimaryHeader::LEN,
});
}
let ph = PrimaryHeader::decode(&buf[..PrimaryHeader::LEN])?;
let data_len = ph.len_minus1 as usize + 1;
let total_len = PrimaryHeader::LEN + data_len;
if buf.len() < total_len {
return Err(Error::NotEnoughData {
got: buf.len(),
wanted: total_len,
});
}
Ok(Packet {
header: ph,
data: buf[..total_len].to_vec(),
offset: 0,
})
}
pub fn read<R>(reader: &mut R) -> Result<Packet>
where
R: Read + Send,
{
let mut buf = vec![0u8; Packet::MAX_LEN];
reader.read_exact(&mut buf[..PrimaryHeader::LEN])?;
let ph = PrimaryHeader::decode(&buf[..PrimaryHeader::LEN])?;
let data_len = ph.len_minus1 as usize + 1;
let total_len = PrimaryHeader::LEN + data_len;
assert!(
total_len <= buf.len(),
"total packet len from header should never be larger than max packet len"
);
reader.read_exact(&mut buf[PrimaryHeader::LEN..total_len])?;
Ok(Packet {
header: ph,
data: buf[..total_len].to_vec(),
offset: 0,
})
}
}
#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
#[cfg_attr(feature = "python", pyclass(frozen))]
pub struct PrimaryHeader {
pub version: u8,
pub type_flag: u8,
pub has_secondary_header: bool,
pub apid: Apid,
pub sequence_flags: u8,
pub sequence_id: u16,
pub len_minus1: u16,
}
#[cfg_attr(feature = "python", pymethods)]
impl PrimaryHeader {
#[cfg(feature = "python")]
#[getter]
fn version(&self) -> u8 {
self.version
}
#[cfg(feature = "python")]
#[getter]
fn type_flag(&self) -> u8 {
self.type_flag
}
#[cfg(feature = "python")]
#[getter]
fn has_secondary_header(&self) -> bool {
self.has_secondary_header
}
#[cfg(feature = "python")]
#[getter]
fn apid(&self) -> Apid {
self.apid
}
#[cfg(feature = "python")]
#[getter]
fn sequence_flags(&self) -> u8 {
self.sequence_flags
}
#[cfg(feature = "python")]
#[getter]
fn sequence_id(&self) -> u16 {
self.sequence_id
}
#[cfg(feature = "python")]
#[getter]
fn len_minus1(&self) -> u16 {
self.len_minus1
}
#[cfg(feature = "python")]
fn __str__(&self) -> String {
format!("{self:?}")
}
}
impl PrimaryHeader {
pub const LEN: usize = 6;
pub const SEQ_MAX: u16 = 16383;
pub const SEQ_FIRST: u8 = 1;
pub const SEQ_CONTINUATION: u8 = 0;
pub const SEQ_LAST: u8 = 2;
pub const SEQ_UNSEGMENTED: u8 = 3;
pub fn decode(buf: &[u8]) -> Result<Self> {
if buf.len() < Self::LEN {
return Err(Error::NotEnoughData {
got: buf.len(),
wanted: Self::LEN,
});
}
let d1 = u16::from_be_bytes([buf[0], buf[1]]);
let d2 = u16::from_be_bytes([buf[2], buf[3]]);
let d3 = u16::from_be_bytes([buf[4], buf[5]]);
Ok(PrimaryHeader {
version: ((d1 >> 13) & 0x7) as u8,
type_flag: ((d1 >> 12) & 0x1) as u8,
has_secondary_header: ((d1 >> 11) & 0x1) == 1,
apid: (d1 & 0x7ff),
sequence_flags: ((d2 >> 14) & 0x3) as u8,
sequence_id: (d2 & 0x3fff),
len_minus1: d3,
})
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(feature = "python", pyclass(frozen))]
pub struct PacketGroup {
pub apid: Apid,
pub packets: Vec<Packet>,
}
#[cfg_attr(feature = "python", pymethods)]
impl PacketGroup {
#[cfg(feature = "python")]
#[getter]
fn apid(&self) -> Apid {
self.apid
}
#[cfg(feature = "python")]
#[getter]
fn packets(&self) -> Vec<Packet> {
self.packets.clone()
}
#[cfg(feature = "python")]
fn __str__(&self) -> String {
format!(
"PacketGroup {{apid={} packets[len={}]}}",
self.apid,
self.packets.len()
)
}
#[must_use]
pub fn complete(&self) -> bool {
if self.packets.is_empty() {
false
} else if self.packets.len() == 1 {
self.packets[0].is_standalone()
} else {
self.packets[0].is_first()
&& self.packets.last().unwrap().is_last()
&& !self.have_missing()
}
}
#[must_use]
pub fn have_missing(&self) -> bool {
for (a, b) in self.packets.iter().zip(self.packets[1..].iter()) {
if missing_packets(b.header.sequence_id, a.header.sequence_id) > 0 {
return true;
}
}
false
}
}
#[must_use]
pub fn missing_packets(cur: u16, last: u16) -> u16 {
let expected = if last + 1 > PrimaryHeader::SEQ_MAX {
0
} else {
last + 1
};
if cur != expected {
if last + 1 > cur {
return cur + PrimaryHeader::SEQ_MAX - last;
}
return cur - last - 1;
}
0
}
struct PacketDecoder<R: Read + Send> {
pub reader: R,
pub offset: usize,
}
impl<R: Read + Send> PacketDecoder<R> {
pub fn new(reader: R) -> Self {
PacketDecoder { reader, offset: 0 }
}
}
impl<R> Iterator for PacketDecoder<R>
where
R: Read + Send,
{
type Item = Result<Packet>;
fn next(&mut self) -> Option<Self::Item> {
match Packet::read(&mut self.reader) {
Ok(mut p) => {
p.offset = self.offset;
self.offset += PrimaryHeader::LEN + p.header.len_minus1 as usize + 1;
Some(Ok(p))
}
Err(err) => {
if let Error::Io(ref ioerr) = err {
if ioerr.kind() == std::io::ErrorKind::UnexpectedEof {
return None;
}
}
Some(Err(err))
}
}
}
}
pub fn decode_packets<R>(reader: R) -> impl Iterator<Item = Result<Packet>> + Send
where
R: Read + Send,
{
PacketDecoder::new(reader)
}
pub fn collect_groups<I>(packets: I) -> impl Iterator<Item = Result<PacketGroup>> + Send
where
I: Iterator<Item = Packet> + Send,
{
PacketGroupIter::with_packets(packets)
}
struct PacketGroupIter<I>
where
I: Iterator<Item = Packet> + Send,
{
packets: I,
cached: Option<Packet>,
done: bool,
}
impl<I> PacketGroupIter<I>
where
I: Iterator<Item = Packet> + Send,
{
fn with_packets(packets: I) -> Self {
PacketGroupIter {
packets,
cached: None,
done: false,
}
}
}
impl<I> Iterator for PacketGroupIter<I>
where
I: Iterator<Item = Packet> + Send,
{
type Item = Result<PacketGroup>;
fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
let mut group: Option<PacketGroup> = None;
loop {
let packet = match self.cached.take() {
Some(packet) => packet,
None => match self.packets.next() {
Some(packet) => packet,
None => {
break;
}
},
};
group = match group.take() {
None => {
if packet.is_standalone() {
return Some(Ok(PacketGroup {
apid: packet.header.apid,
packets: vec![packet],
}));
}
Some(PacketGroup {
apid: packet.header.apid,
packets: vec![packet],
})
}
Some(mut group) => {
if packet.header.apid != group.packets[0].header.apid {
self.cached = Some(packet);
return Some(Ok(group));
}
group.packets.push(packet);
Some(group)
}
};
}
if let Some(group) = group {
return Some(Ok(group));
}
self.done = true;
match self.cached.take() {
Some(packet) => Some(Ok(PacketGroup {
apid: packet.header.apid,
packets: vec![packet],
})),
None => None,
}
}
}
#[cfg(test)]
mod tests {
use summary::Summary;
use super::*;
#[test]
fn test_decode_packet() {
let dat: [u8; 15] = [
0xd, 0x59, 0xd2, 0xab, 0x0, 0x8, 0x52, 0xc0, 0x0, 0x0, 0x0, 0xa7, 0x0, 0xdb, 0xff,
];
let packet = Packet::decode(&dat).unwrap();
assert_eq!(packet.header.version, 0);
}
#[test]
fn test_decode_header() {
let dat: [u8; 6] = [
0xd, 0x59, 0xd2, 0xab, 0xa, 0x8f,
];
let ph = PrimaryHeader::decode(&dat).unwrap();
assert_eq!(ph.version, 0);
assert_eq!(ph.type_flag, 0);
assert!(ph.has_secondary_header);
assert_eq!(ph.apid, 1369);
assert_eq!(ph.sequence_flags, 3);
assert_eq!(ph.sequence_id, 4779);
assert_eq!(ph.len_minus1, 2703);
}
#[test]
fn packet_iter_test() {
#[rustfmt::skip]
let dat: &[u8] = &[
0xd, 0x59, 0xc0, 0x01, 0x0, 0x8, 0x52, 0xc0, 0x0, 0x0, 0x0, 0xa7, 0x0, 0xdb, 0xff,
0xd, 0x59, 0xc0, 0x02, 0x0, 0x8, 0x52, 0xc0, 0x0, 0x0, 0x0, 0xa7, 0x0, 0xdb, 0xff,
];
let mut summary = Summary::default();
let packets: Vec<Packet> = decode_packets(dat)
.map(|z| z.unwrap())
.inspect(|p| {
summary.add(p);
})
.collect();
assert_eq!(packets.len(), 2);
assert_eq!(packets[0].header.apid, 1369);
assert_eq!(packets[0].header.sequence_id, 1);
assert_eq!(&packets[0].data[..], &dat[..15]);
assert_eq!(packets[1].header.sequence_id, 2);
assert_eq!(&packets[1].data[..], &dat[15..]);
}
#[test]
fn test_missing_packets() {
assert_eq!(missing_packets(5, 4), 0);
assert_eq!(missing_packets(5, 3), 1);
assert_eq!(missing_packets(0, PrimaryHeader::SEQ_MAX), 0);
assert_eq!(missing_packets(0, PrimaryHeader::SEQ_MAX - 1), 1);
assert_eq!(missing_packets(0, 0), PrimaryHeader::SEQ_MAX);
}
}