Skip to main content

dvb_si/collect/
eit.rs

1use alloc::collections::BTreeMap;
2use alloc::sync::Arc;
3use alloc::vec;
4use alloc::vec::Vec;
5
6use crate::descriptors::DescriptorRegistry;
7use crate::section::Section;
8use crate::tables::eit;
9use crate::tables::RunningStatus;
10use dvb_common::Parse;
11
12use super::{
13    CollectError, CollectResult, CompleteSectionSet, ParsedDescriptorLoop, SectionSetKey,
14    SectionSetMeta,
15};
16
17/// Default cap on the number of in-progress logical keys (section sets +
18/// schedule ranges) retained by [`EitCollector`].
19///
20/// 256 concurrent collections is generous — a real DVB network has at most a
21/// few dozen services per transponder — while bounding a hostile stream that
22/// rotates `original_network_id` / `transport_stream_id` / `service_id` (or
23/// `current_next_indicator`) to force unbounded map growth. The cap is applied
24/// independently to the sections map and the schedules map; each is limited to
25/// `max_logical_keys` entries. When a map is full, incoming sections for new
26/// keys are skipped until [`clear`](EitCollector::clear) or
27/// [`retain_logical`](EitCollector::retain_logical) frees capacity.
28pub const DEFAULT_MAX_LOGICAL_KEYS: usize = 256;
29
30/// EIT-specific collector.
31///
32/// Present/following EITs complete as one normal section set. Schedule EITs
33/// complete only when every schedule table_id from the kind's first table_id
34/// through the advertised `last_table_id` has completed its own section set.
35///
36/// # Memory bounds
37///
38/// The collector is bounded by [`DEFAULT_MAX_LOGICAL_KEYS`] (configurable via
39/// [`with_max_logical_keys`](Self::with_max_logical_keys)). When the sections
40/// or schedules map is full, incoming sections for new keys are skipped until
41/// space frees — the same skip-until-space policy as
42/// [`crate::carousel::ModuleReassembler`].
43#[derive(Debug)]
44pub struct EitCollector {
45    sections: BTreeMap<EitSectionSetKey, PartialEitSectionSet>,
46    schedules: BTreeMap<EitLogicalKey, PartialEitSchedule>,
47    max_logical_keys: usize,
48}
49
50impl Default for EitCollector {
51    fn default() -> Self {
52        Self {
53            sections: BTreeMap::new(),
54            schedules: BTreeMap::new(),
55            max_logical_keys: DEFAULT_MAX_LOGICAL_KEYS,
56        }
57    }
58}
59
60impl EitCollector {
61    /// Create an empty EIT collector with the default cap
62    /// ([`DEFAULT_MAX_LOGICAL_KEYS`]).
63    #[must_use]
64    pub fn new() -> Self {
65        Self::default()
66    }
67
68    /// Replace the logical-key cap (default [`DEFAULT_MAX_LOGICAL_KEYS`]).
69    /// The cap is applied independently to the sections and schedules maps.
70    /// Sections for new keys are skipped when the relevant map is full, until
71    /// [`clear`](Self::clear) or [`retain_logical`](Self::retain_logical)
72    /// frees capacity.
73    #[must_use]
74    pub fn with_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
75        self.max_logical_keys = max_logical_keys;
76        self
77    }
78
79    /// Push one complete EIT section.
80    ///
81    /// Returns `Some` for a completed present/following table or a completed
82    /// schedule table-id range.
83    ///
84    /// # Errors
85    ///
86    /// Returns a [`CollectError`] if the incoming section is malformed,
87    /// inconsistent with already retained bytes, or not an EIT section. Treat
88    /// the error as applying to this section only unless your application wants
89    /// strict stream-fail behavior.
90    pub fn push_section(&mut self, bytes: impl AsRef<[u8]>) -> CollectResult<Option<CompletedEit>> {
91        self.push_section_with_pid(None, bytes)
92    }
93
94    /// Push one complete EIT section with PID context.
95    pub fn push_section_with_pid(
96        &mut self,
97        pid: Option<u16>,
98        bytes: impl AsRef<[u8]>,
99    ) -> CollectResult<Option<CompletedEit>> {
100        let raw = bytes.as_ref();
101        let section = Section::parse(raw)?;
102        if !section.section_syntax_indicator {
103            return Err(CollectError::ShortFormSection {
104                table_id: section.table_id,
105            });
106        }
107        if section.section_number > section.last_section_number {
108            return Err(CollectError::SectionNumberOutOfRange {
109                table_id: section.table_id,
110                section_number: section.section_number,
111                last_section_number: section.last_section_number,
112            });
113        }
114        section.validate_crc(raw)?;
115
116        let eit = eit::EitSection::parse(raw)?;
117        let logical_key = EitLogicalKey {
118            pid,
119            kind: eit.kind,
120            service_id: eit.service_id,
121            transport_stream_id: eit.transport_stream_id,
122            original_network_id: eit.original_network_id,
123            current_next_indicator: eit.current_next_indicator,
124        };
125        let key = EitSectionSetKey {
126            logical_key,
127            table_id: eit.table_id,
128        };
129        let meta = EitSectionSetMeta {
130            key,
131            version_number: eit.version_number,
132            last_section_number: eit.last_section_number,
133        };
134        let bytes: Arc<[u8]> = Arc::from(raw);
135
136        // Cap check: sections map
137        if !self.sections.contains_key(&key) && self.sections.len() >= self.max_logical_keys {
138            return Ok(None);
139        }
140
141        let partial = self
142            .sections
143            .entry(key)
144            .or_insert_with(|| PartialEitSectionSet::new(meta));
145        if partial.meta.version_number != meta.version_number
146            || partial.meta.last_section_number != meta.last_section_number
147        {
148            partial.reset(meta);
149        }
150
151        partial.insert(eit.section_number, bytes)?;
152        let complete = match partial.to_complete() {
153            Some(complete) => complete,
154            None => return Ok(None),
155        };
156
157        match eit.kind {
158            eit::EitKind::PresentFollowingActual | eit::EitKind::PresentFollowingOther => {
159                partial.emitted = true;
160                Ok(Some(CompletedEit::PresentFollowing(complete)))
161            }
162            eit::EitKind::ScheduleActual | eit::EitKind::ScheduleOther => {
163                let first_table_id = match eit.kind {
164                    eit::EitKind::ScheduleActual => eit::TABLE_ID_SCHEDULE_ACTUAL_FIRST,
165                    eit::EitKind::ScheduleOther => eit::TABLE_ID_SCHEDULE_OTHER_FIRST,
166                    _ => unreachable!("matched schedule kind above"),
167                };
168                if eit.table_id < first_table_id || eit.table_id > eit.last_table_id {
169                    return Err(CollectError::EitTableIdOutOfRange {
170                        table_id: eit.table_id,
171                        first_table_id,
172                        last_table_id: eit.last_table_id,
173                    });
174                }
175
176                // Cap check: schedules map (before marking the section set emitted)
177                if !self.schedules.contains_key(&logical_key)
178                    && self.schedules.len() >= self.max_logical_keys
179                {
180                    return Ok(None);
181                }
182
183                partial.emitted = true;
184
185                let schedule_meta = EitScheduleMeta {
186                    key: logical_key,
187                    first_table_id,
188                    last_table_id: eit.last_table_id,
189                };
190                let schedule = self
191                    .schedules
192                    .entry(logical_key)
193                    .or_insert_with(|| PartialEitSchedule::new(schedule_meta));
194                if schedule.meta.last_table_id != schedule_meta.last_table_id {
195                    schedule.reset(schedule_meta);
196                }
197                schedule.insert(eit.table_id, complete);
198                if let Some(complete) = schedule.to_complete() {
199                    schedule.emitted = true;
200                    Ok(Some(CompletedEit::Schedule(complete)))
201                } else {
202                    Ok(None)
203                }
204            }
205        }
206    }
207
208    /// Drop all retained EIT partial and completed schedule state.
209    ///
210    /// Long-running receivers that collect EPG data continuously can call this
211    /// at an application-defined carousel boundary if they do not need older
212    /// schedule state.
213    pub fn clear(&mut self) {
214        self.sections.clear();
215        self.schedules.clear();
216    }
217
218    /// Retain only logical EIT keys accepted by `keep`.
219    ///
220    /// This is the explicit pruning hook for long-running EIT schedule
221    /// collection. Both in-progress section sets and completed schedule ranges
222    /// for rejected keys are removed.
223    pub fn retain_logical<F>(&mut self, mut keep: F)
224    where
225        F: FnMut(&EitLogicalKey) -> bool,
226    {
227        self.sections.retain(|key, _| keep(&key.logical_key));
228        self.schedules.retain(|key, _| keep(key));
229    }
230
231    /// Number of retained EIT section-set states.
232    #[must_use]
233    pub fn section_set_len(&self) -> usize {
234        self.sections.len()
235    }
236
237    /// Number of retained EIT logical schedule states.
238    #[must_use]
239    pub fn schedule_len(&self) -> usize {
240        self.schedules.len()
241    }
242}
243
244/// Completed EIT collection result.
245#[derive(Debug, Clone)]
246#[non_exhaustive]
247pub enum CompletedEit {
248    /// One completed present/following EIT section set.
249    PresentFollowing(CompleteSectionSet),
250    /// A completed schedule EIT range spanning one or more table IDs.
251    Schedule(CompleteEitSchedule),
252}
253
254impl CompletedEit {
255    /// Parse the completed EIT table(s) without a descriptor registry.
256    pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
257        self.tables_with_registry(None)
258    }
259
260    /// Parse the completed EIT table(s) with an optional descriptor registry.
261    pub fn tables_with_registry<'a>(
262        &'a self,
263        registry: Option<&'a DescriptorRegistry>,
264    ) -> crate::Result<Vec<CompleteEit<'a>>> {
265        match self {
266            Self::PresentFollowing(set) => Ok(vec![CompleteEit::parse(set, registry)?]),
267            Self::Schedule(schedule) => schedule.tables_with_registry(registry),
268        }
269    }
270}
271
272/// Logical EIT table key used by [`EitCollector`].
273#[non_exhaustive]
274#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
275pub struct EitLogicalKey {
276    /// Optional PID context supplied by the caller.
277    pub pid: Option<u16>,
278    /// EIT kind derived from table_id.
279    pub kind: eit::EitKind,
280    /// service_id.
281    pub service_id: u16,
282    /// transport_stream_id.
283    pub transport_stream_id: u16,
284    /// original_network_id.
285    pub original_network_id: u16,
286    /// current_next_indicator.
287    pub current_next_indicator: bool,
288}
289
290#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
291struct EitSectionSetKey {
292    logical_key: EitLogicalKey,
293    table_id: u8,
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297struct EitScheduleMeta {
298    key: EitLogicalKey,
299    first_table_id: u8,
300    last_table_id: u8,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
304struct EitSectionSetMeta {
305    key: EitSectionSetKey,
306    version_number: u8,
307    last_section_number: u8,
308}
309
310#[derive(Debug)]
311struct PartialEitSectionSet {
312    meta: EitSectionSetMeta,
313    slots: Vec<Option<Arc<[u8]>>>,
314    filled: usize,
315    emitted: bool,
316}
317
318impl PartialEitSectionSet {
319    fn new(meta: EitSectionSetMeta) -> Self {
320        let len = meta.last_section_number as usize + 1;
321        Self {
322            meta,
323            slots: vec![None; len],
324            filled: 0,
325            emitted: false,
326        }
327    }
328
329    fn reset(&mut self, meta: EitSectionSetMeta) {
330        *self = Self::new(meta);
331    }
332
333    fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
334        let index = section_number as usize;
335        if let Some(existing) = &self.slots[index] {
336            if existing.as_ref() == bytes.as_ref() {
337                return Ok(false);
338            }
339            return Err(CollectError::ConflictingSection {
340                table_id: self.meta.key.table_id,
341                section_number,
342            });
343        }
344
345        self.slots[index] = Some(bytes);
346        self.filled += 1;
347        self.emitted = false;
348        Ok(true)
349    }
350
351    fn complete(&self) -> bool {
352        self.filled == self.slots.len()
353    }
354
355    fn to_complete(&self) -> Option<CompleteSectionSet> {
356        if !self.complete() || self.emitted {
357            return None;
358        }
359
360        let sections = self
361            .slots
362            .iter()
363            .map(|slot| {
364                slot.as_ref()
365                    .expect("complete EIT set has no holes")
366                    .clone()
367            })
368            .collect();
369        Some(CompleteSectionSet {
370            meta: SectionSetMeta {
371                key: SectionSetKey {
372                    pid: self.meta.key.logical_key.pid,
373                    table_id: self.meta.key.table_id,
374                    extension_id: self.meta.key.logical_key.service_id,
375                    current_next_indicator: self.meta.key.logical_key.current_next_indicator,
376                },
377                version_number: self.meta.version_number,
378                last_section_number: self.meta.last_section_number,
379            },
380            sections,
381        })
382    }
383}
384
385#[derive(Debug)]
386struct PartialEitSchedule {
387    meta: EitScheduleMeta,
388    table_sets: BTreeMap<u8, CompleteSectionSet>,
389    emitted: bool,
390}
391
392impl PartialEitSchedule {
393    fn new(meta: EitScheduleMeta) -> Self {
394        Self {
395            meta,
396            table_sets: BTreeMap::new(),
397            emitted: false,
398        }
399    }
400
401    fn reset(&mut self, meta: EitScheduleMeta) {
402        *self = Self::new(meta);
403    }
404
405    fn insert(&mut self, table_id: u8, set: CompleteSectionSet) {
406        self.table_sets.insert(table_id, set);
407        self.emitted = false;
408    }
409
410    fn complete(&self) -> bool {
411        (self.meta.first_table_id..=self.meta.last_table_id)
412            .all(|table_id| self.table_sets.contains_key(&table_id))
413    }
414
415    fn to_complete(&self) -> Option<CompleteEitSchedule> {
416        if !self.complete() || self.emitted {
417            return None;
418        }
419        let table_sets = (self.meta.first_table_id..=self.meta.last_table_id)
420            .map(|table_id| {
421                self.table_sets
422                    .get(&table_id)
423                    .expect("complete EIT schedule has no missing table IDs")
424                    .clone()
425            })
426            .collect();
427        Some(CompleteEitSchedule {
428            first_table_id: self.meta.first_table_id,
429            last_table_id: self.meta.last_table_id,
430            table_sets,
431        })
432    }
433}
434
435/// Completed EIT schedule spanning all schedule table IDs through
436/// `last_table_id`.
437#[derive(Debug, Clone)]
438pub struct CompleteEitSchedule {
439    first_table_id: u8,
440    last_table_id: u8,
441    table_sets: Vec<CompleteSectionSet>,
442}
443
444impl CompleteEitSchedule {
445    /// First schedule table_id in this range.
446    #[must_use]
447    pub const fn first_table_id(&self) -> u8 {
448        self.first_table_id
449    }
450
451    /// Last schedule table_id in this range.
452    #[must_use]
453    pub const fn last_table_id(&self) -> u8 {
454        self.last_table_id
455    }
456
457    /// Completed section sets, one per schedule table_id in order.
458    #[must_use]
459    pub fn table_sets(&self) -> &[CompleteSectionSet] {
460        &self.table_sets
461    }
462
463    /// Per-table_id 5-bit version numbers in schedule table_id order.
464    ///
465    /// DVB EIT schedule sub-tables version independently, so there is no single
466    /// schedule-wide version number.
467    pub fn table_versions(&self) -> impl ExactSizeIterator<Item = (u8, u8)> + '_ {
468        self.table_sets
469            .iter()
470            .map(|set| (set.meta().key.table_id, set.meta().version_number))
471    }
472
473    /// Parse each completed schedule table-id set.
474    pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
475        self.tables_with_registry(None)
476    }
477
478    /// Parse each completed schedule table-id set with an optional descriptor
479    /// registry.
480    pub fn tables_with_registry<'a>(
481        &'a self,
482        registry: Option<&'a DescriptorRegistry>,
483    ) -> crate::Result<Vec<CompleteEit<'a>>> {
484        self.table_sets
485            .iter()
486            .map(|set| CompleteEit::parse(set, registry))
487            .collect()
488    }
489}
490
491/// Event entry in a complete EIT.
492#[derive(Debug)]
493#[non_exhaustive]
494pub struct CompleteEitEvent<'a> {
495    /// 16-bit event_id.
496    pub event_id: u16,
497    /// 40-bit start time.
498    pub start_time_raw: [u8; 5],
499    /// 24-bit duration.
500    pub duration_raw: [u8; 3],
501    /// 3-bit running status (EN 300 468 Table 6).
502    pub running_status: RunningStatus,
503    /// free_CA_mode.
504    pub free_ca_mode: bool,
505    /// Typed descriptor loop for this event.
506    pub descriptors: ParsedDescriptorLoop<'a>,
507}
508
509impl CompleteEitEvent<'_> {
510    /// Decode the 24-bit BCD `duration` (HHMMSS) to a [`core::time::Duration`].
511    ///
512    /// Returns `None` if the BCD nibbles are out of range.
513    #[must_use]
514    pub fn duration(&self) -> Option<core::time::Duration> {
515        dvb_common::time::decode_bcd_duration(self.duration_raw)
516    }
517
518    /// Decode `start_time_raw` (16-bit MJD + 24-bit BCD UTC) to a UTC datetime.
519    ///
520    /// Returns `None` if the date/time fields are out of range. MJD→calendar
521    /// conversion per ETSI EN 300 468 Annex C.
522    #[cfg(feature = "chrono")]
523    #[must_use]
524    pub fn start_time(&self) -> Option<chrono::DateTime<chrono::Utc>> {
525        dvb_common::time::decode_mjd_bcd_utc(self.start_time_raw)
526    }
527}
528
529/// Complete EIT for one exact table_id/extension section sequence.
530///
531/// EIT schedule collection across `last_table_id` is intentionally represented
532/// as multiple complete section sets: one per schedule table_id. That preserves
533/// the DVB schedule sub-table structure while still exposing flattened events.
534#[derive(Debug)]
535#[non_exhaustive]
536pub struct CompleteEit<'a> {
537    /// Variant based on table_id.
538    pub kind: eit::EitKind,
539    /// Raw table_id byte.
540    pub table_id: u8,
541    /// service_id.
542    pub service_id: u16,
543    /// 5-bit version_number.
544    pub version_number: u8,
545    /// current_next_indicator bit.
546    pub current_next_indicator: bool,
547    /// transport_stream_id.
548    pub transport_stream_id: u16,
549    /// original_network_id.
550    pub original_network_id: u16,
551    /// segment_last_section_number from section 0.
552    pub segment_last_section_number: u8,
553    /// last_table_id.
554    pub last_table_id: u8,
555    /// Events from all sections in wire order.
556    pub events: Vec<CompleteEitEvent<'a>>,
557}
558
559impl<'a> CompleteEit<'a> {
560    pub(crate) fn parse(
561        set: &'a CompleteSectionSet,
562        registry: Option<&'a DescriptorRegistry>,
563    ) -> crate::Result<Self> {
564        let sections: Vec<eit::EitSection<'a>> = set.parse_sections()?;
565        let first = sections.first().ok_or(crate::Error::BufferTooShort {
566            need: 1,
567            have: 0,
568            what: "CompleteEit sections",
569        })?;
570        let mut events = Vec::new();
571        for section in &sections {
572            events.extend(section.events.iter().map(|event| CompleteEitEvent {
573                event_id: event.event_id,
574                start_time_raw: event.start_time_raw,
575                duration_raw: event.duration_raw,
576                running_status: event.running_status,
577                free_ca_mode: event.free_ca_mode,
578                descriptors: ParsedDescriptorLoop::parse(event.descriptors, registry),
579            }));
580        }
581        Ok(Self {
582            kind: first.kind,
583            table_id: first.table_id,
584            service_id: first.service_id,
585            version_number: first.version_number,
586            current_next_indicator: first.current_next_indicator,
587            transport_stream_id: first.transport_stream_id,
588            original_network_id: first.original_network_id,
589            segment_last_section_number: first.segment_last_section_number,
590            last_table_id: first.last_table_id,
591            events,
592        })
593    }
594}