Skip to main content

dvb_si/collect/
mod.rs

1//! Multi-section table collection.
2//!
3//! Section parsers in [`crate::tables`] describe one wire section. This module
4//! adds the next layer up: collect all sections in `0..=last_section_number`
5//! for one logical version, then expose a complete table view.
6//!
7//! Collectors validate long-form section CRCs before retaining bytes. If the
8//! input already came from [`crate::demux::SiDemux`], that validation has
9//! already happened; direct section-byte callers get the same guard here.
10//!
11//! A collector error describes the section that was just pushed, not the whole
12//! stream. Long-running consumers should normally log/drop that section and
13//! continue feeding later sections; previous valid collector state is retained.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use crate::descriptors::{AnyDescriptor, DescriptorLoop, DescriptorRegistry};
19use crate::section::Section;
20use dvb_common::Parse;
21
22mod bat;
23mod eit;
24mod nit;
25mod sdt;
26
27pub use bat::*;
28pub use eit::*;
29pub use nit::*;
30pub use sdt::*;
31
32/// Default cap on the number of in-progress logical keys retained by
33/// [`SectionSetCollector`].
34///
35/// 256 concurrent collections is generous while bounding a hostile stream that
36/// rotates table_id / extension / current_next_indicator across PIDs to force
37/// unbounded map growth. The cap is applied to the partial-sections map. When
38/// the map is full, incoming sections for new keys are skipped until
39/// [`clear`](SectionSetCollector::clear) frees capacity.
40pub const DEFAULT_MAX_PARTIAL_KEYS: usize = 256;
41
42/// Result alias for collection operations.
43pub type CollectResult<T> = core::result::Result<T, CollectError>;
44
45/// Errors returned by multi-section collectors.
46///
47/// These errors are scoped to the current input section. They usually mean
48/// "skip this section and keep going", especially on live streams where a
49/// broadcaster may mutate section bytes without bumping `version_number`.
50#[derive(Debug, thiserror::Error)]
51#[non_exhaustive]
52pub enum CollectError {
53    /// The section bytes did not parse as a generic PSI/SI section.
54    #[error("section parse failed: {0}")]
55    Section(#[from] crate::Error),
56
57    /// A short-form section was fed to a multi-section collector.
58    #[error(
59        "table_id {table_id:#04x} is a short-form section and cannot be multi-section collected"
60    )]
61    ShortFormSection {
62        /// Raw table_id byte.
63        table_id: u8,
64    },
65
66    /// `section_number` was outside the advertised section range.
67    #[error(
68        "section_number {section_number} exceeds last_section_number {last_section_number} for table_id {table_id:#04x}"
69    )]
70    SectionNumberOutOfRange {
71        /// Raw table_id byte.
72        table_id: u8,
73        /// Section number carried by the section.
74        section_number: u8,
75        /// Last section number carried by the section.
76        last_section_number: u8,
77    },
78
79    /// A slot already contained different bytes for the same version.
80    #[error("conflicting bytes for table_id {table_id:#04x} section {section_number}")]
81    ConflictingSection {
82        /// Raw table_id byte.
83        table_id: u8,
84        /// Section slot that conflicted.
85        section_number: u8,
86    },
87
88    /// An EIT schedule section advertised an impossible table-id range.
89    #[error(
90        "EIT schedule table_id {table_id:#04x} is outside advertised range {first_table_id:#04x}..={last_table_id:#04x}"
91    )]
92    EitTableIdOutOfRange {
93        /// Incoming EIT schedule table_id.
94        table_id: u8,
95        /// First table_id for this schedule kind.
96        first_table_id: u8,
97        /// Advertised last_table_id.
98        last_table_id: u8,
99    },
100}
101
102/// Logical key for one section sequence.
103///
104/// The key deliberately excludes `version_number` and `section_number`. Version
105/// changes reset a collection; section numbers index into that collection.
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
107#[non_exhaustive]
108pub struct SectionSetKey {
109    /// Optional PID context supplied by the caller.
110    pub pid: Option<u16>,
111    /// Raw `table_id`.
112    pub table_id: u8,
113    /// Long-form `table_id_extension`.
114    pub extension_id: u16,
115    /// `current_next_indicator`.
116    pub current_next_indicator: bool,
117}
118
119/// Metadata shared by every section in a complete section set.
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121#[non_exhaustive]
122pub struct SectionSetMeta {
123    /// Logical section-set key.
124    pub key: SectionSetKey,
125    /// 5-bit `version_number`.
126    pub version_number: u8,
127    /// Last section number for this set.
128    pub last_section_number: u8,
129}
130
131#[derive(Debug)]
132struct PartialSectionSet {
133    meta: SectionSetMeta,
134    slots: Vec<Option<Arc<[u8]>>>,
135    filled: usize,
136    emitted: bool,
137}
138
139impl PartialSectionSet {
140    fn new(meta: SectionSetMeta) -> Self {
141        let len = meta.last_section_number as usize + 1;
142        Self {
143            meta,
144            slots: vec![None; len],
145            filled: 0,
146            emitted: false,
147        }
148    }
149
150    fn reset(&mut self, meta: SectionSetMeta) {
151        *self = Self::new(meta);
152    }
153
154    fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
155        let index = section_number as usize;
156        if let Some(existing) = &self.slots[index] {
157            if existing.as_ref() == bytes.as_ref() {
158                return Ok(false);
159            }
160            return Err(CollectError::ConflictingSection {
161                table_id: self.meta.key.table_id,
162                section_number,
163            });
164        }
165
166        self.slots[index] = Some(bytes);
167        self.filled += 1;
168        self.emitted = false;
169        Ok(true)
170    }
171
172    fn complete(&self) -> bool {
173        self.filled == self.slots.len()
174    }
175
176    fn to_complete(&self) -> Option<CompleteSectionSet> {
177        if !self.complete() || self.emitted {
178            return None;
179        }
180
181        let sections = self
182            .slots
183            .iter()
184            .map(|slot| slot.as_ref().expect("complete set has no holes").clone())
185            .collect();
186        Some(CompleteSectionSet {
187            meta: self.meta,
188            sections,
189        })
190    }
191}
192
193/// Generic collector for long-form `section_number`/`last_section_number`
194/// sequences.
195///
196/// The constructor [`SectionSetCollector::new`] uses the default cap
197/// [`DEFAULT_MAX_PARTIAL_KEYS`]; the cap is configurable via
198/// [`with_max_partial_keys`](Self::with_max_partial_keys).
199#[derive(Debug)]
200pub struct SectionSetCollector {
201    partial: HashMap<SectionSetKey, PartialSectionSet>,
202    max_partial_keys: usize,
203}
204
205impl Default for SectionSetCollector {
206    fn default() -> Self {
207        Self {
208            partial: HashMap::new(),
209            max_partial_keys: DEFAULT_MAX_PARTIAL_KEYS,
210        }
211    }
212}
213
214impl SectionSetCollector {
215    /// Create an empty collector with the default cap
216    /// ([`DEFAULT_MAX_PARTIAL_KEYS`]).
217    #[must_use]
218    pub fn new() -> Self {
219        Self::default()
220    }
221
222    /// Replace the partial-key cap (default [`DEFAULT_MAX_PARTIAL_KEYS`]).
223    /// Sections for new keys are skipped when the map is full, until
224    /// [`clear`](Self::clear) frees capacity.
225    #[must_use]
226    pub fn with_max_partial_keys(mut self, max_partial_keys: usize) -> Self {
227        self.max_partial_keys = max_partial_keys;
228        self
229    }
230
231    /// Push one complete section. Returns `Some` only when the logical section
232    /// set has become complete for the first time at this version.
233    ///
234    /// # Errors
235    ///
236    /// Returns a [`CollectError`] if the bytes are not a valid long-form
237    /// section or if the section set becomes internally inconsistent. Treat the
238    /// error as applying to this section only unless your application wants
239    /// strict stream-fail behavior.
240    pub fn push_section(
241        &mut self,
242        bytes: impl AsRef<[u8]>,
243    ) -> CollectResult<Option<CompleteSectionSet>> {
244        self.push_section_with_pid(None, bytes)
245    }
246
247    /// Push one complete section with PID context.
248    ///
249    /// The PID is folded into the section-set key so tables with identical
250    /// table id/extension on different PIDs do not collide.
251    pub fn push_section_with_pid(
252        &mut self,
253        pid: Option<u16>,
254        bytes: impl AsRef<[u8]>,
255    ) -> CollectResult<Option<CompleteSectionSet>> {
256        let raw = bytes.as_ref();
257        let section = Section::parse(raw)?;
258        if !section.section_syntax_indicator {
259            return Err(CollectError::ShortFormSection {
260                table_id: section.table_id,
261            });
262        }
263        if section.section_number > section.last_section_number {
264            return Err(CollectError::SectionNumberOutOfRange {
265                table_id: section.table_id,
266                section_number: section.section_number,
267                last_section_number: section.last_section_number,
268            });
269        }
270        section.validate_crc(raw)?;
271
272        let key = SectionSetKey {
273            pid,
274            table_id: section.table_id,
275            extension_id: section.extension_id,
276            current_next_indicator: section.current_next_indicator,
277        };
278        let meta = SectionSetMeta {
279            key,
280            version_number: section.version_number,
281            last_section_number: section.last_section_number,
282        };
283        let bytes: Arc<[u8]> = Arc::from(raw);
284
285        // Cap check: skip new keys when the map is full
286        if !self.partial.contains_key(&key) && self.partial.len() >= self.max_partial_keys {
287            return Ok(None);
288        }
289
290        let partial = self
291            .partial
292            .entry(key)
293            .or_insert_with(|| PartialSectionSet::new(meta));
294
295        if partial.meta.version_number != meta.version_number
296            || partial.meta.last_section_number != meta.last_section_number
297        {
298            partial.reset(meta);
299        }
300
301        partial.insert(section.section_number, bytes)?;
302        let complete = partial.to_complete();
303        if complete.is_some() {
304            partial.emitted = true;
305        }
306        Ok(complete)
307    }
308
309    /// Drop all retained partial section sets.
310    pub fn clear(&mut self) {
311        self.partial.clear();
312    }
313
314    /// Number of retained partial section-set states.
315    #[must_use]
316    pub fn len(&self) -> usize {
317        self.partial.len()
318    }
319
320    /// Whether the collector currently has no retained state.
321    #[must_use]
322    pub fn is_empty(&self) -> bool {
323        self.partial.is_empty()
324    }
325}
326
327/// A complete owned set of original section bytes for one logical section
328/// sequence.
329#[derive(Debug, Clone)]
330pub struct CompleteSectionSet {
331    meta: SectionSetMeta,
332    sections: Vec<Arc<[u8]>>,
333}
334
335/// Generic complete table view for one collected section set.
336///
337/// This is the all-table escape hatch: every long-form PSI/SI table with
338/// `section_number`/`last_section_number` can be collected into a
339/// [`CompleteSectionSet`] and parsed as `CompleteTable<T>`. Table-specific
340/// complete views such as [`CompleteNit`] add flattened convenience fields where
341/// the logical table shape is useful.
342#[derive(Debug)]
343pub struct CompleteTable<T> {
344    meta: SectionSetMeta,
345    sections: Vec<T>,
346}
347
348impl<T> CompleteTable<T> {
349    /// Metadata shared by the section set.
350    #[must_use]
351    pub const fn meta(&self) -> SectionSetMeta {
352        self.meta
353    }
354
355    /// Parsed sections in section-number order.
356    #[must_use]
357    pub fn sections(&self) -> &[T] {
358        &self.sections
359    }
360
361    /// Consume the complete table and return the parsed sections.
362    #[must_use]
363    pub fn into_sections(self) -> Vec<T> {
364        self.sections
365    }
366}
367
368impl CompleteSectionSet {
369    /// Metadata shared by the section set.
370    #[must_use]
371    pub const fn meta(&self) -> SectionSetMeta {
372        self.meta
373    }
374
375    /// Complete section bytes in section-number order.
376    #[must_use]
377    pub fn section_bytes(&self) -> impl ExactSizeIterator<Item = &[u8]> {
378        self.sections.iter().map(AsRef::as_ref)
379    }
380
381    /// Parse every section in this set as `T`.
382    ///
383    /// The parsed values borrow from this [`CompleteSectionSet`], so callers can
384    /// retain the set and use borrowed typed views without copying table loops.
385    pub fn parse_sections<'a, T>(&'a self) -> crate::Result<Vec<T>>
386    where
387        T: Parse<'a, Error = crate::Error>,
388    {
389        self.section_bytes().map(T::parse).collect()
390    }
391
392    /// Parse this set as a generic complete table.
393    ///
394    /// Use this for any long-form table that does not need a specialised
395    /// flattened logical view.
396    pub fn table<'a, T>(&'a self) -> crate::Result<CompleteTable<T>>
397    where
398        T: Parse<'a, Error = crate::Error>,
399    {
400        Ok(CompleteTable {
401            meta: self.meta,
402            sections: self.parse_sections()?,
403        })
404    }
405
406    /// Build a complete NIT view from this section set.
407    pub fn nit(&self) -> crate::Result<CompleteNit<'_>> {
408        CompleteNit::parse(self, None)
409    }
410
411    /// Build a complete NIT view using a descriptor registry.
412    pub fn nit_with_registry<'a>(
413        &'a self,
414        registry: &'a DescriptorRegistry,
415    ) -> crate::Result<CompleteNit<'a>> {
416        CompleteNit::parse(self, Some(registry))
417    }
418
419    /// Build a complete BAT view from this section set.
420    pub fn bat(&self) -> crate::Result<CompleteBat<'_>> {
421        CompleteBat::parse(self, None)
422    }
423
424    /// Build a complete BAT view using a descriptor registry.
425    pub fn bat_with_registry<'a>(
426        &'a self,
427        registry: &'a DescriptorRegistry,
428    ) -> crate::Result<CompleteBat<'a>> {
429        CompleteBat::parse(self, Some(registry))
430    }
431
432    /// Build a complete SDT view from this section set.
433    pub fn sdt(&self) -> crate::Result<CompleteSdt<'_>> {
434        CompleteSdt::parse(self, None)
435    }
436
437    /// Build a complete SDT view using a descriptor registry.
438    pub fn sdt_with_registry<'a>(
439        &'a self,
440        registry: &'a DescriptorRegistry,
441    ) -> crate::Result<CompleteSdt<'a>> {
442        CompleteSdt::parse(self, Some(registry))
443    }
444
445    /// Build a complete EIT view from this section set.
446    pub fn eit(&self) -> crate::Result<CompleteEit<'_>> {
447        CompleteEit::parse(self, None)
448    }
449
450    /// Build a complete EIT view using a descriptor registry.
451    pub fn eit_with_registry<'a>(
452        &'a self,
453        registry: &'a DescriptorRegistry,
454    ) -> crate::Result<CompleteEit<'a>> {
455        CompleteEit::parse(self, Some(registry))
456    }
457}
458
459/// Parsed descriptor loop retaining the raw bytes and the typed descriptor
460/// results.
461#[derive(Debug)]
462pub struct ParsedDescriptorLoop<'a> {
463    raw: DescriptorLoop<'a>,
464    descriptors: Vec<crate::Result<AnyDescriptor<'a>>>,
465}
466
467impl<'a> ParsedDescriptorLoop<'a> {
468    pub(crate) fn parse(raw: DescriptorLoop<'a>, registry: Option<&'a DescriptorRegistry>) -> Self {
469        let descriptors = match registry {
470            Some(registry) => registry.parse_loop(raw.raw()).collect(),
471            None => raw.iter().collect(),
472        };
473        Self { raw, descriptors }
474    }
475
476    /// Raw descriptor-loop bytes.
477    ///
478    /// Use `raw().iter_with_extensions(&desc_reg, &ext_reg)` to recover custom
479    /// extension bodies from a `Complete*` view.
480    #[must_use]
481    pub const fn raw(&self) -> DescriptorLoop<'a> {
482        self.raw
483    }
484
485    /// Typed descriptor parse results in wire order.
486    pub fn descriptors(&self) -> &[crate::Result<AnyDescriptor<'a>>] {
487        &self.descriptors
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494
495    const TEST_TABLE_ID: u8 = 0x42;
496
497    fn min_section(extension_id: u16) -> Vec<u8> {
498        let section_length: u16 = 9; // 5 (ext_header) + 0 (payload) + 4 (crc)
499        let mut buf = vec![0u8; 12];
500        buf[0] = TEST_TABLE_ID;
501        buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
502        buf[2] = (section_length & 0xFF) as u8;
503        buf[3..5].copy_from_slice(&extension_id.to_be_bytes());
504        buf[5] = 0xC1;
505        buf[6] = 0;
506        buf[7] = 0;
507        let crc = dvb_common::crc32_mpeg2::compute(&buf[..8]);
508        buf[8..12].copy_from_slice(&crc.to_be_bytes());
509        buf
510    }
511
512    #[test]
513    fn collect_single_section_is_complete() {
514        let mut c = SectionSetCollector::new();
515        let sec = min_section(0);
516        let result = c.push_section(&sec).unwrap();
517        assert!(result.is_some());
518        assert_eq!(c.len(), 1);
519    }
520
521    #[test]
522    fn partial_keys_cap_skips_new_keys() {
523        let mut c = SectionSetCollector::new().with_max_partial_keys(3);
524
525        // Push sections for 3 distinct extension IDs — fills the cap.
526        for eid in 0..3u16 {
527            let sec = min_section(eid);
528            let result = c.push_section(&sec).unwrap();
529            assert!(
530                result.is_some(),
531                "single-section set for eid {eid} completes"
532            );
533        }
534        assert_eq!(c.len(), 3);
535
536        // Push a 4th distinct key — should be skipped (cap full).
537        let sec4 = min_section(3);
538        let result = c.push_section(&sec4).unwrap();
539        assert!(result.is_none(), "new key beyond cap must be skipped");
540        assert_eq!(c.len(), 3);
541
542        // Clear frees space — 4th key can now enter.
543        c.clear();
544        assert!(c.is_empty());
545        let result = c.push_section(&sec4).unwrap();
546        assert!(result.is_some());
547        assert_eq!(c.len(), 1);
548    }
549
550    #[test]
551    fn partial_keys_cap_does_not_skip_existing_key() {
552        let mut c = SectionSetCollector::new().with_max_partial_keys(1);
553
554        // Fill the cap with one multi-section NIT-like extension (section 0 of 1).
555        let sec0 = {
556            let mut buf = min_section(0xAB);
557            // Make section 0 of 1 be incomplete: change last_section_number to 1
558            buf[7] = 1;
559            // Recompute CRC
560            let crc = dvb_common::crc32_mpeg2::compute(&buf[..8]);
561            buf[8..12].copy_from_slice(&crc.to_be_bytes());
562            buf
563        };
564        let result = c.push_section(&sec0).unwrap();
565        assert!(result.is_none(), "incomplete section set yields None");
566
567        // Push section 1 of 1 for the same key — cap is full but key already
568        // exists, so it must NOT be skipped.
569        let mut sec1 = min_section(0xAB);
570        sec1[6] = 1; // section_number = 1
571        sec1[7] = 1; // last_section_number = 1
572        let crc = dvb_common::crc32_mpeg2::compute(&sec1[..8]);
573        sec1[8..12].copy_from_slice(&crc.to_be_bytes());
574
575        let result = c.push_section(&sec1).unwrap();
576        assert!(
577            result.is_some(),
578            "existing key must NOT be skipped when cap full"
579        );
580        assert_eq!(c.len(), 1);
581    }
582}