use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use crate::descriptors::DescriptorRegistry;
use crate::section::Section;
use crate::tables::eit;
use dvb_common::Parse;
use super::{
CollectError, CollectResult, CompleteSectionSet, ParsedDescriptorLoop, SectionSetKey,
SectionSetMeta,
};
pub const DEFAULT_MAX_LOGICAL_KEYS: usize = 256;
#[derive(Debug)]
pub struct EitCollector {
sections: HashMap<EitSectionSetKey, PartialEitSectionSet>,
schedules: HashMap<EitLogicalKey, PartialEitSchedule>,
max_logical_keys: usize,
}
impl Default for EitCollector {
fn default() -> Self {
Self {
sections: HashMap::new(),
schedules: HashMap::new(),
max_logical_keys: DEFAULT_MAX_LOGICAL_KEYS,
}
}
}
impl EitCollector {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
self.max_logical_keys = max_logical_keys;
self
}
pub fn push_section(&mut self, bytes: impl AsRef<[u8]>) -> CollectResult<Option<CompletedEit>> {
self.push_section_with_pid(None, bytes)
}
pub fn push_section_with_pid(
&mut self,
pid: Option<u16>,
bytes: impl AsRef<[u8]>,
) -> CollectResult<Option<CompletedEit>> {
let raw = bytes.as_ref();
let section = Section::parse(raw)?;
if !section.section_syntax_indicator {
return Err(CollectError::ShortFormSection {
table_id: section.table_id,
});
}
if section.section_number > section.last_section_number {
return Err(CollectError::SectionNumberOutOfRange {
table_id: section.table_id,
section_number: section.section_number,
last_section_number: section.last_section_number,
});
}
section.validate_crc(raw)?;
let eit = eit::EitSection::parse(raw)?;
let logical_key = EitLogicalKey {
pid,
kind: eit.kind,
service_id: eit.service_id,
transport_stream_id: eit.transport_stream_id,
original_network_id: eit.original_network_id,
current_next_indicator: eit.current_next_indicator,
};
let key = EitSectionSetKey {
logical_key,
table_id: eit.table_id,
};
let meta = EitSectionSetMeta {
key,
version_number: eit.version_number,
last_section_number: eit.last_section_number,
};
let bytes: Arc<[u8]> = Arc::from(raw);
if !self.sections.contains_key(&key) && self.sections.len() >= self.max_logical_keys {
return Ok(None);
}
let partial = self
.sections
.entry(key)
.or_insert_with(|| PartialEitSectionSet::new(meta));
if partial.meta.version_number != meta.version_number
|| partial.meta.last_section_number != meta.last_section_number
{
partial.reset(meta);
}
partial.insert(eit.section_number, bytes)?;
let complete = match partial.to_complete() {
Some(complete) => complete,
None => return Ok(None),
};
match eit.kind {
eit::EitKind::PresentFollowingActual | eit::EitKind::PresentFollowingOther => {
partial.emitted = true;
Ok(Some(CompletedEit::PresentFollowing(complete)))
}
eit::EitKind::ScheduleActual | eit::EitKind::ScheduleOther => {
let first_table_id = match eit.kind {
eit::EitKind::ScheduleActual => eit::TABLE_ID_SCHEDULE_ACTUAL_FIRST,
eit::EitKind::ScheduleOther => eit::TABLE_ID_SCHEDULE_OTHER_FIRST,
_ => unreachable!("matched schedule kind above"),
};
if eit.table_id < first_table_id || eit.table_id > eit.last_table_id {
return Err(CollectError::EitTableIdOutOfRange {
table_id: eit.table_id,
first_table_id,
last_table_id: eit.last_table_id,
});
}
if !self.schedules.contains_key(&logical_key)
&& self.schedules.len() >= self.max_logical_keys
{
return Ok(None);
}
partial.emitted = true;
let schedule_meta = EitScheduleMeta {
key: logical_key,
first_table_id,
last_table_id: eit.last_table_id,
};
let schedule = self
.schedules
.entry(logical_key)
.or_insert_with(|| PartialEitSchedule::new(schedule_meta));
if schedule.meta.last_table_id != schedule_meta.last_table_id {
schedule.reset(schedule_meta);
}
schedule.insert(eit.table_id, complete);
if let Some(complete) = schedule.to_complete() {
schedule.emitted = true;
Ok(Some(CompletedEit::Schedule(complete)))
} else {
Ok(None)
}
}
}
}
pub fn clear(&mut self) {
self.sections.clear();
self.schedules.clear();
}
pub fn retain_logical<F>(&mut self, mut keep: F)
where
F: FnMut(&EitLogicalKey) -> bool,
{
self.sections.retain(|key, _| keep(&key.logical_key));
self.schedules.retain(|key, _| keep(key));
}
#[must_use]
pub fn section_set_len(&self) -> usize {
self.sections.len()
}
#[must_use]
pub fn schedule_len(&self) -> usize {
self.schedules.len()
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum CompletedEit {
PresentFollowing(CompleteSectionSet),
Schedule(CompleteEitSchedule),
}
impl CompletedEit {
pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
self.tables_with_registry(None)
}
pub fn tables_with_registry<'a>(
&'a self,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Vec<CompleteEit<'a>>> {
match self {
Self::PresentFollowing(set) => Ok(vec![CompleteEit::parse(set, registry)?]),
Self::Schedule(schedule) => schedule.tables_with_registry(registry),
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct EitLogicalKey {
pub pid: Option<u16>,
pub kind: eit::EitKind,
pub service_id: u16,
pub transport_stream_id: u16,
pub original_network_id: u16,
pub current_next_indicator: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct EitSectionSetKey {
logical_key: EitLogicalKey,
table_id: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct EitScheduleMeta {
key: EitLogicalKey,
first_table_id: u8,
last_table_id: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct EitSectionSetMeta {
key: EitSectionSetKey,
version_number: u8,
last_section_number: u8,
}
#[derive(Debug)]
struct PartialEitSectionSet {
meta: EitSectionSetMeta,
slots: Vec<Option<Arc<[u8]>>>,
filled: usize,
emitted: bool,
}
impl PartialEitSectionSet {
fn new(meta: EitSectionSetMeta) -> Self {
let len = meta.last_section_number as usize + 1;
Self {
meta,
slots: vec![None; len],
filled: 0,
emitted: false,
}
}
fn reset(&mut self, meta: EitSectionSetMeta) {
*self = Self::new(meta);
}
fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
let index = section_number as usize;
if let Some(existing) = &self.slots[index] {
if existing.as_ref() == bytes.as_ref() {
return Ok(false);
}
return Err(CollectError::ConflictingSection {
table_id: self.meta.key.table_id,
section_number,
});
}
self.slots[index] = Some(bytes);
self.filled += 1;
self.emitted = false;
Ok(true)
}
fn complete(&self) -> bool {
self.filled == self.slots.len()
}
fn to_complete(&self) -> Option<CompleteSectionSet> {
if !self.complete() || self.emitted {
return None;
}
let sections = self
.slots
.iter()
.map(|slot| {
slot.as_ref()
.expect("complete EIT set has no holes")
.clone()
})
.collect();
Some(CompleteSectionSet {
meta: SectionSetMeta {
key: SectionSetKey {
pid: self.meta.key.logical_key.pid,
table_id: self.meta.key.table_id,
extension_id: self.meta.key.logical_key.service_id,
current_next_indicator: self.meta.key.logical_key.current_next_indicator,
},
version_number: self.meta.version_number,
last_section_number: self.meta.last_section_number,
},
sections,
})
}
}
#[derive(Debug)]
struct PartialEitSchedule {
meta: EitScheduleMeta,
table_sets: BTreeMap<u8, CompleteSectionSet>,
emitted: bool,
}
impl PartialEitSchedule {
fn new(meta: EitScheduleMeta) -> Self {
Self {
meta,
table_sets: BTreeMap::new(),
emitted: false,
}
}
fn reset(&mut self, meta: EitScheduleMeta) {
*self = Self::new(meta);
}
fn insert(&mut self, table_id: u8, set: CompleteSectionSet) {
self.table_sets.insert(table_id, set);
self.emitted = false;
}
fn complete(&self) -> bool {
(self.meta.first_table_id..=self.meta.last_table_id)
.all(|table_id| self.table_sets.contains_key(&table_id))
}
fn to_complete(&self) -> Option<CompleteEitSchedule> {
if !self.complete() || self.emitted {
return None;
}
let table_sets = (self.meta.first_table_id..=self.meta.last_table_id)
.map(|table_id| {
self.table_sets
.get(&table_id)
.expect("complete EIT schedule has no missing table IDs")
.clone()
})
.collect();
Some(CompleteEitSchedule {
first_table_id: self.meta.first_table_id,
last_table_id: self.meta.last_table_id,
table_sets,
})
}
}
#[derive(Debug, Clone)]
pub struct CompleteEitSchedule {
first_table_id: u8,
last_table_id: u8,
table_sets: Vec<CompleteSectionSet>,
}
impl CompleteEitSchedule {
#[must_use]
pub const fn first_table_id(&self) -> u8 {
self.first_table_id
}
#[must_use]
pub const fn last_table_id(&self) -> u8 {
self.last_table_id
}
#[must_use]
pub fn table_sets(&self) -> &[CompleteSectionSet] {
&self.table_sets
}
pub fn table_versions(&self) -> impl ExactSizeIterator<Item = (u8, u8)> + '_ {
self.table_sets
.iter()
.map(|set| (set.meta().key.table_id, set.meta().version_number))
}
pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
self.tables_with_registry(None)
}
pub fn tables_with_registry<'a>(
&'a self,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Vec<CompleteEit<'a>>> {
self.table_sets
.iter()
.map(|set| CompleteEit::parse(set, registry))
.collect()
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct CompleteEitEvent<'a> {
pub event_id: u16,
pub start_time_raw: [u8; 5],
pub duration_raw: [u8; 3],
pub running_status: u8,
pub free_ca_mode: bool,
pub descriptors: ParsedDescriptorLoop<'a>,
}
impl CompleteEitEvent<'_> {
#[must_use]
pub fn duration(&self) -> Option<core::time::Duration> {
dvb_common::time::decode_bcd_duration(self.duration_raw)
}
#[cfg(feature = "chrono")]
#[must_use]
pub fn start_time(&self) -> Option<chrono::DateTime<chrono::Utc>> {
dvb_common::time::decode_mjd_bcd_utc(self.start_time_raw)
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct CompleteEit<'a> {
pub kind: eit::EitKind,
pub table_id: u8,
pub service_id: u16,
pub version_number: u8,
pub current_next_indicator: bool,
pub transport_stream_id: u16,
pub original_network_id: u16,
pub segment_last_section_number: u8,
pub last_table_id: u8,
pub events: Vec<CompleteEitEvent<'a>>,
}
impl<'a> CompleteEit<'a> {
pub(crate) fn parse(
set: &'a CompleteSectionSet,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Self> {
let sections: Vec<eit::EitSection<'a>> = set.parse_sections()?;
let first = sections.first().ok_or(crate::Error::BufferTooShort {
need: 1,
have: 0,
what: "CompleteEit sections",
})?;
let mut events = Vec::new();
for section in §ions {
events.extend(section.events.iter().map(|event| CompleteEitEvent {
event_id: event.event_id,
start_time_raw: event.start_time_raw,
duration_raw: event.duration_raw,
running_status: event.running_status,
free_ca_mode: event.free_ca_mode,
descriptors: ParsedDescriptorLoop::parse(event.descriptors, registry),
}));
}
Ok(Self {
kind: first.kind,
table_id: first.table_id,
service_id: first.service_id,
version_number: first.version_number,
current_next_indicator: first.current_next_indicator,
transport_stream_id: first.transport_stream_id,
original_network_id: first.original_network_id,
segment_last_section_number: first.segment_last_section_number,
last_table_id: first.last_table_id,
events,
})
}
}