1use crate::packet;
16use crate::psi;
17use crate::psi::pat;
18use crate::psi::pmt::PmtSection;
19use crate::psi::pmt::StreamInfo;
20use crate::StreamType;
21use log::warn;
22use std::marker;
23
24pub trait PacketFilter {
31 type Ctx: DemuxContext;
34
35 fn consume(&mut self, ctx: &mut Self::Ctx, pk: &packet::Packet<'_>);
37}
38
39pub struct NullPacketFilter<Ctx: DemuxContext> {
44 phantom: marker::PhantomData<Ctx>,
45}
46impl<Ctx: DemuxContext> Default for NullPacketFilter<Ctx> {
47 fn default() -> NullPacketFilter<Ctx> {
48 NullPacketFilter {
49 phantom: marker::PhantomData,
50 }
51 }
52}
53impl<Ctx: DemuxContext> PacketFilter for NullPacketFilter<Ctx> {
54 type Ctx = Ctx;
55 fn consume(&mut self, _ctx: &mut Self::Ctx, _pk: &packet::Packet<'_>) {
56 }
58}
59
60#[macro_export]
104macro_rules! demux_context {
105 ($name:ident, $filter:ty) => {
106 pub struct $name {
107 changeset: $crate::demultiplex::FilterChangeset<$filter>,
108 }
109 impl $name {
110 pub fn new() -> Self {
111 $name {
112 changeset: $crate::demultiplex::FilterChangeset::default(),
113 }
114 }
115 }
116 impl $crate::demultiplex::DemuxContext for $name {
117 type F = $filter;
118
119 fn filter_changeset(&mut self) -> &mut $crate::demultiplex::FilterChangeset<Self::F> {
120 &mut self.changeset
121 }
122 fn construct(&mut self, req: $crate::demultiplex::FilterRequest<'_, '_>) -> Self::F {
123 self.do_construct(req)
124 }
125 }
126 };
127}
128
129#[macro_export]
137macro_rules! packet_filter_switch {
138 (
139 $name:ident<$ctx:ty> {
140 $( $case_name:ident : $t:ty ),*,
141 }
142 ) => {
143 pub enum $name {
144 $( $case_name($t), )*
145 }
146 impl $crate::demultiplex::PacketFilter for $name {
147 type Ctx = $ctx;
148 #[inline(always)]
149 fn consume(&mut self, ctx: &mut $ctx, pk: &$crate::packet::Packet<'_>) {
150 match self {
151 $( &mut $name::$case_name(ref mut f) => f.consume(ctx, pk), )*
152
153 }
154 }
155 }
156 }
157}
158
159struct Filters<F: PacketFilter> {
163 filters_by_pid: Vec<Option<F>>,
164}
165impl<F: PacketFilter> Default for Filters<F> {
166 fn default() -> Filters<F> {
167 Filters {
168 filters_by_pid: vec![],
169 }
170 }
171}
172impl<F: PacketFilter> Filters<F> {
173 pub fn contains(&self, pid: packet::Pid) -> bool {
174 usize::from(pid) < self.filters_by_pid.len()
175 && self.filters_by_pid[usize::from(pid)].is_some()
176 }
177
178 pub fn get(&mut self, pid: packet::Pid) -> Option<&mut F> {
179 if usize::from(pid) >= self.filters_by_pid.len() {
180 None
181 } else {
182 self.filters_by_pid[usize::from(pid)].as_mut()
183 }
184 }
185
186 pub fn insert(&mut self, pid: packet::Pid, filter: F) {
187 let diff = usize::from(pid) as isize - self.filters_by_pid.len() as isize;
188 if diff >= 0 {
189 for _ in 0..=diff {
190 self.filters_by_pid.push(None);
191 }
192 }
193 self.filters_by_pid[usize::from(pid)] = Some(filter);
194 }
195
196 pub fn remove(&mut self, pid: packet::Pid) {
197 if usize::from(pid) < self.filters_by_pid.len() {
198 self.filters_by_pid[usize::from(pid)] = None;
199 }
200 }
201}
202
203pub enum FilterChange<F: PacketFilter> {
210 Insert(packet::Pid, F),
212 Remove(packet::Pid),
214}
215impl<F: PacketFilter> FilterChange<F> {
216 fn apply(self, filters: &mut Filters<F>) {
217 match self {
218 FilterChange::Insert(pid, filter) => filters.insert(pid, filter),
219 FilterChange::Remove(pid) => filters.remove(pid),
220 };
221 }
222}
223impl<F: PacketFilter> std::fmt::Debug for FilterChange<F> {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
225 match *self {
226 FilterChange::Insert(pid, _) => write!(f, "FilterChange::Insert {{ {:?}, ... }}", pid),
227 FilterChange::Remove(pid) => write!(f, "FilterChange::Remove {{ {:?}, ... }}", pid),
228 }
229 }
230}
231
232#[derive(Debug)]
242pub struct FilterChangeset<F: PacketFilter> {
243 updates: Vec<FilterChange<F>>,
244}
245impl<F: PacketFilter> Default for FilterChangeset<F> {
246 fn default() -> FilterChangeset<F> {
247 FilterChangeset {
248 updates: Vec::new(),
249 }
250 }
251}
252impl<F: PacketFilter> FilterChangeset<F> {
253 pub fn insert(&mut self, pid: packet::Pid, filter: F) {
256 self.updates.push(FilterChange::Insert(pid, filter))
257 }
258 pub fn remove(&mut self, pid: packet::Pid) {
261 self.updates.push(FilterChange::Remove(pid))
262 }
263
264 fn apply(&mut self, filters: &mut Filters<F>) {
265 for update in self.updates.drain(..) {
266 update.apply(filters);
267 }
268 }
269 pub fn is_empty(&self) -> bool {
271 self.updates.is_empty()
272 }
273}
274
275impl<F: PacketFilter> std::iter::IntoIterator for FilterChangeset<F> {
276 type Item = FilterChange<F>;
277 type IntoIter = std::vec::IntoIter<FilterChange<F>>;
278
279 fn into_iter(self) -> std::vec::IntoIter<FilterChange<F>> {
280 self.updates.into_iter()
281 }
282}
283
284#[derive(Debug)]
287pub enum FilterRequest<'a, 'buf> {
288 ByPid(packet::Pid),
291 ByStream {
294 program_pid: packet::Pid,
296 stream_type: StreamType,
298 pmt: &'a PmtSection<'buf>,
300 stream_info: &'a StreamInfo<'buf>,
303 },
304 Pmt {
306 pid: packet::Pid,
308 program_number: u16,
310 },
311 Nit {
313 pid: packet::Pid,
315 },
316}
317
318struct PmtProcessor<Ctx: DemuxContext> {
319 pid: packet::Pid,
320 program_number: u16,
321 filters_registered: fixedbitset::FixedBitSet,
322 phantom: marker::PhantomData<Ctx>,
323}
324
325impl<Ctx: DemuxContext> PmtProcessor<Ctx> {
326 pub fn new(pid: packet::Pid, program_number: u16) -> PmtProcessor<Ctx> {
327 PmtProcessor {
328 pid,
329 program_number,
330 filters_registered: fixedbitset::FixedBitSet::with_capacity(packet::Pid::PID_COUNT),
331 phantom: marker::PhantomData,
332 }
333 }
334
335 fn new_table(
336 &mut self,
337 ctx: &mut Ctx,
338 header: &psi::SectionCommonHeader,
339 _table_syntax_header: &psi::TableSyntaxHeader<'_>,
340 sect: &PmtSection<'_>,
341 ) {
342 if 0x02 != header.table_id {
343 warn!(
344 "[PMT {:?} program:{}] Expected PMT to have table id 0x2, but got {:#x}",
345 self.pid, self.program_number, header.table_id
346 );
347 return;
348 }
349 let mut pids_seen = fixedbitset::FixedBitSet::with_capacity(packet::Pid::PID_COUNT);
351 for stream_info in sect.streams() {
352 let pes_packet_consumer = ctx.construct(FilterRequest::ByStream {
353 program_pid: self.pid,
354 stream_type: stream_info.stream_type(),
355 pmt: sect,
356 stream_info: &stream_info,
357 });
358 ctx.filter_changeset()
359 .insert(stream_info.elementary_pid(), pes_packet_consumer);
360 pids_seen.insert(usize::from(stream_info.elementary_pid()));
361 self.filters_registered
362 .insert(usize::from(stream_info.elementary_pid()));
363 }
364 self.remove_outdated(ctx, pids_seen);
367 }
368
369 fn remove_outdated(&mut self, ctx: &mut Ctx, pids_seen: fixedbitset::FixedBitSet) {
370 for pid in self.filters_registered.difference(&pids_seen) {
371 ctx.filter_changeset().remove(packet::Pid::new(pid as u16));
372 }
373 self.filters_registered = pids_seen;
374 }
375}
376
377impl<Ctx: DemuxContext> psi::WholeSectionSyntaxPayloadParser for PmtProcessor<Ctx> {
378 type Context = Ctx;
379
380 fn section(
381 &mut self,
382 ctx: &mut Self::Context,
383 header: &psi::SectionCommonHeader,
384 table_syntax_header: &psi::TableSyntaxHeader<'_>,
385 data: &[u8],
386 ) {
387 let start = psi::SectionCommonHeader::SIZE + psi::TableSyntaxHeader::SIZE;
388 let end = data.len() - 4; match PmtSection::from_bytes(&data[start..end]) {
390 Ok(sect) => self.new_table(ctx, header, table_syntax_header, §),
391 Err(e) => warn!(
392 "[PMT {:?} program:{}] problem reading data: {:?}",
393 self.pid, self.program_number, e
394 ),
395 }
396 }
397}
398
399#[derive(Debug)]
401pub enum DemuxError {
402 NotEnoughData {
405 field: &'static str,
407 expected: usize,
409 actual: usize,
411 },
412}
413
414type PacketFilterConsumer<Proc> = psi::SectionPacketConsumer<
415 psi::SectionSyntaxSectionProcessor<
416 psi::DedupSectionSyntaxPayloadParser<
417 psi::BufferSectionSyntaxParser<psi::CrcCheckWholeSectionSyntaxPayloadParser<Proc>>,
418 >,
419 >,
420>;
421
422pub struct PmtPacketFilter<Ctx: DemuxContext + 'static> {
429 pmt_section_packet_consumer: PacketFilterConsumer<PmtProcessor<Ctx>>,
430}
431impl<Ctx: DemuxContext> PmtPacketFilter<Ctx> {
432 pub fn new(pid: packet::Pid, program_number: u16) -> PmtPacketFilter<Ctx> {
435 let pmt_proc = PmtProcessor::new(pid, program_number);
436 PmtPacketFilter {
437 pmt_section_packet_consumer: psi::SectionPacketConsumer::new(
438 psi::SectionSyntaxSectionProcessor::new(psi::DedupSectionSyntaxPayloadParser::new(
439 psi::BufferSectionSyntaxParser::new(
440 psi::CrcCheckWholeSectionSyntaxPayloadParser::new(pmt_proc),
441 ),
442 )),
443 ),
444 }
445 }
446}
447impl<Ctx: DemuxContext> PacketFilter for PmtPacketFilter<Ctx> {
448 type Ctx = Ctx;
449
450 fn consume(&mut self, ctx: &mut Self::Ctx, pk: &packet::Packet<'_>) {
451 self.pmt_section_packet_consumer.consume(ctx, pk);
452 }
453}
454
455struct PatProcessor<Ctx: DemuxContext> {
456 filters_registered: fixedbitset::FixedBitSet, phantom: marker::PhantomData<Ctx>,
458}
459
460impl<Ctx: DemuxContext> Default for PatProcessor<Ctx> {
461 fn default() -> PatProcessor<Ctx> {
462 PatProcessor {
463 filters_registered: fixedbitset::FixedBitSet::with_capacity(packet::Pid::PID_COUNT),
464 phantom: marker::PhantomData,
465 }
466 }
467}
468impl<Ctx: DemuxContext> PatProcessor<Ctx> {
469 fn new_table(
470 &mut self,
471 ctx: &mut Ctx,
472 header: &psi::SectionCommonHeader,
473 _table_syntax_header: &psi::TableSyntaxHeader<'_>,
474 sect: &pat::PatSection<'_>,
475 ) {
476 if 0x00 != header.table_id {
477 warn!(
478 "Expected PAT to have table id 0x0, but got {:#x}",
479 header.table_id
480 );
481 return;
482 }
483 let mut pids_seen = fixedbitset::FixedBitSet::with_capacity(packet::Pid::PID_COUNT);
484 for desc in sect.programs() {
486 let filter = match desc {
487 pat::ProgramDescriptor::Program {
488 program_number,
489 pid,
490 } => ctx.construct(FilterRequest::Pmt {
491 pid,
492 program_number,
493 }),
494 pat::ProgramDescriptor::Network { pid } => {
495 ctx.construct(FilterRequest::Nit { pid })
496 }
497 };
498 ctx.filter_changeset().insert(desc.pid(), filter);
499 pids_seen.insert(usize::from(desc.pid()));
500 self.filters_registered.insert(usize::from(desc.pid()));
501 }
502 self.remove_outdated(ctx, pids_seen);
505 }
506
507 fn remove_outdated(&mut self, ctx: &mut Ctx, pids_seen: fixedbitset::FixedBitSet) {
508 for pid in self.filters_registered.difference(&pids_seen) {
509 ctx.filter_changeset().remove(packet::Pid::new(pid as u16));
510 }
511 self.filters_registered = pids_seen;
512 }
513}
514
515impl<Ctx: DemuxContext> psi::WholeSectionSyntaxPayloadParser for PatProcessor<Ctx> {
516 type Context = Ctx;
517
518 fn section(
519 &mut self,
520 ctx: &mut Self::Context,
521 header: &psi::SectionCommonHeader,
522 table_syntax_header: &psi::TableSyntaxHeader<'_>,
523 data: &[u8],
524 ) {
525 let start = psi::SectionCommonHeader::SIZE + psi::TableSyntaxHeader::SIZE;
526 let end = data.len() - 4; self.new_table(
528 ctx,
529 header,
530 table_syntax_header,
531 &pat::PatSection::new(&data[start..end]),
532 );
533 }
534}
535
536pub trait DemuxContext: Sized {
545 type F: PacketFilter<Ctx = Self>;
547
548 fn filter_changeset(&mut self) -> &mut FilterChangeset<Self::F>;
550
551 fn construct(&mut self, req: FilterRequest<'_, '_>) -> Self::F;
555}
556
557pub struct PatPacketFilter<Ctx: DemuxContext> {
564 pat_section_packet_consumer: PacketFilterConsumer<PatProcessor<Ctx>>,
565}
566impl<Ctx: DemuxContext> Default for PatPacketFilter<Ctx> {
567 fn default() -> PatPacketFilter<Ctx> {
568 let pat_proc = PatProcessor::default();
569 PatPacketFilter {
570 pat_section_packet_consumer: psi::SectionPacketConsumer::new(
571 psi::SectionSyntaxSectionProcessor::new(psi::DedupSectionSyntaxPayloadParser::new(
572 psi::BufferSectionSyntaxParser::new(
573 psi::CrcCheckWholeSectionSyntaxPayloadParser::new(pat_proc),
574 ),
575 )),
576 ),
577 }
578 }
579}
580impl<Ctx: DemuxContext> PacketFilter for PatPacketFilter<Ctx> {
581 type Ctx = Ctx;
582
583 fn consume(&mut self, ctx: &mut Self::Ctx, pk: &packet::Packet<'_>) {
584 self.pat_section_packet_consumer.consume(ctx, pk);
585 }
586}
587
588pub struct Demultiplex<Ctx: DemuxContext> {
600 processor_by_pid: Filters<Ctx::F>,
601}
602impl<Ctx: DemuxContext> Demultiplex<Ctx> {
603 pub fn new(ctx: &mut Ctx) -> Demultiplex<Ctx> {
607 let mut result = Demultiplex {
608 processor_by_pid: Filters::default(),
609 };
610
611 result.processor_by_pid.insert(
612 psi::pat::PAT_PID,
613 ctx.construct(FilterRequest::ByPid(psi::pat::PAT_PID)),
614 );
615
616 result
617 }
618
619 pub fn push(&mut self, ctx: &mut Ctx, buf: &[u8]) {
622 let mut itr = buf
624 .chunks_exact(packet::Packet::SIZE)
625 .map(packet::Packet::try_new);
626 let mut pk = if let Some(Some(p)) = itr.next() {
627 p
628 } else {
629 return;
630 };
631 'outer: loop {
632 let this_pid = pk.pid();
633 if !self.processor_by_pid.contains(this_pid) {
634 self.add_pid_filter(ctx, this_pid);
635 };
636 let this_proc = self.processor_by_pid.get(this_pid).unwrap();
637 'inner: loop {
638 if pk.transport_error_indicator() {
639 warn!("{:?} transport_error_indicator", pk.pid());
642 } else if pk.transport_scrambling_control().is_scrambled() {
643 warn!(
645 "{:?} dropping scrambled packet {:?}",
646 pk.pid(),
647 pk.transport_scrambling_control()
648 );
649 } else {
650 this_proc.consume(ctx, &pk);
651 if !ctx.filter_changeset().is_empty() {
652 break 'inner;
653 }
654 }
655 pk = if let Some(Some(p)) = itr.next() {
656 p
657 } else {
658 break 'outer;
659 };
660 if pk.pid() != this_pid {
661 continue 'outer;
662 }
663 }
664 if !ctx.filter_changeset().is_empty() {
665 ctx.filter_changeset().apply(&mut self.processor_by_pid);
666 }
667 debug_assert!(ctx.filter_changeset().is_empty());
668 pk = if let Some(Some(p)) = itr.next() {
669 p
670 } else {
671 break 'outer;
672 };
673 }
674 }
675
676 fn add_pid_filter(&mut self, ctx: &mut Ctx, this_pid: packet::Pid) {
677 let filter = ctx.construct(FilterRequest::ByPid(this_pid));
678 self.processor_by_pid.insert(this_pid, filter);
679 }
680}
681
682#[cfg(test)]
683pub(crate) mod test {
684 use bitstream_io::{BitWrite, BitWriter, BE};
685 use hex_literal::*;
686 use std::io;
687
688 use crate::demultiplex;
689 use crate::demultiplex::{Filters, NullPacketFilter, PacketFilter};
690 use crate::packet;
691 use crate::packet::{Packet, Pid};
692 use crate::psi;
693 use crate::psi::WholeSectionSyntaxPayloadParser;
694 use bitstream_io::BigEndian;
695
696 pub struct CountPacketFilter {
697 count: u64,
698 }
699 impl PacketFilter for CountPacketFilter {
700 type Ctx = NullDemuxContext;
701
702 fn consume(&mut self, _ctx: &mut Self::Ctx, _pk: &Packet<'_>) {
703 self.count += 1;
704 }
705 }
706
707 packet_filter_switch! {
708 NullFilterSwitch<NullDemuxContext> {
709 Pat: demultiplex::PatPacketFilter<NullDemuxContext>,
710 Pmt: demultiplex::PmtPacketFilter<NullDemuxContext>,
711 Nul: demultiplex::NullPacketFilter<NullDemuxContext>,
712 Count: CountPacketFilter,
713 }
714 }
715 demux_context!(NullDemuxContext, NullFilterSwitch);
716 impl NullDemuxContext {
717 fn do_construct(&mut self, req: demultiplex::FilterRequest<'_, '_>) -> NullFilterSwitch {
718 match req {
719 demultiplex::FilterRequest::ByPid(psi::pat::PAT_PID) => {
720 NullFilterSwitch::Pat(demultiplex::PatPacketFilter::default())
721 }
722 demultiplex::FilterRequest::ByPid(_) => {
723 NullFilterSwitch::Nul(demultiplex::NullPacketFilter::default())
724 }
725 demultiplex::FilterRequest::ByStream { .. } => {
726 NullFilterSwitch::Nul(demultiplex::NullPacketFilter::default())
727 }
728 demultiplex::FilterRequest::Pmt {
729 pid,
730 program_number,
731 } => NullFilterSwitch::Pmt(demultiplex::PmtPacketFilter::new(pid, program_number)),
732 demultiplex::FilterRequest::Nit { .. } => {
733 NullFilterSwitch::Nul(demultiplex::NullPacketFilter::default())
734 }
735 }
736 }
737 }
738
739 #[test]
740 fn demux_empty() {
741 let mut ctx = NullDemuxContext::new();
742 let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
743 deplex.push(&mut ctx, &[0x0; 0][..]);
744 }
745
746 #[test]
747 fn pat() {
748 let buf = hex!("474000150000B00D0001C100000001E1E02D507804FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF");
750 let mut ctx = NullDemuxContext::new();
751 let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
752 deplex.push(&mut ctx, &buf[..]);
753 assert!(deplex.processor_by_pid.contains(Pid::new(480)));
755 }
756
757 #[test]
758 fn pat_no_existing_program() {
759 let mut processor = demultiplex::PatProcessor::default();
760 let section = vec![
761 0, 0, 0, 0x0D, 0x00, 0b00000001, 0xC1, 0x00, 0, 1, 0, 101, 0, 0, 0, 0, ];
768 let header = psi::SectionCommonHeader::new(§ion[..psi::SectionCommonHeader::SIZE]);
769 let table_syntax_header =
770 psi::TableSyntaxHeader::new(§ion[psi::SectionCommonHeader::SIZE..]);
771 let mut ctx = NullDemuxContext::new();
772 processor.section(&mut ctx, &header, &table_syntax_header, §ion[..]);
773 let mut changes = ctx.changeset.updates.into_iter();
774 if let Some(demultiplex::FilterChange::Insert(pid, _)) = changes.next() {
775 assert_eq!(packet::Pid::new(101), pid);
776 } else {
777 panic!();
778 }
779 }
780
781 #[test]
782 fn pat_remove_existing_program() {
783 let mut ctx = NullDemuxContext::new();
784 let mut processor = demultiplex::PatProcessor::default();
785 {
786 let section = vec![
787 0, 0, 0, 0x0D, 0x00, 0b00000001, 0xC1, 0x00,
790 0, 1, 0, 101, 0, 0, 0, 0, ];
795 let header = psi::SectionCommonHeader::new(§ion[..psi::SectionCommonHeader::SIZE]);
796 let table_syntax_header =
797 psi::TableSyntaxHeader::new(§ion[psi::SectionCommonHeader::SIZE..]);
798 processor.section(&mut ctx, &header, &table_syntax_header, §ion[..]);
799 }
800 ctx.changeset.updates.clear();
801 {
802 let section = vec![
803 0, 0, 0, 0x0D, 0x00, 0b00000011, 0xC1, 0x00, 0, 0, 0, 0, ];
809 let header = psi::SectionCommonHeader::new(§ion[..psi::SectionCommonHeader::SIZE]);
810 let table_syntax_header =
811 psi::TableSyntaxHeader::new(§ion[psi::SectionCommonHeader::SIZE..]);
812 processor.section(&mut ctx, &header, &table_syntax_header, §ion[..]);
813 }
814 let mut changes = ctx.changeset.updates.into_iter();
815 if let Some(demultiplex::FilterChange::Remove(pid)) = changes.next() {
816 assert_eq!(packet::Pid::new(101), pid);
817 } else {
818 panic!();
819 }
820 }
821
822 pub(crate) fn make_test_data<F>(builder: F) -> Vec<u8>
823 where
824 F: Fn(&mut BitWriter<Vec<u8>, BE>) -> Result<(), io::Error>,
825 {
826 let data: Vec<u8> = Vec::new();
827 let mut w = BitWriter::endian(data, BigEndian);
828 builder(&mut w).unwrap();
829 w.into_writer()
830 }
831
832 #[test]
833 fn pmt_new_stream() {
834 let pid = packet::Pid::new(101);
836 let program_number = 1001;
837 let mut processor = demultiplex::PmtProcessor::new(pid, program_number);
838 let section = make_test_data(|w| {
839 w.write(8, 0x02)?; w.write_bit(true)?; w.write_bit(false)?; w.write(2, 3)?; w.write(12, 20)?; w.write(16, 0)?; w.write(2, 3)?; w.write(5, 0)?; w.write(1, 1)?; w.write(8, 0)?; w.write(8, 0)?; w.write(3, 7)?; w.write(13, 123)?; w.write(4, 15)?; w.write(12, 0)?; w.write(8, 0)?; w.write(3, 7)?; w.write(13, 201)?; w.write(4, 15)?; w.write(12, 6)?; w.write(8, 0)?; w.write(8, 1)?; w.write(8, 0)?; w.write(8, 0)?; w.write(8, 1)?; w.write(8, 0)?; w.write(32, 0) });
878 let header = psi::SectionCommonHeader::new(§ion[..psi::SectionCommonHeader::SIZE]);
879 let table_syntax_header =
880 psi::TableSyntaxHeader::new(§ion[psi::SectionCommonHeader::SIZE..]);
881 let mut ctx = NullDemuxContext::new();
882 processor.section(&mut ctx, &header, &table_syntax_header, §ion[..]);
883 let mut changes = ctx.changeset.updates.into_iter();
884 if let Some(demultiplex::FilterChange::Insert(pid, _)) = changes.next() {
885 assert_eq!(packet::Pid::new(201), pid);
886 } else {
887 panic!();
888 }
889 }
890
891 #[test]
892 fn filters() {
893 let mut filters = Filters::<NullPacketFilter<NullDemuxContext>>::default();
894 assert!(!filters.contains(Pid::PAT));
895 assert!(filters.get(Pid::PAT).is_none());
896 filters.insert(Pid::PAT, NullPacketFilter::default());
897 assert!(filters.contains(Pid::PAT));
898 assert!(filters.get(Pid::PAT).is_some());
899 filters.remove(Pid::PAT);
900 assert!(!filters.contains(Pid::PAT));
901 assert!(filters.get(Pid::PAT).is_none());
902 }
903
904 #[test]
905 fn bad_sync() {
906 let mut buf = hex!("474000150000B00D0001C100000001E1E02D507804FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
907 .to_vec();
908 buf[0] = 0; let mut ctx = NullDemuxContext::new();
910 let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
911 deplex.push(&mut ctx, &buf[..]);
912 }
914
915 #[test]
916 fn unknown_pid() {
917 let mut buf = hex!("474000150000B00D0001C100000001E1E02D507804FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
918 .to_vec();
919 let new_pid: u16 = 123;
920 assert_eq!(new_pid & 0b1110_0000_0000_0000, 0);
921 buf[1] = buf[1] & 0b1110_0000 | ((new_pid >> 8) as u8);
922 buf[2] = (new_pid & 0xff) as u8;
923 let mut ctx = NullDemuxContext::new();
925 let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
926 assert!(!deplex.processor_by_pid.contains(Pid::new(new_pid)));
927 deplex.push(&mut ctx, &buf[..]);
928 assert!(deplex.processor_by_pid.contains(Pid::new(new_pid)));
931 }
932
933 #[test]
934 fn ignore_scrambled_packet() {
935 let mut ctx = NullDemuxContext::new();
936 let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
937 let count = CountPacketFilter { count: 0 };
938 deplex
939 .processor_by_pid
940 .insert(Pid::STUFFING, NullFilterSwitch::Count(count));
941
942 let mut buf = hex!("47400015FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
943 .to_vec();
944 let new_pid: u16 = Pid::STUFFING.into();
945 assert_eq!(new_pid & 0b1110_0000_0000_0000, 0);
946 buf[1] = buf[1] & 0b1110_0000 | ((new_pid >> 8) as u8);
947 buf[2] = (new_pid & 0xff) as u8;
948
949 deplex.push(&mut ctx, &buf);
950 if let Some(NullFilterSwitch::Count(count)) = deplex.processor_by_pid.get(Pid::STUFFING) {
952 assert_eq!(count.count, 1);
953 } else {
954 unreachable!()
955 }
956
957 let new_transport_scrambling_control = 1;
958 buf[3] = buf[3] & 0b0011_1111 | (new_transport_scrambling_control << 6);
959
960 deplex.push(&mut ctx, &buf);
961 if let Some(NullFilterSwitch::Count(count)) = deplex.processor_by_pid.get(Pid::STUFFING) {
964 assert_eq!(count.count, 1);
965 } else {
966 unreachable!()
967 }
968 }
969
970 #[test]
971 fn ignore_error_packet() {
972 let _ = env_logger::builder()
973 .filter_level(log::LevelFilter::Warn)
974 .is_test(true)
975 .try_init();
976 let mut ctx = NullDemuxContext::new();
977 let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
978 let count = CountPacketFilter { count: 0 };
979 deplex
980 .processor_by_pid
981 .insert(Pid::STUFFING, NullFilterSwitch::Count(count));
982
983 let mut buf = hex!("47400015FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
984 .to_vec();
985 let new_pid: u16 = Pid::STUFFING.into();
986 assert_eq!(new_pid & 0b1110_0000_0000_0000, 0);
987 buf[1] = buf[1] & 0b1110_0000 | ((new_pid >> 8) as u8);
988 buf[2] = (new_pid & 0xff) as u8;
989
990 deplex.push(&mut ctx, &buf);
991 if let Some(NullFilterSwitch::Count(count)) = deplex.processor_by_pid.get(Pid::STUFFING) {
993 assert_eq!(count.count, 1);
994 } else {
995 unreachable!()
996 }
997
998 buf[1] = buf[1] | 0b1000_0000;
1000
1001 deplex.push(&mut ctx, &buf);
1002 if let Some(NullFilterSwitch::Count(count)) = deplex.processor_by_pid.get(Pid::STUFFING) {
1005 assert_eq!(count.count, 1);
1006 } else {
1007 unreachable!()
1008 }
1009 }
1010}