Skip to main content

dvb_si/collect/
eit.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use crate::descriptors::DescriptorRegistry;
5use crate::section::Section;
6use crate::tables::eit;
7use dvb_common::Parse;
8
9use super::{
10    CollectError, CollectResult, CompleteSectionSet, ParsedDescriptorLoop, SectionSetKey,
11    SectionSetMeta,
12};
13
14/// Default cap on the number of in-progress logical keys (section sets +
15/// schedule ranges) retained by [`EitCollector`].
16///
17/// 256 concurrent collections is generous — a real DVB network has at most a
18/// few dozen services per transponder — while bounding a hostile stream that
19/// rotates `original_network_id` / `transport_stream_id` / `service_id` (or
20/// `current_next_indicator`) to force unbounded map growth. The cap is applied
21/// independently to the sections map and the schedules map; each is limited to
22/// `max_logical_keys` entries. When a map is full, incoming sections for new
23/// keys are skipped until [`clear`](EitCollector::clear) or
24/// [`retain_logical`](EitCollector::retain_logical) frees capacity.
25pub const DEFAULT_MAX_LOGICAL_KEYS: usize = 256;
26
27/// EIT-specific collector.
28///
29/// Present/following EITs complete as one normal section set. Schedule EITs
30/// complete only when every schedule table_id from the kind's first table_id
31/// through the advertised `last_table_id` has completed its own section set.
32///
33/// # Memory bounds
34///
35/// The collector is bounded by [`DEFAULT_MAX_LOGICAL_KEYS`] (configurable via
36/// [`with_max_logical_keys`](Self::with_max_logical_keys)). When the sections
37/// or schedules map is full, incoming sections for new keys are skipped until
38/// space frees — the same skip-until-space policy as
39/// [`crate::carousel::ModuleReassembler`].
40#[derive(Debug)]
41pub struct EitCollector {
42    sections: HashMap<EitSectionSetKey, PartialEitSectionSet>,
43    schedules: HashMap<EitLogicalKey, PartialEitSchedule>,
44    max_logical_keys: usize,
45}
46
47impl Default for EitCollector {
48    fn default() -> Self {
49        Self {
50            sections: HashMap::new(),
51            schedules: HashMap::new(),
52            max_logical_keys: DEFAULT_MAX_LOGICAL_KEYS,
53        }
54    }
55}
56
57impl EitCollector {
58    /// Create an empty EIT collector with the default cap
59    /// ([`DEFAULT_MAX_LOGICAL_KEYS`]).
60    #[must_use]
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    /// Replace the logical-key cap (default [`DEFAULT_MAX_LOGICAL_KEYS`]).
66    /// The cap is applied independently to the sections and schedules maps.
67    /// Sections for new keys are skipped when the relevant map is full, until
68    /// [`clear`](Self::clear) or [`retain_logical`](Self::retain_logical)
69    /// frees capacity.
70    #[must_use]
71    pub fn with_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
72        self.max_logical_keys = max_logical_keys;
73        self
74    }
75
76    /// Push one complete EIT section.
77    ///
78    /// Returns `Some` for a completed present/following table or a completed
79    /// schedule table-id range.
80    ///
81    /// # Errors
82    ///
83    /// Returns a [`CollectError`] if the incoming section is malformed,
84    /// inconsistent with already retained bytes, or not an EIT section. Treat
85    /// the error as applying to this section only unless your application wants
86    /// strict stream-fail behavior.
87    pub fn push_section(&mut self, bytes: impl AsRef<[u8]>) -> CollectResult<Option<CompletedEit>> {
88        self.push_section_with_pid(None, bytes)
89    }
90
91    /// Push one complete EIT section with PID context.
92    pub fn push_section_with_pid(
93        &mut self,
94        pid: Option<u16>,
95        bytes: impl AsRef<[u8]>,
96    ) -> CollectResult<Option<CompletedEit>> {
97        let raw = bytes.as_ref();
98        let section = Section::parse(raw)?;
99        if !section.section_syntax_indicator {
100            return Err(CollectError::ShortFormSection {
101                table_id: section.table_id,
102            });
103        }
104        if section.section_number > section.last_section_number {
105            return Err(CollectError::SectionNumberOutOfRange {
106                table_id: section.table_id,
107                section_number: section.section_number,
108                last_section_number: section.last_section_number,
109            });
110        }
111        section.validate_crc(raw)?;
112
113        let eit = eit::EitSection::parse(raw)?;
114        let logical_key = EitLogicalKey {
115            pid,
116            kind: eit.kind,
117            service_id: eit.service_id,
118            transport_stream_id: eit.transport_stream_id,
119            original_network_id: eit.original_network_id,
120            current_next_indicator: eit.current_next_indicator,
121        };
122        let key = EitSectionSetKey {
123            logical_key,
124            table_id: eit.table_id,
125        };
126        let meta = EitSectionSetMeta {
127            key,
128            version_number: eit.version_number,
129            last_section_number: eit.last_section_number,
130        };
131        let bytes: Arc<[u8]> = Arc::from(raw);
132
133        // Cap check: sections map
134        if !self.sections.contains_key(&key) && self.sections.len() >= self.max_logical_keys {
135            return Ok(None);
136        }
137
138        let partial = self
139            .sections
140            .entry(key)
141            .or_insert_with(|| PartialEitSectionSet::new(meta));
142        if partial.meta.version_number != meta.version_number
143            || partial.meta.last_section_number != meta.last_section_number
144        {
145            partial.reset(meta);
146        }
147
148        partial.insert(eit.section_number, bytes)?;
149        let complete = match partial.to_complete() {
150            Some(complete) => complete,
151            None => return Ok(None),
152        };
153
154        match eit.kind {
155            eit::EitKind::PresentFollowingActual | eit::EitKind::PresentFollowingOther => {
156                partial.emitted = true;
157                Ok(Some(CompletedEit::PresentFollowing(complete)))
158            }
159            eit::EitKind::ScheduleActual | eit::EitKind::ScheduleOther => {
160                let first_table_id = match eit.kind {
161                    eit::EitKind::ScheduleActual => eit::TABLE_ID_SCHEDULE_ACTUAL_FIRST,
162                    eit::EitKind::ScheduleOther => eit::TABLE_ID_SCHEDULE_OTHER_FIRST,
163                    _ => unreachable!("matched schedule kind above"),
164                };
165                if eit.table_id < first_table_id || eit.table_id > eit.last_table_id {
166                    return Err(CollectError::EitTableIdOutOfRange {
167                        table_id: eit.table_id,
168                        first_table_id,
169                        last_table_id: eit.last_table_id,
170                    });
171                }
172
173                // Cap check: schedules map (before marking the section set emitted)
174                if !self.schedules.contains_key(&logical_key)
175                    && self.schedules.len() >= self.max_logical_keys
176                {
177                    return Ok(None);
178                }
179
180                partial.emitted = true;
181
182                let schedule_meta = EitScheduleMeta {
183                    key: logical_key,
184                    first_table_id,
185                    last_table_id: eit.last_table_id,
186                };
187                let schedule = self
188                    .schedules
189                    .entry(logical_key)
190                    .or_insert_with(|| PartialEitSchedule::new(schedule_meta));
191                if schedule.meta.last_table_id != schedule_meta.last_table_id {
192                    schedule.reset(schedule_meta);
193                }
194                schedule.insert(eit.table_id, complete);
195                if let Some(complete) = schedule.to_complete() {
196                    schedule.emitted = true;
197                    Ok(Some(CompletedEit::Schedule(complete)))
198                } else {
199                    Ok(None)
200                }
201            }
202        }
203    }
204
205    /// Drop all retained EIT partial and completed schedule state.
206    ///
207    /// Long-running receivers that collect EPG data continuously can call this
208    /// at an application-defined carousel boundary if they do not need older
209    /// schedule state.
210    pub fn clear(&mut self) {
211        self.sections.clear();
212        self.schedules.clear();
213    }
214
215    /// Retain only logical EIT keys accepted by `keep`.
216    ///
217    /// This is the explicit pruning hook for long-running EIT schedule
218    /// collection. Both in-progress section sets and completed schedule ranges
219    /// for rejected keys are removed.
220    pub fn retain_logical<F>(&mut self, mut keep: F)
221    where
222        F: FnMut(&EitLogicalKey) -> bool,
223    {
224        self.sections.retain(|key, _| keep(&key.logical_key));
225        self.schedules.retain(|key, _| keep(key));
226    }
227
228    /// Number of retained EIT section-set states.
229    #[must_use]
230    pub fn section_set_len(&self) -> usize {
231        self.sections.len()
232    }
233
234    /// Number of retained EIT logical schedule states.
235    #[must_use]
236    pub fn schedule_len(&self) -> usize {
237        self.schedules.len()
238    }
239}
240
241/// Completed EIT collection result.
242#[derive(Debug, Clone)]
243#[non_exhaustive]
244pub enum CompletedEit {
245    /// One completed present/following EIT section set.
246    PresentFollowing(CompleteSectionSet),
247    /// A completed schedule EIT range spanning one or more table IDs.
248    Schedule(CompleteEitSchedule),
249}
250
251impl CompletedEit {
252    /// Parse the completed EIT table(s) without a descriptor registry.
253    pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
254        self.tables_with_registry(None)
255    }
256
257    /// Parse the completed EIT table(s) with an optional descriptor registry.
258    pub fn tables_with_registry<'a>(
259        &'a self,
260        registry: Option<&'a DescriptorRegistry>,
261    ) -> crate::Result<Vec<CompleteEit<'a>>> {
262        match self {
263            Self::PresentFollowing(set) => Ok(vec![CompleteEit::parse(set, registry)?]),
264            Self::Schedule(schedule) => schedule.tables_with_registry(registry),
265        }
266    }
267}
268
269/// Logical EIT table key used by [`EitCollector`].
270#[non_exhaustive]
271#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
272pub struct EitLogicalKey {
273    /// Optional PID context supplied by the caller.
274    pub pid: Option<u16>,
275    /// EIT kind derived from table_id.
276    pub kind: eit::EitKind,
277    /// service_id.
278    pub service_id: u16,
279    /// transport_stream_id.
280    pub transport_stream_id: u16,
281    /// original_network_id.
282    pub original_network_id: u16,
283    /// current_next_indicator.
284    pub current_next_indicator: bool,
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
288struct EitSectionSetKey {
289    logical_key: EitLogicalKey,
290    table_id: u8,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq)]
294struct EitScheduleMeta {
295    key: EitLogicalKey,
296    first_table_id: u8,
297    last_table_id: u8,
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301struct EitSectionSetMeta {
302    key: EitSectionSetKey,
303    version_number: u8,
304    last_section_number: u8,
305}
306
307#[derive(Debug)]
308struct PartialEitSectionSet {
309    meta: EitSectionSetMeta,
310    slots: Vec<Option<Arc<[u8]>>>,
311    filled: usize,
312    emitted: bool,
313}
314
315impl PartialEitSectionSet {
316    fn new(meta: EitSectionSetMeta) -> Self {
317        let len = meta.last_section_number as usize + 1;
318        Self {
319            meta,
320            slots: vec![None; len],
321            filled: 0,
322            emitted: false,
323        }
324    }
325
326    fn reset(&mut self, meta: EitSectionSetMeta) {
327        *self = Self::new(meta);
328    }
329
330    fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
331        let index = section_number as usize;
332        if let Some(existing) = &self.slots[index] {
333            if existing.as_ref() == bytes.as_ref() {
334                return Ok(false);
335            }
336            return Err(CollectError::ConflictingSection {
337                table_id: self.meta.key.table_id,
338                section_number,
339            });
340        }
341
342        self.slots[index] = Some(bytes);
343        self.filled += 1;
344        self.emitted = false;
345        Ok(true)
346    }
347
348    fn complete(&self) -> bool {
349        self.filled == self.slots.len()
350    }
351
352    fn to_complete(&self) -> Option<CompleteSectionSet> {
353        if !self.complete() || self.emitted {
354            return None;
355        }
356
357        let sections = self
358            .slots
359            .iter()
360            .map(|slot| {
361                slot.as_ref()
362                    .expect("complete EIT set has no holes")
363                    .clone()
364            })
365            .collect();
366        Some(CompleteSectionSet {
367            meta: SectionSetMeta {
368                key: SectionSetKey {
369                    pid: self.meta.key.logical_key.pid,
370                    table_id: self.meta.key.table_id,
371                    extension_id: self.meta.key.logical_key.service_id,
372                    current_next_indicator: self.meta.key.logical_key.current_next_indicator,
373                },
374                version_number: self.meta.version_number,
375                last_section_number: self.meta.last_section_number,
376            },
377            sections,
378        })
379    }
380}
381
382#[derive(Debug)]
383struct PartialEitSchedule {
384    meta: EitScheduleMeta,
385    table_sets: BTreeMap<u8, CompleteSectionSet>,
386    emitted: bool,
387}
388
389impl PartialEitSchedule {
390    fn new(meta: EitScheduleMeta) -> Self {
391        Self {
392            meta,
393            table_sets: BTreeMap::new(),
394            emitted: false,
395        }
396    }
397
398    fn reset(&mut self, meta: EitScheduleMeta) {
399        *self = Self::new(meta);
400    }
401
402    fn insert(&mut self, table_id: u8, set: CompleteSectionSet) {
403        self.table_sets.insert(table_id, set);
404        self.emitted = false;
405    }
406
407    fn complete(&self) -> bool {
408        (self.meta.first_table_id..=self.meta.last_table_id)
409            .all(|table_id| self.table_sets.contains_key(&table_id))
410    }
411
412    fn to_complete(&self) -> Option<CompleteEitSchedule> {
413        if !self.complete() || self.emitted {
414            return None;
415        }
416        let table_sets = (self.meta.first_table_id..=self.meta.last_table_id)
417            .map(|table_id| {
418                self.table_sets
419                    .get(&table_id)
420                    .expect("complete EIT schedule has no missing table IDs")
421                    .clone()
422            })
423            .collect();
424        Some(CompleteEitSchedule {
425            first_table_id: self.meta.first_table_id,
426            last_table_id: self.meta.last_table_id,
427            table_sets,
428        })
429    }
430}
431
432/// Completed EIT schedule spanning all schedule table IDs through
433/// `last_table_id`.
434#[derive(Debug, Clone)]
435pub struct CompleteEitSchedule {
436    first_table_id: u8,
437    last_table_id: u8,
438    table_sets: Vec<CompleteSectionSet>,
439}
440
441impl CompleteEitSchedule {
442    /// First schedule table_id in this range.
443    #[must_use]
444    pub const fn first_table_id(&self) -> u8 {
445        self.first_table_id
446    }
447
448    /// Last schedule table_id in this range.
449    #[must_use]
450    pub const fn last_table_id(&self) -> u8 {
451        self.last_table_id
452    }
453
454    /// Completed section sets, one per schedule table_id in order.
455    #[must_use]
456    pub fn table_sets(&self) -> &[CompleteSectionSet] {
457        &self.table_sets
458    }
459
460    /// Per-table_id 5-bit version numbers in schedule table_id order.
461    ///
462    /// DVB EIT schedule sub-tables version independently, so there is no single
463    /// schedule-wide version number.
464    pub fn table_versions(&self) -> impl ExactSizeIterator<Item = (u8, u8)> + '_ {
465        self.table_sets
466            .iter()
467            .map(|set| (set.meta().key.table_id, set.meta().version_number))
468    }
469
470    /// Parse each completed schedule table-id set.
471    pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
472        self.tables_with_registry(None)
473    }
474
475    /// Parse each completed schedule table-id set with an optional descriptor
476    /// registry.
477    pub fn tables_with_registry<'a>(
478        &'a self,
479        registry: Option<&'a DescriptorRegistry>,
480    ) -> crate::Result<Vec<CompleteEit<'a>>> {
481        self.table_sets
482            .iter()
483            .map(|set| CompleteEit::parse(set, registry))
484            .collect()
485    }
486}
487
488/// Event entry in a complete EIT.
489#[derive(Debug)]
490#[non_exhaustive]
491pub struct CompleteEitEvent<'a> {
492    /// 16-bit event_id.
493    pub event_id: u16,
494    /// 40-bit start time.
495    pub start_time_raw: [u8; 5],
496    /// 24-bit duration.
497    pub duration_raw: [u8; 3],
498    /// 3-bit running status.
499    pub running_status: u8,
500    /// free_CA_mode.
501    pub free_ca_mode: bool,
502    /// Typed descriptor loop for this event.
503    pub descriptors: ParsedDescriptorLoop<'a>,
504}
505
506impl CompleteEitEvent<'_> {
507    /// Decode the 24-bit BCD `duration` (HHMMSS) to a [`core::time::Duration`].
508    ///
509    /// Returns `None` if the BCD nibbles are out of range.
510    #[must_use]
511    pub fn duration(&self) -> Option<core::time::Duration> {
512        dvb_common::time::decode_bcd_duration(self.duration_raw)
513    }
514
515    /// Decode `start_time_raw` (16-bit MJD + 24-bit BCD UTC) to a UTC datetime.
516    ///
517    /// Returns `None` if the date/time fields are out of range. MJD→calendar
518    /// conversion per ETSI EN 300 468 Annex C.
519    #[cfg(feature = "chrono")]
520    #[must_use]
521    pub fn start_time(&self) -> Option<chrono::DateTime<chrono::Utc>> {
522        dvb_common::time::decode_mjd_bcd_utc(self.start_time_raw)
523    }
524}
525
526/// Complete EIT for one exact table_id/extension section sequence.
527///
528/// EIT schedule collection across `last_table_id` is intentionally represented
529/// as multiple complete section sets: one per schedule table_id. That preserves
530/// the DVB schedule sub-table structure while still exposing flattened events.
531#[derive(Debug)]
532#[non_exhaustive]
533pub struct CompleteEit<'a> {
534    /// Variant based on table_id.
535    pub kind: eit::EitKind,
536    /// Raw table_id byte.
537    pub table_id: u8,
538    /// service_id.
539    pub service_id: u16,
540    /// 5-bit version_number.
541    pub version_number: u8,
542    /// current_next_indicator bit.
543    pub current_next_indicator: bool,
544    /// transport_stream_id.
545    pub transport_stream_id: u16,
546    /// original_network_id.
547    pub original_network_id: u16,
548    /// segment_last_section_number from section 0.
549    pub segment_last_section_number: u8,
550    /// last_table_id.
551    pub last_table_id: u8,
552    /// Events from all sections in wire order.
553    pub events: Vec<CompleteEitEvent<'a>>,
554}
555
556impl<'a> CompleteEit<'a> {
557    pub(crate) fn parse(
558        set: &'a CompleteSectionSet,
559        registry: Option<&'a DescriptorRegistry>,
560    ) -> crate::Result<Self> {
561        let sections: Vec<eit::EitSection<'a>> = set.parse_sections()?;
562        let first = sections.first().ok_or(crate::Error::BufferTooShort {
563            need: 1,
564            have: 0,
565            what: "CompleteEit sections",
566        })?;
567        let mut events = Vec::new();
568        for section in &sections {
569            events.extend(section.events.iter().map(|event| CompleteEitEvent {
570                event_id: event.event_id,
571                start_time_raw: event.start_time_raw,
572                duration_raw: event.duration_raw,
573                running_status: event.running_status,
574                free_ca_mode: event.free_ca_mode,
575                descriptors: ParsedDescriptorLoop::parse(event.descriptors, registry),
576            }));
577        }
578        Ok(Self {
579            kind: first.kind,
580            table_id: first.table_id,
581            service_id: first.service_id,
582            version_number: first.version_number,
583            current_next_indicator: first.current_next_indicator,
584            transport_stream_id: first.transport_stream_id,
585            original_network_id: first.original_network_id,
586            segment_last_section_number: first.segment_last_section_number,
587            last_table_id: first.last_table_id,
588            events,
589        })
590    }
591}