use crate::demultiplex;
use crate::packet;
use crate::packet::ClockRef;
use log::warn;
use std::fmt::Formatter;
use std::marker;
use std::{fmt, num};
pub trait ElementaryStreamConsumer<Ctx> {
fn start_stream(&mut self, ctx: &mut Ctx);
fn begin_packet(&mut self, ctx: &mut Ctx, header: PesHeader<'_>);
fn continue_packet(&mut self, ctx: &mut Ctx, data: &[u8]);
fn end_packet(&mut self, ctx: &mut Ctx);
fn continuity_error(&mut self, ctx: &mut Ctx);
}
#[derive(Debug, PartialEq)]
enum PesState {
Begin,
Started,
IgnoreRest,
}
pub struct PesPacketFilter<Ctx, E>
where
Ctx: demultiplex::DemuxContext,
E: ElementaryStreamConsumer<Ctx>,
{
stream_consumer: E,
ccounter: Option<packet::ContinuityCounter>,
state: PesState,
phantom: marker::PhantomData<Ctx>,
}
impl<Ctx, E> PesPacketFilter<Ctx, E>
where
Ctx: demultiplex::DemuxContext,
E: ElementaryStreamConsumer<Ctx>,
{
pub fn new(stream_consumer: E) -> PesPacketFilter<Ctx, E> {
PesPacketFilter {
stream_consumer,
ccounter: None,
state: PesState::Begin,
phantom: marker::PhantomData,
}
}
fn is_continuous(&self, packet: &packet::Packet<'_>) -> bool {
if let Some(cc) = self.ccounter {
let result = if packet.adaptation_control().has_payload() {
packet.continuity_counter().follows(cc)
} else {
packet.continuity_counter().count() == cc.count()
};
if !result {
}
result
} else {
true
}
}
}
impl<Ctx, E> demultiplex::PacketFilter for PesPacketFilter<Ctx, E>
where
Ctx: demultiplex::DemuxContext,
E: ElementaryStreamConsumer<Ctx>,
{
type Ctx = Ctx;
#[inline(always)]
fn consume(&mut self, ctx: &mut Self::Ctx, packet: &packet::Packet<'_>) {
if !self.is_continuous(packet) {
self.stream_consumer.continuity_error(ctx);
self.state = PesState::IgnoreRest;
}
self.ccounter = Some(packet.continuity_counter());
if packet.payload_unit_start_indicator() {
if self.state == PesState::Started {
self.stream_consumer.end_packet(ctx);
} else {
if self.state == PesState::Begin {
self.stream_consumer.start_stream(ctx);
}
self.state = PesState::Started;
}
if let Some(payload) = packet.payload() {
if let Some(header) = PesHeader::from_bytes(payload) {
self.stream_consumer.begin_packet(ctx, header);
}
}
} else {
match self.state {
PesState::Started => {
if let Some(payload) = packet.payload() {
if !payload.is_empty() {
self.stream_consumer.continue_packet(ctx, payload);
}
}
}
PesState::Begin => {
warn!("{:?}: Ignoring elementary stream content without a payload_start_indicator", packet.pid());
}
PesState::IgnoreRest => (),
}
}
}
}
#[derive(Debug, PartialEq)]
pub enum PesLength {
Unbounded,
Bounded(num::NonZeroU16),
}
#[derive(PartialEq, Eq)]
pub struct StreamId(u8);
impl StreamId {
pub const PROGRAM_STREAM_MAP: StreamId = StreamId(0b1011_1100);
pub const PRIVATE_STREAM1: StreamId = StreamId(0b1011_1101);
pub const PADDING_STREAM: StreamId = StreamId(0b1011_1110);
pub const PRIVATE_STREAM2: StreamId = StreamId(0b1011_1111);
pub const ECM_STREAM: StreamId = StreamId(0b1111_0000);
pub const EMM_STREAM: StreamId = StreamId(0b1111_0001);
pub const DSM_CC: StreamId = StreamId(0b1111_0010);
pub const ISO_13522_STREAM: StreamId = StreamId(0b1111_0011);
pub const H222_1_TYPE_A: StreamId = StreamId(0b1111_0100);
pub const H222_1_TYPE_B: StreamId = StreamId(0b1111_0101);
pub const H222_1_TYPE_C: StreamId = StreamId(0b1111_0110);
pub const H222_1_TYPE_D: StreamId = StreamId(0b1111_0111);
pub const H222_1_TYPE_E: StreamId = StreamId(0b1111_1000);
pub const ANCILLARY_STREAM: StreamId = StreamId(0b1111_1001);
pub const SL_PACKETIZED_STREAM: StreamId = StreamId(0b1111_1010);
pub const FLEX_MUX_STREAM: StreamId = StreamId(0b1111_1011);
pub const METADATA_STREAM: StreamId = StreamId(0b1111_1100);
pub const EXTENDED_STREAM_ID: StreamId = StreamId(0b1111_1101);
pub const RESERVED_DATA_STREAM: StreamId = StreamId(0b1111_1110);
pub const PROGRAM_STREAM_DIRECTORY: StreamId = StreamId(0b1111_1111);
fn is_parsed(&self) -> bool {
!matches!(
*self,
StreamId::PROGRAM_STREAM_MAP
| StreamId::PADDING_STREAM
| StreamId::PRIVATE_STREAM2
| StreamId::ECM_STREAM
| StreamId::EMM_STREAM
| StreamId::PROGRAM_STREAM_DIRECTORY
| StreamId::DSM_CC
| StreamId::H222_1_TYPE_E
)
}
}
impl fmt::Debug for StreamId {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match *self {
StreamId::PROGRAM_STREAM_MAP => f.write_str("PROGRAM_STREAM_MAP"),
StreamId::PRIVATE_STREAM1 => f.write_str("PRIVATE_STREAM1"),
StreamId::PADDING_STREAM => f.write_str("PADDING_STREAM"),
StreamId::PRIVATE_STREAM2 => f.write_str("PRIVATE_STREAM2"),
StreamId(0b1100_0000..=0b1101_1111) => {
f.write_fmt(format_args!("Audio({})", self.0 & 0b0001_1111))
}
StreamId(0b1110_0000..=0b1110_1111) => {
f.write_fmt(format_args!("Video({})", self.0 & 0b0001_1111))
}
StreamId::ECM_STREAM => f.write_str("ECM_STREAM"),
StreamId::EMM_STREAM => f.write_str("EMM_STREAM"),
StreamId::DSM_CC => f.write_str("DSM_CC"),
StreamId::ISO_13522_STREAM => f.write_str("ISO_13522_STREAM"),
StreamId::H222_1_TYPE_A => f.write_str("H222_1_TYPE_A"),
StreamId::H222_1_TYPE_B => f.write_str("H222_1_TYPE_B"),
StreamId::H222_1_TYPE_C => f.write_str("H222_1_TYPE_C"),
StreamId::H222_1_TYPE_D => f.write_str("H222_1_TYPE_D"),
StreamId::H222_1_TYPE_E => f.write_str("H222_1_TYPE_E"),
StreamId::ANCILLARY_STREAM => f.write_str("ANCILLARY_STREAM"),
StreamId::SL_PACKETIZED_STREAM => f.write_str("SL_PACKETIZED_STREAM"),
StreamId::FLEX_MUX_STREAM => f.write_str("FLEX_MUX_STREAM"),
StreamId::METADATA_STREAM => f.write_str("METADATA_STREAM"),
StreamId::EXTENDED_STREAM_ID => f.write_str("EXTENDED_STREAM_ID"),
StreamId::RESERVED_DATA_STREAM => f.write_str("RESERVED_DATA_STREAM"),
StreamId::PROGRAM_STREAM_DIRECTORY => f.write_str("PROGRAM_STREAM_DIRECTORY"),
_ => f.write_fmt(format_args!("Unknown({})", self.0)),
}
}
}
pub struct PesHeader<'buf> {
buf: &'buf [u8],
}
impl<'buf> PesHeader<'buf> {
const FIXED_HEADER_SIZE: usize = 6;
pub fn from_bytes(buf: &'buf [u8]) -> Option<PesHeader<'buf>> {
if buf.len() < Self::FIXED_HEADER_SIZE {
warn!("Buffer size {} too small to hold PES header", buf.len());
return None;
}
let packet_start_code_prefix =
u32::from(buf[0]) << 16 | u32::from(buf[1]) << 8 | u32::from(buf[2]);
if packet_start_code_prefix != 1 {
warn!(
"invalid packet_start_code_prefix 0x{:06x}, expected 0x000001",
packet_start_code_prefix
);
return None;
}
Some(PesHeader { buf })
}
pub fn stream_id(&self) -> StreamId {
StreamId(self.buf[3])
}
pub fn pes_packet_length(&self) -> PesLength {
let len = u16::from(self.buf[4]) << 8 | u16::from(self.buf[5]);
match num::NonZeroU16::new(len) {
None => PesLength::Unbounded,
Some(l) => PesLength::Bounded(l),
}
}
pub fn contents(&self) -> PesContents<'buf> {
let rest = &self.buf[Self::FIXED_HEADER_SIZE..];
if self.stream_id().is_parsed() {
PesContents::Parsed(PesParsedContents::from_bytes(rest))
} else {
PesContents::Payload(rest)
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum PesError {
FieldNotPresent,
PtsDtsFlagsInvalid,
NotEnoughData {
requested: usize,
available: usize,
},
MarkerBitNotSet,
}
pub enum PesContents<'buf> {
Parsed(Option<PesParsedContents<'buf>>),
Payload(&'buf [u8]),
}
#[derive(Debug)]
#[allow(missing_docs)]
pub enum DsmTrickMode {
FastForward {
field_id: u8,
intra_slice_refresh: bool,
frequency_truncation: FrequencyTruncationCoefficientSelection,
},
SlowMotion {
rep_cntrl: u8,
},
FreezeFrame {
field_id: u8,
reserved: u8,
},
FastReverse {
field_id: u8,
intra_slice_refresh: bool,
frequency_truncation: FrequencyTruncationCoefficientSelection,
},
SlowReverse {
rep_cntrl: u8,
},
Reserved {
reserved: u8,
},
}
#[derive(Debug)]
pub enum FrequencyTruncationCoefficientSelection {
DCNonZero,
FirstThreeNonZero,
FirstSixNonZero,
AllMaybeNonZero,
}
impl FrequencyTruncationCoefficientSelection {
fn from_id(id: u8) -> Self {
match id {
0b00 => FrequencyTruncationCoefficientSelection::DCNonZero,
0b01 => FrequencyTruncationCoefficientSelection::FirstThreeNonZero,
0b10 => FrequencyTruncationCoefficientSelection::FirstSixNonZero,
0b11 => FrequencyTruncationCoefficientSelection::AllMaybeNonZero,
_ => panic!("Invalid id {}", id),
}
}
}
#[derive(Debug)]
pub struct EsRate(u32);
impl EsRate {
const RATE_BYTES_PER_SECOND: u32 = 50;
pub fn new(es_rate: u32) -> EsRate {
assert!(es_rate < 1 << 22);
EsRate(es_rate)
}
pub fn bytes_per_second(&self) -> u32 {
self.0 * Self::RATE_BYTES_PER_SECOND
}
}
impl From<EsRate> for u32 {
fn from(r: EsRate) -> Self {
r.0
}
}
#[derive(Debug)] pub struct PesExtension<'buf> {
_buf: &'buf [u8],
}
pub struct PesParsedContents<'buf> {
buf: &'buf [u8],
}
impl<'buf> PesParsedContents<'buf> {
pub fn from_bytes(buf: &'buf [u8]) -> Option<PesParsedContents<'buf>> {
if buf.len() < Self::FIXED_HEADER_SIZE {
warn!(
"buf not large enough to hold PES parsed header: {} bytes",
buf.len()
);
return None;
}
let check_bits = buf[0] >> 6;
if check_bits != 0b10 {
warn!(
"unexpected check-bits value {:#b}, expected 0b10",
check_bits
);
return None;
}
let contents = PesParsedContents { buf };
if (Self::FIXED_HEADER_SIZE + contents.pes_header_data_len()) > buf.len() {
warn!(
"reported PES header length {} does not fit within remaining buffer length {}",
contents.pes_header_data_len(),
buf.len() - Self::FIXED_HEADER_SIZE,
);
return None;
}
if contents.pes_crc_end() > (Self::FIXED_HEADER_SIZE + contents.pes_header_data_len()) {
warn!(
"calculated PES header data length {} does not fit with in recorded PES_header_length {}",
contents.pes_crc_end() - Self::FIXED_HEADER_SIZE,
contents.pes_header_data_len(),
);
return None;
}
Some(contents)
}
pub fn pes_priority(&self) -> u8 {
self.buf[0] >> 3 & 1
}
pub fn data_alignment_indicator(&self) -> DataAlignment {
if self.buf[0] & 0b100 != 0 {
DataAlignment::Aligned
} else {
DataAlignment::NotAligned
}
}
pub fn copyright(&self) -> Copyright {
if self.buf[0] & 0b10 != 0 {
Copyright::Undefined
} else {
Copyright::Protected
}
}
pub fn original_or_copy(&self) -> OriginalOrCopy {
if self.buf[0] & 0b1 != 0 {
OriginalOrCopy::Original
} else {
OriginalOrCopy::Copy
}
}
fn pts_dts_flags(&self) -> u8 {
self.buf[1] >> 6
}
fn escr_flag(&self) -> bool {
self.buf[1] >> 5 & 1 != 0
}
fn esrate_flag(&self) -> bool {
self.buf[1] >> 4 & 1 != 0
}
fn dsm_trick_mode_flag(&self) -> bool {
self.buf[1] >> 3 & 1 != 0
}
fn additional_copy_info_flag(&self) -> bool {
self.buf[1] >> 2 & 1 != 0
}
fn pes_crc_flag(&self) -> bool {
self.buf[1] >> 1 & 1 != 0
}
fn pes_extension_flag(&self) -> bool {
self.buf[1] & 1 != 0
}
fn pes_header_data_len(&self) -> usize {
self.buf[2] as usize
}
fn header_slice(&self, from: usize, to: usize) -> Result<&'buf [u8], PesError> {
if to > self.pes_header_data_len() + Self::FIXED_HEADER_SIZE {
Err(PesError::NotEnoughData {
requested: to,
available: self.pes_header_data_len() + Self::FIXED_HEADER_SIZE,
})
} else if to > self.buf.len() {
Err(PesError::NotEnoughData {
requested: to,
available: self.buf.len(),
})
} else {
Ok(&self.buf[from..to])
}
}
const FIXED_HEADER_SIZE: usize = 3;
const TIMESTAMP_SIZE: usize = 5;
fn pts_dts_end(&self) -> usize {
match self.pts_dts_flags() {
0b00 => Self::FIXED_HEADER_SIZE,
0b01 => Self::FIXED_HEADER_SIZE,
0b10 => Self::FIXED_HEADER_SIZE + Self::TIMESTAMP_SIZE,
0b11 => Self::FIXED_HEADER_SIZE + Self::TIMESTAMP_SIZE * 2,
v => panic!("unexpected value {}", v),
}
}
pub fn pts_dts(&self) -> Result<PtsDts, PesError> {
match self.pts_dts_flags() {
0b00 => Err(PesError::FieldNotPresent),
0b01 => Err(PesError::PtsDtsFlagsInvalid),
0b10 => self
.header_slice(Self::FIXED_HEADER_SIZE, self.pts_dts_end())
.map(|s| PtsDts::PtsOnly(Timestamp::from_bytes(s))),
0b11 => self
.header_slice(Self::FIXED_HEADER_SIZE, self.pts_dts_end())
.map(|s| PtsDts::Both {
pts: Timestamp::from_bytes(&s[..Self::TIMESTAMP_SIZE]),
dts: Timestamp::from_bytes(&s[Self::TIMESTAMP_SIZE..]),
}),
v => panic!("unexpected value {}", v),
}
}
const ESCR_SIZE: usize = 6;
pub fn escr(&self) -> Result<ClockRef, PesError> {
if self.escr_flag() {
self.header_slice(self.pts_dts_end(), self.pts_dts_end() + Self::ESCR_SIZE)
.map(|s| {
let base = u64::from(s[0] & 0b0011_1000) << 27
| u64::from(s[0] & 0b0000_0011) << 28
| u64::from(s[1]) << 20
| u64::from(s[2] & 0b1111_1000) << 12
| u64::from(s[2] & 0b0000_0011) << 13
| u64::from(s[3]) << 5
| u64::from(s[4] & 0b1111_1000) >> 3;
let extension =
u16::from(s[4] & 0b0000_0011) << 7 | u16::from(s[5] & 0b1111_1110) >> 1;
ClockRef::from_parts(base, extension)
})
} else {
Err(PesError::FieldNotPresent)
}
}
fn escr_end(&self) -> usize {
self.pts_dts_end() + if self.escr_flag() { Self::ESCR_SIZE } else { 0 }
}
const ES_RATE_SIZE: usize = 3;
pub fn es_rate(&self) -> Result<EsRate, PesError> {
if self.esrate_flag() {
self.header_slice(self.escr_end(), self.escr_end() + Self::ES_RATE_SIZE)
.map(|s| {
EsRate::new(
u32::from(s[0] & 0b0111_1111) << 15
| u32::from(s[1]) << 7
| u32::from(s[2] & 0b1111_1110) >> 1,
)
})
} else {
Err(PesError::FieldNotPresent)
}
}
fn es_rate_end(&self) -> usize {
self.escr_end()
+ if self.esrate_flag() {
Self::ES_RATE_SIZE
} else {
0
}
}
const DSM_TRICK_MODE_SIZE: usize = 1;
pub fn dsm_trick_mode(&self) -> Result<DsmTrickMode, PesError> {
if self.dsm_trick_mode_flag() {
self.header_slice(
self.es_rate_end(),
self.es_rate_end() + Self::DSM_TRICK_MODE_SIZE,
)
.map(|s| {
let trick_mode_control = s[0] >> 5;
let trick_mode_data = s[0] & 0b0001_1111;
match trick_mode_control {
0b000 => DsmTrickMode::FastForward {
field_id: trick_mode_data >> 3,
intra_slice_refresh: (trick_mode_data & 0b100) != 0,
frequency_truncation: FrequencyTruncationCoefficientSelection::from_id(
trick_mode_data & 0b11,
),
},
0b001 => DsmTrickMode::SlowMotion {
rep_cntrl: trick_mode_control,
},
0b010 => DsmTrickMode::FreezeFrame {
field_id: trick_mode_data >> 3,
reserved: trick_mode_data & 0b111,
},
0b011 => DsmTrickMode::FastReverse {
field_id: trick_mode_data >> 3,
intra_slice_refresh: (trick_mode_data & 0b100) != 0,
frequency_truncation: FrequencyTruncationCoefficientSelection::from_id(
trick_mode_data & 0b11,
),
},
0b100 => DsmTrickMode::SlowReverse {
rep_cntrl: trick_mode_control,
},
_ => DsmTrickMode::Reserved {
reserved: trick_mode_control,
},
}
})
} else {
Err(PesError::FieldNotPresent)
}
}
fn dsm_trick_mode_end(&self) -> usize {
self.es_rate_end()
+ if self.dsm_trick_mode_flag() {
Self::DSM_TRICK_MODE_SIZE
} else {
0
}
}
const ADDITIONAL_COPY_INFO_SIZE: usize = 1;
pub fn additional_copy_info(&self) -> Result<u8, PesError> {
if self.additional_copy_info_flag() {
self.header_slice(
self.dsm_trick_mode_end(),
self.dsm_trick_mode_end() + Self::ADDITIONAL_COPY_INFO_SIZE,
)
.and_then(|s| {
if s[0] & 0b1000_0000 == 0 {
Err(PesError::MarkerBitNotSet)
} else {
Ok(s[0] & 0b0111_1111)
}
})
} else {
Err(PesError::FieldNotPresent)
}
}
fn additional_copy_info_end(&self) -> usize {
self.dsm_trick_mode_end()
+ if self.additional_copy_info_flag() {
Self::ADDITIONAL_COPY_INFO_SIZE
} else {
0
}
}
const PREVIOUS_PES_PACKET_CRC_SIZE: usize = 2;
pub fn previous_pes_packet_crc(&self) -> Result<u16, PesError> {
if self.pes_crc_flag() {
self.header_slice(
self.additional_copy_info_end(),
self.additional_copy_info_end() + Self::PREVIOUS_PES_PACKET_CRC_SIZE,
)
.map(|s| u16::from(s[0]) << 8 | u16::from(s[1]))
} else {
Err(PesError::FieldNotPresent)
}
}
fn pes_crc_end(&self) -> usize {
self.additional_copy_info_end()
+ if self.pes_crc_flag() {
Self::PREVIOUS_PES_PACKET_CRC_SIZE
} else {
0
}
}
pub fn pes_extension(&self) -> Result<PesExtension<'buf>, PesError> {
if self.pes_extension_flag() {
self.header_slice(
self.pes_crc_end(),
self.pes_header_data_len() + Self::FIXED_HEADER_SIZE,
)
.map(|s| PesExtension { _buf: s })
} else {
Err(PesError::FieldNotPresent)
}
}
pub fn payload(&self) -> &'buf [u8] {
&self.buf[Self::FIXED_HEADER_SIZE + self.pes_header_data_len()..]
}
}
impl<'buf> fmt::Debug for PesParsedContents<'buf> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let mut s = f.debug_struct("PesParsedContents");
s.field("pes_priority", &self.pes_priority())
.field("data_alignment_indicator", &self.data_alignment_indicator())
.field("copyright", &self.copyright())
.field("original_or_copy", &self.original_or_copy())
.field("pts_dts", &self.pts_dts());
if let Ok(escr) = self.escr() {
s.field("escr", &escr);
}
if let Ok(es_rate) = self.es_rate() {
s.field("es_rate", &es_rate);
}
if let Ok(dsm_trick_mode) = self.dsm_trick_mode() {
s.field("dsm_trick_mode", &dsm_trick_mode);
}
if let Ok(additional_copy_info) = self.additional_copy_info() {
s.field("additional_copy_info", &additional_copy_info);
}
if let Ok(previous_pes_packet_crc) = self.previous_pes_packet_crc() {
s.field("previous_pes_packet_crc", &previous_pes_packet_crc);
}
if let Ok(pes_extension) = self.pes_extension() {
s.field("pes_extension", &pes_extension);
}
s.finish()
}
}
#[derive(PartialEq, Eq, Debug)]
pub enum TimestampError {
IncorrectPrefixBits {
expected: u8,
actual: u8,
},
MarkerBitNotSet {
bit_number: u8,
},
}
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub struct Timestamp {
val: u64,
}
impl Timestamp {
pub const MAX: Timestamp = Timestamp { val: (1 << 33) - 1 };
pub const TIMEBASE: u64 = 90_000;
pub fn from_pts_bytes(buf: &[u8]) -> Result<Timestamp, TimestampError> {
Timestamp::check_prefix(buf, 0b0010)?;
Timestamp::from_bytes(buf)
}
pub fn from_dts_bytes(buf: &[u8]) -> Result<Timestamp, TimestampError> {
Timestamp::check_prefix(buf, 0b0001)?;
Timestamp::from_bytes(buf)
}
fn check_prefix(buf: &[u8], expected: u8) -> Result<(), TimestampError> {
assert!(expected <= 0b1111);
let actual = buf[0] >> 4;
if actual == expected {
Ok(())
} else {
Err(TimestampError::IncorrectPrefixBits { expected, actual })
}
}
fn check_marker_bit(buf: &[u8], bit_number: u8) -> Result<(), TimestampError> {
let byte_index = bit_number / 8;
let bit_index = bit_number % 8;
let bit_mask = 1 << (7 - bit_index);
if buf[byte_index as usize] & bit_mask != 0 {
Ok(())
} else {
Err(TimestampError::MarkerBitNotSet { bit_number })
}
}
pub fn from_bytes(buf: &[u8]) -> Result<Timestamp, TimestampError> {
Timestamp::check_marker_bit(buf, 7)?;
Timestamp::check_marker_bit(buf, 23)?;
Timestamp::check_marker_bit(buf, 39)?;
Ok(Timestamp {
val: (u64::from(buf[0] & 0b0000_1110) << 29)
| u64::from(buf[1]) << 22
| (u64::from(buf[2] & 0b1111_1110) << 14)
| u64::from(buf[3]) << 7
| u64::from(buf[4]) >> 1,
})
}
pub fn from_u64(val: u64) -> Timestamp {
assert!(val < 1 << 34);
Timestamp { val }
}
pub fn value(self) -> u64 {
self.val
}
pub fn likely_wrapped_since(self, since: Self) -> bool {
self.val <= since.val && since.val - self.val > Self::MAX.val / 2
}
}
#[derive(PartialEq, Eq, Debug)]
pub enum PtsDts {
None,
PtsOnly(Result<Timestamp, TimestampError>),
Invalid,
Both {
pts: Result<Timestamp, TimestampError>,
dts: Result<Timestamp, TimestampError>,
},
}
#[derive(PartialEq, Eq, Debug)]
pub enum DataAlignment {
Aligned,
NotAligned,
}
#[derive(PartialEq, Eq, Debug)]
pub enum Copyright {
Protected,
Undefined,
}
#[derive(PartialEq, Eq, Debug)]
pub enum OriginalOrCopy {
Original,
Copy,
}
#[cfg(test)]
mod test {
use crate::demultiplex;
use crate::demultiplex::PacketFilter;
use crate::packet;
use crate::pes;
use crate::pes::{
Copyright, DataAlignment, EsRate, FrequencyTruncationCoefficientSelection, OriginalOrCopy,
PesContents, PesError, PesHeader, PesLength, PesParsedContents, StreamId,
};
use assert_matches::assert_matches;
use bitstream_io::{BigEndian, BitWrite};
use bitstream_io::{BitWriter, BE};
use hex_literal::*;
use std::io;
use std::num::NonZeroU16;
packet_filter_switch! {
NullFilterSwitch<NullDemuxContext> {
Nul: demultiplex::NullPacketFilter<NullDemuxContext>,
}
}
demux_context!(NullDemuxContext, NullFilterSwitch);
impl NullDemuxContext {
fn do_construct(&mut self, _req: demultiplex::FilterRequest<'_, '_>) -> NullFilterSwitch {
NullFilterSwitch::Nul(demultiplex::NullPacketFilter::default())
}
}
fn make_test_data<F>(builder: F) -> Vec<u8>
where
F: Fn(&mut BitWriter<Vec<u8>, BE>) -> Result<(), io::Error>,
{
let data: Vec<u8> = Vec::new();
let mut w = BitWriter::endian(data, BigEndian);
builder(&mut w).unwrap();
w.into_writer()
}
fn write_ts(w: &mut BitWriter<Vec<u8>, BE>, ts: u64, prefix: u8) -> Result<(), io::Error> {
assert!(
ts < 1u64 << 33,
"ts value too large {:#x} >= {:#x}",
ts,
1u64 << 33
);
w.write(4, prefix & 0b1111)?;
w.write(3, (ts & 0b1_1100_0000_0000_0000_0000_0000_0000_0000) >> 30)?;
w.write(1, 1)?; w.write(15, (ts & 0b0_0011_1111_1111_1111_1000_0000_0000_0000) >> 15)?;
w.write(1, 1)?; w.write(15, ts & 0b0_0000_0000_0000_0000_0111_1111_1111_1111)?;
w.write(1, 1) }
fn write_escr(
w: &mut BitWriter<Vec<u8>, BE>,
base: u64,
extension: u16,
) -> Result<(), io::Error> {
assert!(
base < 1u64 << 33,
"base value too large {:#x} >= {:#x}",
base,
1u64 << 33
);
assert!(
extension < 1u16 << 9,
"extension value too large {:#x} >= {:#x}",
base,
1u16 << 9
);
w.write(2, 0b11)?; w.write(
3,
(base & 0b1_1100_0000_0000_0000_0000_0000_0000_0000) >> 30,
)?;
w.write(1, 1)?; w.write(
15,
(base & 0b0_0011_1111_1111_1111_1000_0000_0000_0000) >> 15,
)?;
w.write(1, 1)?; w.write(15, base & 0b0_0000_0000_0000_0000_0111_1111_1111_1111)?;
w.write(1, 1)?; w.write(9, extension)?;
w.write(1, 1) }
fn write_es_rate(w: &mut BitWriter<Vec<u8>, BE>, rate: u32) -> Result<(), io::Error> {
assert!(
rate < 1u32 << 22,
"rate value too large {:#x} >= {:#x}",
rate,
1u32 << 22
);
w.write(1, 1)?; w.write(22, rate)?;
w.write(1, 1) }
#[test]
fn parse_header() {
let data = make_test_data(|w| {
w.write(24, 1)?; w.write(8, 7)?; w.write(16, 7)?;
w.write(2, 0b10)?; w.write(2, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; w.write(1, 0)?; w.write(2, 0b10)?; w.write(1, 1)?; w.write(1, 1)?; w.write(1, 1)?; w.write(1, 1)?; w.write(1, 1)?; w.write(1, 0)?; let pes_header_length = 5 + 6 + 3 + 1 + 1 + 2; w.write(8, pes_header_length)?; write_ts(w, 123456789, 0b0010)?; write_escr(w, 0b111111111111111111111111111111111, 234)?;
write_es_rate(w, 1234567)?;
w.write(3, 0b00)?; w.write(2, 2)?; w.write_bit(true)?; w.write(2, 0)?;
w.write(1, 1)?; w.write(7, 123)?; w.write(16, 54321) });
let header = pes::PesHeader::from_bytes(&data[..]).unwrap();
assert_eq!(pes::StreamId(7), header.stream_id());
assert_eq!(
header.pes_packet_length(),
PesLength::Bounded(NonZeroU16::new(7).unwrap())
);
match header.contents() {
pes::PesContents::Parsed(parsed_contents) => {
let p =
parsed_contents.expect("expected PesContents::Parsed(Some(_)) but was None");
assert_eq!(0, p.pes_priority());
assert_eq!(pes::DataAlignment::Aligned, p.data_alignment_indicator());
assert_eq!(pes::Copyright::Protected, p.copyright());
assert_eq!(pes::OriginalOrCopy::Copy, p.original_or_copy());
match p.pts_dts() {
Ok(pes::PtsDts::PtsOnly(Ok(ts))) => {
let a = ts.value();
let b = 123456789;
assert_eq!(
a, b,
"timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
a, b
);
}
_ => panic!("expected PtsDts::PtsOnly, got{}", stringify!(_)),
}
assert_eq!(p.payload().len(), 0);
match p.escr() {
Ok(escr) => {
assert_eq!(0b111111111111111111111111111111111, escr.base());
assert_eq!(234, escr.extension());
}
e => panic!("expected escr value, got {:?}", e),
}
assert_matches!(
p.dsm_trick_mode(),
Ok(pes::DsmTrickMode::FastForward {
field_id: 2,
intra_slice_refresh: true,
frequency_truncation:
pes::FrequencyTruncationCoefficientSelection::DCNonZero,
})
);
assert_matches!(p.es_rate(), Ok(pes::EsRate(1234567)));
assert_matches!(p.additional_copy_info(), Ok(123));
assert_matches!(p.previous_pes_packet_crc(), Ok(54321));
}
pes::PesContents::Payload(_) => {
panic!("expected PesContents::Parsed, got PesContents::Payload")
}
}
}
#[test]
fn pts() {
let pts_prefix = 0b0010;
let pts = make_test_data(|w| {
write_ts(w, 0b1_0101_0101_0101_0101_0101_0101_0101_0101, pts_prefix)
});
let a = pes::Timestamp::from_pts_bytes(&pts[..]).unwrap().value();
let b = 0b1_0101_0101_0101_0101_0101_0101_0101_0101;
assert_eq!(
a, b,
"timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
a, b
);
}
#[test]
fn dts() {
let pts_prefix = 0b0001;
let pts = make_test_data(|w| {
write_ts(w, 0b0_1010_1010_1010_1010_1010_1010_1010_1010, pts_prefix)
});
let a = pes::Timestamp::from_dts_bytes(&pts[..]).unwrap().value();
let b = 0b0_1010_1010_1010_1010_1010_1010_1010_1010;
assert_eq!(
a, b,
"timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
a, b
);
}
#[test]
fn timestamp_ones() {
let pts_prefix = 0b0010;
let pts = make_test_data(|w| {
write_ts(w, 0b1_1111_1111_1111_1111_1111_1111_1111_1111, pts_prefix)
});
let a = pes::Timestamp::from_pts_bytes(&pts[..]).unwrap().value();
let b = 0b1_1111_1111_1111_1111_1111_1111_1111_1111;
assert_eq!(
a, b,
"timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
a, b
);
}
#[test]
fn timestamp_zeros() {
let pts_prefix = 0b0010;
let pts = make_test_data(|w| {
write_ts(w, 0b0_0000_0000_0000_0000_0000_0000_0000_0000, pts_prefix)
});
let a = pes::Timestamp::from_pts_bytes(&pts[..]).unwrap().value();
let b = 0b0_0000_0000_0000_0000_0000_0000_0000_0000;
assert_eq!(
a, b,
"timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
a, b
);
}
#[test]
fn timestamp_wrap() {
let zero: pes::Timestamp = pes::Timestamp::from_u64(0);
assert!(zero.likely_wrapped_since(pes::Timestamp::MAX));
assert!(!pes::Timestamp::MAX.likely_wrapped_since(zero));
assert!(!zero.likely_wrapped_since(pes::Timestamp::from_u64(1)));
assert!(!pes::Timestamp::from_u64(1).likely_wrapped_since(zero));
}
#[test]
fn timestamp_bad_prefix() {
let pts_prefix = 0b0010;
let mut pts = make_test_data(|w| write_ts(w, 1234, pts_prefix));
pts[0] |= 0b10000000;
assert_matches!(
pes::Timestamp::from_pts_bytes(&pts[..]),
Err(pes::TimestampError::IncorrectPrefixBits {
expected: 0b0010,
actual: 0b1010
})
)
}
#[test]
fn timestamp_bad_marker() {
let pts_prefix = 0b0010;
let mut pts = make_test_data(|w| write_ts(w, 1234, pts_prefix));
pts[0] &= 0b11111110;
assert_matches!(
pes::Timestamp::from_pts_bytes(&pts[..]),
Err(pes::TimestampError::MarkerBitNotSet { bit_number: 7 })
)
}
struct MockState {
start_stream_called: bool,
begin_packet_called: bool,
continuity_error_called: bool,
}
impl MockState {
fn new() -> MockState {
MockState {
start_stream_called: false,
begin_packet_called: false,
continuity_error_called: false,
}
}
}
struct MockElementaryStreamConsumer {
state: std::rc::Rc<std::cell::RefCell<MockState>>,
}
impl MockElementaryStreamConsumer {
fn new(state: std::rc::Rc<std::cell::RefCell<MockState>>) -> MockElementaryStreamConsumer {
MockElementaryStreamConsumer { state }
}
}
impl pes::ElementaryStreamConsumer<NullDemuxContext> for MockElementaryStreamConsumer {
fn start_stream(&mut self, _ctx: &mut NullDemuxContext) {
self.state.borrow_mut().start_stream_called = true;
}
fn begin_packet(&mut self, _ctx: &mut NullDemuxContext, _header: pes::PesHeader<'_>) {
self.state.borrow_mut().begin_packet_called = true;
}
fn continue_packet(&mut self, _ctx: &mut NullDemuxContext, _data: &[u8]) {}
fn end_packet(&mut self, _ctx: &mut NullDemuxContext) {}
fn continuity_error(&mut self, _ctx: &mut NullDemuxContext) {
self.state.borrow_mut().continuity_error_called = true;
}
}
#[test]
fn pes_packet_consumer() {
let state = std::rc::Rc::new(std::cell::RefCell::new(MockState::new()));
let mock = MockElementaryStreamConsumer::new(state.clone());
let mut pes_filter = pes::PesPacketFilter::new(mock);
let buf = hex!("4741F510000001E0000084C00A355DDD11B1155DDBF5910000000109100000000167640029AD843FFFC21FFFE10FFFF087FFF843FFFC21FFFE10FFFFFFFFFFFFFFFF087FFFFFFFFFFFFFFF2CC501E0113F780A1010101F00000303E80000C350940000000168FF3CB0000001060001C006018401103A0408D2BA80000050204E95D400000302040AB500314454473141FEFF53040000C815540DF04F77FFFFFFFFFFFFFFFFFFFF80000000016588800005DB001008673FC365F48EAE");
let pk = packet::Packet::new(&buf[..]);
let mut ctx = NullDemuxContext::new();
pes_filter.consume(&mut ctx, &pk);
{
let state = state.borrow();
assert!(state.start_stream_called);
assert!(state.begin_packet_called);
assert!(!state.continuity_error_called);
}
let pk = packet::Packet::new(&buf[..]);
pes_filter.consume(&mut ctx, &pk);
{
let state = state.borrow();
assert!(state.continuity_error_called);
}
}
#[test]
fn header_length_doesnt_fit() {
let data = make_test_data(|w| {
w.write(24, 1)?; w.write(8, 7)?; w.write(16, 7)?;
w.write(2, 0b10)?; w.write(2, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; w.write(1, 0)?; w.write(2, 0b00)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; let pes_header_length = 2; w.write(8, pes_header_length)?;
w.write(8, 1) });
let header = pes::PesHeader::from_bytes(&data[..]).unwrap();
assert!(matches!(header.contents(), pes::PesContents::Parsed(None)));
}
#[test]
fn pes_header_data_length_too_short() {
let data = make_test_data(|w| {
w.write(24, 1)?; w.write(8, 7)?; w.write(16, 7)?;
w.write(2, 0b10)?; w.write(2, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; w.write(1, 0)?; w.write(2, 0b00)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; let pes_header_length = 1; w.write(8, pes_header_length)?;
w.write(8, 2) });
let header = pes::PesHeader::from_bytes(&data[..]).unwrap();
assert!(matches!(header.contents(), pes::PesContents::Parsed(None)));
}
#[test]
fn should_convert_u8_to_stream_id_without_panic() {
for i in 0..=255 {
let sid = StreamId(i);
let _ = format!("{:?}", sid);
}
}
#[test]
fn should_reject_too_short_pes_header() {
assert!(PesHeader::from_bytes(&[0, 0, 1, 0, 0]).is_none());
}
#[test]
fn should_reject_bad_start_code_prefix() {
assert!(PesHeader::from_bytes(&[0, 0, 2, 0, 0, 0]).is_none());
assert!(PesHeader::from_bytes(&[0, 1, 1, 0, 0, 0]).is_none());
assert!(PesHeader::from_bytes(&[1, 0, 1, 0, 0, 0]).is_none());
}
#[test]
fn should_report_unbounded_length() {
let header = PesHeader::from_bytes(&[0, 0, 1, 0, 0, 0]).unwrap();
assert_matches!(header.pes_packet_length(), PesLength::Unbounded);
}
#[test]
fn should_produce_unparsed_payload() {
let header = PesHeader::from_bytes(&[0, 0, 1, 0b1011_1110, 0, 0, 47]).unwrap();
assert_eq!(header.stream_id(), StreamId::PADDING_STREAM);
match header.contents() {
PesContents::Parsed(_) => panic!("expected PesContents::Payload"),
PesContents::Payload(_) => { }
}
}
#[test]
fn should_convert_freq_truncation_from_u8() {
assert_matches!(
FrequencyTruncationCoefficientSelection::from_id(0),
FrequencyTruncationCoefficientSelection::DCNonZero
);
assert_matches!(
FrequencyTruncationCoefficientSelection::from_id(1),
FrequencyTruncationCoefficientSelection::FirstThreeNonZero
);
assert_matches!(
FrequencyTruncationCoefficientSelection::from_id(2),
FrequencyTruncationCoefficientSelection::FirstSixNonZero
);
assert_matches!(
FrequencyTruncationCoefficientSelection::from_id(3),
FrequencyTruncationCoefficientSelection::AllMaybeNonZero
);
}
#[test]
fn should_convert_es_rate() {
for r in 0..1 << 22 {
let es_rate = EsRate::new(r);
assert_eq!(es_rate.bytes_per_second(), r * 50);
assert_eq!(u32::from(es_rate), r);
}
}
#[test]
fn should_reject_parsed_pes_too_short_for_header() {
assert!(PesParsedContents::from_bytes(&[0b10000000, 0]).is_none());
}
#[test]
fn should_reject_parsed_pes_too_short_for_payload() {
assert!(PesParsedContents::from_bytes(&[0b10000000, 0, 1]).is_none());
}
#[test]
fn should_reject_parsed_pes_bad_check_bits() {
assert!(PesParsedContents::from_bytes(&[0b01000000, 0, 1]).is_none());
}
#[test]
fn should_report_zero_flags() {
let contents = PesParsedContents::from_bytes(&[0b10000000, 0, 0]).unwrap();
assert_eq!(
contents.data_alignment_indicator(),
DataAlignment::NotAligned
);
assert_eq!(contents.copyright(), Copyright::Protected);
assert_eq!(contents.original_or_copy(), OriginalOrCopy::Copy);
assert_matches!(contents.escr(), Err(PesError::FieldNotPresent));
assert_matches!(contents.es_rate(), Err(PesError::FieldNotPresent));
assert_matches!(contents.pes_extension(), Err(PesError::FieldNotPresent));
assert_matches!(
contents.previous_pes_packet_crc(),
Err(PesError::FieldNotPresent)
);
}
#[test]
fn should_report_one_flags() {
let contents = PesParsedContents::from_bytes(&[0b10111111, 0, 0]).unwrap();
assert_eq!(contents.data_alignment_indicator(), DataAlignment::Aligned);
assert_eq!(contents.copyright(), Copyright::Undefined);
assert_eq!(contents.original_or_copy(), OriginalOrCopy::Original);
}
}