use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use crate::descriptors::{AnyDescriptor, DescriptorLoop, DescriptorRegistry};
use crate::section::Section;
use crate::tables::{bat, eit, nit, sdt};
use dvb_common::Parse;
pub type CollectResult<T> = core::result::Result<T, CollectError>;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum CollectError {
#[error("section parse failed: {0}")]
Section(#[from] crate::Error),
#[error(
"table_id {table_id:#04x} is a short-form section and cannot be multi-section collected"
)]
ShortFormSection {
table_id: u8,
},
#[error(
"section_number {section_number} exceeds last_section_number {last_section_number} for table_id {table_id:#04x}"
)]
SectionNumberOutOfRange {
table_id: u8,
section_number: u8,
last_section_number: u8,
},
#[error("conflicting bytes for table_id {table_id:#04x} section {section_number}")]
ConflictingSection {
table_id: u8,
section_number: u8,
},
#[error(
"EIT schedule table_id {table_id:#04x} is outside advertised range {first_table_id:#04x}..={last_table_id:#04x}"
)]
EitTableIdOutOfRange {
table_id: u8,
first_table_id: u8,
last_table_id: u8,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SectionSetKey {
pub pid: Option<u16>,
pub table_id: u8,
pub extension_id: u16,
pub current_next_indicator: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SectionSetMeta {
pub key: SectionSetKey,
pub version_number: u8,
pub last_section_number: u8,
}
#[derive(Debug)]
struct PartialSectionSet {
meta: SectionSetMeta,
slots: Vec<Option<Arc<[u8]>>>,
filled: usize,
emitted: bool,
}
impl PartialSectionSet {
fn new(meta: SectionSetMeta) -> Self {
let len = meta.last_section_number as usize + 1;
Self {
meta,
slots: vec![None; len],
filled: 0,
emitted: false,
}
}
fn reset(&mut self, meta: SectionSetMeta) {
*self = Self::new(meta);
}
fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
let index = section_number as usize;
if let Some(existing) = &self.slots[index] {
if existing.as_ref() == bytes.as_ref() {
return Ok(false);
}
return Err(CollectError::ConflictingSection {
table_id: self.meta.key.table_id,
section_number,
});
}
self.slots[index] = Some(bytes);
self.filled += 1;
self.emitted = false;
Ok(true)
}
fn complete(&self) -> bool {
self.filled == self.slots.len()
}
fn to_complete(&self) -> Option<CompleteSectionSet> {
if !self.complete() || self.emitted {
return None;
}
let sections = self
.slots
.iter()
.map(|slot| slot.as_ref().expect("complete set has no holes").clone())
.collect();
Some(CompleteSectionSet {
meta: self.meta,
sections,
})
}
}
#[derive(Debug, Default)]
pub struct SectionSetCollector {
partial: HashMap<SectionSetKey, PartialSectionSet>,
}
#[derive(Debug, Default)]
pub struct EitCollector {
sections: HashMap<EitSectionSetKey, PartialEitSectionSet>,
schedules: HashMap<EitLogicalKey, PartialEitSchedule>,
}
impl EitCollector {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn push_section(&mut self, bytes: impl AsRef<[u8]>) -> CollectResult<Option<CompletedEit>> {
self.push_section_with_pid(None, bytes)
}
pub fn push_section_with_pid(
&mut self,
pid: Option<u16>,
bytes: impl AsRef<[u8]>,
) -> CollectResult<Option<CompletedEit>> {
let raw = bytes.as_ref();
let section = Section::parse(raw)?;
if !section.section_syntax_indicator {
return Err(CollectError::ShortFormSection {
table_id: section.table_id,
});
}
if section.section_number > section.last_section_number {
return Err(CollectError::SectionNumberOutOfRange {
table_id: section.table_id,
section_number: section.section_number,
last_section_number: section.last_section_number,
});
}
section.validate_crc(raw)?;
let eit = eit::EitSection::parse(raw)?;
let logical_key = EitLogicalKey {
pid,
kind: eit.kind,
service_id: eit.service_id,
transport_stream_id: eit.transport_stream_id,
original_network_id: eit.original_network_id,
current_next_indicator: eit.current_next_indicator,
};
let key = EitSectionSetKey {
logical_key,
table_id: eit.table_id,
};
let meta = EitSectionSetMeta {
key,
version_number: eit.version_number,
last_section_number: eit.last_section_number,
};
let bytes: Arc<[u8]> = Arc::from(raw);
let partial = self
.sections
.entry(key)
.or_insert_with(|| PartialEitSectionSet::new(meta));
if partial.meta.version_number != meta.version_number
|| partial.meta.last_section_number != meta.last_section_number
{
partial.reset(meta);
}
partial.insert(eit.section_number, bytes)?;
let complete = match partial.to_complete() {
Some(complete) => complete,
None => return Ok(None),
};
partial.emitted = true;
match eit.kind {
eit::EitKind::PresentFollowingActual | eit::EitKind::PresentFollowingOther => {
Ok(Some(CompletedEit::PresentFollowing(complete)))
}
eit::EitKind::ScheduleActual | eit::EitKind::ScheduleOther => {
let first_table_id = match eit.kind {
eit::EitKind::ScheduleActual => eit::TABLE_ID_SCHEDULE_ACTUAL_FIRST,
eit::EitKind::ScheduleOther => eit::TABLE_ID_SCHEDULE_OTHER_FIRST,
_ => unreachable!("matched schedule kind above"),
};
if eit.table_id < first_table_id || eit.table_id > eit.last_table_id {
return Err(CollectError::EitTableIdOutOfRange {
table_id: eit.table_id,
first_table_id,
last_table_id: eit.last_table_id,
});
}
let meta = EitScheduleMeta {
key: logical_key,
first_table_id,
last_table_id: eit.last_table_id,
};
let schedule = self
.schedules
.entry(logical_key)
.or_insert_with(|| PartialEitSchedule::new(meta));
if schedule.meta.last_table_id != meta.last_table_id {
schedule.reset(meta);
}
schedule.insert(eit.table_id, complete);
if let Some(complete) = schedule.to_complete() {
schedule.emitted = true;
Ok(Some(CompletedEit::Schedule(complete)))
} else {
Ok(None)
}
}
}
}
pub fn clear(&mut self) {
self.sections.clear();
self.schedules.clear();
}
pub fn retain_logical<F>(&mut self, mut keep: F)
where
F: FnMut(&EitLogicalKey) -> bool,
{
self.sections.retain(|key, _| keep(&key.logical_key));
self.schedules.retain(|key, _| keep(key));
}
#[must_use]
pub fn section_set_len(&self) -> usize {
self.sections.len()
}
#[must_use]
pub fn schedule_len(&self) -> usize {
self.schedules.len()
}
}
#[derive(Debug, Clone)]
pub enum CompletedEit {
PresentFollowing(CompleteSectionSet),
Schedule(CompleteEitSchedule),
}
impl CompletedEit {
pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
self.tables_with_registry(None)
}
pub fn tables_with_registry<'a>(
&'a self,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Vec<CompleteEit<'a>>> {
match self {
Self::PresentFollowing(set) => Ok(vec![CompleteEit::parse(set, registry)?]),
Self::Schedule(schedule) => schedule.tables_with_registry(registry),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct EitLogicalKey {
pub pid: Option<u16>,
pub kind: eit::EitKind,
pub service_id: u16,
pub transport_stream_id: u16,
pub original_network_id: u16,
pub current_next_indicator: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct EitSectionSetKey {
logical_key: EitLogicalKey,
table_id: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct EitScheduleMeta {
key: EitLogicalKey,
first_table_id: u8,
last_table_id: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct EitSectionSetMeta {
key: EitSectionSetKey,
version_number: u8,
last_section_number: u8,
}
#[derive(Debug)]
struct PartialEitSectionSet {
meta: EitSectionSetMeta,
slots: Vec<Option<Arc<[u8]>>>,
filled: usize,
emitted: bool,
}
impl PartialEitSectionSet {
fn new(meta: EitSectionSetMeta) -> Self {
let len = meta.last_section_number as usize + 1;
Self {
meta,
slots: vec![None; len],
filled: 0,
emitted: false,
}
}
fn reset(&mut self, meta: EitSectionSetMeta) {
*self = Self::new(meta);
}
fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
let index = section_number as usize;
if let Some(existing) = &self.slots[index] {
if existing.as_ref() == bytes.as_ref() {
return Ok(false);
}
return Err(CollectError::ConflictingSection {
table_id: self.meta.key.table_id,
section_number,
});
}
self.slots[index] = Some(bytes);
self.filled += 1;
self.emitted = false;
Ok(true)
}
fn complete(&self) -> bool {
self.filled == self.slots.len()
}
fn to_complete(&self) -> Option<CompleteSectionSet> {
if !self.complete() || self.emitted {
return None;
}
let sections = self
.slots
.iter()
.map(|slot| {
slot.as_ref()
.expect("complete EIT set has no holes")
.clone()
})
.collect();
Some(CompleteSectionSet {
meta: SectionSetMeta {
key: SectionSetKey {
pid: self.meta.key.logical_key.pid,
table_id: self.meta.key.table_id,
extension_id: self.meta.key.logical_key.service_id,
current_next_indicator: self.meta.key.logical_key.current_next_indicator,
},
version_number: self.meta.version_number,
last_section_number: self.meta.last_section_number,
},
sections,
})
}
}
#[derive(Debug)]
struct PartialEitSchedule {
meta: EitScheduleMeta,
table_sets: BTreeMap<u8, CompleteSectionSet>,
emitted: bool,
}
impl PartialEitSchedule {
fn new(meta: EitScheduleMeta) -> Self {
Self {
meta,
table_sets: BTreeMap::new(),
emitted: false,
}
}
fn reset(&mut self, meta: EitScheduleMeta) {
*self = Self::new(meta);
}
fn insert(&mut self, table_id: u8, set: CompleteSectionSet) {
self.table_sets.insert(table_id, set);
self.emitted = false;
}
fn complete(&self) -> bool {
(self.meta.first_table_id..=self.meta.last_table_id)
.all(|table_id| self.table_sets.contains_key(&table_id))
}
fn to_complete(&self) -> Option<CompleteEitSchedule> {
if !self.complete() || self.emitted {
return None;
}
let table_sets = (self.meta.first_table_id..=self.meta.last_table_id)
.map(|table_id| {
self.table_sets
.get(&table_id)
.expect("complete EIT schedule has no missing table IDs")
.clone()
})
.collect();
Some(CompleteEitSchedule {
first_table_id: self.meta.first_table_id,
last_table_id: self.meta.last_table_id,
table_sets,
})
}
}
#[derive(Debug, Clone)]
pub struct CompleteEitSchedule {
first_table_id: u8,
last_table_id: u8,
table_sets: Vec<CompleteSectionSet>,
}
impl CompleteEitSchedule {
#[must_use]
pub const fn first_table_id(&self) -> u8 {
self.first_table_id
}
#[must_use]
pub const fn last_table_id(&self) -> u8 {
self.last_table_id
}
#[must_use]
pub fn table_sets(&self) -> &[CompleteSectionSet] {
&self.table_sets
}
pub fn table_versions(&self) -> impl ExactSizeIterator<Item = (u8, u8)> + '_ {
self.table_sets
.iter()
.map(|set| (set.meta.key.table_id, set.meta.version_number))
}
pub fn tables(&self) -> crate::Result<Vec<CompleteEit<'_>>> {
self.tables_with_registry(None)
}
pub fn tables_with_registry<'a>(
&'a self,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Vec<CompleteEit<'a>>> {
self.table_sets
.iter()
.map(|set| CompleteEit::parse(set, registry))
.collect()
}
}
impl SectionSetCollector {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn push_section(
&mut self,
bytes: impl AsRef<[u8]>,
) -> CollectResult<Option<CompleteSectionSet>> {
self.push_section_with_pid(None, bytes)
}
pub fn push_section_with_pid(
&mut self,
pid: Option<u16>,
bytes: impl AsRef<[u8]>,
) -> CollectResult<Option<CompleteSectionSet>> {
let raw = bytes.as_ref();
let section = Section::parse(raw)?;
if !section.section_syntax_indicator {
return Err(CollectError::ShortFormSection {
table_id: section.table_id,
});
}
if section.section_number > section.last_section_number {
return Err(CollectError::SectionNumberOutOfRange {
table_id: section.table_id,
section_number: section.section_number,
last_section_number: section.last_section_number,
});
}
section.validate_crc(raw)?;
let key = SectionSetKey {
pid,
table_id: section.table_id,
extension_id: section.extension_id,
current_next_indicator: section.current_next_indicator,
};
let meta = SectionSetMeta {
key,
version_number: section.version_number,
last_section_number: section.last_section_number,
};
let bytes: Arc<[u8]> = Arc::from(raw);
let partial = self
.partial
.entry(key)
.or_insert_with(|| PartialSectionSet::new(meta));
if partial.meta.version_number != meta.version_number
|| partial.meta.last_section_number != meta.last_section_number
{
partial.reset(meta);
}
partial.insert(section.section_number, bytes)?;
let complete = partial.to_complete();
if complete.is_some() {
partial.emitted = true;
}
Ok(complete)
}
pub fn clear(&mut self) {
self.partial.clear();
}
#[must_use]
pub fn len(&self) -> usize {
self.partial.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.partial.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct CompleteSectionSet {
meta: SectionSetMeta,
sections: Vec<Arc<[u8]>>,
}
#[derive(Debug)]
pub struct CompleteTable<T> {
meta: SectionSetMeta,
sections: Vec<T>,
}
impl<T> CompleteTable<T> {
#[must_use]
pub const fn meta(&self) -> SectionSetMeta {
self.meta
}
#[must_use]
pub fn sections(&self) -> &[T] {
&self.sections
}
#[must_use]
pub fn into_sections(self) -> Vec<T> {
self.sections
}
}
impl CompleteSectionSet {
#[must_use]
pub const fn meta(&self) -> SectionSetMeta {
self.meta
}
#[must_use]
pub fn section_bytes(&self) -> impl ExactSizeIterator<Item = &[u8]> {
self.sections.iter().map(AsRef::as_ref)
}
pub fn parse_sections<'a, T>(&'a self) -> crate::Result<Vec<T>>
where
T: Parse<'a, Error = crate::Error>,
{
self.section_bytes().map(T::parse).collect()
}
pub fn table<'a, T>(&'a self) -> crate::Result<CompleteTable<T>>
where
T: Parse<'a, Error = crate::Error>,
{
Ok(CompleteTable {
meta: self.meta,
sections: self.parse_sections()?,
})
}
pub fn nit(&self) -> crate::Result<CompleteNit<'_>> {
CompleteNit::parse(self, None)
}
pub fn nit_with_registry<'a>(
&'a self,
registry: &'a DescriptorRegistry,
) -> crate::Result<CompleteNit<'a>> {
CompleteNit::parse(self, Some(registry))
}
pub fn bat(&self) -> crate::Result<CompleteBat<'_>> {
CompleteBat::parse(self, None)
}
pub fn bat_with_registry<'a>(
&'a self,
registry: &'a DescriptorRegistry,
) -> crate::Result<CompleteBat<'a>> {
CompleteBat::parse(self, Some(registry))
}
pub fn sdt(&self) -> crate::Result<CompleteSdt<'_>> {
CompleteSdt::parse(self, None)
}
pub fn sdt_with_registry<'a>(
&'a self,
registry: &'a DescriptorRegistry,
) -> crate::Result<CompleteSdt<'a>> {
CompleteSdt::parse(self, Some(registry))
}
pub fn eit(&self) -> crate::Result<CompleteEit<'_>> {
CompleteEit::parse(self, None)
}
pub fn eit_with_registry<'a>(
&'a self,
registry: &'a DescriptorRegistry,
) -> crate::Result<CompleteEit<'a>> {
CompleteEit::parse(self, Some(registry))
}
}
#[derive(Debug)]
pub struct ParsedDescriptorLoop<'a> {
raw: DescriptorLoop<'a>,
descriptors: Vec<crate::Result<AnyDescriptor<'a>>>,
}
impl<'a> ParsedDescriptorLoop<'a> {
fn parse(raw: DescriptorLoop<'a>, registry: Option<&'a DescriptorRegistry>) -> Self {
let descriptors = match registry {
Some(registry) => registry.parse_loop(raw.raw()).collect(),
None => raw.iter().collect(),
};
Self { raw, descriptors }
}
#[must_use]
pub const fn raw(&self) -> DescriptorLoop<'a> {
self.raw
}
pub fn descriptors(&self) -> &[crate::Result<AnyDescriptor<'a>>] {
&self.descriptors
}
}
#[derive(Debug)]
pub struct CompleteNitTransportStream<'a> {
pub transport_stream_id: u16,
pub original_network_id: u16,
pub descriptors: ParsedDescriptorLoop<'a>,
}
#[derive(Debug)]
pub struct CompleteNit<'a> {
pub kind: nit::NitKind,
pub network_id: u16,
pub version_number: u8,
pub current_next_indicator: bool,
pub network_descriptors: ParsedDescriptorLoop<'a>,
pub transport_streams: Vec<CompleteNitTransportStream<'a>>,
}
impl<'a> CompleteNit<'a> {
fn parse(
set: &'a CompleteSectionSet,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Self> {
let sections: Vec<nit::NitSection<'a>> = set.parse_sections()?;
let first = sections.first().ok_or(crate::Error::BufferTooShort {
need: 1,
have: 0,
what: "CompleteNit sections",
})?;
let mut transport_streams = Vec::new();
for section in §ions {
transport_streams.extend(section.transport_streams.iter().map(|ts| {
CompleteNitTransportStream {
transport_stream_id: ts.transport_stream_id,
original_network_id: ts.original_network_id,
descriptors: ParsedDescriptorLoop::parse(ts.descriptors, registry),
}
}));
}
Ok(Self {
kind: first.kind,
network_id: first.network_id,
version_number: first.version_number,
current_next_indicator: first.current_next_indicator,
network_descriptors: ParsedDescriptorLoop::parse(first.network_descriptors, registry),
transport_streams,
})
}
}
#[derive(Debug)]
pub struct CompleteBatTransportStream<'a> {
pub transport_stream_id: u16,
pub original_network_id: u16,
pub descriptors: ParsedDescriptorLoop<'a>,
}
#[derive(Debug)]
pub struct CompleteBat<'a> {
pub bouquet_id: u16,
pub version_number: u8,
pub current_next_indicator: bool,
pub bouquet_descriptors: ParsedDescriptorLoop<'a>,
pub transport_streams: Vec<CompleteBatTransportStream<'a>>,
}
impl<'a> CompleteBat<'a> {
fn parse(
set: &'a CompleteSectionSet,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Self> {
let sections: Vec<bat::BatSection<'a>> = set.parse_sections()?;
let first = sections.first().ok_or(crate::Error::BufferTooShort {
need: 1,
have: 0,
what: "CompleteBat sections",
})?;
let mut transport_streams = Vec::new();
for section in §ions {
transport_streams.extend(section.transport_streams.iter().map(|ts| {
CompleteBatTransportStream {
transport_stream_id: ts.transport_stream_id,
original_network_id: ts.original_network_id,
descriptors: ParsedDescriptorLoop::parse(ts.descriptors, registry),
}
}));
}
Ok(Self {
bouquet_id: first.bouquet_id,
version_number: first.version_number,
current_next_indicator: first.current_next_indicator,
bouquet_descriptors: ParsedDescriptorLoop::parse(first.bouquet_descriptors, registry),
transport_streams,
})
}
}
#[derive(Debug)]
pub struct CompleteSdtService<'a> {
pub service_id: u16,
pub eit_schedule_flag: bool,
pub eit_present_following_flag: bool,
pub running_status: u8,
pub free_ca_mode: bool,
pub descriptors: ParsedDescriptorLoop<'a>,
}
#[derive(Debug)]
pub struct CompleteSdt<'a> {
pub kind: sdt::SdtKind,
pub transport_stream_id: u16,
pub version_number: u8,
pub current_next_indicator: bool,
pub original_network_id: u16,
pub services: Vec<CompleteSdtService<'a>>,
}
impl<'a> CompleteSdt<'a> {
fn parse(
set: &'a CompleteSectionSet,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Self> {
let sections: Vec<sdt::SdtSection<'a>> = set.parse_sections()?;
let first = sections.first().ok_or(crate::Error::BufferTooShort {
need: 1,
have: 0,
what: "CompleteSdt sections",
})?;
let mut services = Vec::new();
for section in §ions {
services.extend(section.services.iter().map(|svc| CompleteSdtService {
service_id: svc.service_id,
eit_schedule_flag: svc.eit_schedule_flag,
eit_present_following_flag: svc.eit_present_following_flag,
running_status: svc.running_status,
free_ca_mode: svc.free_ca_mode,
descriptors: ParsedDescriptorLoop::parse(svc.descriptors, registry),
}));
}
Ok(Self {
kind: first.kind,
transport_stream_id: first.transport_stream_id,
version_number: first.version_number,
current_next_indicator: first.current_next_indicator,
original_network_id: first.original_network_id,
services,
})
}
}
#[derive(Debug)]
pub struct CompleteEitEvent<'a> {
pub event_id: u16,
pub start_time_raw: [u8; 5],
pub duration_raw: [u8; 3],
pub running_status: u8,
pub free_ca_mode: bool,
pub descriptors: ParsedDescriptorLoop<'a>,
}
#[derive(Debug)]
pub struct CompleteEit<'a> {
pub kind: eit::EitKind,
pub table_id: u8,
pub service_id: u16,
pub version_number: u8,
pub current_next_indicator: bool,
pub transport_stream_id: u16,
pub original_network_id: u16,
pub segment_last_section_number: u8,
pub last_table_id: u8,
pub events: Vec<CompleteEitEvent<'a>>,
}
impl<'a> CompleteEit<'a> {
fn parse(
set: &'a CompleteSectionSet,
registry: Option<&'a DescriptorRegistry>,
) -> crate::Result<Self> {
let sections: Vec<eit::EitSection<'a>> = set.parse_sections()?;
let first = sections.first().ok_or(crate::Error::BufferTooShort {
need: 1,
have: 0,
what: "CompleteEit sections",
})?;
let mut events = Vec::new();
for section in §ions {
events.extend(section.events.iter().map(|event| CompleteEitEvent {
event_id: event.event_id,
start_time_raw: event.start_time_raw,
duration_raw: event.duration_raw,
running_status: event.running_status,
free_ca_mode: event.free_ca_mode,
descriptors: ParsedDescriptorLoop::parse(event.descriptors, registry),
}));
}
Ok(Self {
kind: first.kind,
table_id: first.table_id,
service_id: first.service_id,
version_number: first.version_number,
current_next_indicator: first.current_next_indicator,
transport_stream_id: first.transport_stream_id,
original_network_id: first.original_network_id,
segment_last_section_number: first.segment_last_section_number,
last_table_id: first.last_table_id,
events,
})
}
}