use std::collections::HashMap;
use std::sync::Arc;
use crate::descriptors::{AnyDescriptor, DescriptorLoop, DescriptorRegistry};
use crate::section::Section;
use dvb_common::Parse;
mod bat;
mod eit;
mod nit;
mod sdt;
pub use bat::*;
pub use eit::*;
pub use nit::*;
pub use sdt::*;
pub const DEFAULT_MAX_PARTIAL_KEYS: usize = 256;
pub type CollectResult<T> = core::result::Result<T, CollectError>;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum CollectError {
#[error("section parse failed: {0}")]
Section(#[from] crate::Error),
#[error(
"table_id {table_id:#04x} is a short-form section and cannot be multi-section collected"
)]
ShortFormSection {
table_id: u8,
},
#[error(
"section_number {section_number} exceeds last_section_number {last_section_number} for table_id {table_id:#04x}"
)]
SectionNumberOutOfRange {
table_id: u8,
section_number: u8,
last_section_number: u8,
},
#[error("conflicting bytes for table_id {table_id:#04x} section {section_number}")]
ConflictingSection {
table_id: u8,
section_number: u8,
},
#[error(
"EIT schedule table_id {table_id:#04x} is outside advertised range {first_table_id:#04x}..={last_table_id:#04x}"
)]
EitTableIdOutOfRange {
table_id: u8,
first_table_id: u8,
last_table_id: u8,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct SectionSetKey {
pub pid: Option<u16>,
pub table_id: u8,
pub extension_id: u16,
pub current_next_indicator: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub struct SectionSetMeta {
pub key: SectionSetKey,
pub version_number: u8,
pub last_section_number: u8,
}
#[derive(Debug)]
struct PartialSectionSet {
meta: SectionSetMeta,
slots: Vec<Option<Arc<[u8]>>>,
filled: usize,
emitted: bool,
}
impl PartialSectionSet {
fn new(meta: SectionSetMeta) -> Self {
let len = meta.last_section_number as usize + 1;
Self {
meta,
slots: vec![None; len],
filled: 0,
emitted: false,
}
}
fn reset(&mut self, meta: SectionSetMeta) {
*self = Self::new(meta);
}
fn insert(&mut self, section_number: u8, bytes: Arc<[u8]>) -> CollectResult<bool> {
let index = section_number as usize;
if let Some(existing) = &self.slots[index] {
if existing.as_ref() == bytes.as_ref() {
return Ok(false);
}
return Err(CollectError::ConflictingSection {
table_id: self.meta.key.table_id,
section_number,
});
}
self.slots[index] = Some(bytes);
self.filled += 1;
self.emitted = false;
Ok(true)
}
fn complete(&self) -> bool {
self.filled == self.slots.len()
}
fn to_complete(&self) -> Option<CompleteSectionSet> {
if !self.complete() || self.emitted {
return None;
}
let sections = self
.slots
.iter()
.map(|slot| slot.as_ref().expect("complete set has no holes").clone())
.collect();
Some(CompleteSectionSet {
meta: self.meta,
sections,
})
}
}
#[derive(Debug)]
pub struct SectionSetCollector {
partial: HashMap<SectionSetKey, PartialSectionSet>,
max_partial_keys: usize,
}
impl Default for SectionSetCollector {
fn default() -> Self {
Self {
partial: HashMap::new(),
max_partial_keys: DEFAULT_MAX_PARTIAL_KEYS,
}
}
}
impl SectionSetCollector {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_max_partial_keys(mut self, max_partial_keys: usize) -> Self {
self.max_partial_keys = max_partial_keys;
self
}
pub fn push_section(
&mut self,
bytes: impl AsRef<[u8]>,
) -> CollectResult<Option<CompleteSectionSet>> {
self.push_section_with_pid(None, bytes)
}
pub fn push_section_with_pid(
&mut self,
pid: Option<u16>,
bytes: impl AsRef<[u8]>,
) -> CollectResult<Option<CompleteSectionSet>> {
let raw = bytes.as_ref();
let section = Section::parse(raw)?;
if !section.section_syntax_indicator {
return Err(CollectError::ShortFormSection {
table_id: section.table_id,
});
}
if section.section_number > section.last_section_number {
return Err(CollectError::SectionNumberOutOfRange {
table_id: section.table_id,
section_number: section.section_number,
last_section_number: section.last_section_number,
});
}
section.validate_crc(raw)?;
let key = SectionSetKey {
pid,
table_id: section.table_id,
extension_id: section.extension_id,
current_next_indicator: section.current_next_indicator,
};
let meta = SectionSetMeta {
key,
version_number: section.version_number,
last_section_number: section.last_section_number,
};
let bytes: Arc<[u8]> = Arc::from(raw);
if !self.partial.contains_key(&key) && self.partial.len() >= self.max_partial_keys {
return Ok(None);
}
let partial = self
.partial
.entry(key)
.or_insert_with(|| PartialSectionSet::new(meta));
if partial.meta.version_number != meta.version_number
|| partial.meta.last_section_number != meta.last_section_number
{
partial.reset(meta);
}
partial.insert(section.section_number, bytes)?;
let complete = partial.to_complete();
if complete.is_some() {
partial.emitted = true;
}
Ok(complete)
}
pub fn clear(&mut self) {
self.partial.clear();
}
#[must_use]
pub fn len(&self) -> usize {
self.partial.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.partial.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct CompleteSectionSet {
meta: SectionSetMeta,
sections: Vec<Arc<[u8]>>,
}
#[derive(Debug)]
pub struct CompleteTable<T> {
meta: SectionSetMeta,
sections: Vec<T>,
}
impl<T> CompleteTable<T> {
#[must_use]
pub const fn meta(&self) -> SectionSetMeta {
self.meta
}
#[must_use]
pub fn sections(&self) -> &[T] {
&self.sections
}
#[must_use]
pub fn into_sections(self) -> Vec<T> {
self.sections
}
}
impl CompleteSectionSet {
#[must_use]
pub const fn meta(&self) -> SectionSetMeta {
self.meta
}
#[must_use]
pub fn section_bytes(&self) -> impl ExactSizeIterator<Item = &[u8]> {
self.sections.iter().map(AsRef::as_ref)
}
pub fn parse_sections<'a, T>(&'a self) -> crate::Result<Vec<T>>
where
T: Parse<'a, Error = crate::Error>,
{
self.section_bytes().map(T::parse).collect()
}
pub fn table<'a, T>(&'a self) -> crate::Result<CompleteTable<T>>
where
T: Parse<'a, Error = crate::Error>,
{
Ok(CompleteTable {
meta: self.meta,
sections: self.parse_sections()?,
})
}
pub fn nit(&self) -> crate::Result<CompleteNit<'_>> {
CompleteNit::parse(self, None)
}
pub fn nit_with_registry<'a>(
&'a self,
registry: &'a DescriptorRegistry,
) -> crate::Result<CompleteNit<'a>> {
CompleteNit::parse(self, Some(registry))
}
pub fn bat(&self) -> crate::Result<CompleteBat<'_>> {
CompleteBat::parse(self, None)
}
pub fn bat_with_registry<'a>(
&'a self,
registry: &'a DescriptorRegistry,
) -> crate::Result<CompleteBat<'a>> {
CompleteBat::parse(self, Some(registry))
}
pub fn sdt(&self) -> crate::Result<CompleteSdt<'_>> {
CompleteSdt::parse(self, None)
}
pub fn sdt_with_registry<'a>(
&'a self,
registry: &'a DescriptorRegistry,
) -> crate::Result<CompleteSdt<'a>> {
CompleteSdt::parse(self, Some(registry))
}
pub fn eit(&self) -> crate::Result<CompleteEit<'_>> {
CompleteEit::parse(self, None)
}
pub fn eit_with_registry<'a>(
&'a self,
registry: &'a DescriptorRegistry,
) -> crate::Result<CompleteEit<'a>> {
CompleteEit::parse(self, Some(registry))
}
}
#[derive(Debug)]
pub struct ParsedDescriptorLoop<'a> {
raw: DescriptorLoop<'a>,
descriptors: Vec<crate::Result<AnyDescriptor<'a>>>,
}
impl<'a> ParsedDescriptorLoop<'a> {
pub(crate) fn parse(raw: DescriptorLoop<'a>, registry: Option<&'a DescriptorRegistry>) -> Self {
let descriptors = match registry {
Some(registry) => registry.parse_loop(raw.raw()).collect(),
None => raw.iter().collect(),
};
Self { raw, descriptors }
}
#[must_use]
pub const fn raw(&self) -> DescriptorLoop<'a> {
self.raw
}
pub fn descriptors(&self) -> &[crate::Result<AnyDescriptor<'a>>] {
&self.descriptors
}
}
#[cfg(test)]
mod tests {
use super::*;
const TEST_TABLE_ID: u8 = 0x42;
fn min_section(extension_id: u16) -> Vec<u8> {
let section_length: u16 = 9; let mut buf = vec![0u8; 12];
buf[0] = TEST_TABLE_ID;
buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
buf[2] = (section_length & 0xFF) as u8;
buf[3..5].copy_from_slice(&extension_id.to_be_bytes());
buf[5] = 0xC1;
buf[6] = 0;
buf[7] = 0;
let crc = dvb_common::crc32_mpeg2::compute(&buf[..8]);
buf[8..12].copy_from_slice(&crc.to_be_bytes());
buf
}
#[test]
fn collect_single_section_is_complete() {
let mut c = SectionSetCollector::new();
let sec = min_section(0);
let result = c.push_section(&sec).unwrap();
assert!(result.is_some());
assert_eq!(c.len(), 1);
}
#[test]
fn partial_keys_cap_skips_new_keys() {
let mut c = SectionSetCollector::new().with_max_partial_keys(3);
for eid in 0..3u16 {
let sec = min_section(eid);
let result = c.push_section(&sec).unwrap();
assert!(
result.is_some(),
"single-section set for eid {eid} completes"
);
}
assert_eq!(c.len(), 3);
let sec4 = min_section(3);
let result = c.push_section(&sec4).unwrap();
assert!(result.is_none(), "new key beyond cap must be skipped");
assert_eq!(c.len(), 3);
c.clear();
assert!(c.is_empty());
let result = c.push_section(&sec4).unwrap();
assert!(result.is_some());
assert_eq!(c.len(), 1);
}
#[test]
fn partial_keys_cap_does_not_skip_existing_key() {
let mut c = SectionSetCollector::new().with_max_partial_keys(1);
let sec0 = {
let mut buf = min_section(0xAB);
buf[7] = 1;
let crc = dvb_common::crc32_mpeg2::compute(&buf[..8]);
buf[8..12].copy_from_slice(&crc.to_be_bytes());
buf
};
let result = c.push_section(&sec0).unwrap();
assert!(result.is_none(), "incomplete section set yields None");
let mut sec1 = min_section(0xAB);
sec1[6] = 1; sec1[7] = 1; let crc = dvb_common::crc32_mpeg2::compute(&sec1[..8]);
sec1[8..12].copy_from_slice(&crc.to_be_bytes());
let result = c.push_section(&sec1).unwrap();
assert!(
result.is_some(),
"existing key must NOT be skipped when cap full"
);
assert_eq!(c.len(), 1);
}
}