1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use crate::descriptors::DescriptorRegistry;
5use crate::section::Section;
6use crate::tables::eit;
7use crate::tables::RunningStatus;
8use dvb_common::Parse;
9
10use super::{
11 CollectError, CollectResult, CompleteSectionSet, ParsedDescriptorLoop, SectionSetKey,
12 SectionSetMeta,
13};
14
15pub const DEFAULT_MAX_LOGICAL_KEYS: usize = 256;
27
28#[derive(Debug)]
42pub struct EitCollector {
43 sections: HashMap<EitSectionSetKey, PartialEitSectionSet>,
44 schedules: HashMap<EitLogicalKey, PartialEitSchedule>,
45 max_logical_keys: usize,
46}
47
48impl Default for EitCollector {
49 fn default() -> Self {
50 Self {
51 sections: HashMap::new(),
52 schedules: HashMap::new(),
53 max_logical_keys: DEFAULT_MAX_LOGICAL_KEYS,
54 }
55 }
56}
57
58impl EitCollector {
59 #[must_use]
62 pub fn new() -> Self {
63 Self::default()
64 }
65
66 #[must_use]
72 pub fn with_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
73 self.max_logical_keys = max_logical_keys;
74 self
75 }
76
77 pub fn push_section(&mut self, bytes: impl AsRef<[u8]>) -> CollectResult<Option<CompletedEit>> {
89 self.push_section_with_pid(None, bytes)
90 }
91
92 pub fn push_section_with_pid(
94 &mut self,
95 pid: Option<u16>,
96 bytes: impl AsRef<[u8]>,
97 ) -> CollectResult<Option<CompletedEit>> {
98 let raw = bytes.as_ref();
99 let section = Section::parse(raw)?;
100 if !section.section_syntax_indicator {
101 return Err(CollectError::ShortFormSection {
102 table_id: section.table_id,
103 });
104 }
105 if section.section_number > section.last_section_number {
106 return Err(CollectError::SectionNumberOutOfRange {
107 table_id: section.table_id,
108 section_number: section.section_number,
109 last_section_number: section.last_section_number,
110 });
111 }
112 section.validate_crc(raw)?;
113
114 let eit = eit::EitSection::parse(raw)?;
115 let logical_key = EitLogicalKey {
116 pid,
117 kind: eit.kind,
118 service_id: eit.service_id,
119 transport_stream_id: eit.transport_stream_id,
120 original_network_id: eit.original_network_id,
121 current_next_indicator: eit.current_next_indicator,
122 };
123 let key = EitSectionSetKey {
124 logical_key,
125 table_id: eit.table_id,
126 };
127 let meta = EitSectionSetMeta {
128 key,
129 version_number: eit.version_number,
130 last_section_number: eit.last_section_number,
131 };
132 let bytes: Arc<[u8]> = Arc::from(raw);
133
134 if !self.sections.contains_key(&key) && self.sections.len() >= self.max_logical_keys {
136 return Ok(None);
137 }
138
139 let partial = self
140 .sections
141 .entry(key)
142 .or_insert_with(|| PartialEitSectionSet::new(meta));
143 if partial.meta.version_number != meta.version_number
144 || partial.meta.last_section_number != meta.last_section_number
145 {
146 partial.reset(meta);
147 }
148
149 partial.insert(eit.section_number, bytes)?;
150 let complete = match partial.to_complete() {
151 Some(complete) => complete,
152 None => return Ok(None),
153 };
154
155 match eit.kind {
156 eit::EitKind::PresentFollowingActual | eit::EitKind::PresentFollowingOther => {
157 partial.emitted = true;
158 Ok(Some(CompletedEit::PresentFollowing(complete)))
159 }
160 eit::EitKind::ScheduleActual | eit::EitKind::ScheduleOther => {
161 let first_table_id = match eit.kind {
162 eit::EitKind::ScheduleActual => eit::TABLE_ID_SCHEDULE_ACTUAL_FIRST,
163 eit::EitKind::ScheduleOther => eit::TABLE_ID_SCHEDULE_OTHER_FIRST,
164 _ => unreachable!("matched schedule kind above"),
165 };
166 if eit.table_id < first_table_id || eit.table_id > eit.last_table_id {
167 return Err(CollectError::EitTableIdOutOfRange {
168 table_id: eit.table_id,
169 first_table_id,
170 last_table_id: eit.last_table_id,
171 });
172 }
173
174 if !self.schedules.contains_key(&logical_key)
176 && self.schedules.len() >= self.max_logical_keys
177 {
178 return Ok(None);
179 }
180
181 partial.emitted = true;
182
183 let schedule_meta = EitScheduleMeta {
184 key: logical_key,
185 first_table_id,
186 last_table_id: eit.last_table_id,
187 };
188 let schedule = self
189 .schedules
190 .entry(logical_key)
191 .or_insert_with(|| PartialEitSchedule::new(schedule_meta));
192 if schedule.meta.last_table_id != schedule_meta.last_table_id {
193 schedule.reset(schedule_meta);
194 }
195 schedule.insert(eit.table_id, complete);
196 if let Some(complete) = schedule.to_complete() {
197 schedule.emitted = true;
198 Ok(Some(CompletedEit::Schedule(complete)))
199 } else {
200 Ok(None)
201 }
202 }
203 }
204 }
205
206 pub fn clear(&mut self) {
212 self.sections.clear();
213 self.schedules.clear();
214 }
215
216 pub fn retain_logical<F>(&mut self, mut keep: F)
222 where
223 F: FnMut(&EitLogicalKey) -> bool,
224 {
225 self.sections.retain(|key, _| keep(&key.logical_key));
226 self.schedules.retain(|key, _| keep(key));
227 }
228
229 #[must_use]
231 pub fn section_set_len(&self) -> usize {
232 self.sections.len()
233 }
234
235 #[must_use]
237 pub fn schedule_len(&self) -> usize {
238 self.schedules.len()
239 }
240}
241
242#[derive(Debug, Clone)]
244#[non_exhaustive]
245pub enum CompletedEit {
246 PresentFollowing(CompleteSectionSet),
248 Schedule(CompleteEitSchedule),
250}
251
252impl CompletedEit {
253 pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
255 self.tables_with_registry(None)
256 }
257
258 pub fn tables_with_registry<'a>(
260 &'a self,
261 registry: Option<&'a DescriptorRegistry>,
262 ) -> crate::Result<Vec<CompleteEit<'a>>> {
263 match self {
264 Self::PresentFollowing(set) => Ok(vec![CompleteEit::parse(set, registry)?]),
265 Self::Schedule(schedule) => schedule.tables_with_registry(registry),
266 }
267 }
268}
269
270#[non_exhaustive]
272#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
273pub struct EitLogicalKey {
274 pub pid: Option<u16>,
276 pub kind: eit::EitKind,
278 pub service_id: u16,
280 pub transport_stream_id: u16,
282 pub original_network_id: u16,
284 pub current_next_indicator: bool,
286}
287
288#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
289struct EitSectionSetKey {
290 logical_key: EitLogicalKey,
291 table_id: u8,
292}
293
294#[derive(Debug, Clone, Copy, PartialEq, Eq)]
295struct EitScheduleMeta {
296 key: EitLogicalKey,
297 first_table_id: u8,
298 last_table_id: u8,
299}
300
301#[derive(Debug, Clone, Copy, PartialEq, Eq)]
302struct EitSectionSetMeta {
303 key: EitSectionSetKey,
304 version_number: u8,
305 last_section_number: u8,
306}
307
308#[derive(Debug)]
309struct PartialEitSectionSet {
310 meta: EitSectionSetMeta,
311 slots: Vec<Option<Arc<[u8]>>>,
312 filled: usize,
313 emitted: bool,
314}
315
316impl PartialEitSectionSet {
317 fn new(meta: EitSectionSetMeta) -> Self {
318 let len = meta.last_section_number as usize + 1;
319 Self {
320 meta,
321 slots: vec![None; len],
322 filled: 0,
323 emitted: false,
324 }
325 }
326
327 fn reset(&mut self, meta: EitSectionSetMeta) {
328 *self = Self::new(meta);
329 }
330
331 fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
332 let index = section_number as usize;
333 if let Some(existing) = &self.slots[index] {
334 if existing.as_ref() == bytes.as_ref() {
335 return Ok(false);
336 }
337 return Err(CollectError::ConflictingSection {
338 table_id: self.meta.key.table_id,
339 section_number,
340 });
341 }
342
343 self.slots[index] = Some(bytes);
344 self.filled += 1;
345 self.emitted = false;
346 Ok(true)
347 }
348
349 fn complete(&self) -> bool {
350 self.filled == self.slots.len()
351 }
352
353 fn to_complete(&self) -> Option<CompleteSectionSet> {
354 if !self.complete() || self.emitted {
355 return None;
356 }
357
358 let sections = self
359 .slots
360 .iter()
361 .map(|slot| {
362 slot.as_ref()
363 .expect("complete EIT set has no holes")
364 .clone()
365 })
366 .collect();
367 Some(CompleteSectionSet {
368 meta: SectionSetMeta {
369 key: SectionSetKey {
370 pid: self.meta.key.logical_key.pid,
371 table_id: self.meta.key.table_id,
372 extension_id: self.meta.key.logical_key.service_id,
373 current_next_indicator: self.meta.key.logical_key.current_next_indicator,
374 },
375 version_number: self.meta.version_number,
376 last_section_number: self.meta.last_section_number,
377 },
378 sections,
379 })
380 }
381}
382
383#[derive(Debug)]
384struct PartialEitSchedule {
385 meta: EitScheduleMeta,
386 table_sets: BTreeMap<u8, CompleteSectionSet>,
387 emitted: bool,
388}
389
390impl PartialEitSchedule {
391 fn new(meta: EitScheduleMeta) -> Self {
392 Self {
393 meta,
394 table_sets: BTreeMap::new(),
395 emitted: false,
396 }
397 }
398
399 fn reset(&mut self, meta: EitScheduleMeta) {
400 *self = Self::new(meta);
401 }
402
403 fn insert(&mut self, table_id: u8, set: CompleteSectionSet) {
404 self.table_sets.insert(table_id, set);
405 self.emitted = false;
406 }
407
408 fn complete(&self) -> bool {
409 (self.meta.first_table_id..=self.meta.last_table_id)
410 .all(|table_id| self.table_sets.contains_key(&table_id))
411 }
412
413 fn to_complete(&self) -> Option<CompleteEitSchedule> {
414 if !self.complete() || self.emitted {
415 return None;
416 }
417 let table_sets = (self.meta.first_table_id..=self.meta.last_table_id)
418 .map(|table_id| {
419 self.table_sets
420 .get(&table_id)
421 .expect("complete EIT schedule has no missing table IDs")
422 .clone()
423 })
424 .collect();
425 Some(CompleteEitSchedule {
426 first_table_id: self.meta.first_table_id,
427 last_table_id: self.meta.last_table_id,
428 table_sets,
429 })
430 }
431}
432
433#[derive(Debug, Clone)]
436pub struct CompleteEitSchedule {
437 first_table_id: u8,
438 last_table_id: u8,
439 table_sets: Vec<CompleteSectionSet>,
440}
441
442impl CompleteEitSchedule {
443 #[must_use]
445 pub const fn first_table_id(&self) -> u8 {
446 self.first_table_id
447 }
448
449 #[must_use]
451 pub const fn last_table_id(&self) -> u8 {
452 self.last_table_id
453 }
454
455 #[must_use]
457 pub fn table_sets(&self) -> &[CompleteSectionSet] {
458 &self.table_sets
459 }
460
461 pub fn table_versions(&self) -> impl ExactSizeIterator<Item = (u8, u8)> + '_ {
466 self.table_sets
467 .iter()
468 .map(|set| (set.meta().key.table_id, set.meta().version_number))
469 }
470
471 pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
473 self.tables_with_registry(None)
474 }
475
476 pub fn tables_with_registry<'a>(
479 &'a self,
480 registry: Option<&'a DescriptorRegistry>,
481 ) -> crate::Result<Vec<CompleteEit<'a>>> {
482 self.table_sets
483 .iter()
484 .map(|set| CompleteEit::parse(set, registry))
485 .collect()
486 }
487}
488
489#[derive(Debug)]
491#[non_exhaustive]
492pub struct CompleteEitEvent<'a> {
493 pub event_id: u16,
495 pub start_time_raw: [u8; 5],
497 pub duration_raw: [u8; 3],
499 pub running_status: RunningStatus,
501 pub free_ca_mode: bool,
503 pub descriptors: ParsedDescriptorLoop<'a>,
505}
506
507impl CompleteEitEvent<'_> {
508 #[must_use]
512 pub fn duration(&self) -> Option<core::time::Duration> {
513 dvb_common::time::decode_bcd_duration(self.duration_raw)
514 }
515
516 #[cfg(feature = "chrono")]
521 #[must_use]
522 pub fn start_time(&self) -> Option<chrono::DateTime<chrono::Utc>> {
523 dvb_common::time::decode_mjd_bcd_utc(self.start_time_raw)
524 }
525}
526
527#[derive(Debug)]
533#[non_exhaustive]
534pub struct CompleteEit<'a> {
535 pub kind: eit::EitKind,
537 pub table_id: u8,
539 pub service_id: u16,
541 pub version_number: u8,
543 pub current_next_indicator: bool,
545 pub transport_stream_id: u16,
547 pub original_network_id: u16,
549 pub segment_last_section_number: u8,
551 pub last_table_id: u8,
553 pub events: Vec<CompleteEitEvent<'a>>,
555}
556
557impl<'a> CompleteEit<'a> {
558 pub(crate) fn parse(
559 set: &'a CompleteSectionSet,
560 registry: Option<&'a DescriptorRegistry>,
561 ) -> crate::Result<Self> {
562 let sections: Vec<eit::EitSection<'a>> = set.parse_sections()?;
563 let first = sections.first().ok_or(crate::Error::BufferTooShort {
564 need: 1,
565 have: 0,
566 what: "CompleteEit sections",
567 })?;
568 let mut events = Vec::new();
569 for section in §ions {
570 events.extend(section.events.iter().map(|event| CompleteEitEvent {
571 event_id: event.event_id,
572 start_time_raw: event.start_time_raw,
573 duration_raw: event.duration_raw,
574 running_status: event.running_status,
575 free_ca_mode: event.free_ca_mode,
576 descriptors: ParsedDescriptorLoop::parse(event.descriptors, registry),
577 }));
578 }
579 Ok(Self {
580 kind: first.kind,
581 table_id: first.table_id,
582 service_id: first.service_id,
583 version_number: first.version_number,
584 current_next_indicator: first.current_next_indicator,
585 transport_stream_id: first.transport_stream_id,
586 original_network_id: first.original_network_id,
587 segment_last_section_number: first.segment_last_section_number,
588 last_table_id: first.last_table_id,
589 events,
590 })
591 }
592}