1use crate::demultiplex;
15use crate::packet;
16use crate::packet::ClockRef;
17use log::warn;
18use std::fmt::Formatter;
19use std::marker;
20use std::{fmt, num};
21
22pub trait ElementaryStreamConsumer<Ctx> {
29 fn start_stream(&mut self, ctx: &mut Ctx);
31
32 fn begin_packet(&mut self, ctx: &mut Ctx, header: PesHeader<'_>);
35
36 fn continue_packet(&mut self, ctx: &mut Ctx, data: &[u8]);
39
40 fn end_packet(&mut self, ctx: &mut Ctx);
42
43 fn continuity_error(&mut self, ctx: &mut Ctx);
46
47 }
49
50#[derive(Debug, PartialEq)]
51enum PesState {
52 Begin,
53 Started,
54 IgnoreRest,
55}
56
57pub struct PesPacketFilter<Ctx, E>
63where
64 Ctx: demultiplex::DemuxContext,
65 E: ElementaryStreamConsumer<Ctx>,
66{
67 stream_consumer: E,
68 ccounter: Option<packet::ContinuityCounter>,
69 state: PesState,
70 phantom: marker::PhantomData<Ctx>,
71}
72impl<Ctx, E> PesPacketFilter<Ctx, E>
73where
74 Ctx: demultiplex::DemuxContext,
75 E: ElementaryStreamConsumer<Ctx>,
76{
77 pub fn new(stream_consumer: E) -> PesPacketFilter<Ctx, E> {
80 PesPacketFilter {
81 stream_consumer,
82 ccounter: None,
83 state: PesState::Begin,
84 phantom: marker::PhantomData,
85 }
86 }
87
88 fn is_continuous(&self, packet: &packet::Packet<'_>) -> bool {
89 if let Some(cc) = self.ccounter {
90 let result = if packet.adaptation_control().has_payload() {
92 packet.continuity_counter().follows(cc)
93 } else {
94 packet.continuity_counter().count() == cc.count()
95 };
96 if !result {
97 }
99 result
100 } else {
101 true
102 }
103 }
104}
105impl<Ctx, E> demultiplex::PacketFilter for PesPacketFilter<Ctx, E>
106where
107 Ctx: demultiplex::DemuxContext,
108 E: ElementaryStreamConsumer<Ctx>,
109{
110 type Ctx = Ctx;
111
112 #[inline(always)]
113 fn consume(&mut self, ctx: &mut Self::Ctx, packet: &packet::Packet<'_>) {
114 if !self.is_continuous(packet) {
115 self.stream_consumer.continuity_error(ctx);
116 self.state = PesState::IgnoreRest;
117 }
118 self.ccounter = Some(packet.continuity_counter());
119 if packet.payload_unit_start_indicator() {
120 if self.state == PesState::Started {
121 self.stream_consumer.end_packet(ctx);
122 } else {
123 if self.state == PesState::Begin {
127 self.stream_consumer.start_stream(ctx);
128 }
129 self.state = PesState::Started;
130 }
131 if let Some(payload) = packet.payload() {
132 if let Some(header) = PesHeader::from_bytes(payload) {
133 self.stream_consumer.begin_packet(ctx, header);
134 }
135 }
136 } else {
137 match self.state {
138 PesState::Started => {
139 if let Some(payload) = packet.payload() {
140 if !payload.is_empty() {
141 self.stream_consumer.continue_packet(ctx, payload);
142 }
143 }
144 }
145 PesState::Begin => {
146 warn!("{:?}: Ignoring elementary stream content without a payload_start_indicator", packet.pid());
147 }
148 PesState::IgnoreRest => (),
149 }
150 }
151 }
152}
153
154#[derive(Debug, PartialEq)]
156pub enum PesLength {
157 Unbounded,
161 Bounded(num::NonZeroU16),
164}
165
166#[derive(PartialEq, Eq)]
170pub struct StreamId(u8);
171
172impl StreamId {
173 pub const PROGRAM_STREAM_MAP: StreamId = StreamId(0b1011_1100);
175 pub const PRIVATE_STREAM1: StreamId = StreamId(0b1011_1101);
177 pub const PADDING_STREAM: StreamId = StreamId(0b1011_1110);
179 pub const PRIVATE_STREAM2: StreamId = StreamId(0b1011_1111);
181 pub const ECM_STREAM: StreamId = StreamId(0b1111_0000);
188 pub const EMM_STREAM: StreamId = StreamId(0b1111_0001);
190 pub const DSM_CC: StreamId = StreamId(0b1111_0010);
192 pub const ISO_13522_STREAM: StreamId = StreamId(0b1111_0011);
194 pub const H222_1_TYPE_A: StreamId = StreamId(0b1111_0100);
196 pub const H222_1_TYPE_B: StreamId = StreamId(0b1111_0101);
198 pub const H222_1_TYPE_C: StreamId = StreamId(0b1111_0110);
200 pub const H222_1_TYPE_D: StreamId = StreamId(0b1111_0111);
202 pub const H222_1_TYPE_E: StreamId = StreamId(0b1111_1000);
204 pub const ANCILLARY_STREAM: StreamId = StreamId(0b1111_1001);
206 pub const SL_PACKETIZED_STREAM: StreamId = StreamId(0b1111_1010);
208 pub const FLEX_MUX_STREAM: StreamId = StreamId(0b1111_1011);
210 pub const METADATA_STREAM: StreamId = StreamId(0b1111_1100);
212 pub const EXTENDED_STREAM_ID: StreamId = StreamId(0b1111_1101);
214 pub const RESERVED_DATA_STREAM: StreamId = StreamId(0b1111_1110);
216 pub const PROGRAM_STREAM_DIRECTORY: StreamId = StreamId(0b1111_1111);
218
219 fn is_parsed(&self) -> bool {
220 !matches!(
221 *self,
222 StreamId::PROGRAM_STREAM_MAP
223 | StreamId::PADDING_STREAM
224 | StreamId::PRIVATE_STREAM2
225 | StreamId::ECM_STREAM
226 | StreamId::EMM_STREAM
227 | StreamId::PROGRAM_STREAM_DIRECTORY
228 | StreamId::DSM_CC
229 | StreamId::H222_1_TYPE_E
230 )
231 }
232}
233impl fmt::Debug for StreamId {
234 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
235 match *self {
236 StreamId::PROGRAM_STREAM_MAP => f.write_str("PROGRAM_STREAM_MAP"),
237 StreamId::PRIVATE_STREAM1 => f.write_str("PRIVATE_STREAM1"),
238 StreamId::PADDING_STREAM => f.write_str("PADDING_STREAM"),
239 StreamId::PRIVATE_STREAM2 => f.write_str("PRIVATE_STREAM2"),
240 StreamId(0b1100_0000..=0b1101_1111) => {
241 f.write_fmt(format_args!("Audio({})", self.0 & 0b0001_1111))
242 }
243 StreamId(0b1110_0000..=0b1110_1111) => {
244 f.write_fmt(format_args!("Video({})", self.0 & 0b0001_1111))
245 }
246 StreamId::ECM_STREAM => f.write_str("ECM_STREAM"),
247 StreamId::EMM_STREAM => f.write_str("EMM_STREAM"),
248 StreamId::DSM_CC => f.write_str("DSM_CC"),
249 StreamId::ISO_13522_STREAM => f.write_str("ISO_13522_STREAM"),
250 StreamId::H222_1_TYPE_A => f.write_str("H222_1_TYPE_A"),
251 StreamId::H222_1_TYPE_B => f.write_str("H222_1_TYPE_B"),
252 StreamId::H222_1_TYPE_C => f.write_str("H222_1_TYPE_C"),
253 StreamId::H222_1_TYPE_D => f.write_str("H222_1_TYPE_D"),
254 StreamId::H222_1_TYPE_E => f.write_str("H222_1_TYPE_E"),
255 StreamId::ANCILLARY_STREAM => f.write_str("ANCILLARY_STREAM"),
256 StreamId::SL_PACKETIZED_STREAM => f.write_str("SL_PACKETIZED_STREAM"),
257 StreamId::FLEX_MUX_STREAM => f.write_str("FLEX_MUX_STREAM"),
258 StreamId::METADATA_STREAM => f.write_str("METADATA_STREAM"),
259 StreamId::EXTENDED_STREAM_ID => f.write_str("EXTENDED_STREAM_ID"),
260 StreamId::RESERVED_DATA_STREAM => f.write_str("RESERVED_DATA_STREAM"),
261 StreamId::PROGRAM_STREAM_DIRECTORY => f.write_str("PROGRAM_STREAM_DIRECTORY"),
262 _ => f.write_fmt(format_args!("Unknown({})", self.0)),
263 }
264 }
265}
266
267pub struct PesHeader<'buf> {
281 buf: &'buf [u8],
282}
283impl<'buf> PesHeader<'buf> {
284 const FIXED_HEADER_SIZE: usize = 6;
285
286 pub fn from_bytes(buf: &'buf [u8]) -> Option<PesHeader<'buf>> {
294 if buf.len() < Self::FIXED_HEADER_SIZE {
297 warn!("Buffer size {} too small to hold PES header", buf.len());
298 return None;
299 }
300 let packet_start_code_prefix =
301 u32::from(buf[0]) << 16 | u32::from(buf[1]) << 8 | u32::from(buf[2]);
302 if packet_start_code_prefix != 1 {
303 warn!(
304 "invalid packet_start_code_prefix 0x{:06x}, expected 0x000001",
305 packet_start_code_prefix
306 );
307 return None;
308 }
309 Some(PesHeader { buf })
310 }
311
312 pub fn stream_id(&self) -> StreamId {
314 StreamId(self.buf[3])
315 }
316
317 pub fn pes_packet_length(&self) -> PesLength {
320 let len = u16::from(self.buf[4]) << 8 | u16::from(self.buf[5]);
321 match num::NonZeroU16::new(len) {
322 None => PesLength::Unbounded,
323 Some(l) => PesLength::Bounded(l),
324 }
325 }
326
327 pub fn contents(&self) -> PesContents<'buf> {
332 let rest = &self.buf[Self::FIXED_HEADER_SIZE..];
333 if self.stream_id().is_parsed() {
334 PesContents::Parsed(PesParsedContents::from_bytes(rest))
335 } else {
336 PesContents::Payload(rest)
337 }
338 }
339}
340
341#[derive(Debug, PartialEq, Eq)]
343pub enum PesError {
344 FieldNotPresent,
347 PtsDtsFlagsInvalid,
350 NotEnoughData {
352 requested: usize,
354 available: usize,
356 },
357 MarkerBitNotSet,
360}
361
362pub enum PesContents<'buf> {
385 Parsed(Option<PesParsedContents<'buf>>),
387 Payload(&'buf [u8]),
389}
390
391#[derive(Debug)]
396#[allow(missing_docs)]
397pub enum DsmTrickMode {
398 FastForward {
399 field_id: u8,
400 intra_slice_refresh: bool,
401 frequency_truncation: FrequencyTruncationCoefficientSelection,
402 },
403 SlowMotion {
404 rep_cntrl: u8,
405 },
406 FreezeFrame {
407 field_id: u8,
408 reserved: u8,
409 },
410 FastReverse {
411 field_id: u8,
412 intra_slice_refresh: bool,
413 frequency_truncation: FrequencyTruncationCoefficientSelection,
414 },
415 SlowReverse {
416 rep_cntrl: u8,
417 },
418 Reserved {
419 reserved: u8,
420 },
421}
422
423#[derive(Debug)]
426pub enum FrequencyTruncationCoefficientSelection {
427 DCNonZero,
429 FirstThreeNonZero,
431 FirstSixNonZero,
433 AllMaybeNonZero,
435}
436impl FrequencyTruncationCoefficientSelection {
437 fn from_id(id: u8) -> Self {
438 match id {
439 0b00 => FrequencyTruncationCoefficientSelection::DCNonZero,
440 0b01 => FrequencyTruncationCoefficientSelection::FirstThreeNonZero,
441 0b10 => FrequencyTruncationCoefficientSelection::FirstSixNonZero,
442 0b11 => FrequencyTruncationCoefficientSelection::AllMaybeNonZero,
443 _ => panic!("Invalid id {}", id),
444 }
445 }
446}
447
448#[derive(Debug)]
450pub struct EsRate(u32);
451impl EsRate {
452 const RATE_BYTES_PER_SECOND: u32 = 50;
453 pub fn new(es_rate: u32) -> EsRate {
455 assert!(es_rate < 1 << 22);
456 EsRate(es_rate)
457 }
458 pub fn bytes_per_second(&self) -> u32 {
460 self.0 * Self::RATE_BYTES_PER_SECOND
461 }
462}
463impl From<EsRate> for u32 {
464 fn from(r: EsRate) -> Self {
465 r.0
466 }
467}
468
469#[derive(Debug)] pub struct PesExtension<'buf> {
472 _buf: &'buf [u8],
473}
474
475pub struct PesParsedContents<'buf> {
478 buf: &'buf [u8],
479}
480impl<'buf> PesParsedContents<'buf> {
481 pub fn from_bytes(buf: &'buf [u8]) -> Option<PesParsedContents<'buf>> {
489 if buf.len() < Self::FIXED_HEADER_SIZE {
490 warn!(
491 "buf not large enough to hold PES parsed header: {} bytes",
492 buf.len()
493 );
494 return None;
495 }
496 let check_bits = buf[0] >> 6;
497 if check_bits != 0b10 {
498 warn!(
499 "unexpected check-bits value {:#b}, expected 0b10",
500 check_bits
501 );
502 return None;
503 }
504 let contents = PesParsedContents { buf };
505 if (Self::FIXED_HEADER_SIZE + contents.pes_header_data_len()) > buf.len() {
506 warn!(
507 "reported PES header length {} does not fit within remaining buffer length {}",
508 contents.pes_header_data_len(),
509 buf.len() - Self::FIXED_HEADER_SIZE,
510 );
511 return None;
512 }
513 if contents.pes_crc_end() > (Self::FIXED_HEADER_SIZE + contents.pes_header_data_len()) {
514 warn!(
515 "calculated PES header data length {} does not fit with in recorded PES_header_length {}",
516 contents.pes_crc_end() - Self::FIXED_HEADER_SIZE,
517 contents.pes_header_data_len(),
518 );
519 return None;
520 }
521 Some(contents)
522 }
523
524 pub fn pes_priority(&self) -> u8 {
526 self.buf[0] >> 3 & 1
527 }
528 pub fn data_alignment_indicator(&self) -> DataAlignment {
532 if self.buf[0] & 0b100 != 0 {
533 DataAlignment::Aligned
534 } else {
535 DataAlignment::NotAligned
536 }
537 }
538 pub fn copyright(&self) -> Copyright {
540 if self.buf[0] & 0b10 != 0 {
541 Copyright::Undefined
542 } else {
543 Copyright::Protected
544 }
545 }
546 pub fn original_or_copy(&self) -> OriginalOrCopy {
548 if self.buf[0] & 0b1 != 0 {
549 OriginalOrCopy::Original
550 } else {
551 OriginalOrCopy::Copy
552 }
553 }
554 fn pts_dts_flags(&self) -> u8 {
555 self.buf[1] >> 6
556 }
557 fn escr_flag(&self) -> bool {
558 self.buf[1] >> 5 & 1 != 0
559 }
560 fn esrate_flag(&self) -> bool {
561 self.buf[1] >> 4 & 1 != 0
562 }
563 fn dsm_trick_mode_flag(&self) -> bool {
564 self.buf[1] >> 3 & 1 != 0
565 }
566 fn additional_copy_info_flag(&self) -> bool {
567 self.buf[1] >> 2 & 1 != 0
568 }
569 fn pes_crc_flag(&self) -> bool {
570 self.buf[1] >> 1 & 1 != 0
571 }
572 fn pes_extension_flag(&self) -> bool {
573 self.buf[1] & 1 != 0
574 }
575 fn pes_header_data_len(&self) -> usize {
576 self.buf[2] as usize
577 }
578
579 fn header_slice(&self, from: usize, to: usize) -> Result<&'buf [u8], PesError> {
580 if to > self.pes_header_data_len() + Self::FIXED_HEADER_SIZE {
581 Err(PesError::NotEnoughData {
582 requested: to,
583 available: self.pes_header_data_len() + Self::FIXED_HEADER_SIZE,
584 })
585 } else if to > self.buf.len() {
586 Err(PesError::NotEnoughData {
587 requested: to,
588 available: self.buf.len(),
589 })
590 } else {
591 Ok(&self.buf[from..to])
592 }
593 }
594
595 const FIXED_HEADER_SIZE: usize = 3;
596 const TIMESTAMP_SIZE: usize = 5;
597
598 fn pts_dts_end(&self) -> usize {
599 match self.pts_dts_flags() {
600 0b00 => Self::FIXED_HEADER_SIZE,
601 0b01 => Self::FIXED_HEADER_SIZE,
602 0b10 => Self::FIXED_HEADER_SIZE + Self::TIMESTAMP_SIZE,
603 0b11 => Self::FIXED_HEADER_SIZE + Self::TIMESTAMP_SIZE * 2,
604 v => panic!("unexpected value {}", v),
605 }
606 }
607 pub fn pts_dts(&self) -> Result<PtsDts, PesError> {
614 match self.pts_dts_flags() {
615 0b00 => Err(PesError::FieldNotPresent),
616 0b01 => Err(PesError::PtsDtsFlagsInvalid),
617 0b10 => self
618 .header_slice(Self::FIXED_HEADER_SIZE, self.pts_dts_end())
619 .map(|s| PtsDts::PtsOnly(Timestamp::from_bytes(s))),
620 0b11 => self
621 .header_slice(Self::FIXED_HEADER_SIZE, self.pts_dts_end())
622 .map(|s| PtsDts::Both {
623 pts: Timestamp::from_bytes(&s[..Self::TIMESTAMP_SIZE]),
624 dts: Timestamp::from_bytes(&s[Self::TIMESTAMP_SIZE..]),
625 }),
626 v => panic!("unexpected value {}", v),
627 }
628 }
629 const ESCR_SIZE: usize = 6;
630
631 pub fn escr(&self) -> Result<ClockRef, PesError> {
636 if self.escr_flag() {
637 self.header_slice(self.pts_dts_end(), self.pts_dts_end() + Self::ESCR_SIZE)
638 .map(|s| {
639 let base = u64::from(s[0] & 0b0011_1000) << 27
640 | u64::from(s[0] & 0b0000_0011) << 28
641 | u64::from(s[1]) << 20
642 | u64::from(s[2] & 0b1111_1000) << 12
643 | u64::from(s[2] & 0b0000_0011) << 13
644 | u64::from(s[3]) << 5
645 | u64::from(s[4] & 0b1111_1000) >> 3;
646 let extension =
647 u16::from(s[4] & 0b0000_0011) << 7 | u16::from(s[5] & 0b1111_1110) >> 1;
648 ClockRef::from_parts(base, extension)
649 })
650 } else {
651 Err(PesError::FieldNotPresent)
652 }
653 }
654 fn escr_end(&self) -> usize {
655 self.pts_dts_end() + if self.escr_flag() { Self::ESCR_SIZE } else { 0 }
656 }
657 const ES_RATE_SIZE: usize = 3;
658
659 pub fn es_rate(&self) -> Result<EsRate, PesError> {
664 if self.esrate_flag() {
665 self.header_slice(self.escr_end(), self.escr_end() + Self::ES_RATE_SIZE)
666 .map(|s| {
667 EsRate::new(
668 u32::from(s[0] & 0b0111_1111) << 15
669 | u32::from(s[1]) << 7
670 | u32::from(s[2] & 0b1111_1110) >> 1,
671 )
672 })
673 } else {
674 Err(PesError::FieldNotPresent)
675 }
676 }
677 fn es_rate_end(&self) -> usize {
678 self.escr_end()
679 + if self.esrate_flag() {
680 Self::ES_RATE_SIZE
681 } else {
682 0
683 }
684 }
685 const DSM_TRICK_MODE_SIZE: usize = 1;
686 pub fn dsm_trick_mode(&self) -> Result<DsmTrickMode, PesError> {
691 if self.dsm_trick_mode_flag() {
692 self.header_slice(
693 self.es_rate_end(),
694 self.es_rate_end() + Self::DSM_TRICK_MODE_SIZE,
695 )
696 .map(|s| {
697 let trick_mode_control = s[0] >> 5;
698 let trick_mode_data = s[0] & 0b0001_1111;
699 match trick_mode_control {
700 0b000 => DsmTrickMode::FastForward {
701 field_id: trick_mode_data >> 3,
702 intra_slice_refresh: (trick_mode_data & 0b100) != 0,
703 frequency_truncation: FrequencyTruncationCoefficientSelection::from_id(
704 trick_mode_data & 0b11,
705 ),
706 },
707 0b001 => DsmTrickMode::SlowMotion {
708 rep_cntrl: trick_mode_control,
709 },
710 0b010 => DsmTrickMode::FreezeFrame {
711 field_id: trick_mode_data >> 3,
712 reserved: trick_mode_data & 0b111,
713 },
714 0b011 => DsmTrickMode::FastReverse {
715 field_id: trick_mode_data >> 3,
716 intra_slice_refresh: (trick_mode_data & 0b100) != 0,
717 frequency_truncation: FrequencyTruncationCoefficientSelection::from_id(
718 trick_mode_data & 0b11,
719 ),
720 },
721 0b100 => DsmTrickMode::SlowReverse {
722 rep_cntrl: trick_mode_control,
723 },
724 _ => DsmTrickMode::Reserved {
725 reserved: trick_mode_control,
726 },
727 }
728 })
729 } else {
730 Err(PesError::FieldNotPresent)
731 }
732 }
733 fn dsm_trick_mode_end(&self) -> usize {
734 self.es_rate_end()
735 + if self.dsm_trick_mode_flag() {
736 Self::DSM_TRICK_MODE_SIZE
737 } else {
738 0
739 }
740 }
741 const ADDITIONAL_COPY_INFO_SIZE: usize = 1;
742 pub fn additional_copy_info(&self) -> Result<u8, PesError> {
747 if self.additional_copy_info_flag() {
748 self.header_slice(
749 self.dsm_trick_mode_end(),
750 self.dsm_trick_mode_end() + Self::ADDITIONAL_COPY_INFO_SIZE,
751 )
752 .and_then(|s| {
753 if s[0] & 0b1000_0000 == 0 {
754 Err(PesError::MarkerBitNotSet)
755 } else {
756 Ok(s[0] & 0b0111_1111)
757 }
758 })
759 } else {
760 Err(PesError::FieldNotPresent)
761 }
762 }
763 fn additional_copy_info_end(&self) -> usize {
764 self.dsm_trick_mode_end()
765 + if self.additional_copy_info_flag() {
766 Self::ADDITIONAL_COPY_INFO_SIZE
767 } else {
768 0
769 }
770 }
771 const PREVIOUS_PES_PACKET_CRC_SIZE: usize = 2;
772 pub fn previous_pes_packet_crc(&self) -> Result<u16, PesError> {
780 if self.pes_crc_flag() {
781 self.header_slice(
782 self.additional_copy_info_end(),
783 self.additional_copy_info_end() + Self::PREVIOUS_PES_PACKET_CRC_SIZE,
784 )
785 .map(|s| u16::from(s[0]) << 8 | u16::from(s[1]))
786 } else {
787 Err(PesError::FieldNotPresent)
788 }
789 }
790 fn pes_crc_end(&self) -> usize {
791 self.additional_copy_info_end()
792 + if self.pes_crc_flag() {
793 Self::PREVIOUS_PES_PACKET_CRC_SIZE
794 } else {
795 0
796 }
797 }
798 pub fn pes_extension(&self) -> Result<PesExtension<'buf>, PesError> {
800 if self.pes_extension_flag() {
801 self.header_slice(
802 self.pes_crc_end(),
803 self.pes_header_data_len() + Self::FIXED_HEADER_SIZE,
804 )
805 .map(|s| PesExtension { _buf: s })
806 } else {
807 Err(PesError::FieldNotPresent)
808 }
809 }
810
811 pub fn payload(&self) -> &'buf [u8] {
821 &self.buf[Self::FIXED_HEADER_SIZE + self.pes_header_data_len()..]
822 }
823}
824
825impl<'buf> fmt::Debug for PesParsedContents<'buf> {
826 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
827 let mut s = f.debug_struct("PesParsedContents");
828 s.field("pes_priority", &self.pes_priority())
829 .field("data_alignment_indicator", &self.data_alignment_indicator())
830 .field("copyright", &self.copyright())
831 .field("original_or_copy", &self.original_or_copy())
832 .field("pts_dts", &self.pts_dts());
833 if let Ok(escr) = self.escr() {
834 s.field("escr", &escr);
835 }
836 if let Ok(es_rate) = self.es_rate() {
837 s.field("es_rate", &es_rate);
838 }
839 if let Ok(dsm_trick_mode) = self.dsm_trick_mode() {
840 s.field("dsm_trick_mode", &dsm_trick_mode);
841 }
842 if let Ok(additional_copy_info) = self.additional_copy_info() {
843 s.field("additional_copy_info", &additional_copy_info);
844 }
845 if let Ok(previous_pes_packet_crc) = self.previous_pes_packet_crc() {
846 s.field("previous_pes_packet_crc", &previous_pes_packet_crc);
847 }
848 if let Ok(pes_extension) = self.pes_extension() {
849 s.field("pes_extension", &pes_extension);
850 }
851 s.finish()
852 }
853}
854
855#[derive(PartialEq, Eq, Debug)]
858pub enum TimestampError {
859 IncorrectPrefixBits {
862 expected: u8,
864 actual: u8,
866 },
867 MarkerBitNotSet {
870 bit_number: u8,
872 },
873}
874
875#[derive(PartialEq, Eq, Debug, Clone, Copy)]
878pub struct Timestamp {
879 val: u64,
880}
881impl Timestamp {
882 pub const MAX: Timestamp = Timestamp { val: (1 << 33) - 1 };
884
885 pub const TIMEBASE: u64 = 90_000;
887
888 pub fn from_pts_bytes(buf: &[u8]) -> Result<Timestamp, TimestampError> {
892 Timestamp::check_prefix(buf, 0b0010)?;
893 Timestamp::from_bytes(buf)
894 }
895 pub fn from_dts_bytes(buf: &[u8]) -> Result<Timestamp, TimestampError> {
899 Timestamp::check_prefix(buf, 0b0001)?;
900 Timestamp::from_bytes(buf)
901 }
902 fn check_prefix(buf: &[u8], expected: u8) -> Result<(), TimestampError> {
903 assert!(expected <= 0b1111);
904 let actual = buf[0] >> 4;
905 if actual == expected {
906 Ok(())
907 } else {
908 Err(TimestampError::IncorrectPrefixBits { expected, actual })
909 }
910 }
911 fn check_marker_bit(buf: &[u8], bit_number: u8) -> Result<(), TimestampError> {
912 let byte_index = bit_number / 8;
913 let bit_index = bit_number % 8;
914 let bit_mask = 1 << (7 - bit_index);
915 if buf[byte_index as usize] & bit_mask != 0 {
916 Ok(())
917 } else {
918 Err(TimestampError::MarkerBitNotSet { bit_number })
919 }
920 }
921 pub fn from_bytes(buf: &[u8]) -> Result<Timestamp, TimestampError> {
927 Timestamp::check_marker_bit(buf, 7)?;
928 Timestamp::check_marker_bit(buf, 23)?;
929 Timestamp::check_marker_bit(buf, 39)?;
930 Ok(Timestamp {
931 val: (u64::from(buf[0] & 0b0000_1110) << 29)
932 | u64::from(buf[1]) << 22
933 | (u64::from(buf[2] & 0b1111_1110) << 14)
934 | u64::from(buf[3]) << 7
935 | u64::from(buf[4]) >> 1,
936 })
937 }
938 pub fn from_u64(val: u64) -> Timestamp {
940 assert!(val < 1 << 34);
941 Timestamp { val }
942 }
943 pub fn value(self) -> u64 {
945 self.val
946 }
947
948 pub fn likely_wrapped_since(self, since: Self) -> bool {
953 self.val <= since.val && since.val - self.val > Self::MAX.val / 2
954 }
955}
956
957#[derive(PartialEq, Eq, Debug)]
962pub enum PtsDts {
963 None,
965 PtsOnly(Result<Timestamp, TimestampError>),
967 Invalid,
969 Both {
971 pts: Result<Timestamp, TimestampError>,
973 dts: Result<Timestamp, TimestampError>,
975 },
976}
977
978#[derive(PartialEq, Eq, Debug)]
984pub enum DataAlignment {
985 Aligned,
987 NotAligned,
989}
990#[derive(PartialEq, Eq, Debug)]
994pub enum Copyright {
995 Protected,
997 Undefined,
999}
1000#[derive(PartialEq, Eq, Debug)]
1005pub enum OriginalOrCopy {
1006 Original,
1008 Copy,
1010}
1011
1012#[cfg(test)]
1013mod test {
1014 use crate::demultiplex;
1015 use crate::demultiplex::PacketFilter;
1016 use crate::packet;
1017 use crate::pes;
1018 use crate::pes::{
1019 Copyright, DataAlignment, EsRate, FrequencyTruncationCoefficientSelection, OriginalOrCopy,
1020 PesContents, PesError, PesHeader, PesLength, PesParsedContents, StreamId,
1021 };
1022 use assert_matches::assert_matches;
1023 use bitstream_io::{BigEndian, BitWrite};
1024 use bitstream_io::{BitWriter, BE};
1025 use hex_literal::*;
1026 use std::io;
1027 use std::num::NonZeroU16;
1028
1029 packet_filter_switch! {
1030 NullFilterSwitch<NullDemuxContext> {
1031 Nul: demultiplex::NullPacketFilter<NullDemuxContext>,
1032 }
1033 }
1034 demux_context!(NullDemuxContext, NullFilterSwitch);
1035 impl NullDemuxContext {
1036 fn do_construct(&mut self, _req: demultiplex::FilterRequest<'_, '_>) -> NullFilterSwitch {
1037 NullFilterSwitch::Nul(demultiplex::NullPacketFilter::default())
1038 }
1039 }
1040
1041 fn make_test_data<F>(builder: F) -> Vec<u8>
1042 where
1043 F: Fn(&mut BitWriter<Vec<u8>, BE>) -> Result<(), io::Error>,
1044 {
1045 let data: Vec<u8> = Vec::new();
1046 let mut w = BitWriter::endian(data, BigEndian);
1047 builder(&mut w).unwrap();
1048 w.into_writer()
1049 }
1050
1051 fn write_ts(w: &mut BitWriter<Vec<u8>, BE>, ts: u64, prefix: u8) -> Result<(), io::Error> {
1053 assert!(
1054 ts < 1u64 << 33,
1055 "ts value too large {:#x} >= {:#x}",
1056 ts,
1057 1u64 << 33
1058 );
1059 w.write(4, prefix & 0b1111)?;
1060 w.write(3, (ts & 0b1_1100_0000_0000_0000_0000_0000_0000_0000) >> 30)?;
1061 w.write(1, 1)?; w.write(15, (ts & 0b0_0011_1111_1111_1111_1000_0000_0000_0000) >> 15)?;
1063 w.write(1, 1)?; w.write(15, ts & 0b0_0000_0000_0000_0000_0111_1111_1111_1111)?;
1065 w.write(1, 1) }
1067
1068 fn write_escr(
1069 w: &mut BitWriter<Vec<u8>, BE>,
1070 base: u64,
1071 extension: u16,
1072 ) -> Result<(), io::Error> {
1073 assert!(
1074 base < 1u64 << 33,
1075 "base value too large {:#x} >= {:#x}",
1076 base,
1077 1u64 << 33
1078 );
1079 assert!(
1080 extension < 1u16 << 9,
1081 "extension value too large {:#x} >= {:#x}",
1082 base,
1083 1u16 << 9
1084 );
1085 w.write(2, 0b11)?; w.write(
1087 3,
1088 (base & 0b1_1100_0000_0000_0000_0000_0000_0000_0000) >> 30,
1089 )?;
1090 w.write(1, 1)?; w.write(
1092 15,
1093 (base & 0b0_0011_1111_1111_1111_1000_0000_0000_0000) >> 15,
1094 )?;
1095 w.write(1, 1)?; w.write(15, base & 0b0_0000_0000_0000_0000_0111_1111_1111_1111)?;
1097 w.write(1, 1)?; w.write(9, extension)?;
1099 w.write(1, 1) }
1101 fn write_es_rate(w: &mut BitWriter<Vec<u8>, BE>, rate: u32) -> Result<(), io::Error> {
1102 assert!(
1103 rate < 1u32 << 22,
1104 "rate value too large {:#x} >= {:#x}",
1105 rate,
1106 1u32 << 22
1107 );
1108 w.write(1, 1)?; w.write(22, rate)?;
1110 w.write(1, 1) }
1112
1113 #[test]
1114 fn parse_header() {
1115 let data = make_test_data(|w| {
1116 w.write(24, 1)?; w.write(8, 7)?; w.write(16, 7)?; w.write(2, 0b10)?; w.write(2, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; w.write(1, 0)?; w.write(2, 0b10)?; w.write(1, 1)?; w.write(1, 1)?; w.write(1, 1)?; w.write(1, 1)?; w.write(1, 1)?; w.write(1, 0)?; let pes_header_length = 5 + 6 + 3 + 1 + 1 + 2; w.write(8, pes_header_length)?; write_ts(w, 123456789, 0b0010)?; write_escr(w, 0b111111111111111111111111111111111, 234)?;
1142 write_es_rate(w, 1234567)?;
1143
1144 w.write(3, 0b00)?; w.write(2, 2)?; w.write_bit(true)?; w.write(2, 0)?; w.write(1, 1)?; w.write(7, 123)?; w.write(16, 54321) });
1155 let header = pes::PesHeader::from_bytes(&data[..]).unwrap();
1156 assert_eq!(pes::StreamId(7), header.stream_id());
1157 assert_eq!(
1158 header.pes_packet_length(),
1159 PesLength::Bounded(NonZeroU16::new(7).unwrap())
1160 );
1161
1162 match header.contents() {
1163 pes::PesContents::Parsed(parsed_contents) => {
1164 let p =
1165 parsed_contents.expect("expected PesContents::Parsed(Some(_)) but was None");
1166 assert_eq!(0, p.pes_priority());
1167 assert_eq!(pes::DataAlignment::Aligned, p.data_alignment_indicator());
1168 assert_eq!(pes::Copyright::Protected, p.copyright());
1169 assert_eq!(pes::OriginalOrCopy::Copy, p.original_or_copy());
1170 match p.pts_dts() {
1171 Ok(pes::PtsDts::PtsOnly(Ok(ts))) => {
1172 let a = ts.value();
1173 let b = 123456789;
1174 assert_eq!(
1175 a, b,
1176 "timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
1177 a, b
1178 );
1179 }
1180 _ => panic!("expected PtsDts::PtsOnly, got{}", stringify!(_)),
1181 }
1182 assert_eq!(p.payload().len(), 0);
1183 match p.escr() {
1184 Ok(escr) => {
1185 assert_eq!(0b111111111111111111111111111111111, escr.base());
1186 assert_eq!(234, escr.extension());
1187 }
1188 e => panic!("expected escr value, got {:?}", e),
1189 }
1190 assert_matches!(
1191 p.dsm_trick_mode(),
1192 Ok(pes::DsmTrickMode::FastForward {
1193 field_id: 2,
1194 intra_slice_refresh: true,
1195 frequency_truncation:
1196 pes::FrequencyTruncationCoefficientSelection::DCNonZero,
1197 })
1198 );
1199 assert_matches!(p.es_rate(), Ok(pes::EsRate(1234567)));
1200 assert_matches!(p.additional_copy_info(), Ok(123));
1201 assert_matches!(p.previous_pes_packet_crc(), Ok(54321));
1202 }
1203 pes::PesContents::Payload(_) => {
1204 panic!("expected PesContents::Parsed, got PesContents::Payload")
1205 }
1206 }
1207 }
1208
1209 #[test]
1210 fn pts() {
1211 let pts_prefix = 0b0010;
1212 let pts = make_test_data(|w| {
1213 write_ts(w, 0b1_0101_0101_0101_0101_0101_0101_0101_0101, pts_prefix)
1214 });
1215 let a = pes::Timestamp::from_pts_bytes(&pts[..]).unwrap().value();
1216 let b = 0b1_0101_0101_0101_0101_0101_0101_0101_0101;
1217 assert_eq!(
1218 a, b,
1219 "timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
1220 a, b
1221 );
1222 }
1223
1224 #[test]
1225 fn dts() {
1226 let pts_prefix = 0b0001;
1227 let pts = make_test_data(|w| {
1228 write_ts(w, 0b0_1010_1010_1010_1010_1010_1010_1010_1010, pts_prefix)
1229 });
1230 let a = pes::Timestamp::from_dts_bytes(&pts[..]).unwrap().value();
1231 let b = 0b0_1010_1010_1010_1010_1010_1010_1010_1010;
1232 assert_eq!(
1233 a, b,
1234 "timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
1235 a, b
1236 );
1237 }
1238
1239 #[test]
1240 fn timestamp_ones() {
1241 let pts_prefix = 0b0010;
1242 let pts = make_test_data(|w| {
1243 write_ts(w, 0b1_1111_1111_1111_1111_1111_1111_1111_1111, pts_prefix)
1244 });
1245 let a = pes::Timestamp::from_pts_bytes(&pts[..]).unwrap().value();
1246 let b = 0b1_1111_1111_1111_1111_1111_1111_1111_1111;
1247 assert_eq!(
1248 a, b,
1249 "timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
1250 a, b
1251 );
1252 }
1253
1254 #[test]
1255 fn timestamp_zeros() {
1256 let pts_prefix = 0b0010;
1257 let pts = make_test_data(|w| {
1258 write_ts(w, 0b0_0000_0000_0000_0000_0000_0000_0000_0000, pts_prefix)
1259 });
1260 let a = pes::Timestamp::from_pts_bytes(&pts[..]).unwrap().value();
1261 let b = 0b0_0000_0000_0000_0000_0000_0000_0000_0000;
1262 assert_eq!(
1263 a, b,
1264 "timestamp values don't match:\n actual:{:#b}\nexpected:{:#b}",
1265 a, b
1266 );
1267 }
1268
1269 #[test]
1270 fn timestamp_wrap() {
1271 let zero: pes::Timestamp = pes::Timestamp::from_u64(0);
1272 assert!(zero.likely_wrapped_since(pes::Timestamp::MAX));
1273 assert!(!pes::Timestamp::MAX.likely_wrapped_since(zero));
1274 assert!(!zero.likely_wrapped_since(pes::Timestamp::from_u64(1)));
1275 assert!(!pes::Timestamp::from_u64(1).likely_wrapped_since(zero));
1276 }
1277
1278 #[test]
1279 fn timestamp_bad_prefix() {
1280 let pts_prefix = 0b0010;
1281 let mut pts = make_test_data(|w| write_ts(w, 1234, pts_prefix));
1282 pts[0] |= 0b10000000;
1284 assert_matches!(
1285 pes::Timestamp::from_pts_bytes(&pts[..]),
1286 Err(pes::TimestampError::IncorrectPrefixBits {
1287 expected: 0b0010,
1288 actual: 0b1010
1289 })
1290 )
1291 }
1292
1293 #[test]
1294 fn timestamp_bad_marker() {
1295 let pts_prefix = 0b0010;
1296 let mut pts = make_test_data(|w| write_ts(w, 1234, pts_prefix));
1297 pts[0] &= 0b11111110;
1299 assert_matches!(
1300 pes::Timestamp::from_pts_bytes(&pts[..]),
1301 Err(pes::TimestampError::MarkerBitNotSet { bit_number: 7 })
1302 )
1303 }
1304
1305 struct MockState {
1306 start_stream_called: bool,
1307 begin_packet_called: bool,
1308 continuity_error_called: bool,
1309 }
1310 impl MockState {
1311 fn new() -> MockState {
1312 MockState {
1313 start_stream_called: false,
1314 begin_packet_called: false,
1315 continuity_error_called: false,
1316 }
1317 }
1318 }
1319 struct MockElementaryStreamConsumer {
1320 state: std::rc::Rc<std::cell::RefCell<MockState>>,
1321 }
1322 impl MockElementaryStreamConsumer {
1323 fn new(state: std::rc::Rc<std::cell::RefCell<MockState>>) -> MockElementaryStreamConsumer {
1324 MockElementaryStreamConsumer { state }
1325 }
1326 }
1327 impl pes::ElementaryStreamConsumer<NullDemuxContext> for MockElementaryStreamConsumer {
1328 fn start_stream(&mut self, _ctx: &mut NullDemuxContext) {
1329 self.state.borrow_mut().start_stream_called = true;
1330 }
1331 fn begin_packet(&mut self, _ctx: &mut NullDemuxContext, _header: pes::PesHeader<'_>) {
1332 self.state.borrow_mut().begin_packet_called = true;
1333 }
1334 fn continue_packet(&mut self, _ctx: &mut NullDemuxContext, _data: &[u8]) {}
1335 fn end_packet(&mut self, _ctx: &mut NullDemuxContext) {}
1336 fn continuity_error(&mut self, _ctx: &mut NullDemuxContext) {
1337 self.state.borrow_mut().continuity_error_called = true;
1338 }
1339 }
1340
1341 #[test]
1342 fn pes_packet_consumer() {
1343 let state = std::rc::Rc::new(std::cell::RefCell::new(MockState::new()));
1344 let mock = MockElementaryStreamConsumer::new(state.clone());
1345 let mut pes_filter = pes::PesPacketFilter::new(mock);
1346 let buf = hex!("4741F510000001E0000084C00A355DDD11B1155DDBF5910000000109100000000167640029AD843FFFC21FFFE10FFFF087FFF843FFFC21FFFE10FFFFFFFFFFFFFFFF087FFFFFFFFFFFFFFF2CC501E0113F780A1010101F00000303E80000C350940000000168FF3CB0000001060001C006018401103A0408D2BA80000050204E95D400000302040AB500314454473141FEFF53040000C815540DF04F77FFFFFFFFFFFFFFFFFFFF80000000016588800005DB001008673FC365F48EAE");
1347 let pk = packet::Packet::new(&buf[..]);
1348 let mut ctx = NullDemuxContext::new();
1349 pes_filter.consume(&mut ctx, &pk);
1350 {
1351 let state = state.borrow();
1352 assert!(state.start_stream_called);
1353 assert!(state.begin_packet_called);
1354 assert!(!state.continuity_error_called);
1355 }
1356 let pk = packet::Packet::new(&buf[..]);
1359 pes_filter.consume(&mut ctx, &pk);
1360 {
1361 let state = state.borrow();
1362 assert!(state.continuity_error_called);
1363 }
1364 }
1365
1366 #[test]
1367 fn header_length_doesnt_fit() {
1368 let data = make_test_data(|w| {
1369 w.write(24, 1)?; w.write(8, 7)?; w.write(16, 7)?; w.write(2, 0b10)?; w.write(2, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; w.write(1, 0)?; w.write(2, 0b00)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; let pes_header_length = 2; w.write(8, pes_header_length)?; w.write(8, 1) });
1392 let header = pes::PesHeader::from_bytes(&data[..]).unwrap();
1395 assert!(matches!(header.contents(), pes::PesContents::Parsed(None)));
1396 }
1397
1398 #[test]
1399 fn pes_header_data_length_too_short() {
1400 let data = make_test_data(|w| {
1401 w.write(24, 1)?; w.write(8, 7)?; w.write(16, 7)?; w.write(2, 0b10)?; w.write(2, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; w.write(1, 0)?; w.write(2, 0b00)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 0)?; w.write(1, 1)?; w.write(1, 0)?; let pes_header_length = 1; w.write(8, pes_header_length)?; w.write(8, 2) });
1425 let header = pes::PesHeader::from_bytes(&data[..]).unwrap();
1426 assert!(matches!(header.contents(), pes::PesContents::Parsed(None)));
1427 }
1428
1429 #[test]
1430 fn should_convert_u8_to_stream_id_without_panic() {
1431 for i in 0..=255 {
1432 let sid = StreamId(i);
1433 let _ = format!("{:?}", sid);
1434 }
1435 }
1436
1437 #[test]
1438 fn should_reject_too_short_pes_header() {
1439 assert!(PesHeader::from_bytes(&[0, 0, 1, 0, 0]).is_none());
1440 }
1441
1442 #[test]
1443 fn should_reject_bad_start_code_prefix() {
1444 assert!(PesHeader::from_bytes(&[0, 0, 2, 0, 0, 0]).is_none());
1446 assert!(PesHeader::from_bytes(&[0, 1, 1, 0, 0, 0]).is_none());
1447 assert!(PesHeader::from_bytes(&[1, 0, 1, 0, 0, 0]).is_none());
1448 }
1449
1450 #[test]
1451 fn should_report_unbounded_length() {
1452 let header = PesHeader::from_bytes(&[0, 0, 1, 0, 0, 0]).unwrap();
1455 assert_matches!(header.pes_packet_length(), PesLength::Unbounded);
1456 }
1457
1458 #[test]
1459 fn should_produce_unparsed_payload() {
1460 let header = PesHeader::from_bytes(&[0, 0, 1, 0b1011_1110, 0, 0, 47]).unwrap();
1461 assert_eq!(header.stream_id(), StreamId::PADDING_STREAM);
1462 match header.contents() {
1463 PesContents::Parsed(_) => panic!("expected PesContents::Payload"),
1464 PesContents::Payload(_) => { }
1465 }
1466 }
1467
1468 #[test]
1469 fn should_convert_freq_truncation_from_u8() {
1470 assert_matches!(
1471 FrequencyTruncationCoefficientSelection::from_id(0),
1472 FrequencyTruncationCoefficientSelection::DCNonZero
1473 );
1474 assert_matches!(
1475 FrequencyTruncationCoefficientSelection::from_id(1),
1476 FrequencyTruncationCoefficientSelection::FirstThreeNonZero
1477 );
1478 assert_matches!(
1479 FrequencyTruncationCoefficientSelection::from_id(2),
1480 FrequencyTruncationCoefficientSelection::FirstSixNonZero
1481 );
1482 assert_matches!(
1483 FrequencyTruncationCoefficientSelection::from_id(3),
1484 FrequencyTruncationCoefficientSelection::AllMaybeNonZero
1485 );
1486 }
1487
1488 #[test]
1489 fn should_convert_es_rate() {
1490 for r in 0..1 << 22 {
1491 let es_rate = EsRate::new(r);
1492 assert_eq!(es_rate.bytes_per_second(), r * 50);
1493 assert_eq!(u32::from(es_rate), r);
1494 }
1495 }
1496
1497 #[test]
1498 fn should_reject_parsed_pes_too_short_for_header() {
1499 assert!(PesParsedContents::from_bytes(&[0b10000000, 0]).is_none());
1500 }
1501 #[test]
1502 fn should_reject_parsed_pes_too_short_for_payload() {
1503 assert!(PesParsedContents::from_bytes(&[0b10000000, 0, 1]).is_none());
1504 }
1505 #[test]
1506 fn should_reject_parsed_pes_bad_check_bits() {
1507 assert!(PesParsedContents::from_bytes(&[0b01000000, 0, 1]).is_none());
1509 }
1510 #[test]
1511 fn should_report_zero_flags() {
1512 let contents = PesParsedContents::from_bytes(&[0b10000000, 0, 0]).unwrap();
1514 assert_eq!(
1515 contents.data_alignment_indicator(),
1516 DataAlignment::NotAligned
1517 );
1518 assert_eq!(contents.copyright(), Copyright::Protected);
1519 assert_eq!(contents.original_or_copy(), OriginalOrCopy::Copy);
1520 assert_matches!(contents.escr(), Err(PesError::FieldNotPresent));
1521 assert_matches!(contents.es_rate(), Err(PesError::FieldNotPresent));
1522 assert_matches!(contents.pes_extension(), Err(PesError::FieldNotPresent));
1523 assert_matches!(
1524 contents.previous_pes_packet_crc(),
1525 Err(PesError::FieldNotPresent)
1526 );
1527 }
1528 #[test]
1529 fn should_report_one_flags() {
1530 let contents = PesParsedContents::from_bytes(&[0b10111111, 0, 0]).unwrap();
1532 assert_eq!(contents.data_alignment_indicator(), DataAlignment::Aligned);
1533 assert_eq!(contents.copyright(), Copyright::Undefined);
1534 assert_eq!(contents.original_or_copy(), OriginalOrCopy::Original);
1535 }
1536}