1use std::collections::{BTreeMap, HashMap};
2use std::sync::Arc;
3
4use crate::descriptors::DescriptorRegistry;
5use crate::section::Section;
6use crate::tables::eit;
7use dvb_common::Parse;
8
9use super::{
10 CollectError, CollectResult, CompleteSectionSet, ParsedDescriptorLoop, SectionSetKey,
11 SectionSetMeta,
12};
13
14pub const DEFAULT_MAX_LOGICAL_KEYS: usize = 256;
26
27#[derive(Debug)]
41pub struct EitCollector {
42 sections: HashMap<EitSectionSetKey, PartialEitSectionSet>,
43 schedules: HashMap<EitLogicalKey, PartialEitSchedule>,
44 max_logical_keys: usize,
45}
46
47impl Default for EitCollector {
48 fn default() -> Self {
49 Self {
50 sections: HashMap::new(),
51 schedules: HashMap::new(),
52 max_logical_keys: DEFAULT_MAX_LOGICAL_KEYS,
53 }
54 }
55}
56
57impl EitCollector {
58 #[must_use]
61 pub fn new() -> Self {
62 Self::default()
63 }
64
65 #[must_use]
71 pub fn with_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
72 self.max_logical_keys = max_logical_keys;
73 self
74 }
75
76 pub fn push_section(&mut self, bytes: impl AsRef<[u8]>) -> CollectResult<Option<CompletedEit>> {
88 self.push_section_with_pid(None, bytes)
89 }
90
91 pub fn push_section_with_pid(
93 &mut self,
94 pid: Option<u16>,
95 bytes: impl AsRef<[u8]>,
96 ) -> CollectResult<Option<CompletedEit>> {
97 let raw = bytes.as_ref();
98 let section = Section::parse(raw)?;
99 if !section.section_syntax_indicator {
100 return Err(CollectError::ShortFormSection {
101 table_id: section.table_id,
102 });
103 }
104 if section.section_number > section.last_section_number {
105 return Err(CollectError::SectionNumberOutOfRange {
106 table_id: section.table_id,
107 section_number: section.section_number,
108 last_section_number: section.last_section_number,
109 });
110 }
111 section.validate_crc(raw)?;
112
113 let eit = eit::EitSection::parse(raw)?;
114 let logical_key = EitLogicalKey {
115 pid,
116 kind: eit.kind,
117 service_id: eit.service_id,
118 transport_stream_id: eit.transport_stream_id,
119 original_network_id: eit.original_network_id,
120 current_next_indicator: eit.current_next_indicator,
121 };
122 let key = EitSectionSetKey {
123 logical_key,
124 table_id: eit.table_id,
125 };
126 let meta = EitSectionSetMeta {
127 key,
128 version_number: eit.version_number,
129 last_section_number: eit.last_section_number,
130 };
131 let bytes: Arc<[u8]> = Arc::from(raw);
132
133 if !self.sections.contains_key(&key) && self.sections.len() >= self.max_logical_keys {
135 return Ok(None);
136 }
137
138 let partial = self
139 .sections
140 .entry(key)
141 .or_insert_with(|| PartialEitSectionSet::new(meta));
142 if partial.meta.version_number != meta.version_number
143 || partial.meta.last_section_number != meta.last_section_number
144 {
145 partial.reset(meta);
146 }
147
148 partial.insert(eit.section_number, bytes)?;
149 let complete = match partial.to_complete() {
150 Some(complete) => complete,
151 None => return Ok(None),
152 };
153
154 match eit.kind {
155 eit::EitKind::PresentFollowingActual | eit::EitKind::PresentFollowingOther => {
156 partial.emitted = true;
157 Ok(Some(CompletedEit::PresentFollowing(complete)))
158 }
159 eit::EitKind::ScheduleActual | eit::EitKind::ScheduleOther => {
160 let first_table_id = match eit.kind {
161 eit::EitKind::ScheduleActual => eit::TABLE_ID_SCHEDULE_ACTUAL_FIRST,
162 eit::EitKind::ScheduleOther => eit::TABLE_ID_SCHEDULE_OTHER_FIRST,
163 _ => unreachable!("matched schedule kind above"),
164 };
165 if eit.table_id < first_table_id || eit.table_id > eit.last_table_id {
166 return Err(CollectError::EitTableIdOutOfRange {
167 table_id: eit.table_id,
168 first_table_id,
169 last_table_id: eit.last_table_id,
170 });
171 }
172
173 if !self.schedules.contains_key(&logical_key)
175 && self.schedules.len() >= self.max_logical_keys
176 {
177 return Ok(None);
178 }
179
180 partial.emitted = true;
181
182 let schedule_meta = EitScheduleMeta {
183 key: logical_key,
184 first_table_id,
185 last_table_id: eit.last_table_id,
186 };
187 let schedule = self
188 .schedules
189 .entry(logical_key)
190 .or_insert_with(|| PartialEitSchedule::new(schedule_meta));
191 if schedule.meta.last_table_id != schedule_meta.last_table_id {
192 schedule.reset(schedule_meta);
193 }
194 schedule.insert(eit.table_id, complete);
195 if let Some(complete) = schedule.to_complete() {
196 schedule.emitted = true;
197 Ok(Some(CompletedEit::Schedule(complete)))
198 } else {
199 Ok(None)
200 }
201 }
202 }
203 }
204
205 pub fn clear(&mut self) {
211 self.sections.clear();
212 self.schedules.clear();
213 }
214
215 pub fn retain_logical<F>(&mut self, mut keep: F)
221 where
222 F: FnMut(&EitLogicalKey) -> bool,
223 {
224 self.sections.retain(|key, _| keep(&key.logical_key));
225 self.schedules.retain(|key, _| keep(key));
226 }
227
228 #[must_use]
230 pub fn section_set_len(&self) -> usize {
231 self.sections.len()
232 }
233
234 #[must_use]
236 pub fn schedule_len(&self) -> usize {
237 self.schedules.len()
238 }
239}
240
241#[derive(Debug, Clone)]
243#[non_exhaustive]
244pub enum CompletedEit {
245 PresentFollowing(CompleteSectionSet),
247 Schedule(CompleteEitSchedule),
249}
250
251impl CompletedEit {
252 pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
254 self.tables_with_registry(None)
255 }
256
257 pub fn tables_with_registry<'a>(
259 &'a self,
260 registry: Option<&'a DescriptorRegistry>,
261 ) -> crate::Result<Vec<CompleteEit<'a>>> {
262 match self {
263 Self::PresentFollowing(set) => Ok(vec![CompleteEit::parse(set, registry)?]),
264 Self::Schedule(schedule) => schedule.tables_with_registry(registry),
265 }
266 }
267}
268
269#[non_exhaustive]
271#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
272pub struct EitLogicalKey {
273 pub pid: Option<u16>,
275 pub kind: eit::EitKind,
277 pub service_id: u16,
279 pub transport_stream_id: u16,
281 pub original_network_id: u16,
283 pub current_next_indicator: bool,
285}
286
287#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
288struct EitSectionSetKey {
289 logical_key: EitLogicalKey,
290 table_id: u8,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq)]
294struct EitScheduleMeta {
295 key: EitLogicalKey,
296 first_table_id: u8,
297 last_table_id: u8,
298}
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301struct EitSectionSetMeta {
302 key: EitSectionSetKey,
303 version_number: u8,
304 last_section_number: u8,
305}
306
307#[derive(Debug)]
308struct PartialEitSectionSet {
309 meta: EitSectionSetMeta,
310 slots: Vec<Option<Arc<[u8]>>>,
311 filled: usize,
312 emitted: bool,
313}
314
315impl PartialEitSectionSet {
316 fn new(meta: EitSectionSetMeta) -> Self {
317 let len = meta.last_section_number as usize + 1;
318 Self {
319 meta,
320 slots: vec![None; len],
321 filled: 0,
322 emitted: false,
323 }
324 }
325
326 fn reset(&mut self, meta: EitSectionSetMeta) {
327 *self = Self::new(meta);
328 }
329
330 fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
331 let index = section_number as usize;
332 if let Some(existing) = &self.slots[index] {
333 if existing.as_ref() == bytes.as_ref() {
334 return Ok(false);
335 }
336 return Err(CollectError::ConflictingSection {
337 table_id: self.meta.key.table_id,
338 section_number,
339 });
340 }
341
342 self.slots[index] = Some(bytes);
343 self.filled += 1;
344 self.emitted = false;
345 Ok(true)
346 }
347
348 fn complete(&self) -> bool {
349 self.filled == self.slots.len()
350 }
351
352 fn to_complete(&self) -> Option<CompleteSectionSet> {
353 if !self.complete() || self.emitted {
354 return None;
355 }
356
357 let sections = self
358 .slots
359 .iter()
360 .map(|slot| {
361 slot.as_ref()
362 .expect("complete EIT set has no holes")
363 .clone()
364 })
365 .collect();
366 Some(CompleteSectionSet {
367 meta: SectionSetMeta {
368 key: SectionSetKey {
369 pid: self.meta.key.logical_key.pid,
370 table_id: self.meta.key.table_id,
371 extension_id: self.meta.key.logical_key.service_id,
372 current_next_indicator: self.meta.key.logical_key.current_next_indicator,
373 },
374 version_number: self.meta.version_number,
375 last_section_number: self.meta.last_section_number,
376 },
377 sections,
378 })
379 }
380}
381
382#[derive(Debug)]
383struct PartialEitSchedule {
384 meta: EitScheduleMeta,
385 table_sets: BTreeMap<u8, CompleteSectionSet>,
386 emitted: bool,
387}
388
389impl PartialEitSchedule {
390 fn new(meta: EitScheduleMeta) -> Self {
391 Self {
392 meta,
393 table_sets: BTreeMap::new(),
394 emitted: false,
395 }
396 }
397
398 fn reset(&mut self, meta: EitScheduleMeta) {
399 *self = Self::new(meta);
400 }
401
402 fn insert(&mut self, table_id: u8, set: CompleteSectionSet) {
403 self.table_sets.insert(table_id, set);
404 self.emitted = false;
405 }
406
407 fn complete(&self) -> bool {
408 (self.meta.first_table_id..=self.meta.last_table_id)
409 .all(|table_id| self.table_sets.contains_key(&table_id))
410 }
411
412 fn to_complete(&self) -> Option<CompleteEitSchedule> {
413 if !self.complete() || self.emitted {
414 return None;
415 }
416 let table_sets = (self.meta.first_table_id..=self.meta.last_table_id)
417 .map(|table_id| {
418 self.table_sets
419 .get(&table_id)
420 .expect("complete EIT schedule has no missing table IDs")
421 .clone()
422 })
423 .collect();
424 Some(CompleteEitSchedule {
425 first_table_id: self.meta.first_table_id,
426 last_table_id: self.meta.last_table_id,
427 table_sets,
428 })
429 }
430}
431
432#[derive(Debug, Clone)]
435pub struct CompleteEitSchedule {
436 first_table_id: u8,
437 last_table_id: u8,
438 table_sets: Vec<CompleteSectionSet>,
439}
440
441impl CompleteEitSchedule {
442 #[must_use]
444 pub const fn first_table_id(&self) -> u8 {
445 self.first_table_id
446 }
447
448 #[must_use]
450 pub const fn last_table_id(&self) -> u8 {
451 self.last_table_id
452 }
453
454 #[must_use]
456 pub fn table_sets(&self) -> &[CompleteSectionSet] {
457 &self.table_sets
458 }
459
460 pub fn table_versions(&self) -> impl ExactSizeIterator<Item = (u8, u8)> + '_ {
465 self.table_sets
466 .iter()
467 .map(|set| (set.meta().key.table_id, set.meta().version_number))
468 }
469
470 pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
472 self.tables_with_registry(None)
473 }
474
475 pub fn tables_with_registry<'a>(
478 &'a self,
479 registry: Option<&'a DescriptorRegistry>,
480 ) -> crate::Result<Vec<CompleteEit<'a>>> {
481 self.table_sets
482 .iter()
483 .map(|set| CompleteEit::parse(set, registry))
484 .collect()
485 }
486}
487
488#[derive(Debug)]
490#[non_exhaustive]
491pub struct CompleteEitEvent<'a> {
492 pub event_id: u16,
494 pub start_time_raw: [u8; 5],
496 pub duration_raw: [u8; 3],
498 pub running_status: u8,
500 pub free_ca_mode: bool,
502 pub descriptors: ParsedDescriptorLoop<'a>,
504}
505
506impl CompleteEitEvent<'_> {
507 #[must_use]
511 pub fn duration(&self) -> Option<core::time::Duration> {
512 dvb_common::time::decode_bcd_duration(self.duration_raw)
513 }
514
515 #[cfg(feature = "chrono")]
520 #[must_use]
521 pub fn start_time(&self) -> Option<chrono::DateTime<chrono::Utc>> {
522 dvb_common::time::decode_mjd_bcd_utc(self.start_time_raw)
523 }
524}
525
526#[derive(Debug)]
532#[non_exhaustive]
533pub struct CompleteEit<'a> {
534 pub kind: eit::EitKind,
536 pub table_id: u8,
538 pub service_id: u16,
540 pub version_number: u8,
542 pub current_next_indicator: bool,
544 pub transport_stream_id: u16,
546 pub original_network_id: u16,
548 pub segment_last_section_number: u8,
550 pub last_table_id: u8,
552 pub events: Vec<CompleteEitEvent<'a>>,
554}
555
556impl<'a> CompleteEit<'a> {
557 pub(crate) fn parse(
558 set: &'a CompleteSectionSet,
559 registry: Option<&'a DescriptorRegistry>,
560 ) -> crate::Result<Self> {
561 let sections: Vec<eit::EitSection<'a>> = set.parse_sections()?;
562 let first = sections.first().ok_or(crate::Error::BufferTooShort {
563 need: 1,
564 have: 0,
565 what: "CompleteEit sections",
566 })?;
567 let mut events = Vec::new();
568 for section in §ions {
569 events.extend(section.events.iter().map(|event| CompleteEitEvent {
570 event_id: event.event_id,
571 start_time_raw: event.start_time_raw,
572 duration_raw: event.duration_raw,
573 running_status: event.running_status,
574 free_ca_mode: event.free_ca_mode,
575 descriptors: ParsedDescriptorLoop::parse(event.descriptors, registry),
576 }));
577 }
578 Ok(Self {
579 kind: first.kind,
580 table_id: first.table_id,
581 service_id: first.service_id,
582 version_number: first.version_number,
583 current_next_indicator: first.current_next_indicator,
584 transport_stream_id: first.transport_stream_id,
585 original_network_id: first.original_network_id,
586 segment_last_section_number: first.segment_last_section_number,
587 last_table_id: first.last_table_id,
588 events,
589 })
590 }
591}