mpeg2ts_reader/
demultiplex.rs

1//! Main types implementing the demultiplexer state-machine.
2//!
3//! Construct an instance of [`Demultiplex`](struct.Demultiplex.html) and feed it one a succession
4//! of byte-slices containing the Transport Stream data.
5//!
6//! Users of this crate are expected to provide their own implementations of,
7//!
8//!  - [`PacketFilter`](trait.PacketFilter.html) - for defining per-stream-type handling, possibly
9//!    by using the [`packet_filter_switch!()`](../macro.packet_filter_switch.html) macro to create
10//!    an enum implementing this trait.
11//!  - [`DemuxContext`](trait.DemuxContext.html) - to create specific `PacketFilter` instances
12//!    for each type of sub-stream found within the Transport Stream data. possibly by using the
13//!    [`demux_context!()`](../macro.demux_context.html) macro.
14
15use crate::packet;
16use crate::psi;
17use crate::psi::pat;
18use crate::psi::pmt::PmtSection;
19use crate::psi::pmt::StreamInfo;
20use crate::StreamType;
21use log::warn;
22use std::marker;
23
24/// Trait to which `Demultiplex` delegates handling of subsets of Transport Stream packets.
25///
26/// A packet filter can collaborate with the owning `Demultiplex` instance via the `DemuxContext`
27/// to which they will both be lent mutable access.  For example, by making entries in the
28/// `FilterChangeset` owned by the `DemuxContext`, a `PacketFilter` implementation may alter
29/// the handling of subsequent packets in the Transport Stream.
30pub trait PacketFilter {
31    /// The type of context-object used by the `Demultiplex` instance with which implementing
32    /// `PacketFilters` will be collaborating.
33    type Ctx: DemuxContext;
34
35    /// Implements filter-specific packet processing logic.
36    fn consume(&mut self, ctx: &mut Self::Ctx, pk: &packet::Packet<'_>);
37}
38
39/// No-op implementation of `PacketFilter`.
40///
41/// Sometimes a Transport Stream will contain packets that you know you want to ignore.  This type
42/// can be used when you have to register a filter for such packets with the demultiplexer.
43pub struct NullPacketFilter<Ctx: DemuxContext> {
44    phantom: marker::PhantomData<Ctx>,
45}
46impl<Ctx: DemuxContext> Default for NullPacketFilter<Ctx> {
47    fn default() -> NullPacketFilter<Ctx> {
48        NullPacketFilter {
49            phantom: marker::PhantomData,
50        }
51    }
52}
53impl<Ctx: DemuxContext> PacketFilter for NullPacketFilter<Ctx> {
54    type Ctx = Ctx;
55    fn consume(&mut self, _ctx: &mut Self::Ctx, _pk: &packet::Packet<'_>) {
56        // ignore
57    }
58}
59
60/// Creates the boilerplate needed for a filter-implementation-specific `DemuxContext`.
61///
62/// This macro takes two arguments; the name for the new type, and the name of an existing
63/// implementation of `PacketFilter`.  It then..
64///
65/// 1. creates a struct with the given name, wrapping an instance of `FilterChangeset`
66/// 2. provides an implementation of `default()` for that struct
67/// 3. provides an implementation of `DemuxContext`
68///
69/// **NB** The implementation of `DemuxContext` will assume that your own implementation of the
70/// type will provide a `do_construct()` method, to which its implementation of
71/// [`DemuxContext::construct()`](DemuxContext::construct) will delegate.
72///
73/// # Example
74///
75/// ```
76/// # #[macro_use]
77/// # extern crate mpeg2ts_reader;
78/// use mpeg2ts_reader::demultiplex;
79/// # fn main() {
80/// // Create an enum that implements PacketFilter as required by your application.
81/// packet_filter_switch!{
82///     MyFilterSwitch<MyDemuxContext> {
83///         Pat: demultiplex::PatPacketFilter<MyDemuxContext>,
84///         Pmt: demultiplex::PmtPacketFilter<MyDemuxContext>,
85///         Nul: demultiplex::NullPacketFilter<MyDemuxContext>,
86///     }
87/// };
88///
89/// // Create an implementation of the DemuxContext trait for the PacketFilter implementation
90/// // created above.
91/// demux_context!(MyDemuxContext, MyFilterSwitch);
92/// impl MyDemuxContext {
93///     fn do_construct(&mut self, req: demultiplex::FilterRequest<'_, '_>) -> MyFilterSwitch {
94///         // ...inspect 'req', construct appropriate 'MyFilterSwitch' variant
95/// #        unimplemented!()
96///     }
97/// }
98///
99/// let mut ctx = MyDemuxContext::new();
100/// // .. use the ctx value while demultiplexing some data ..
101/// # }
102/// ```
103#[macro_export]
104macro_rules! demux_context {
105    ($name:ident, $filter:ty) => {
106        pub struct $name {
107            changeset: $crate::demultiplex::FilterChangeset<$filter>,
108        }
109        impl $name {
110            pub fn new() -> Self {
111                $name {
112                    changeset: $crate::demultiplex::FilterChangeset::default(),
113                }
114            }
115        }
116        impl $crate::demultiplex::DemuxContext for $name {
117            type F = $filter;
118
119            fn filter_changeset(&mut self) -> &mut $crate::demultiplex::FilterChangeset<Self::F> {
120                &mut self.changeset
121            }
122            fn construct(&mut self, req: $crate::demultiplex::FilterRequest<'_, '_>) -> Self::F {
123                self.do_construct(req)
124            }
125        }
126    };
127}
128
129/// Creates an enum which implements [`PacketFilter`](demultiplex/trait.PacketFilter.html) by
130/// delegating to other `PacketFilter` implementations, depending on the enum-variant.  It's
131/// intended that the types created by this macro be used as the type-parameter for an instance of
132/// the `Demultiplex` type, allowing `Demultiplex` to support many kinds of `PacketFilter`, without
133/// the cost of having to box them.
134///
135/// See [demux_context!()](macro.demux_context.html) for an example.
136#[macro_export]
137macro_rules! packet_filter_switch {
138    (
139        $name:ident<$ctx:ty> {
140            $( $case_name:ident : $t:ty ),*,
141        }
142    ) => {
143        pub enum $name {
144            $( $case_name($t), )*
145        }
146        impl $crate::demultiplex::PacketFilter for $name {
147            type Ctx = $ctx;
148            #[inline(always)]
149            fn consume(&mut self, ctx: &mut $ctx, pk: &$crate::packet::Packet<'_>) {
150                match self {
151                    $( &mut $name::$case_name(ref mut f) => f.consume(ctx, pk), )*
152
153                }
154            }
155        }
156    }
157}
158
159/// Growable list of filters (implementations of [`PacketFilter`](trait.PacketFilter.html)),
160/// indexed by [`Pid`](../packet/struct.Pid.html).  Lookups produce an `Option`, and the result
161/// is `None` rather than `panic!()` when not found.
162struct Filters<F: PacketFilter> {
163    filters_by_pid: Vec<Option<F>>,
164}
165impl<F: PacketFilter> Default for Filters<F> {
166    fn default() -> Filters<F> {
167        Filters {
168            filters_by_pid: vec![],
169        }
170    }
171}
172impl<F: PacketFilter> Filters<F> {
173    pub fn contains(&self, pid: packet::Pid) -> bool {
174        usize::from(pid) < self.filters_by_pid.len()
175            && self.filters_by_pid[usize::from(pid)].is_some()
176    }
177
178    pub fn get(&mut self, pid: packet::Pid) -> Option<&mut F> {
179        if usize::from(pid) >= self.filters_by_pid.len() {
180            None
181        } else {
182            self.filters_by_pid[usize::from(pid)].as_mut()
183        }
184    }
185
186    pub fn insert(&mut self, pid: packet::Pid, filter: F) {
187        let diff = usize::from(pid) as isize - self.filters_by_pid.len() as isize;
188        if diff >= 0 {
189            for _ in 0..=diff {
190                self.filters_by_pid.push(None);
191            }
192        }
193        self.filters_by_pid[usize::from(pid)] = Some(filter);
194    }
195
196    pub fn remove(&mut self, pid: packet::Pid) {
197        if usize::from(pid) < self.filters_by_pid.len() {
198            self.filters_by_pid[usize::from(pid)] = None;
199        }
200    }
201}
202
203// A filter can't change the map of filters-by-pid that it is itself owned by while the filter is
204// running, so this changeset protocol allows a filter to specify any filter updates required so
205// the demultiplexer can apply them when the filter is complete
206
207/// Represents the intention to either insert a new `PacketFilter` into the `Demultiplex` instance
208/// or remove an old `PacketFilter` from the `Demultiplex` instance.
209pub enum FilterChange<F: PacketFilter> {
210    /// Insert the given filter for the given `Pid`.
211    Insert(packet::Pid, F),
212    /// Remove any filter for the given `Pid`.
213    Remove(packet::Pid),
214}
215impl<F: PacketFilter> FilterChange<F> {
216    fn apply(self, filters: &mut Filters<F>) {
217        match self {
218            FilterChange::Insert(pid, filter) => filters.insert(pid, filter),
219            FilterChange::Remove(pid) => filters.remove(pid),
220        };
221    }
222}
223impl<F: PacketFilter> std::fmt::Debug for FilterChange<F> {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
225        match *self {
226            FilterChange::Insert(pid, _) => write!(f, "FilterChange::Insert {{ {:?}, ... }}", pid),
227            FilterChange::Remove(pid) => write!(f, "FilterChange::Remove {{ {:?}, ... }}", pid),
228        }
229    }
230}
231
232/// Owns a queue of [`FilterChange`](enum.FilterChange.html) objects representing pending updates
233/// to the Pid handling of the `Demultiplexer`.
234///
235/// These changes need to be queued since practically a `PacketFilter` implementation cannot be
236/// allowed to remove itself from the owning `Demultiplex` instance while it is in the act of
237/// filtering a packet.
238///
239/// The public interface allows items to be added to the queue, and the internal implementation of
240/// `Demultiplex` will later remove them.
241#[derive(Debug)]
242pub struct FilterChangeset<F: PacketFilter> {
243    updates: Vec<FilterChange<F>>,
244}
245impl<F: PacketFilter> Default for FilterChangeset<F> {
246    fn default() -> FilterChangeset<F> {
247        FilterChangeset {
248            updates: Vec::new(),
249        }
250    }
251}
252impl<F: PacketFilter> FilterChangeset<F> {
253    /// Queue the insertion of the given `PacketFilter` for the given Pid, after the `Demultiplex`
254    /// instance has finished handling the current packet.
255    pub fn insert(&mut self, pid: packet::Pid, filter: F) {
256        self.updates.push(FilterChange::Insert(pid, filter))
257    }
258    /// Queue the removal of the existing `PacketFilter` for the given Pid, after the `Demultiplex`
259    /// instance has finished handling the current packet.
260    pub fn remove(&mut self, pid: packet::Pid) {
261        self.updates.push(FilterChange::Remove(pid))
262    }
263
264    fn apply(&mut self, filters: &mut Filters<F>) {
265        for update in self.updates.drain(..) {
266            update.apply(filters);
267        }
268    }
269    /// Are there any changes queued in this changeset?
270    pub fn is_empty(&self) -> bool {
271        self.updates.is_empty()
272    }
273}
274
275impl<F: PacketFilter> std::iter::IntoIterator for FilterChangeset<F> {
276    type Item = FilterChange<F>;
277    type IntoIter = std::vec::IntoIter<FilterChange<F>>;
278
279    fn into_iter(self) -> std::vec::IntoIter<FilterChange<F>> {
280        self.updates.into_iter()
281    }
282}
283
284/// Request that may be submitted to a
285/// [`DemuxContext::construct()`](trait.DemuxContext.html) implementation.
286#[derive(Debug)]
287pub enum FilterRequest<'a, 'buf> {
288    /// requests a filter implementation for handling a PID contained in the transport stream that
289    /// was not announced via other means (PAT/PMT).
290    ByPid(packet::Pid),
291    /// requests a filter for the stream with the given details which has just been discovered
292    /// within a Program Map Table section.
293    ByStream {
294        /// The `Pid` of the program containing the stream to be handled
295        program_pid: packet::Pid,
296        /// The type of the stream to be handled
297        stream_type: StreamType,
298        /// The full PmtSection defining the stream needing to he handled
299        pmt: &'a PmtSection<'buf>,
300        /// the PMT stream information for the specific stream being handled (which will be one
301        /// of the values inside the `pmt` which is also provided.
302        stream_info: &'a StreamInfo<'buf>,
303    },
304    /// Requests a filter implementation for handling Program Map Table sections
305    Pmt {
306        /// the `Pid` which contains the PMT
307        pid: packet::Pid,
308        /// the _program number_ of the program for which this will be the PMT
309        program_number: u16,
310    },
311    /// requests a filter implementation to handle packets containing Network Information Table data
312    Nit {
313        /// The `Pid` of the packets which contain the NIT.
314        pid: packet::Pid,
315    },
316}
317
318struct PmtProcessor<Ctx: DemuxContext> {
319    pid: packet::Pid,
320    program_number: u16,
321    filters_registered: fixedbitset::FixedBitSet,
322    phantom: marker::PhantomData<Ctx>,
323}
324
325impl<Ctx: DemuxContext> PmtProcessor<Ctx> {
326    pub fn new(pid: packet::Pid, program_number: u16) -> PmtProcessor<Ctx> {
327        PmtProcessor {
328            pid,
329            program_number,
330            filters_registered: fixedbitset::FixedBitSet::with_capacity(packet::Pid::PID_COUNT),
331            phantom: marker::PhantomData,
332        }
333    }
334
335    fn new_table(
336        &mut self,
337        ctx: &mut Ctx,
338        header: &psi::SectionCommonHeader,
339        _table_syntax_header: &psi::TableSyntaxHeader<'_>,
340        sect: &PmtSection<'_>,
341    ) {
342        if 0x02 != header.table_id {
343            warn!(
344                "[PMT {:?} program:{}] Expected PMT to have table id 0x2, but got {:#x}",
345                self.pid, self.program_number, header.table_id
346            );
347            return;
348        }
349        // pass the table_id value this far!
350        let mut pids_seen = fixedbitset::FixedBitSet::with_capacity(packet::Pid::PID_COUNT);
351        for stream_info in sect.streams() {
352            let pes_packet_consumer = ctx.construct(FilterRequest::ByStream {
353                program_pid: self.pid,
354                stream_type: stream_info.stream_type(),
355                pmt: sect,
356                stream_info: &stream_info,
357            });
358            ctx.filter_changeset()
359                .insert(stream_info.elementary_pid(), pes_packet_consumer);
360            pids_seen.insert(usize::from(stream_info.elementary_pid()));
361            self.filters_registered
362                .insert(usize::from(stream_info.elementary_pid()));
363        }
364        // remove filters for descriptors we've seen before that are not present in this updated
365        // table,
366        self.remove_outdated(ctx, pids_seen);
367    }
368
369    fn remove_outdated(&mut self, ctx: &mut Ctx, pids_seen: fixedbitset::FixedBitSet) {
370        for pid in self.filters_registered.difference(&pids_seen) {
371            ctx.filter_changeset().remove(packet::Pid::new(pid as u16));
372        }
373        self.filters_registered = pids_seen;
374    }
375}
376
377impl<Ctx: DemuxContext> psi::WholeSectionSyntaxPayloadParser for PmtProcessor<Ctx> {
378    type Context = Ctx;
379
380    fn section(
381        &mut self,
382        ctx: &mut Self::Context,
383        header: &psi::SectionCommonHeader,
384        table_syntax_header: &psi::TableSyntaxHeader<'_>,
385        data: &[u8],
386    ) {
387        let start = psi::SectionCommonHeader::SIZE + psi::TableSyntaxHeader::SIZE;
388        let end = data.len() - 4; // remove CRC bytes
389        match PmtSection::from_bytes(&data[start..end]) {
390            Ok(sect) => self.new_table(ctx, header, table_syntax_header, &sect),
391            Err(e) => warn!(
392                "[PMT {:?} program:{}] problem reading data: {:?}",
393                self.pid, self.program_number, e
394            ),
395        }
396    }
397}
398
399/// TODO: this type does not belong here
400#[derive(Debug)]
401pub enum DemuxError {
402    /// The transport stream has a syntax error that means there was not enough data present to
403    /// parse the requested structure.
404    NotEnoughData {
405        /// The name of the field we were unable to parse
406        field: &'static str,
407        /// The expected size of the field data
408        expected: usize,
409        /// the actual size of date available within the transport stream
410        actual: usize,
411    },
412}
413
414type PacketFilterConsumer<Proc> = psi::SectionPacketConsumer<
415    psi::SectionSyntaxSectionProcessor<
416        psi::DedupSectionSyntaxPayloadParser<
417            psi::BufferSectionSyntaxParser<psi::CrcCheckWholeSectionSyntaxPayloadParser<Proc>>,
418        >,
419    >,
420>;
421
422/// `PacketFilter` implementation which will insert some other `PacketFilter` into the `Demultiplex`
423/// instance for each sub-stream listed in one of the stream's PMT-sections.
424///
425/// The particular `PacketFilter` to be inserted is determined by querying
426/// [`DemuxContxt::construct()`](trait.DemuxContext.html), passing a
427/// [`FilterRequest::ByStream`](enum.FilterRequest.html#variant.ByStream) request.
428pub struct PmtPacketFilter<Ctx: DemuxContext + 'static> {
429    pmt_section_packet_consumer: PacketFilterConsumer<PmtProcessor<Ctx>>,
430}
431impl<Ctx: DemuxContext> PmtPacketFilter<Ctx> {
432    /// creates a new `PmtPacketFilter` for PMT sections in packets with the given `Pid`, and for
433    /// the given _program number_.
434    pub fn new(pid: packet::Pid, program_number: u16) -> PmtPacketFilter<Ctx> {
435        let pmt_proc = PmtProcessor::new(pid, program_number);
436        PmtPacketFilter {
437            pmt_section_packet_consumer: psi::SectionPacketConsumer::new(
438                psi::SectionSyntaxSectionProcessor::new(psi::DedupSectionSyntaxPayloadParser::new(
439                    psi::BufferSectionSyntaxParser::new(
440                        psi::CrcCheckWholeSectionSyntaxPayloadParser::new(pmt_proc),
441                    ),
442                )),
443            ),
444        }
445    }
446}
447impl<Ctx: DemuxContext> PacketFilter for PmtPacketFilter<Ctx> {
448    type Ctx = Ctx;
449
450    fn consume(&mut self, ctx: &mut Self::Ctx, pk: &packet::Packet<'_>) {
451        self.pmt_section_packet_consumer.consume(ctx, pk);
452    }
453}
454
455struct PatProcessor<Ctx: DemuxContext> {
456    filters_registered: fixedbitset::FixedBitSet, // TODO: https://crates.io/crates/typenum_bitset ?
457    phantom: marker::PhantomData<Ctx>,
458}
459
460impl<Ctx: DemuxContext> Default for PatProcessor<Ctx> {
461    fn default() -> PatProcessor<Ctx> {
462        PatProcessor {
463            filters_registered: fixedbitset::FixedBitSet::with_capacity(packet::Pid::PID_COUNT),
464            phantom: marker::PhantomData,
465        }
466    }
467}
468impl<Ctx: DemuxContext> PatProcessor<Ctx> {
469    fn new_table(
470        &mut self,
471        ctx: &mut Ctx,
472        header: &psi::SectionCommonHeader,
473        _table_syntax_header: &psi::TableSyntaxHeader<'_>,
474        sect: &pat::PatSection<'_>,
475    ) {
476        if 0x00 != header.table_id {
477            warn!(
478                "Expected PAT to have table id 0x0, but got {:#x}",
479                header.table_id
480            );
481            return;
482        }
483        let mut pids_seen = fixedbitset::FixedBitSet::with_capacity(packet::Pid::PID_COUNT);
484        // add or update filters for descriptors we've not seen before,
485        for desc in sect.programs() {
486            let filter = match desc {
487                pat::ProgramDescriptor::Program {
488                    program_number,
489                    pid,
490                } => ctx.construct(FilterRequest::Pmt {
491                    pid,
492                    program_number,
493                }),
494                pat::ProgramDescriptor::Network { pid } => {
495                    ctx.construct(FilterRequest::Nit { pid })
496                }
497            };
498            ctx.filter_changeset().insert(desc.pid(), filter);
499            pids_seen.insert(usize::from(desc.pid()));
500            self.filters_registered.insert(usize::from(desc.pid()));
501        }
502        // remove filters for descriptors we've seen before that are not present in this updated
503        // table,
504        self.remove_outdated(ctx, pids_seen);
505    }
506
507    fn remove_outdated(&mut self, ctx: &mut Ctx, pids_seen: fixedbitset::FixedBitSet) {
508        for pid in self.filters_registered.difference(&pids_seen) {
509            ctx.filter_changeset().remove(packet::Pid::new(pid as u16));
510        }
511        self.filters_registered = pids_seen;
512    }
513}
514
515impl<Ctx: DemuxContext> psi::WholeSectionSyntaxPayloadParser for PatProcessor<Ctx> {
516    type Context = Ctx;
517
518    fn section(
519        &mut self,
520        ctx: &mut Self::Context,
521        header: &psi::SectionCommonHeader,
522        table_syntax_header: &psi::TableSyntaxHeader<'_>,
523        data: &[u8],
524    ) {
525        let start = psi::SectionCommonHeader::SIZE + psi::TableSyntaxHeader::SIZE;
526        let end = data.len() - 4; // remove CRC bytes
527        self.new_table(
528            ctx,
529            header,
530            table_syntax_header,
531            &pat::PatSection::new(&data[start..end]),
532        );
533    }
534}
535
536// ---- demux ----
537
538/// Context shared between the `Dumultiplex` object and the `PacketFilter` instances
539/// which customise its behavior.
540///
541/// This trait defines behaviour that the process of demultiplexing requires from the context
542/// object, but an application has the opportunity to add further state to the type implementing
543/// this trait, if desired.
544pub trait DemuxContext: Sized {
545    /// the type of `PacketFilter` which the `Demultiplex` object needs to use
546    type F: PacketFilter<Ctx = Self>;
547
548    /// mutable reference to the `FilterChangeset` this context holds
549    fn filter_changeset(&mut self) -> &mut FilterChangeset<Self::F>;
550
551    /// The application using this crate should provide an implementation of this method that
552    /// returns an instance of `PacketFilter` implementing the application's desired handling for
553    /// the given content.
554    fn construct(&mut self, req: FilterRequest<'_, '_>) -> Self::F;
555}
556
557/// `PacketFilter` implementation which will insert some other `PacketFilter` into the `Demultiplex`
558/// instance for each program listed in the stream's PAT-section.
559///
560/// The particular `PacketFilter` to be inserted is determined by querying
561/// [`DemuxContext::construct()`](trait.DemuxContext.html), passing a
562/// [`FilterRequest::Pmt`](enum.FilterRequest.html#variant.Pmt) request.
563pub struct PatPacketFilter<Ctx: DemuxContext> {
564    pat_section_packet_consumer: PacketFilterConsumer<PatProcessor<Ctx>>,
565}
566impl<Ctx: DemuxContext> Default for PatPacketFilter<Ctx> {
567    fn default() -> PatPacketFilter<Ctx> {
568        let pat_proc = PatProcessor::default();
569        PatPacketFilter {
570            pat_section_packet_consumer: psi::SectionPacketConsumer::new(
571                psi::SectionSyntaxSectionProcessor::new(psi::DedupSectionSyntaxPayloadParser::new(
572                    psi::BufferSectionSyntaxParser::new(
573                        psi::CrcCheckWholeSectionSyntaxPayloadParser::new(pat_proc),
574                    ),
575                )),
576            ),
577        }
578    }
579}
580impl<Ctx: DemuxContext> PacketFilter for PatPacketFilter<Ctx> {
581    type Ctx = Ctx;
582
583    fn consume(&mut self, ctx: &mut Self::Ctx, pk: &packet::Packet<'_>) {
584        self.pat_section_packet_consumer.consume(ctx, pk);
585    }
586}
587
588/// Transport Stream demultiplexer.
589///
590/// Uses the `DemuxContext` passed to `new()` to create Filters for
591/// processing the payloads of each packet discovered in the TransportStream.
592///
593/// # Incremental parsing
594///
595/// Successive sections of transport stream data can be passed in order to `push()`, and the
596/// demultiplexing process will resume at the start of one buffer where it left off at the end of
597/// the last.  This supports for example the processing of sections of TS data as they are received
598/// from the network, without needing to copy them out of the source network buffer.
599pub struct Demultiplex<Ctx: DemuxContext> {
600    processor_by_pid: Filters<Ctx::F>,
601}
602impl<Ctx: DemuxContext> Demultiplex<Ctx> {
603    /// Create a `Dumultiplex` instance, and populate it with an initial `PacketFilter` for
604    /// handling PAT packets (which is created by the given `DemuxContext` object).
605    /// The returned value does not retain any reference to the given `DemuxContext` reference.
606    pub fn new(ctx: &mut Ctx) -> Demultiplex<Ctx> {
607        let mut result = Demultiplex {
608            processor_by_pid: Filters::default(),
609        };
610
611        result.processor_by_pid.insert(
612            psi::pat::PAT_PID,
613            ctx.construct(FilterRequest::ByPid(psi::pat::PAT_PID)),
614        );
615
616        result
617    }
618
619    /// Parse the Transport Stream packets in the given buffer, using functions from the given
620    /// `DemuxContent` object
621    pub fn push(&mut self, ctx: &mut Ctx, buf: &[u8]) {
622        // TODO: simplify
623        let mut itr = buf
624            .chunks_exact(packet::Packet::SIZE)
625            .map(packet::Packet::try_new);
626        let mut pk = if let Some(Some(p)) = itr.next() {
627            p
628        } else {
629            return;
630        };
631        'outer: loop {
632            let this_pid = pk.pid();
633            if !self.processor_by_pid.contains(this_pid) {
634                self.add_pid_filter(ctx, this_pid);
635            };
636            let this_proc = self.processor_by_pid.get(this_pid).unwrap();
637            'inner: loop {
638                if pk.transport_error_indicator() {
639                    // drop packets that have transport_error_indicator set, on the assumption that
640                    // the contents are nonsense
641                    warn!("{:?} transport_error_indicator", pk.pid());
642                } else if pk.transport_scrambling_control().is_scrambled() {
643                    // TODO: allow descrambler to be plugged in
644                    warn!(
645                        "{:?} dropping scrambled packet {:?}",
646                        pk.pid(),
647                        pk.transport_scrambling_control()
648                    );
649                } else {
650                    this_proc.consume(ctx, &pk);
651                    if !ctx.filter_changeset().is_empty() {
652                        break 'inner;
653                    }
654                }
655                pk = if let Some(Some(p)) = itr.next() {
656                    p
657                } else {
658                    break 'outer;
659                };
660                if pk.pid() != this_pid {
661                    continue 'outer;
662                }
663            }
664            if !ctx.filter_changeset().is_empty() {
665                ctx.filter_changeset().apply(&mut self.processor_by_pid);
666            }
667            debug_assert!(ctx.filter_changeset().is_empty());
668            pk = if let Some(Some(p)) = itr.next() {
669                p
670            } else {
671                break 'outer;
672            };
673        }
674    }
675
676    fn add_pid_filter(&mut self, ctx: &mut Ctx, this_pid: packet::Pid) {
677        let filter = ctx.construct(FilterRequest::ByPid(this_pid));
678        self.processor_by_pid.insert(this_pid, filter);
679    }
680}
681
682#[cfg(test)]
683pub(crate) mod test {
684    use bitstream_io::{BitWrite, BitWriter, BE};
685    use hex_literal::*;
686    use std::io;
687
688    use crate::demultiplex;
689    use crate::demultiplex::{Filters, NullPacketFilter, PacketFilter};
690    use crate::packet;
691    use crate::packet::{Packet, Pid};
692    use crate::psi;
693    use crate::psi::WholeSectionSyntaxPayloadParser;
694    use bitstream_io::BigEndian;
695
696    pub struct CountPacketFilter {
697        count: u64,
698    }
699    impl PacketFilter for CountPacketFilter {
700        type Ctx = NullDemuxContext;
701
702        fn consume(&mut self, _ctx: &mut Self::Ctx, _pk: &Packet<'_>) {
703            self.count += 1;
704        }
705    }
706
707    packet_filter_switch! {
708        NullFilterSwitch<NullDemuxContext> {
709            Pat: demultiplex::PatPacketFilter<NullDemuxContext>,
710            Pmt: demultiplex::PmtPacketFilter<NullDemuxContext>,
711            Nul: demultiplex::NullPacketFilter<NullDemuxContext>,
712            Count: CountPacketFilter,
713        }
714    }
715    demux_context!(NullDemuxContext, NullFilterSwitch);
716    impl NullDemuxContext {
717        fn do_construct(&mut self, req: demultiplex::FilterRequest<'_, '_>) -> NullFilterSwitch {
718            match req {
719                demultiplex::FilterRequest::ByPid(psi::pat::PAT_PID) => {
720                    NullFilterSwitch::Pat(demultiplex::PatPacketFilter::default())
721                }
722                demultiplex::FilterRequest::ByPid(_) => {
723                    NullFilterSwitch::Nul(demultiplex::NullPacketFilter::default())
724                }
725                demultiplex::FilterRequest::ByStream { .. } => {
726                    NullFilterSwitch::Nul(demultiplex::NullPacketFilter::default())
727                }
728                demultiplex::FilterRequest::Pmt {
729                    pid,
730                    program_number,
731                } => NullFilterSwitch::Pmt(demultiplex::PmtPacketFilter::new(pid, program_number)),
732                demultiplex::FilterRequest::Nit { .. } => {
733                    NullFilterSwitch::Nul(demultiplex::NullPacketFilter::default())
734                }
735            }
736        }
737    }
738
739    #[test]
740    fn demux_empty() {
741        let mut ctx = NullDemuxContext::new();
742        let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
743        deplex.push(&mut ctx, &[0x0; 0][..]);
744    }
745
746    #[test]
747    fn pat() {
748        // this PAT data references a program with PID 480
749        let buf = hex!("474000150000B00D0001C100000001E1E02D507804FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF");
750        let mut ctx = NullDemuxContext::new();
751        let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
752        deplex.push(&mut ctx, &buf[..]);
753        // check that a processor was registered for the program referenced by the PAT
754        assert!(deplex.processor_by_pid.contains(Pid::new(480)));
755    }
756
757    #[test]
758    fn pat_no_existing_program() {
759        let mut processor = demultiplex::PatProcessor::default();
760        let section = vec![
761            0, 0, 0, // common header
762            // table syntax header
763            0x0D, 0x00, 0b00000001, 0xC1, 0x00, // PAT section
764            0, 1, // program_number
765            0, 101, // pid
766            0, 0, 0, 0, // CRC (incorrect!)
767        ];
768        let header = psi::SectionCommonHeader::new(&section[..psi::SectionCommonHeader::SIZE]);
769        let table_syntax_header =
770            psi::TableSyntaxHeader::new(&section[psi::SectionCommonHeader::SIZE..]);
771        let mut ctx = NullDemuxContext::new();
772        processor.section(&mut ctx, &header, &table_syntax_header, &section[..]);
773        let mut changes = ctx.changeset.updates.into_iter();
774        if let Some(demultiplex::FilterChange::Insert(pid, _)) = changes.next() {
775            assert_eq!(packet::Pid::new(101), pid);
776        } else {
777            panic!();
778        }
779    }
780
781    #[test]
782    fn pat_remove_existing_program() {
783        let mut ctx = NullDemuxContext::new();
784        let mut processor = demultiplex::PatProcessor::default();
785        {
786            let section = vec![
787                0, 0, 0, // common header
788                // table syntax header
789                0x0D, 0x00, 0b00000001, 0xC1, 0x00,
790                // PAT with a single program; next version of the  table removes this,
791                0, 1, // program_number
792                0, 101, // pid
793                0, 0, 0, 0, // CRC (incorrect)
794            ];
795            let header = psi::SectionCommonHeader::new(&section[..psi::SectionCommonHeader::SIZE]);
796            let table_syntax_header =
797                psi::TableSyntaxHeader::new(&section[psi::SectionCommonHeader::SIZE..]);
798            processor.section(&mut ctx, &header, &table_syntax_header, &section[..]);
799        }
800        ctx.changeset.updates.clear();
801        {
802            let section = vec![
803                0, 0, 0, // common header
804                // table syntax header
805                0x0D, 0x00, 0b00000011, 0xC1, 0x00, // new version!
806                // empty PMT - simulate removal of PID 101
807                0, 0, 0, 0, // CRC (incorrect)
808            ];
809            let header = psi::SectionCommonHeader::new(&section[..psi::SectionCommonHeader::SIZE]);
810            let table_syntax_header =
811                psi::TableSyntaxHeader::new(&section[psi::SectionCommonHeader::SIZE..]);
812            processor.section(&mut ctx, &header, &table_syntax_header, &section[..]);
813        }
814        let mut changes = ctx.changeset.updates.into_iter();
815        if let Some(demultiplex::FilterChange::Remove(pid)) = changes.next() {
816            assert_eq!(packet::Pid::new(101), pid);
817        } else {
818            panic!();
819        }
820    }
821
822    pub(crate) fn make_test_data<F>(builder: F) -> Vec<u8>
823    where
824        F: Fn(&mut BitWriter<Vec<u8>, BE>) -> Result<(), io::Error>,
825    {
826        let data: Vec<u8> = Vec::new();
827        let mut w = BitWriter::endian(data, BigEndian);
828        builder(&mut w).unwrap();
829        w.into_writer()
830    }
831
832    #[test]
833    fn pmt_new_stream() {
834        // TODO arrange for the filter table to already contain an entry for PID 101
835        let pid = packet::Pid::new(101);
836        let program_number = 1001;
837        let mut processor = demultiplex::PmtProcessor::new(pid, program_number);
838        let section = make_test_data(|w| {
839            // common section header,
840            w.write(8, 0x02)?; // table_id
841            w.write_bit(true)?; // section_syntax_indicator
842            w.write_bit(false)?; // private_indicator
843            w.write(2, 3)?; // reserved
844            w.write(12, 20)?; // section_length
845
846            // section syntax header,
847            w.write(16, 0)?; // id
848            w.write(2, 3)?; // reserved
849            w.write(5, 0)?; // version
850            w.write(1, 1)?; // current_next_indicator
851            w.write(8, 0)?; // section_number
852            w.write(8, 0)?; // last_section_number
853
854            // PMT section payload
855            w.write(3, 7)?; // reserved
856            w.write(13, 123)?; // pcr_pid
857            w.write(4, 15)?; // reserved
858            w.write(12, 0)?; // program_info_length
859
860            // program_info_length=0, so no descriptors follow; straight into stream info
861            w.write(8, 0)?; // stream_type
862            w.write(3, 7)?; // reserved
863            w.write(13, 201)?; // elementary_pid
864            w.write(4, 15)?; // reserved
865            w.write(12, 6)?; // es_info_length
866
867            // and now, two made-up descriptors which need to fill up es_info_length-bytes
868            w.write(8, 0)?; // descriptor_tag
869            w.write(8, 1)?; // descriptor_length
870            w.write(8, 0)?; // made-up descriptor data not following any spec
871
872            // second descriptor
873            w.write(8, 0)?; // descriptor_tag
874            w.write(8, 1)?; // descriptor_length
875            w.write(8, 0)?; // made-up descriptor data not following any spec
876            w.write(32, 0) // CRC (incorrect)
877        });
878        let header = psi::SectionCommonHeader::new(&section[..psi::SectionCommonHeader::SIZE]);
879        let table_syntax_header =
880            psi::TableSyntaxHeader::new(&section[psi::SectionCommonHeader::SIZE..]);
881        let mut ctx = NullDemuxContext::new();
882        processor.section(&mut ctx, &header, &table_syntax_header, &section[..]);
883        let mut changes = ctx.changeset.updates.into_iter();
884        if let Some(demultiplex::FilterChange::Insert(pid, _)) = changes.next() {
885            assert_eq!(packet::Pid::new(201), pid);
886        } else {
887            panic!();
888        }
889    }
890
891    #[test]
892    fn filters() {
893        let mut filters = Filters::<NullPacketFilter<NullDemuxContext>>::default();
894        assert!(!filters.contains(Pid::PAT));
895        assert!(filters.get(Pid::PAT).is_none());
896        filters.insert(Pid::PAT, NullPacketFilter::default());
897        assert!(filters.contains(Pid::PAT));
898        assert!(filters.get(Pid::PAT).is_some());
899        filters.remove(Pid::PAT);
900        assert!(!filters.contains(Pid::PAT));
901        assert!(filters.get(Pid::PAT).is_none());
902    }
903
904    #[test]
905    fn bad_sync() {
906        let mut buf = hex!("474000150000B00D0001C100000001E1E02D507804FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
907            .to_vec();
908        buf[0] = 0; // owerwrite the sync byte with an invalid valud
909        let mut ctx = NullDemuxContext::new();
910        let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
911        deplex.push(&mut ctx, &buf[..]);
912        // while we don't assert anything yet, the code should not panic
913    }
914
915    #[test]
916    fn unknown_pid() {
917        let mut buf = hex!("474000150000B00D0001C100000001E1E02D507804FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
918            .to_vec();
919        let new_pid: u16 = 123;
920        assert_eq!(new_pid & 0b1110_0000_0000_0000, 0);
921        buf[1] = buf[1] & 0b1110_0000 | ((new_pid >> 8) as u8);
922        buf[2] = (new_pid & 0xff) as u8;
923        // a single sync-byte - too short since a full 188 byte packet is expected
924        let mut ctx = NullDemuxContext::new();
925        let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
926        assert!(!deplex.processor_by_pid.contains(Pid::new(new_pid)));
927        deplex.push(&mut ctx, &buf[..]);
928        // discovery of a packet with a new PID should case a processor to be registered for that
929        // PID
930        assert!(deplex.processor_by_pid.contains(Pid::new(new_pid)));
931    }
932
933    #[test]
934    fn ignore_scrambled_packet() {
935        let mut ctx = NullDemuxContext::new();
936        let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
937        let count = CountPacketFilter { count: 0 };
938        deplex
939            .processor_by_pid
940            .insert(Pid::STUFFING, NullFilterSwitch::Count(count));
941
942        let mut buf = hex!("47400015FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
943            .to_vec();
944        let new_pid: u16 = Pid::STUFFING.into();
945        assert_eq!(new_pid & 0b1110_0000_0000_0000, 0);
946        buf[1] = buf[1] & 0b1110_0000 | ((new_pid >> 8) as u8);
947        buf[2] = (new_pid & 0xff) as u8;
948
949        deplex.push(&mut ctx, &buf);
950        // the packet was unscrambled, so we expect call-count to increase by 1
951        if let Some(NullFilterSwitch::Count(count)) = deplex.processor_by_pid.get(Pid::STUFFING) {
952            assert_eq!(count.count, 1);
953        } else {
954            unreachable!()
955        }
956
957        let new_transport_scrambling_control = 1;
958        buf[3] = buf[3] & 0b0011_1111 | (new_transport_scrambling_control << 6);
959
960        deplex.push(&mut ctx, &buf);
961        // the packet was scrambled, so we don't expect the packet filter to have been called a
962        // second time - count should remain at 1
963        if let Some(NullFilterSwitch::Count(count)) = deplex.processor_by_pid.get(Pid::STUFFING) {
964            assert_eq!(count.count, 1);
965        } else {
966            unreachable!()
967        }
968    }
969
970    #[test]
971    fn ignore_error_packet() {
972        let _ = env_logger::builder()
973            .filter_level(log::LevelFilter::Warn)
974            .is_test(true)
975            .try_init();
976        let mut ctx = NullDemuxContext::new();
977        let mut deplex = demultiplex::Demultiplex::new(&mut ctx);
978        let count = CountPacketFilter { count: 0 };
979        deplex
980            .processor_by_pid
981            .insert(Pid::STUFFING, NullFilterSwitch::Count(count));
982
983        let mut buf = hex!("47400015FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")
984            .to_vec();
985        let new_pid: u16 = Pid::STUFFING.into();
986        assert_eq!(new_pid & 0b1110_0000_0000_0000, 0);
987        buf[1] = buf[1] & 0b1110_0000 | ((new_pid >> 8) as u8);
988        buf[2] = (new_pid & 0xff) as u8;
989
990        deplex.push(&mut ctx, &buf);
991        // the packet was good, so we expect call-count to increase by 1
992        if let Some(NullFilterSwitch::Count(count)) = deplex.processor_by_pid.get(Pid::STUFFING) {
993            assert_eq!(count.count, 1);
994        } else {
995            unreachable!()
996        }
997
998        // set the transport_error_indicator
999        buf[1] = buf[1] | 0b1000_0000;
1000
1001        deplex.push(&mut ctx, &buf);
1002        // the packet has error_indicator, so we don't expect the packet filter to have been called
1003        // a second time - count should remain at 1
1004        if let Some(NullFilterSwitch::Count(count)) = deplex.processor_by_pid.get(Pid::STUFFING) {
1005            assert_eq!(count.count, 1);
1006        } else {
1007            unreachable!()
1008        }
1009    }
1010}