Skip to main content

dvb_si/collect/
mod.rs

1//! Multi-section table collection.
2//!
3//! Section parsers in [`crate::tables`] describe one wire section. This module
4//! adds the next layer up: collect all sections in `0..=last_section_number`
5//! for one logical version, then expose a complete table view.
6//!
7//! Collectors validate long-form section CRCs before retaining bytes. If the
8//! input already came from [`crate::demux::SiDemux`], that validation has
9//! already happened; direct section-byte callers get the same guard here.
10//!
11//! A collector error describes the section that was just pushed, not the whole
12//! stream. Long-running consumers should normally log/drop that section and
13//! continue feeding later sections; previous valid collector state is retained.
14
15use alloc::collections::BTreeMap;
16use alloc::sync::Arc;
17use alloc::vec;
18use alloc::vec::Vec;
19
20use crate::descriptors::{AnyDescriptor, DescriptorLoop, DescriptorRegistry};
21use crate::section::Section;
22use dvb_common::Parse;
23
24mod bat;
25mod eit;
26mod nit;
27mod sdt;
28
29pub use bat::*;
30pub use eit::*;
31pub use nit::*;
32pub use sdt::*;
33
34/// Default cap on the number of in-progress logical keys retained by
35/// [`SectionSetCollector`].
36///
37/// 256 concurrent collections is generous while bounding a hostile stream that
38/// rotates table_id / extension / current_next_indicator across PIDs to force
39/// unbounded map growth. The cap is applied to the partial-sections map. When
40/// the map is full, incoming sections for new keys are skipped until
41/// [`clear`](SectionSetCollector::clear) frees capacity.
42pub const DEFAULT_MAX_PARTIAL_KEYS: usize = 256;
43
44/// Result alias for collection operations.
45pub type CollectResult<T> = core::result::Result<T, CollectError>;
46
47/// Errors returned by multi-section collectors.
48///
49/// These errors are scoped to the current input section. They usually mean
50/// "skip this section and keep going", especially on live streams where a
51/// broadcaster may mutate section bytes without bumping `version_number`.
52#[derive(Debug, thiserror::Error)]
53#[non_exhaustive]
54pub enum CollectError {
55    /// The section bytes did not parse as a generic PSI/SI section.
56    #[error("section parse failed: {0}")]
57    Section(#[from] crate::Error),
58
59    /// A short-form section was fed to a multi-section collector.
60    #[error(
61        "table_id {table_id:#04x} is a short-form section and cannot be multi-section collected"
62    )]
63    ShortFormSection {
64        /// Raw table_id byte.
65        table_id: u8,
66    },
67
68    /// `section_number` was outside the advertised section range.
69    #[error(
70        "section_number {section_number} exceeds last_section_number {last_section_number} for table_id {table_id:#04x}"
71    )]
72    SectionNumberOutOfRange {
73        /// Raw table_id byte.
74        table_id: u8,
75        /// Section number carried by the section.
76        section_number: u8,
77        /// Last section number carried by the section.
78        last_section_number: u8,
79    },
80
81    /// A slot already contained different bytes for the same version.
82    #[error("conflicting bytes for table_id {table_id:#04x} section {section_number}")]
83    ConflictingSection {
84        /// Raw table_id byte.
85        table_id: u8,
86        /// Section slot that conflicted.
87        section_number: u8,
88    },
89
90    /// An EIT schedule section advertised an impossible table-id range.
91    #[error(
92        "EIT schedule table_id {table_id:#04x} is outside advertised range {first_table_id:#04x}..={last_table_id:#04x}"
93    )]
94    EitTableIdOutOfRange {
95        /// Incoming EIT schedule table_id.
96        table_id: u8,
97        /// First table_id for this schedule kind.
98        first_table_id: u8,
99        /// Advertised last_table_id.
100        last_table_id: u8,
101    },
102}
103
104/// Logical key for one section sequence.
105///
106/// The key deliberately excludes `version_number` and `section_number`. Version
107/// changes reset a collection; section numbers index into that collection.
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
109#[non_exhaustive]
110pub struct SectionSetKey {
111    /// Optional PID context supplied by the caller.
112    pub pid: Option<u16>,
113    /// Raw `table_id`.
114    pub table_id: u8,
115    /// Long-form `table_id_extension`.
116    pub extension_id: u16,
117    /// `current_next_indicator`.
118    pub current_next_indicator: bool,
119}
120
121/// Metadata shared by every section in a complete section set.
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123#[non_exhaustive]
124pub struct SectionSetMeta {
125    /// Logical section-set key.
126    pub key: SectionSetKey,
127    /// 5-bit `version_number`.
128    pub version_number: u8,
129    /// Last section number for this set.
130    pub last_section_number: u8,
131}
132
133#[derive(Debug)]
134struct PartialSectionSet {
135    meta: SectionSetMeta,
136    slots: Vec<Option<Arc<[u8]>>>,
137    filled: usize,
138    emitted: bool,
139}
140
141impl PartialSectionSet {
142    fn new(meta: SectionSetMeta) -> Self {
143        let len = meta.last_section_number as usize + 1;
144        Self {
145            meta,
146            slots: vec![None; len],
147            filled: 0,
148            emitted: false,
149        }
150    }
151
152    fn reset(&mut self, meta: SectionSetMeta) {
153        *self = Self::new(meta);
154    }
155
156    fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
157        let index = section_number as usize;
158        if let Some(existing) = &self.slots[index] {
159            if existing.as_ref() == bytes.as_ref() {
160                return Ok(false);
161            }
162            return Err(CollectError::ConflictingSection {
163                table_id: self.meta.key.table_id,
164                section_number,
165            });
166        }
167
168        self.slots[index] = Some(bytes);
169        self.filled += 1;
170        self.emitted = false;
171        Ok(true)
172    }
173
174    fn complete(&self) -> bool {
175        self.filled == self.slots.len()
176    }
177
178    fn to_complete(&self) -> Option<CompleteSectionSet> {
179        if !self.complete() || self.emitted {
180            return None;
181        }
182
183        let sections = self
184            .slots
185            .iter()
186            .map(|slot| slot.as_ref().expect("complete set has no holes").clone())
187            .collect();
188        Some(CompleteSectionSet {
189            meta: self.meta,
190            sections,
191        })
192    }
193}
194
195/// Generic collector for long-form `section_number`/`last_section_number`
196/// sequences.
197///
198/// The constructor [`SectionSetCollector::new`] uses the default cap
199/// [`DEFAULT_MAX_PARTIAL_KEYS`]; the cap is configurable via
200/// [`with_max_partial_keys`](Self::with_max_partial_keys).
201#[derive(Debug)]
202pub struct SectionSetCollector {
203    partial: BTreeMap<SectionSetKey, PartialSectionSet>,
204    max_partial_keys: usize,
205}
206
207impl Default for SectionSetCollector {
208    fn default() -> Self {
209        Self {
210            partial: BTreeMap::new(),
211            max_partial_keys: DEFAULT_MAX_PARTIAL_KEYS,
212        }
213    }
214}
215
216impl SectionSetCollector {
217    /// Create an empty collector with the default cap
218    /// ([`DEFAULT_MAX_PARTIAL_KEYS`]).
219    #[must_use]
220    pub fn new() -> Self {
221        Self::default()
222    }
223
224    /// Replace the partial-key cap (default [`DEFAULT_MAX_PARTIAL_KEYS`]).
225    /// Sections for new keys are skipped when the map is full, until
226    /// [`clear`](Self::clear) frees capacity.
227    #[must_use]
228    pub fn with_max_partial_keys(mut self, max_partial_keys: usize) -> Self {
229        self.max_partial_keys = max_partial_keys;
230        self
231    }
232
233    /// Push one complete section. Returns `Some` only when the logical section
234    /// set has become complete for the first time at this version.
235    ///
236    /// # Errors
237    ///
238    /// Returns a [`CollectError`] if the bytes are not a valid long-form
239    /// section or if the section set becomes internally inconsistent. Treat the
240    /// error as applying to this section only unless your application wants
241    /// strict stream-fail behavior.
242    pub fn push_section(
243        &mut self,
244        bytes: impl AsRef<[u8]>,
245    ) -> CollectResult<Option<CompleteSectionSet>> {
246        self.push_section_with_pid(None, bytes)
247    }
248
249    /// Push one complete section with PID context.
250    ///
251    /// The PID is folded into the section-set key so tables with identical
252    /// table id/extension on different PIDs do not collide.
253    pub fn push_section_with_pid(
254        &mut self,
255        pid: Option<u16>,
256        bytes: impl AsRef<[u8]>,
257    ) -> CollectResult<Option<CompleteSectionSet>> {
258        let raw = bytes.as_ref();
259        let section = Section::parse(raw)?;
260        if !section.section_syntax_indicator {
261            return Err(CollectError::ShortFormSection {
262                table_id: section.table_id,
263            });
264        }
265        if section.section_number > section.last_section_number {
266            return Err(CollectError::SectionNumberOutOfRange {
267                table_id: section.table_id,
268                section_number: section.section_number,
269                last_section_number: section.last_section_number,
270            });
271        }
272        section.validate_crc(raw)?;
273
274        let key = SectionSetKey {
275            pid,
276            table_id: section.table_id,
277            extension_id: section.extension_id,
278            current_next_indicator: section.current_next_indicator,
279        };
280        let meta = SectionSetMeta {
281            key,
282            version_number: section.version_number,
283            last_section_number: section.last_section_number,
284        };
285        let bytes: Arc<[u8]> = Arc::from(raw);
286
287        // Cap check: skip new keys when the map is full
288        if !self.partial.contains_key(&key) && self.partial.len() >= self.max_partial_keys {
289            return Ok(None);
290        }
291
292        let partial = self
293            .partial
294            .entry(key)
295            .or_insert_with(|| PartialSectionSet::new(meta));
296
297        if partial.meta.version_number != meta.version_number
298            || partial.meta.last_section_number != meta.last_section_number
299        {
300            partial.reset(meta);
301        }
302
303        partial.insert(section.section_number, bytes)?;
304        let complete = partial.to_complete();
305        if complete.is_some() {
306            partial.emitted = true;
307        }
308        Ok(complete)
309    }
310
311    /// Drop all retained partial section sets.
312    pub fn clear(&mut self) {
313        self.partial.clear();
314    }
315
316    /// Number of retained partial section-set states.
317    #[must_use]
318    pub fn len(&self) -> usize {
319        self.partial.len()
320    }
321
322    /// Whether the collector currently has no retained state.
323    #[must_use]
324    pub fn is_empty(&self) -> bool {
325        self.partial.is_empty()
326    }
327}
328
329/// A complete owned set of original section bytes for one logical section
330/// sequence.
331#[derive(Debug, Clone)]
332pub struct CompleteSectionSet {
333    meta: SectionSetMeta,
334    sections: Vec<Arc<[u8]>>,
335}
336
337/// Generic complete table view for one collected section set.
338///
339/// This is the all-table escape hatch: every long-form PSI/SI table with
340/// `section_number`/`last_section_number` can be collected into a
341/// [`CompleteSectionSet`] and parsed as `CompleteTable<T>`. Table-specific
342/// complete views such as [`CompleteNit`] add flattened convenience fields where
343/// the logical table shape is useful.
344#[derive(Debug)]
345pub struct CompleteTable<T> {
346    meta: SectionSetMeta,
347    sections: Vec<T>,
348}
349
350impl<T> CompleteTable<T> {
351    /// Metadata shared by the section set.
352    #[must_use]
353    pub const fn meta(&self) -> SectionSetMeta {
354        self.meta
355    }
356
357    /// Parsed sections in section-number order.
358    #[must_use]
359    pub fn sections(&self) -> &[T] {
360        &self.sections
361    }
362
363    /// Consume the complete table and return the parsed sections.
364    #[must_use]
365    pub fn into_sections(self) -> Vec<T> {
366        self.sections
367    }
368}
369
370impl CompleteSectionSet {
371    /// Metadata shared by the section set.
372    #[must_use]
373    pub const fn meta(&self) -> SectionSetMeta {
374        self.meta
375    }
376
377    /// Complete section bytes in section-number order.
378    #[must_use]
379    pub fn section_bytes(&self) -> impl ExactSizeIterator<Item = &[u8]> {
380        self.sections.iter().map(AsRef::as_ref)
381    }
382
383    /// Parse every section in this set as `T`.
384    ///
385    /// The parsed values borrow from this [`CompleteSectionSet`], so callers can
386    /// retain the set and use borrowed typed views without copying table loops.
387    pub fn parse_sections<'a, T>(&'a self) -> crate::Result<Vec<T>>
388    where
389        T: Parse<'a, Error = crate::Error>,
390    {
391        self.section_bytes().map(T::parse).collect()
392    }
393
394    /// Parse this set as a generic complete table.
395    ///
396    /// Use this for any long-form table that does not need a specialised
397    /// flattened logical view.
398    pub fn table<'a, T>(&'a self) -> crate::Result<CompleteTable<T>>
399    where
400        T: Parse<'a, Error = crate::Error>,
401    {
402        Ok(CompleteTable {
403            meta: self.meta,
404            sections: self.parse_sections()?,
405        })
406    }
407
408    /// Build a complete NIT view from this section set.
409    pub fn nit(&self) -> crate::Result<CompleteNit<'_>> {
410        CompleteNit::parse(self, None)
411    }
412
413    /// Build a complete NIT view using a descriptor registry.
414    pub fn nit_with_registry<'a>(
415        &'a self,
416        registry: &'a DescriptorRegistry,
417    ) -> crate::Result<CompleteNit<'a>> {
418        CompleteNit::parse(self, Some(registry))
419    }
420
421    /// Build a complete BAT view from this section set.
422    pub fn bat(&self) -> crate::Result<CompleteBat<'_>> {
423        CompleteBat::parse(self, None)
424    }
425
426    /// Build a complete BAT view using a descriptor registry.
427    pub fn bat_with_registry<'a>(
428        &'a self,
429        registry: &'a DescriptorRegistry,
430    ) -> crate::Result<CompleteBat<'a>> {
431        CompleteBat::parse(self, Some(registry))
432    }
433
434    /// Build a complete SDT view from this section set.
435    pub fn sdt(&self) -> crate::Result<CompleteSdt<'_>> {
436        CompleteSdt::parse(self, None)
437    }
438
439    /// Build a complete SDT view using a descriptor registry.
440    pub fn sdt_with_registry<'a>(
441        &'a self,
442        registry: &'a DescriptorRegistry,
443    ) -> crate::Result<CompleteSdt<'a>> {
444        CompleteSdt::parse(self, Some(registry))
445    }
446
447    /// Build a complete EIT view from this section set.
448    pub fn eit(&self) -> crate::Result<CompleteEit<'_>> {
449        CompleteEit::parse(self, None)
450    }
451
452    /// Build a complete EIT view using a descriptor registry.
453    pub fn eit_with_registry<'a>(
454        &'a self,
455        registry: &'a DescriptorRegistry,
456    ) -> crate::Result<CompleteEit<'a>> {
457        CompleteEit::parse(self, Some(registry))
458    }
459}
460
461/// Parsed descriptor loop retaining the raw bytes and the typed descriptor
462/// results.
463#[derive(Debug)]
464pub struct ParsedDescriptorLoop<'a> {
465    raw: DescriptorLoop<'a>,
466    descriptors: Vec<crate::Result<AnyDescriptor<'a>>>,
467}
468
469impl<'a> ParsedDescriptorLoop<'a> {
470    pub(crate) fn parse(raw: DescriptorLoop<'a>, registry: Option<&'a DescriptorRegistry>) -> Self {
471        let descriptors = match registry {
472            Some(registry) => registry.parse_loop(raw.raw()).collect(),
473            None => raw.iter().collect(),
474        };
475        Self { raw, descriptors }
476    }
477
478    /// Raw descriptor-loop bytes.
479    ///
480    /// Use `raw().iter_with_extensions(&desc_reg, &ext_reg)` to recover custom
481    /// extension bodies from a `Complete*` view.
482    #[must_use]
483    pub const fn raw(&self) -> DescriptorLoop<'a> {
484        self.raw
485    }
486
487    /// Typed descriptor parse results in wire order.
488    pub fn descriptors(&self) -> &[crate::Result<AnyDescriptor<'a>>] {
489        &self.descriptors
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496
497    const TEST_TABLE_ID: u8 = 0x42;
498
499    fn min_section(extension_id: u16) -> Vec<u8> {
500        let section_length: u16 = 9; // 5 (ext_header) + 0 (payload) + 4 (crc)
501        let mut buf = vec![0u8; 12];
502        buf[0] = TEST_TABLE_ID;
503        buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
504        buf[2] = (section_length & 0xFF) as u8;
505        buf[3..5].copy_from_slice(&extension_id.to_be_bytes());
506        buf[5] = 0xC1;
507        buf[6] = 0;
508        buf[7] = 0;
509        let crc = dvb_common::crc32_mpeg2::compute(&buf[..8]);
510        buf[8..12].copy_from_slice(&crc.to_be_bytes());
511        buf
512    }
513
514    #[test]
515    fn collect_single_section_is_complete() {
516        let mut c = SectionSetCollector::new();
517        let sec = min_section(0);
518        let result = c.push_section(&sec).unwrap();
519        assert!(result.is_some());
520        assert_eq!(c.len(), 1);
521    }
522
523    #[test]
524    fn partial_keys_cap_skips_new_keys() {
525        let mut c = SectionSetCollector::new().with_max_partial_keys(3);
526
527        // Push sections for 3 distinct extension IDs — fills the cap.
528        for eid in 0..3u16 {
529            let sec = min_section(eid);
530            let result = c.push_section(&sec).unwrap();
531            assert!(
532                result.is_some(),
533                "single-section set for eid {eid} completes"
534            );
535        }
536        assert_eq!(c.len(), 3);
537
538        // Push a 4th distinct key — should be skipped (cap full).
539        let sec4 = min_section(3);
540        let result = c.push_section(&sec4).unwrap();
541        assert!(result.is_none(), "new key beyond cap must be skipped");
542        assert_eq!(c.len(), 3);
543
544        // Clear frees space — 4th key can now enter.
545        c.clear();
546        assert!(c.is_empty());
547        let result = c.push_section(&sec4).unwrap();
548        assert!(result.is_some());
549        assert_eq!(c.len(), 1);
550    }
551
552    #[test]
553    fn partial_keys_cap_does_not_skip_existing_key() {
554        let mut c = SectionSetCollector::new().with_max_partial_keys(1);
555
556        // Fill the cap with one multi-section NIT-like extension (section 0 of 1).
557        let sec0 = {
558            let mut buf = min_section(0xAB);
559            // Make section 0 of 1 be incomplete: change last_section_number to 1
560            buf[7] = 1;
561            // Recompute CRC
562            let crc = dvb_common::crc32_mpeg2::compute(&buf[..8]);
563            buf[8..12].copy_from_slice(&crc.to_be_bytes());
564            buf
565        };
566        let result = c.push_section(&sec0).unwrap();
567        assert!(result.is_none(), "incomplete section set yields None");
568
569        // Push section 1 of 1 for the same key — cap is full but key already
570        // exists, so it must NOT be skipped.
571        let mut sec1 = min_section(0xAB);
572        sec1[6] = 1; // section_number = 1
573        sec1[7] = 1; // last_section_number = 1
574        let crc = dvb_common::crc32_mpeg2::compute(&sec1[..8]);
575        sec1[8..12].copy_from_slice(&crc.to_be_bytes());
576
577        let result = c.push_section(&sec1).unwrap();
578        assert!(
579            result.is_some(),
580            "existing key must NOT be skipped when cap full"
581        );
582        assert_eq!(c.len(), 1);
583    }
584}