1use alloc::collections::BTreeMap;
2use alloc::sync::Arc;
3use alloc::vec;
4use alloc::vec::Vec;
5
6use crate::descriptors::DescriptorRegistry;
7use crate::section::Section;
8use crate::tables::eit;
9use crate::tables::RunningStatus;
10use dvb_common::Parse;
11
12use super::{
13 CollectError, CollectResult, CompleteSectionSet, ParsedDescriptorLoop, SectionSetKey,
14 SectionSetMeta,
15};
16
17pub const DEFAULT_MAX_LOGICAL_KEYS: usize = 256;
29
30#[derive(Debug)]
44pub struct EitCollector {
45 sections: BTreeMap<EitSectionSetKey, PartialEitSectionSet>,
46 schedules: BTreeMap<EitLogicalKey, PartialEitSchedule>,
47 max_logical_keys: usize,
48}
49
50impl Default for EitCollector {
51 fn default() -> Self {
52 Self {
53 sections: BTreeMap::new(),
54 schedules: BTreeMap::new(),
55 max_logical_keys: DEFAULT_MAX_LOGICAL_KEYS,
56 }
57 }
58}
59
60impl EitCollector {
61 #[must_use]
64 pub fn new() -> Self {
65 Self::default()
66 }
67
68 #[must_use]
74 pub fn with_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
75 self.max_logical_keys = max_logical_keys;
76 self
77 }
78
79 pub fn push_section(&mut self, bytes: impl AsRef<[u8]>) -> CollectResult<Option<CompletedEit>> {
91 self.push_section_with_pid(None, bytes)
92 }
93
94 pub fn push_section_with_pid(
96 &mut self,
97 pid: Option<u16>,
98 bytes: impl AsRef<[u8]>,
99 ) -> CollectResult<Option<CompletedEit>> {
100 let raw = bytes.as_ref();
101 let section = Section::parse(raw)?;
102 if !section.section_syntax_indicator {
103 return Err(CollectError::ShortFormSection {
104 table_id: section.table_id,
105 });
106 }
107 if section.section_number > section.last_section_number {
108 return Err(CollectError::SectionNumberOutOfRange {
109 table_id: section.table_id,
110 section_number: section.section_number,
111 last_section_number: section.last_section_number,
112 });
113 }
114 section.validate_crc(raw)?;
115
116 let eit = eit::EitSection::parse(raw)?;
117 let logical_key = EitLogicalKey {
118 pid,
119 kind: eit.kind,
120 service_id: eit.service_id,
121 transport_stream_id: eit.transport_stream_id,
122 original_network_id: eit.original_network_id,
123 current_next_indicator: eit.current_next_indicator,
124 };
125 let key = EitSectionSetKey {
126 logical_key,
127 table_id: eit.table_id,
128 };
129 let meta = EitSectionSetMeta {
130 key,
131 version_number: eit.version_number,
132 last_section_number: eit.last_section_number,
133 };
134 let bytes: Arc<[u8]> = Arc::from(raw);
135
136 if !self.sections.contains_key(&key) && self.sections.len() >= self.max_logical_keys {
138 return Ok(None);
139 }
140
141 let partial = self
142 .sections
143 .entry(key)
144 .or_insert_with(|| PartialEitSectionSet::new(meta));
145 if partial.meta.version_number != meta.version_number
146 || partial.meta.last_section_number != meta.last_section_number
147 {
148 partial.reset(meta);
149 }
150
151 partial.insert(eit.section_number, bytes)?;
152 let complete = match partial.to_complete() {
153 Some(complete) => complete,
154 None => return Ok(None),
155 };
156
157 match eit.kind {
158 eit::EitKind::PresentFollowingActual | eit::EitKind::PresentFollowingOther => {
159 partial.emitted = true;
160 Ok(Some(CompletedEit::PresentFollowing(complete)))
161 }
162 eit::EitKind::ScheduleActual | eit::EitKind::ScheduleOther => {
163 let first_table_id = match eit.kind {
164 eit::EitKind::ScheduleActual => eit::TABLE_ID_SCHEDULE_ACTUAL_FIRST,
165 eit::EitKind::ScheduleOther => eit::TABLE_ID_SCHEDULE_OTHER_FIRST,
166 _ => unreachable!("matched schedule kind above"),
167 };
168 if eit.table_id < first_table_id || eit.table_id > eit.last_table_id {
169 return Err(CollectError::EitTableIdOutOfRange {
170 table_id: eit.table_id,
171 first_table_id,
172 last_table_id: eit.last_table_id,
173 });
174 }
175
176 if !self.schedules.contains_key(&logical_key)
178 && self.schedules.len() >= self.max_logical_keys
179 {
180 return Ok(None);
181 }
182
183 partial.emitted = true;
184
185 let schedule_meta = EitScheduleMeta {
186 key: logical_key,
187 first_table_id,
188 last_table_id: eit.last_table_id,
189 };
190 let schedule = self
191 .schedules
192 .entry(logical_key)
193 .or_insert_with(|| PartialEitSchedule::new(schedule_meta));
194 if schedule.meta.last_table_id != schedule_meta.last_table_id {
195 schedule.reset(schedule_meta);
196 }
197 schedule.insert(eit.table_id, complete);
198 if let Some(complete) = schedule.to_complete() {
199 schedule.emitted = true;
200 Ok(Some(CompletedEit::Schedule(complete)))
201 } else {
202 Ok(None)
203 }
204 }
205 }
206 }
207
208 pub fn clear(&mut self) {
214 self.sections.clear();
215 self.schedules.clear();
216 }
217
218 pub fn retain_logical<F>(&mut self, mut keep: F)
224 where
225 F: FnMut(&EitLogicalKey) -> bool,
226 {
227 self.sections.retain(|key, _| keep(&key.logical_key));
228 self.schedules.retain(|key, _| keep(key));
229 }
230
231 #[must_use]
233 pub fn section_set_len(&self) -> usize {
234 self.sections.len()
235 }
236
237 #[must_use]
239 pub fn schedule_len(&self) -> usize {
240 self.schedules.len()
241 }
242}
243
244#[derive(Debug, Clone)]
246#[non_exhaustive]
247pub enum CompletedEit {
248 PresentFollowing(CompleteSectionSet),
250 Schedule(CompleteEitSchedule),
252}
253
254impl CompletedEit {
255 pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
257 self.tables_with_registry(None)
258 }
259
260 pub fn tables_with_registry<'a>(
262 &'a self,
263 registry: Option<&'a DescriptorRegistry>,
264 ) -> crate::Result<Vec<CompleteEit<'a>>> {
265 match self {
266 Self::PresentFollowing(set) => Ok(vec![CompleteEit::parse(set, registry)?]),
267 Self::Schedule(schedule) => schedule.tables_with_registry(registry),
268 }
269 }
270}
271
272#[non_exhaustive]
274#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
275pub struct EitLogicalKey {
276 pub pid: Option<u16>,
278 pub kind: eit::EitKind,
280 pub service_id: u16,
282 pub transport_stream_id: u16,
284 pub original_network_id: u16,
286 pub current_next_indicator: bool,
288}
289
290#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
291struct EitSectionSetKey {
292 logical_key: EitLogicalKey,
293 table_id: u8,
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297struct EitScheduleMeta {
298 key: EitLogicalKey,
299 first_table_id: u8,
300 last_table_id: u8,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
304struct EitSectionSetMeta {
305 key: EitSectionSetKey,
306 version_number: u8,
307 last_section_number: u8,
308}
309
310#[derive(Debug)]
311struct PartialEitSectionSet {
312 meta: EitSectionSetMeta,
313 slots: Vec<Option<Arc<[u8]>>>,
314 filled: usize,
315 emitted: bool,
316}
317
318impl PartialEitSectionSet {
319 fn new(meta: EitSectionSetMeta) -> Self {
320 let len = meta.last_section_number as usize + 1;
321 Self {
322 meta,
323 slots: vec![None; len],
324 filled: 0,
325 emitted: false,
326 }
327 }
328
329 fn reset(&mut self, meta: EitSectionSetMeta) {
330 *self = Self::new(meta);
331 }
332
333 fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
334 let index = section_number as usize;
335 if let Some(existing) = &self.slots[index] {
336 if existing.as_ref() == bytes.as_ref() {
337 return Ok(false);
338 }
339 return Err(CollectError::ConflictingSection {
340 table_id: self.meta.key.table_id,
341 section_number,
342 });
343 }
344
345 self.slots[index] = Some(bytes);
346 self.filled += 1;
347 self.emitted = false;
348 Ok(true)
349 }
350
351 fn complete(&self) -> bool {
352 self.filled == self.slots.len()
353 }
354
355 fn to_complete(&self) -> Option<CompleteSectionSet> {
356 if !self.complete() || self.emitted {
357 return None;
358 }
359
360 let sections = self
361 .slots
362 .iter()
363 .map(|slot| {
364 slot.as_ref()
365 .expect("complete EIT set has no holes")
366 .clone()
367 })
368 .collect();
369 Some(CompleteSectionSet {
370 meta: SectionSetMeta {
371 key: SectionSetKey {
372 pid: self.meta.key.logical_key.pid,
373 table_id: self.meta.key.table_id,
374 extension_id: self.meta.key.logical_key.service_id,
375 current_next_indicator: self.meta.key.logical_key.current_next_indicator,
376 },
377 version_number: self.meta.version_number,
378 last_section_number: self.meta.last_section_number,
379 },
380 sections,
381 })
382 }
383}
384
385#[derive(Debug)]
386struct PartialEitSchedule {
387 meta: EitScheduleMeta,
388 table_sets: BTreeMap<u8, CompleteSectionSet>,
389 emitted: bool,
390}
391
392impl PartialEitSchedule {
393 fn new(meta: EitScheduleMeta) -> Self {
394 Self {
395 meta,
396 table_sets: BTreeMap::new(),
397 emitted: false,
398 }
399 }
400
401 fn reset(&mut self, meta: EitScheduleMeta) {
402 *self = Self::new(meta);
403 }
404
405 fn insert(&mut self, table_id: u8, set: CompleteSectionSet) {
406 self.table_sets.insert(table_id, set);
407 self.emitted = false;
408 }
409
410 fn complete(&self) -> bool {
411 (self.meta.first_table_id..=self.meta.last_table_id)
412 .all(|table_id| self.table_sets.contains_key(&table_id))
413 }
414
415 fn to_complete(&self) -> Option<CompleteEitSchedule> {
416 if !self.complete() || self.emitted {
417 return None;
418 }
419 let table_sets = (self.meta.first_table_id..=self.meta.last_table_id)
420 .map(|table_id| {
421 self.table_sets
422 .get(&table_id)
423 .expect("complete EIT schedule has no missing table IDs")
424 .clone()
425 })
426 .collect();
427 Some(CompleteEitSchedule {
428 first_table_id: self.meta.first_table_id,
429 last_table_id: self.meta.last_table_id,
430 table_sets,
431 })
432 }
433}
434
435#[derive(Debug, Clone)]
438pub struct CompleteEitSchedule {
439 first_table_id: u8,
440 last_table_id: u8,
441 table_sets: Vec<CompleteSectionSet>,
442}
443
444impl CompleteEitSchedule {
445 #[must_use]
447 pub const fn first_table_id(&self) -> u8 {
448 self.first_table_id
449 }
450
451 #[must_use]
453 pub const fn last_table_id(&self) -> u8 {
454 self.last_table_id
455 }
456
457 #[must_use]
459 pub fn table_sets(&self) -> &[CompleteSectionSet] {
460 &self.table_sets
461 }
462
463 pub fn table_versions(&self) -> impl ExactSizeIterator<Item = (u8, u8)> + '_ {
468 self.table_sets
469 .iter()
470 .map(|set| (set.meta().key.table_id, set.meta().version_number))
471 }
472
473 pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
475 self.tables_with_registry(None)
476 }
477
478 pub fn tables_with_registry<'a>(
481 &'a self,
482 registry: Option<&'a DescriptorRegistry>,
483 ) -> crate::Result<Vec<CompleteEit<'a>>> {
484 self.table_sets
485 .iter()
486 .map(|set| CompleteEit::parse(set, registry))
487 .collect()
488 }
489}
490
491#[derive(Debug)]
493#[non_exhaustive]
494pub struct CompleteEitEvent<'a> {
495 pub event_id: u16,
497 pub start_time_raw: [u8; 5],
499 pub duration_raw: [u8; 3],
501 pub running_status: RunningStatus,
503 pub free_ca_mode: bool,
505 pub descriptors: ParsedDescriptorLoop<'a>,
507}
508
509impl CompleteEitEvent<'_> {
510 #[must_use]
514 pub fn duration(&self) -> Option<core::time::Duration> {
515 dvb_common::time::decode_bcd_duration(self.duration_raw)
516 }
517
518 #[cfg(feature = "chrono")]
523 #[must_use]
524 pub fn start_time(&self) -> Option<chrono::DateTime<chrono::Utc>> {
525 dvb_common::time::decode_mjd_bcd_utc(self.start_time_raw)
526 }
527}
528
529#[derive(Debug)]
535#[non_exhaustive]
536pub struct CompleteEit<'a> {
537 pub kind: eit::EitKind,
539 pub table_id: u8,
541 pub service_id: u16,
543 pub version_number: u8,
545 pub current_next_indicator: bool,
547 pub transport_stream_id: u16,
549 pub original_network_id: u16,
551 pub segment_last_section_number: u8,
553 pub last_table_id: u8,
555 pub events: Vec<CompleteEitEvent<'a>>,
557}
558
559impl<'a> CompleteEit<'a> {
560 pub(crate) fn parse(
561 set: &'a CompleteSectionSet,
562 registry: Option<&'a DescriptorRegistry>,
563 ) -> crate::Result<Self> {
564 let sections: Vec<eit::EitSection<'a>> = set.parse_sections()?;
565 let first = sections.first().ok_or(crate::Error::BufferTooShort {
566 need: 1,
567 have: 0,
568 what: "CompleteEit sections",
569 })?;
570 let mut events = Vec::new();
571 for section in §ions {
572 events.extend(section.events.iter().map(|event| CompleteEitEvent {
573 event_id: event.event_id,
574 start_time_raw: event.start_time_raw,
575 duration_raw: event.duration_raw,
576 running_status: event.running_status,
577 free_ca_mode: event.free_ca_mode,
578 descriptors: ParsedDescriptorLoop::parse(event.descriptors, registry),
579 }));
580 }
581 Ok(Self {
582 kind: first.kind,
583 table_id: first.table_id,
584 service_id: first.service_id,
585 version_number: first.version_number,
586 current_next_indicator: first.current_next_indicator,
587 transport_stream_id: first.transport_stream_id,
588 original_network_id: first.original_network_id,
589 segment_last_section_number: first.segment_last_section_number,
590 last_table_id: first.last_table_id,
591 events,
592 })
593 }
594}