Skip to main content

dvb_si/collect/
mod.rs

1//! Multi-section table collection.
2//!
3//! Section parsers in [`crate::tables`] describe one wire section. This module
4//! adds the next layer up: collect all sections in `0..=last_section_number`
5//! for one logical version, then expose a complete table view.
6//!
7//! Collectors validate long-form section CRCs before retaining bytes. If the
8//! input already came from [`crate::demux::SiDemux`], that validation has
9//! already happened; direct section-byte callers get the same guard here.
10//!
11//! A collector error describes the section that was just pushed, not the whole
12//! stream. Long-running consumers should normally log/drop that section and
13//! continue feeding later sections; previous valid collector state is retained.
14
15use alloc::collections::BTreeMap;
16use alloc::sync::Arc;
17use alloc::vec;
18use alloc::vec::Vec;
19
20use crate::descriptors::{AnyDescriptor, DescriptorLoop, DescriptorRegistry};
21use broadcast_common::Parse;
22use mpeg_ts::section::Section;
23
24mod bat;
25mod eit;
26mod nit;
27mod sdt;
28
29pub use bat::*;
30pub use eit::*;
31pub use nit::*;
32pub use sdt::*;
33
34/// Default cap on the number of in-progress logical keys retained by
35/// [`SectionSetCollector`].
36///
37/// 256 concurrent collections is generous while bounding a hostile stream that
38/// rotates table_id / extension / current_next_indicator across PIDs to force
39/// unbounded map growth. The cap is applied to the partial-sections map. When
40/// the map is full, incoming sections for new keys are skipped until
41/// [`clear`](SectionSetCollector::clear) frees capacity.
42pub const DEFAULT_MAX_PARTIAL_KEYS: usize = 256;
43
44/// Result alias for collection operations.
45pub type CollectResult<T> = core::result::Result<T, CollectError>;
46
47/// Errors returned by multi-section collectors.
48///
49/// These errors are scoped to the current input section. They usually mean
50/// "skip this section and keep going", especially on live streams where a
51/// broadcaster may mutate section bytes without bumping `version_number`.
52#[derive(Debug, thiserror::Error)]
53#[non_exhaustive]
54pub enum CollectError {
55    /// The section bytes did not parse as a generic PSI/SI section.
56    #[error("section parse failed: {0}")]
57    Section(#[from] crate::Error),
58
59    /// A short-form section was fed to a multi-section collector.
60    #[error(
61        "table_id {table_id:#04x} is a short-form section and cannot be multi-section collected"
62    )]
63    ShortFormSection {
64        /// Raw table_id byte.
65        table_id: u8,
66    },
67
68    /// `section_number` was outside the advertised section range.
69    #[error(
70        "section_number {section_number} exceeds last_section_number {last_section_number} for table_id {table_id:#04x}"
71    )]
72    SectionNumberOutOfRange {
73        /// Raw table_id byte.
74        table_id: u8,
75        /// Section number carried by the section.
76        section_number: u8,
77        /// Last section number carried by the section.
78        last_section_number: u8,
79    },
80
81    /// A slot already contained different bytes for the same version.
82    #[error("conflicting bytes for table_id {table_id:#04x} section {section_number}")]
83    ConflictingSection {
84        /// Raw table_id byte.
85        table_id: u8,
86        /// Section slot that conflicted.
87        section_number: u8,
88    },
89
90    /// An EIT schedule section advertised an impossible table-id range.
91    #[error(
92        "EIT schedule table_id {table_id:#04x} is outside advertised range {first_table_id:#04x}..={last_table_id:#04x}"
93    )]
94    EitTableIdOutOfRange {
95        /// Incoming EIT schedule table_id.
96        table_id: u8,
97        /// First table_id for this schedule kind.
98        first_table_id: u8,
99        /// Advertised last_table_id.
100        last_table_id: u8,
101    },
102}
103
104impl From<mpeg_ts::Error> for CollectError {
105    fn from(e: mpeg_ts::Error) -> Self {
106        CollectError::Section(crate::Error::from(e))
107    }
108}
109
110/// Logical key for one section sequence.
111///
112/// The key deliberately excludes `version_number` and `section_number`. Version
113/// changes reset a collection; section numbers index into that collection.
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
115#[non_exhaustive]
116pub struct SectionSetKey {
117    /// Optional PID context supplied by the caller.
118    pub pid: Option<u16>,
119    /// Raw `table_id`.
120    pub table_id: u8,
121    /// Long-form `table_id_extension`.
122    pub extension_id: u16,
123    /// `current_next_indicator`.
124    pub current_next_indicator: bool,
125}
126
127/// Metadata shared by every section in a complete section set.
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129#[non_exhaustive]
130pub struct SectionSetMeta {
131    /// Logical section-set key.
132    pub key: SectionSetKey,
133    /// 5-bit `version_number`.
134    pub version_number: u8,
135    /// Last section number for this set.
136    pub last_section_number: u8,
137}
138
139#[derive(Debug)]
140struct PartialSectionSet {
141    meta: SectionSetMeta,
142    slots: Vec<Option<Arc<[u8]>>>,
143    filled: usize,
144    emitted: bool,
145}
146
147impl PartialSectionSet {
148    fn new(meta: SectionSetMeta) -> Self {
149        let len = meta.last_section_number as usize + 1;
150        Self {
151            meta,
152            slots: vec![None; len],
153            filled: 0,
154            emitted: false,
155        }
156    }
157
158    fn reset(&mut self, meta: SectionSetMeta) {
159        *self = Self::new(meta);
160    }
161
162    fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
163        let index = section_number as usize;
164        if let Some(existing) = &self.slots[index] {
165            if existing.as_ref() == bytes.as_ref() {
166                return Ok(false);
167            }
168            return Err(CollectError::ConflictingSection {
169                table_id: self.meta.key.table_id,
170                section_number,
171            });
172        }
173
174        self.slots[index] = Some(bytes);
175        self.filled += 1;
176        self.emitted = false;
177        Ok(true)
178    }
179
180    fn complete(&self) -> bool {
181        self.filled == self.slots.len()
182    }
183
184    fn to_complete(&self) -> Option<CompleteSectionSet> {
185        if !self.complete() || self.emitted {
186            return None;
187        }
188
189        let sections = self
190            .slots
191            .iter()
192            .map(|slot| slot.as_ref().expect("complete set has no holes").clone())
193            .collect();
194        Some(CompleteSectionSet {
195            meta: self.meta,
196            sections,
197        })
198    }
199}
200
201/// Generic collector for long-form `section_number`/`last_section_number`
202/// sequences.
203///
204/// The constructor [`SectionSetCollector::new`] uses the default cap
205/// [`DEFAULT_MAX_PARTIAL_KEYS`]; the cap is configurable via
206/// [`with_max_partial_keys`](Self::with_max_partial_keys).
207#[derive(Debug)]
208pub struct SectionSetCollector {
209    partial: BTreeMap<SectionSetKey, PartialSectionSet>,
210    max_partial_keys: usize,
211}
212
213impl Default for SectionSetCollector {
214    fn default() -> Self {
215        Self {
216            partial: BTreeMap::new(),
217            max_partial_keys: DEFAULT_MAX_PARTIAL_KEYS,
218        }
219    }
220}
221
222impl SectionSetCollector {
223    /// Create an empty collector with the default cap
224    /// ([`DEFAULT_MAX_PARTIAL_KEYS`]).
225    #[must_use]
226    pub fn new() -> Self {
227        Self::default()
228    }
229
230    /// Replace the partial-key cap (default [`DEFAULT_MAX_PARTIAL_KEYS`]).
231    /// Sections for new keys are skipped when the map is full, until
232    /// [`clear`](Self::clear) frees capacity.
233    #[must_use]
234    pub fn with_max_partial_keys(mut self, max_partial_keys: usize) -> Self {
235        self.max_partial_keys = max_partial_keys;
236        self
237    }
238
239    /// Push one complete section. Returns `Some` only when the logical section
240    /// set has become complete for the first time at this version.
241    ///
242    /// # Errors
243    ///
244    /// Returns a [`CollectError`] if the bytes are not a valid long-form
245    /// section or if the section set becomes internally inconsistent. Treat the
246    /// error as applying to this section only unless your application wants
247    /// strict stream-fail behavior.
248    pub fn push_section(
249        &mut self,
250        bytes: impl AsRef<[u8]>,
251    ) -> CollectResult<Option<CompleteSectionSet>> {
252        self.push_section_with_pid(None, bytes)
253    }
254
255    /// Push one complete section with PID context.
256    ///
257    /// The PID is folded into the section-set key so tables with identical
258    /// table id/extension on different PIDs do not collide.
259    pub fn push_section_with_pid(
260        &mut self,
261        pid: Option<u16>,
262        bytes: impl AsRef<[u8]>,
263    ) -> CollectResult<Option<CompleteSectionSet>> {
264        let raw = bytes.as_ref();
265        let section = Section::parse(raw)?;
266        if !section.section_syntax_indicator {
267            return Err(CollectError::ShortFormSection {
268                table_id: section.table_id,
269            });
270        }
271        if section.section_number > section.last_section_number {
272            return Err(CollectError::SectionNumberOutOfRange {
273                table_id: section.table_id,
274                section_number: section.section_number,
275                last_section_number: section.last_section_number,
276            });
277        }
278        section.validate_crc(raw)?;
279
280        let key = SectionSetKey {
281            pid,
282            table_id: section.table_id,
283            extension_id: section.extension_id,
284            current_next_indicator: section.current_next_indicator,
285        };
286        let meta = SectionSetMeta {
287            key,
288            version_number: section.version_number,
289            last_section_number: section.last_section_number,
290        };
291        let bytes: Arc<[u8]> = Arc::from(raw);
292
293        // Cap check: skip new keys when the map is full
294        if !self.partial.contains_key(&key) && self.partial.len() >= self.max_partial_keys {
295            return Ok(None);
296        }
297
298        let partial = self
299            .partial
300            .entry(key)
301            .or_insert_with(|| PartialSectionSet::new(meta));
302
303        if partial.meta.version_number != meta.version_number
304            || partial.meta.last_section_number != meta.last_section_number
305        {
306            partial.reset(meta);
307        }
308
309        partial.insert(section.section_number, bytes)?;
310        let complete = partial.to_complete();
311        if complete.is_some() {
312            partial.emitted = true;
313        }
314        Ok(complete)
315    }
316
317    /// Drop all retained partial section sets.
318    pub fn clear(&mut self) {
319        self.partial.clear();
320    }
321
322    /// Number of retained partial section-set states.
323    #[must_use]
324    pub fn len(&self) -> usize {
325        self.partial.len()
326    }
327
328    /// Whether the collector currently has no retained state.
329    #[must_use]
330    pub fn is_empty(&self) -> bool {
331        self.partial.is_empty()
332    }
333}
334
335/// A complete owned set of original section bytes for one logical section
336/// sequence.
337#[derive(Debug, Clone)]
338pub struct CompleteSectionSet {
339    meta: SectionSetMeta,
340    sections: Vec<Arc<[u8]>>,
341}
342
343/// Generic complete table view for one collected section set.
344///
345/// This is the all-table escape hatch: every long-form PSI/SI table with
346/// `section_number`/`last_section_number` can be collected into a
347/// [`CompleteSectionSet`] and parsed as `CompleteTable<T>`. Table-specific
348/// complete views such as [`CompleteNit`] add flattened convenience fields where
349/// the logical table shape is useful.
350#[derive(Debug)]
351pub struct CompleteTable<T> {
352    meta: SectionSetMeta,
353    sections: Vec<T>,
354}
355
356impl<T> CompleteTable<T> {
357    /// Metadata shared by the section set.
358    #[must_use]
359    pub const fn meta(&self) -> SectionSetMeta {
360        self.meta
361    }
362
363    /// Parsed sections in section-number order.
364    #[must_use]
365    pub fn sections(&self) -> &[T] {
366        &self.sections
367    }
368
369    /// Consume the complete table and return the parsed sections.
370    #[must_use]
371    pub fn into_sections(self) -> Vec<T> {
372        self.sections
373    }
374}
375
376impl CompleteSectionSet {
377    /// Metadata shared by the section set.
378    #[must_use]
379    pub const fn meta(&self) -> SectionSetMeta {
380        self.meta
381    }
382
383    /// Complete section bytes in section-number order.
384    #[must_use]
385    pub fn section_bytes(&self) -> impl ExactSizeIterator<Item = &[u8]> {
386        self.sections.iter().map(AsRef::as_ref)
387    }
388
389    /// Parse every section in this set as `T`.
390    ///
391    /// The parsed values borrow from this [`CompleteSectionSet`], so callers can
392    /// retain the set and use borrowed typed views without copying table loops.
393    pub fn parse_sections<'a, T>(&'a self) -> crate::Result<Vec<T>>
394    where
395        T: Parse<'a, Error = crate::Error>,
396    {
397        self.section_bytes().map(T::parse).collect()
398    }
399
400    /// Parse this set as a generic complete table.
401    ///
402    /// Use this for any long-form table that does not need a specialised
403    /// flattened logical view.
404    pub fn table<'a, T>(&'a self) -> crate::Result<CompleteTable<T>>
405    where
406        T: Parse<'a, Error = crate::Error>,
407    {
408        Ok(CompleteTable {
409            meta: self.meta,
410            sections: self.parse_sections()?,
411        })
412    }
413
414    /// Build a complete NIT view from this section set.
415    pub fn nit(&self) -> crate::Result<CompleteNit<'_>> {
416        CompleteNit::parse(self, None)
417    }
418
419    /// Build a complete NIT view using a descriptor registry.
420    pub fn nit_with_registry<'a>(
421        &'a self,
422        registry: &'a DescriptorRegistry,
423    ) -> crate::Result<CompleteNit<'a>> {
424        CompleteNit::parse(self, Some(registry))
425    }
426
427    /// Build a complete BAT view from this section set.
428    pub fn bat(&self) -> crate::Result<CompleteBat<'_>> {
429        CompleteBat::parse(self, None)
430    }
431
432    /// Build a complete BAT view using a descriptor registry.
433    pub fn bat_with_registry<'a>(
434        &'a self,
435        registry: &'a DescriptorRegistry,
436    ) -> crate::Result<CompleteBat<'a>> {
437        CompleteBat::parse(self, Some(registry))
438    }
439
440    /// Build a complete SDT view from this section set.
441    pub fn sdt(&self) -> crate::Result<CompleteSdt<'_>> {
442        CompleteSdt::parse(self, None)
443    }
444
445    /// Build a complete SDT view using a descriptor registry.
446    pub fn sdt_with_registry<'a>(
447        &'a self,
448        registry: &'a DescriptorRegistry,
449    ) -> crate::Result<CompleteSdt<'a>> {
450        CompleteSdt::parse(self, Some(registry))
451    }
452
453    /// Build a complete EIT view from this section set.
454    pub fn eit(&self) -> crate::Result<CompleteEit<'_>> {
455        CompleteEit::parse(self, None)
456    }
457
458    /// Build a complete EIT view using a descriptor registry.
459    pub fn eit_with_registry<'a>(
460        &'a self,
461        registry: &'a DescriptorRegistry,
462    ) -> crate::Result<CompleteEit<'a>> {
463        CompleteEit::parse(self, Some(registry))
464    }
465}
466
467/// Parsed descriptor loop retaining the raw bytes and the typed descriptor
468/// results.
469#[derive(Debug)]
470pub struct ParsedDescriptorLoop<'a> {
471    raw: DescriptorLoop<'a>,
472    descriptors: Vec<crate::Result<AnyDescriptor<'a>>>,
473}
474
475impl<'a> ParsedDescriptorLoop<'a> {
476    pub(crate) fn parse(raw: DescriptorLoop<'a>, registry: Option<&'a DescriptorRegistry>) -> Self {
477        let descriptors = match registry {
478            Some(registry) => registry.parse_loop(raw.raw()).collect(),
479            None => raw.iter().collect(),
480        };
481        Self { raw, descriptors }
482    }
483
484    /// Raw descriptor-loop bytes.
485    ///
486    /// Use `raw().iter_with_extensions(&desc_reg, &ext_reg)` to recover custom
487    /// extension bodies from a `Complete*` view.
488    #[must_use]
489    pub const fn raw(&self) -> DescriptorLoop<'a> {
490        self.raw
491    }
492
493    /// Typed descriptor parse results in wire order.
494    pub fn descriptors(&self) -> &[crate::Result<AnyDescriptor<'a>>] {
495        &self.descriptors
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    const TEST_TABLE_ID: u8 = 0x42;
504
505    fn min_section(extension_id: u16) -> Vec<u8> {
506        let section_length: u16 = 9; // 5 (ext_header) + 0 (payload) + 4 (crc)
507        let mut buf = vec![0u8; 12];
508        buf[0] = TEST_TABLE_ID;
509        buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
510        buf[2] = (section_length & 0xFF) as u8;
511        buf[3..5].copy_from_slice(&extension_id.to_be_bytes());
512        buf[5] = 0xC1;
513        buf[6] = 0;
514        buf[7] = 0;
515        let crc = broadcast_common::crc32_mpeg2::compute(&buf[..8]);
516        buf[8..12].copy_from_slice(&crc.to_be_bytes());
517        buf
518    }
519
520    #[test]
521    fn collect_single_section_is_complete() {
522        let mut c = SectionSetCollector::new();
523        let sec = min_section(0);
524        let result = c.push_section(&sec).unwrap();
525        assert!(result.is_some());
526        assert_eq!(c.len(), 1);
527    }
528
529    #[test]
530    fn partial_keys_cap_skips_new_keys() {
531        let mut c = SectionSetCollector::new().with_max_partial_keys(3);
532
533        // Push sections for 3 distinct extension IDs — fills the cap.
534        for eid in 0..3u16 {
535            let sec = min_section(eid);
536            let result = c.push_section(&sec).unwrap();
537            assert!(
538                result.is_some(),
539                "single-section set for eid {eid} completes"
540            );
541        }
542        assert_eq!(c.len(), 3);
543
544        // Push a 4th distinct key — should be skipped (cap full).
545        let sec4 = min_section(3);
546        let result = c.push_section(&sec4).unwrap();
547        assert!(result.is_none(), "new key beyond cap must be skipped");
548        assert_eq!(c.len(), 3);
549
550        // Clear frees space — 4th key can now enter.
551        c.clear();
552        assert!(c.is_empty());
553        let result = c.push_section(&sec4).unwrap();
554        assert!(result.is_some());
555        assert_eq!(c.len(), 1);
556    }
557
558    #[test]
559    fn partial_keys_cap_does_not_skip_existing_key() {
560        let mut c = SectionSetCollector::new().with_max_partial_keys(1);
561
562        // Fill the cap with one multi-section NIT-like extension (section 0 of 1).
563        let sec0 = {
564            let mut buf = min_section(0xAB);
565            // Make section 0 of 1 be incomplete: change last_section_number to 1
566            buf[7] = 1;
567            // Recompute CRC
568            let crc = broadcast_common::crc32_mpeg2::compute(&buf[..8]);
569            buf[8..12].copy_from_slice(&crc.to_be_bytes());
570            buf
571        };
572        let result = c.push_section(&sec0).unwrap();
573        assert!(result.is_none(), "incomplete section set yields None");
574
575        // Push section 1 of 1 for the same key — cap is full but key already
576        // exists, so it must NOT be skipped.
577        let mut sec1 = min_section(0xAB);
578        sec1[6] = 1; // section_number = 1
579        sec1[7] = 1; // last_section_number = 1
580        let crc = broadcast_common::crc32_mpeg2::compute(&sec1[..8]);
581        sec1[8..12].copy_from_slice(&crc.to_be_bytes());
582
583        let result = c.push_section(&sec1).unwrap();
584        assert!(
585            result.is_some(),
586            "existing key must NOT be skipped when cap full"
587        );
588        assert_eq!(c.len(), 1);
589    }
590}