Skip to main content

dvb_si/collect/
eit.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use crate::descriptors::DescriptorRegistry;
5use crate::section::Section;
6use crate::tables::eit;
7use crate::tables::RunningStatus;
8use dvb_common::Parse;
9
10use super::{
11    CollectError, CollectResult, CompleteSectionSet, ParsedDescriptorLoop, SectionSetKey,
12    SectionSetMeta,
13};
14
15/// Default cap on the number of in-progress logical keys (section sets +
16/// schedule ranges) retained by [`EitCollector`].
17///
18/// 256 concurrent collections is generous — a real DVB network has at most a
19/// few dozen services per transponder — while bounding a hostile stream that
20/// rotates `original_network_id` / `transport_stream_id` / `service_id` (or
21/// `current_next_indicator`) to force unbounded map growth. The cap is applied
22/// independently to the sections map and the schedules map; each is limited to
23/// `max_logical_keys` entries. When a map is full, incoming sections for new
24/// keys are skipped until [`clear`](EitCollector::clear) or
25/// [`retain_logical`](EitCollector::retain_logical) frees capacity.
26pub const DEFAULT_MAX_LOGICAL_KEYS: usize = 256;
27
28/// EIT-specific collector.
29///
30/// Present/following EITs complete as one normal section set. Schedule EITs
31/// complete only when every schedule table_id from the kind's first table_id
32/// through the advertised `last_table_id` has completed its own section set.
33///
34/// # Memory bounds
35///
36/// The collector is bounded by [`DEFAULT_MAX_LOGICAL_KEYS`] (configurable via
37/// [`with_max_logical_keys`](Self::with_max_logical_keys)). When the sections
38/// or schedules map is full, incoming sections for new keys are skipped until
39/// space frees — the same skip-until-space policy as
40/// [`crate::carousel::ModuleReassembler`].
41#[derive(Debug)]
42pub struct EitCollector {
43    sections: HashMap<EitSectionSetKey, PartialEitSectionSet>,
44    schedules: HashMap<EitLogicalKey, PartialEitSchedule>,
45    max_logical_keys: usize,
46}
47
48impl Default for EitCollector {
49    fn default() -> Self {
50        Self {
51            sections: HashMap::new(),
52            schedules: HashMap::new(),
53            max_logical_keys: DEFAULT_MAX_LOGICAL_KEYS,
54        }
55    }
56}
57
58impl EitCollector {
59    /// Create an empty EIT collector with the default cap
60    /// ([`DEFAULT_MAX_LOGICAL_KEYS`]).
61    #[must_use]
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    /// Replace the logical-key cap (default [`DEFAULT_MAX_LOGICAL_KEYS`]).
67    /// The cap is applied independently to the sections and schedules maps.
68    /// Sections for new keys are skipped when the relevant map is full, until
69    /// [`clear`](Self::clear) or [`retain_logical`](Self::retain_logical)
70    /// frees capacity.
71    #[must_use]
72    pub fn with_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
73        self.max_logical_keys = max_logical_keys;
74        self
75    }
76
77    /// Push one complete EIT section.
78    ///
79    /// Returns `Some` for a completed present/following table or a completed
80    /// schedule table-id range.
81    ///
82    /// # Errors
83    ///
84    /// Returns a [`CollectError`] if the incoming section is malformed,
85    /// inconsistent with already retained bytes, or not an EIT section. Treat
86    /// the error as applying to this section only unless your application wants
87    /// strict stream-fail behavior.
88    pub fn push_section(&mut self, bytes: impl AsRef<[u8]>) -> CollectResult<Option<CompletedEit>> {
89        self.push_section_with_pid(None, bytes)
90    }
91
92    /// Push one complete EIT section with PID context.
93    pub fn push_section_with_pid(
94        &mut self,
95        pid: Option<u16>,
96        bytes: impl AsRef<[u8]>,
97    ) -> CollectResult<Option<CompletedEit>> {
98        let raw = bytes.as_ref();
99        let section = Section::parse(raw)?;
100        if !section.section_syntax_indicator {
101            return Err(CollectError::ShortFormSection {
102                table_id: section.table_id,
103            });
104        }
105        if section.section_number > section.last_section_number {
106            return Err(CollectError::SectionNumberOutOfRange {
107                table_id: section.table_id,
108                section_number: section.section_number,
109                last_section_number: section.last_section_number,
110            });
111        }
112        section.validate_crc(raw)?;
113
114        let eit = eit::EitSection::parse(raw)?;
115        let logical_key = EitLogicalKey {
116            pid,
117            kind: eit.kind,
118            service_id: eit.service_id,
119            transport_stream_id: eit.transport_stream_id,
120            original_network_id: eit.original_network_id,
121            current_next_indicator: eit.current_next_indicator,
122        };
123        let key = EitSectionSetKey {
124            logical_key,
125            table_id: eit.table_id,
126        };
127        let meta = EitSectionSetMeta {
128            key,
129            version_number: eit.version_number,
130            last_section_number: eit.last_section_number,
131        };
132        let bytes: Arc<[u8]> = Arc::from(raw);
133
134        // Cap check: sections map
135        if !self.sections.contains_key(&key) && self.sections.len() >= self.max_logical_keys {
136            return Ok(None);
137        }
138
139        let partial = self
140            .sections
141            .entry(key)
142            .or_insert_with(|| PartialEitSectionSet::new(meta));
143        if partial.meta.version_number != meta.version_number
144            || partial.meta.last_section_number != meta.last_section_number
145        {
146            partial.reset(meta);
147        }
148
149        partial.insert(eit.section_number, bytes)?;
150        let complete = match partial.to_complete() {
151            Some(complete) => complete,
152            None => return Ok(None),
153        };
154
155        match eit.kind {
156            eit::EitKind::PresentFollowingActual | eit::EitKind::PresentFollowingOther => {
157                partial.emitted = true;
158                Ok(Some(CompletedEit::PresentFollowing(complete)))
159            }
160            eit::EitKind::ScheduleActual | eit::EitKind::ScheduleOther => {
161                let first_table_id = match eit.kind {
162                    eit::EitKind::ScheduleActual => eit::TABLE_ID_SCHEDULE_ACTUAL_FIRST,
163                    eit::EitKind::ScheduleOther => eit::TABLE_ID_SCHEDULE_OTHER_FIRST,
164                    _ => unreachable!("matched schedule kind above"),
165                };
166                if eit.table_id < first_table_id || eit.table_id > eit.last_table_id {
167                    return Err(CollectError::EitTableIdOutOfRange {
168                        table_id: eit.table_id,
169                        first_table_id,
170                        last_table_id: eit.last_table_id,
171                    });
172                }
173
174                // Cap check: schedules map (before marking the section set emitted)
175                if !self.schedules.contains_key(&logical_key)
176                    && self.schedules.len() >= self.max_logical_keys
177                {
178                    return Ok(None);
179                }
180
181                partial.emitted = true;
182
183                let schedule_meta = EitScheduleMeta {
184                    key: logical_key,
185                    first_table_id,
186                    last_table_id: eit.last_table_id,
187                };
188                let schedule = self
189                    .schedules
190                    .entry(logical_key)
191                    .or_insert_with(|| PartialEitSchedule::new(schedule_meta));
192                if schedule.meta.last_table_id != schedule_meta.last_table_id {
193                    schedule.reset(schedule_meta);
194                }
195                schedule.insert(eit.table_id, complete);
196                if let Some(complete) = schedule.to_complete() {
197                    schedule.emitted = true;
198                    Ok(Some(CompletedEit::Schedule(complete)))
199                } else {
200                    Ok(None)
201                }
202            }
203        }
204    }
205
206    /// Drop all retained EIT partial and completed schedule state.
207    ///
208    /// Long-running receivers that collect EPG data continuously can call this
209    /// at an application-defined carousel boundary if they do not need older
210    /// schedule state.
211    pub fn clear(&mut self) {
212        self.sections.clear();
213        self.schedules.clear();
214    }
215
216    /// Retain only logical EIT keys accepted by `keep`.
217    ///
218    /// This is the explicit pruning hook for long-running EIT schedule
219    /// collection. Both in-progress section sets and completed schedule ranges
220    /// for rejected keys are removed.
221    pub fn retain_logical<F>(&mut self, mut keep: F)
222    where
223        F: FnMut(&EitLogicalKey) -> bool,
224    {
225        self.sections.retain(|key, _| keep(&key.logical_key));
226        self.schedules.retain(|key, _| keep(key));
227    }
228
229    /// Number of retained EIT section-set states.
230    #[must_use]
231    pub fn section_set_len(&self) -> usize {
232        self.sections.len()
233    }
234
235    /// Number of retained EIT logical schedule states.
236    #[must_use]
237    pub fn schedule_len(&self) -> usize {
238        self.schedules.len()
239    }
240}
241
242/// Completed EIT collection result.
243#[derive(Debug, Clone)]
244#[non_exhaustive]
245pub enum CompletedEit {
246    /// One completed present/following EIT section set.
247    PresentFollowing(CompleteSectionSet),
248    /// A completed schedule EIT range spanning one or more table IDs.
249    Schedule(CompleteEitSchedule),
250}
251
252impl CompletedEit {
253    /// Parse the completed EIT table(s) without a descriptor registry.
254    pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
255        self.tables_with_registry(None)
256    }
257
258    /// Parse the completed EIT table(s) with an optional descriptor registry.
259    pub fn tables_with_registry<'a>(
260        &'a self,
261        registry: Option<&'a DescriptorRegistry>,
262    ) -> crate::Result<Vec<CompleteEit<'a>>> {
263        match self {
264            Self::PresentFollowing(set) => Ok(vec![CompleteEit::parse(set, registry)?]),
265            Self::Schedule(schedule) => schedule.tables_with_registry(registry),
266        }
267    }
268}
269
270/// Logical EIT table key used by [`EitCollector`].
271#[non_exhaustive]
272#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
273pub struct EitLogicalKey {
274    /// Optional PID context supplied by the caller.
275    pub pid: Option<u16>,
276    /// EIT kind derived from table_id.
277    pub kind: eit::EitKind,
278    /// service_id.
279    pub service_id: u16,
280    /// transport_stream_id.
281    pub transport_stream_id: u16,
282    /// original_network_id.
283    pub original_network_id: u16,
284    /// current_next_indicator.
285    pub current_next_indicator: bool,
286}
287
288#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
289struct EitSectionSetKey {
290    logical_key: EitLogicalKey,
291    table_id: u8,
292}
293
294#[derive(Debug, Clone, Copy, PartialEq, Eq)]
295struct EitScheduleMeta {
296    key: EitLogicalKey,
297    first_table_id: u8,
298    last_table_id: u8,
299}
300
301#[derive(Debug, Clone, Copy, PartialEq, Eq)]
302struct EitSectionSetMeta {
303    key: EitSectionSetKey,
304    version_number: u8,
305    last_section_number: u8,
306}
307
308#[derive(Debug)]
309struct PartialEitSectionSet {
310    meta: EitSectionSetMeta,
311    slots: Vec<Option<Arc<[u8]>>>,
312    filled: usize,
313    emitted: bool,
314}
315
316impl PartialEitSectionSet {
317    fn new(meta: EitSectionSetMeta) -> Self {
318        let len = meta.last_section_number as usize + 1;
319        Self {
320            meta,
321            slots: vec![None; len],
322            filled: 0,
323            emitted: false,
324        }
325    }
326
327    fn reset(&mut self, meta: EitSectionSetMeta) {
328        *self = Self::new(meta);
329    }
330
331    fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
332        let index = section_number as usize;
333        if let Some(existing) = &self.slots[index] {
334            if existing.as_ref() == bytes.as_ref() {
335                return Ok(false);
336            }
337            return Err(CollectError::ConflictingSection {
338                table_id: self.meta.key.table_id,
339                section_number,
340            });
341        }
342
343        self.slots[index] = Some(bytes);
344        self.filled += 1;
345        self.emitted = false;
346        Ok(true)
347    }
348
349    fn complete(&self) -> bool {
350        self.filled == self.slots.len()
351    }
352
353    fn to_complete(&self) -> Option<CompleteSectionSet> {
354        if !self.complete() || self.emitted {
355            return None;
356        }
357
358        let sections = self
359            .slots
360            .iter()
361            .map(|slot| {
362                slot.as_ref()
363                    .expect("complete EIT set has no holes")
364                    .clone()
365            })
366            .collect();
367        Some(CompleteSectionSet {
368            meta: SectionSetMeta {
369                key: SectionSetKey {
370                    pid: self.meta.key.logical_key.pid,
371                    table_id: self.meta.key.table_id,
372                    extension_id: self.meta.key.logical_key.service_id,
373                    current_next_indicator: self.meta.key.logical_key.current_next_indicator,
374                },
375                version_number: self.meta.version_number,
376                last_section_number: self.meta.last_section_number,
377            },
378            sections,
379        })
380    }
381}
382
383#[derive(Debug)]
384struct PartialEitSchedule {
385    meta: EitScheduleMeta,
386    table_sets: BTreeMap<u8, CompleteSectionSet>,
387    emitted: bool,
388}
389
390impl PartialEitSchedule {
391    fn new(meta: EitScheduleMeta) -> Self {
392        Self {
393            meta,
394            table_sets: BTreeMap::new(),
395            emitted: false,
396        }
397    }
398
399    fn reset(&mut self, meta: EitScheduleMeta) {
400        *self = Self::new(meta);
401    }
402
403    fn insert(&mut self, table_id: u8, set: CompleteSectionSet) {
404        self.table_sets.insert(table_id, set);
405        self.emitted = false;
406    }
407
408    fn complete(&self) -> bool {
409        (self.meta.first_table_id..=self.meta.last_table_id)
410            .all(|table_id| self.table_sets.contains_key(&table_id))
411    }
412
413    fn to_complete(&self) -> Option<CompleteEitSchedule> {
414        if !self.complete() || self.emitted {
415            return None;
416        }
417        let table_sets = (self.meta.first_table_id..=self.meta.last_table_id)
418            .map(|table_id| {
419                self.table_sets
420                    .get(&table_id)
421                    .expect("complete EIT schedule has no missing table IDs")
422                    .clone()
423            })
424            .collect();
425        Some(CompleteEitSchedule {
426            first_table_id: self.meta.first_table_id,
427            last_table_id: self.meta.last_table_id,
428            table_sets,
429        })
430    }
431}
432
433/// Completed EIT schedule spanning all schedule table IDs through
434/// `last_table_id`.
435#[derive(Debug, Clone)]
436pub struct CompleteEitSchedule {
437    first_table_id: u8,
438    last_table_id: u8,
439    table_sets: Vec<CompleteSectionSet>,
440}
441
442impl CompleteEitSchedule {
443    /// First schedule table_id in this range.
444    #[must_use]
445    pub const fn first_table_id(&self) -> u8 {
446        self.first_table_id
447    }
448
449    /// Last schedule table_id in this range.
450    #[must_use]
451    pub const fn last_table_id(&self) -> u8 {
452        self.last_table_id
453    }
454
455    /// Completed section sets, one per schedule table_id in order.
456    #[must_use]
457    pub fn table_sets(&self) -> &[CompleteSectionSet] {
458        &self.table_sets
459    }
460
461    /// Per-table_id 5-bit version numbers in schedule table_id order.
462    ///
463    /// DVB EIT schedule sub-tables version independently, so there is no single
464    /// schedule-wide version number.
465    pub fn table_versions(&self) -> impl ExactSizeIterator<Item = (u8, u8)> + '_ {
466        self.table_sets
467            .iter()
468            .map(|set| (set.meta().key.table_id, set.meta().version_number))
469    }
470
471    /// Parse each completed schedule table-id set.
472    pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
473        self.tables_with_registry(None)
474    }
475
476    /// Parse each completed schedule table-id set with an optional descriptor
477    /// registry.
478    pub fn tables_with_registry<'a>(
479        &'a self,
480        registry: Option<&'a DescriptorRegistry>,
481    ) -> crate::Result<Vec<CompleteEit<'a>>> {
482        self.table_sets
483            .iter()
484            .map(|set| CompleteEit::parse(set, registry))
485            .collect()
486    }
487}
488
489/// Event entry in a complete EIT.
490#[derive(Debug)]
491#[non_exhaustive]
492pub struct CompleteEitEvent<'a> {
493    /// 16-bit event_id.
494    pub event_id: u16,
495    /// 40-bit start time.
496    pub start_time_raw: [u8; 5],
497    /// 24-bit duration.
498    pub duration_raw: [u8; 3],
499    /// 3-bit running status (EN 300 468 Table 6).
500    pub running_status: RunningStatus,
501    /// free_CA_mode.
502    pub free_ca_mode: bool,
503    /// Typed descriptor loop for this event.
504    pub descriptors: ParsedDescriptorLoop<'a>,
505}
506
507impl CompleteEitEvent<'_> {
508    /// Decode the 24-bit BCD `duration` (HHMMSS) to a [`core::time::Duration`].
509    ///
510    /// Returns `None` if the BCD nibbles are out of range.
511    #[must_use]
512    pub fn duration(&self) -> Option<core::time::Duration> {
513        dvb_common::time::decode_bcd_duration(self.duration_raw)
514    }
515
516    /// Decode `start_time_raw` (16-bit MJD + 24-bit BCD UTC) to a UTC datetime.
517    ///
518    /// Returns `None` if the date/time fields are out of range. MJD→calendar
519    /// conversion per ETSI EN 300 468 Annex C.
520    #[cfg(feature = "chrono")]
521    #[must_use]
522    pub fn start_time(&self) -> Option<chrono::DateTime<chrono::Utc>> {
523        dvb_common::time::decode_mjd_bcd_utc(self.start_time_raw)
524    }
525}
526
527/// Complete EIT for one exact table_id/extension section sequence.
528///
529/// EIT schedule collection across `last_table_id` is intentionally represented
530/// as multiple complete section sets: one per schedule table_id. That preserves
531/// the DVB schedule sub-table structure while still exposing flattened events.
532#[derive(Debug)]
533#[non_exhaustive]
534pub struct CompleteEit<'a> {
535    /// Variant based on table_id.
536    pub kind: eit::EitKind,
537    /// Raw table_id byte.
538    pub table_id: u8,
539    /// service_id.
540    pub service_id: u16,
541    /// 5-bit version_number.
542    pub version_number: u8,
543    /// current_next_indicator bit.
544    pub current_next_indicator: bool,
545    /// transport_stream_id.
546    pub transport_stream_id: u16,
547    /// original_network_id.
548    pub original_network_id: u16,
549    /// segment_last_section_number from section 0.
550    pub segment_last_section_number: u8,
551    /// last_table_id.
552    pub last_table_id: u8,
553    /// Events from all sections in wire order.
554    pub events: Vec<CompleteEitEvent<'a>>,
555}
556
557impl<'a> CompleteEit<'a> {
558    pub(crate) fn parse(
559        set: &'a CompleteSectionSet,
560        registry: Option<&'a DescriptorRegistry>,
561    ) -> crate::Result<Self> {
562        let sections: Vec<eit::EitSection<'a>> = set.parse_sections()?;
563        let first = sections.first().ok_or(crate::Error::BufferTooShort {
564            need: 1,
565            have: 0,
566            what: "CompleteEit sections",
567        })?;
568        let mut events = Vec::new();
569        for section in &sections {
570            events.extend(section.events.iter().map(|event| CompleteEitEvent {
571                event_id: event.event_id,
572                start_time_raw: event.start_time_raw,
573                duration_raw: event.duration_raw,
574                running_status: event.running_status,
575                free_ca_mode: event.free_ca_mode,
576                descriptors: ParsedDescriptorLoop::parse(event.descriptors, registry),
577            }));
578        }
579        Ok(Self {
580            kind: first.kind,
581            table_id: first.table_id,
582            service_id: first.service_id,
583            version_number: first.version_number,
584            current_next_indicator: first.current_next_indicator,
585            transport_stream_id: first.transport_stream_id,
586            original_network_id: first.original_network_id,
587            segment_last_section_number: first.segment_last_section_number,
588            last_table_id: first.last_table_id,
589            events,
590        })
591    }
592}