use std::collections::BTreeMap;
use std::error::Error;
use std::fmt;
use std::io::{self, Read, Seek, SeekFrom};
#[cfg(feature = "async")]
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use super::{MuxEventCursor, MuxPlan, MuxSampleEvent, MuxTrackConfig, MuxTrackKind};
#[cfg(feature = "async")]
use crate::async_io::{AsyncReadForward, AsyncReadSeek};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct SampleTrackMetadata {
kind: MuxTrackKind,
language: [u8; 3],
}
impl SampleTrackMetadata {
pub const fn kind(&self) -> MuxTrackKind {
self.kind
}
pub const fn language(&self) -> [u8; 3] {
self.language
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct SampleMetadata {
source_index: usize,
track_id: u32,
track: Option<SampleTrackMetadata>,
decode_time: u64,
composition_time_offset: i32,
duration: u32,
data_offset: u64,
data_size: u32,
output_offset: u64,
is_sync_sample: bool,
}
impl SampleMetadata {
pub const fn source_index(&self) -> usize {
self.source_index
}
pub const fn track_id(&self) -> u32 {
self.track_id
}
pub const fn track(&self) -> Option<SampleTrackMetadata> {
self.track
}
pub const fn decode_time(&self) -> u64 {
self.decode_time
}
pub const fn composition_time_offset(&self) -> i32 {
self.composition_time_offset
}
pub const fn duration(&self) -> u32 {
self.duration
}
pub const fn data_offset(&self) -> u64 {
self.data_offset
}
pub const fn data_size(&self) -> u32 {
self.data_size
}
pub const fn output_offset(&self) -> u64 {
self.output_offset
}
pub const fn output_end_offset(&self) -> u64 {
self.output_offset + self.data_size as u64
}
pub const fn decode_end_time(&self) -> u64 {
self.decode_time + self.duration as u64
}
pub const fn is_sync_sample(&self) -> bool {
self.is_sync_sample
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SamplePacket {
metadata: SampleMetadata,
bytes: Vec<u8>,
}
impl SamplePacket {
pub const fn metadata(&self) -> &SampleMetadata {
&self.metadata
}
pub fn bytes(&self) -> &[u8] {
&self.bytes
}
pub fn into_parts(self) -> (SampleMetadata, Vec<u8>) {
(self.metadata, self.bytes)
}
}
#[derive(Debug)]
pub enum SampleReaderError {
SampleSizeOverflow { size: u64 },
MissingSourceIndex {
source_index: usize,
source_count: usize,
},
NonMonotonicSourceOffset {
source_index: usize,
previous_offset: u64,
next_offset: u64,
},
IncompleteAdvance {
source_index: usize,
expected_offset: u64,
actual_offset: u64,
},
IncompleteSample {
source_index: usize,
expected_size: u64,
actual_size: u64,
},
Io(io::Error),
}
impl fmt::Display for SampleReaderError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SampleSizeOverflow { size } => write!(
f,
"planned sample size {size} does not fit in memory on this platform"
),
Self::MissingSourceIndex {
source_index,
source_count,
} => write!(
f,
"sample plan referenced source index {source_index}, but only {source_count} sources were provided"
),
Self::NonMonotonicSourceOffset {
source_index,
previous_offset,
next_offset,
} => write!(
f,
"source index {source_index} would need to move backward from offset {previous_offset} to {next_offset}"
),
Self::IncompleteAdvance {
source_index,
expected_offset,
actual_offset,
} => write!(
f,
"source index {source_index} ended while advancing to offset {expected_offset}; only reached {actual_offset}"
),
Self::IncompleteSample {
source_index,
expected_size,
actual_size,
} => write!(
f,
"source index {source_index} produced {actual_size} bytes for one sample, expected {expected_size}"
),
Self::Io(error) => write!(f, "{error}"),
}
}
}
impl Error for SampleReaderError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Io(error) => Some(error),
_ => None,
}
}
}
impl From<io::Error> for SampleReaderError {
fn from(error: io::Error) -> Self {
Self::Io(error)
}
}
pub struct PlannedSampleReader<'a, R> {
sources: &'a mut [R],
cursor: MuxEventCursor<'a>,
track_metadata: BTreeMap<u32, SampleTrackMetadata>,
}
impl<'a, R> PlannedSampleReader<'a, R>
where
R: Read + Seek,
{
pub fn new(sources: &'a mut [R], plan: &'a MuxPlan) -> Self {
Self {
sources,
cursor: plan.event_graph().cursor(),
track_metadata: BTreeMap::new(),
}
}
pub fn new_with_track_configs(
sources: &'a mut [R],
plan: &'a MuxPlan,
track_configs: &[MuxTrackConfig],
) -> Self {
Self {
sources,
cursor: plan.event_graph().cursor(),
track_metadata: build_track_metadata(track_configs),
}
}
pub fn next_sample(&mut self) -> Result<Option<SamplePacket>, SampleReaderError> {
let mut bytes = Vec::new();
let Some(metadata) = self.next_sample_into(&mut bytes)? else {
return Ok(None);
};
Ok(Some(SamplePacket { metadata, bytes }))
}
pub fn next_sample_into(
&mut self,
bytes: &mut Vec<u8>,
) -> Result<Option<SampleMetadata>, SampleReaderError> {
let Some(event) = next_sample_event(&mut self.cursor) else {
return Ok(None);
};
let staged = event.planned_item().staged();
let source_count = self.sources.len();
let Some(source) = self.sources.get_mut(staged.source_index()) else {
return Err(SampleReaderError::MissingSourceIndex {
source_index: staged.source_index(),
source_count,
});
};
source.seek(SeekFrom::Start(staged.data_offset()))?;
read_sample_bytes_into(
source,
staged.source_index(),
u64::from(staged.data_size()),
bytes,
)?;
Ok(Some(metadata_from_sample_event(
event,
&self.track_metadata,
)))
}
}
pub struct ProgressiveSampleReader<'a, R> {
sources: &'a mut [R],
cursor: MuxEventCursor<'a>,
track_metadata: BTreeMap<u32, SampleTrackMetadata>,
source_offsets: Vec<u64>,
advance_buffer: Vec<u8>,
}
impl<'a, R> ProgressiveSampleReader<'a, R>
where
R: Read,
{
pub fn new(sources: &'a mut [R], plan: &'a MuxPlan) -> Self {
Self {
source_offsets: vec![0_u64; sources.len()],
sources,
cursor: plan.event_graph().cursor(),
track_metadata: BTreeMap::new(),
advance_buffer: vec![0_u8; 16 * 1024],
}
}
pub fn new_with_track_configs(
sources: &'a mut [R],
plan: &'a MuxPlan,
track_configs: &[MuxTrackConfig],
) -> Self {
Self {
source_offsets: vec![0_u64; sources.len()],
sources,
cursor: plan.event_graph().cursor(),
track_metadata: build_track_metadata(track_configs),
advance_buffer: vec![0_u8; 16 * 1024],
}
}
pub fn next_sample(&mut self) -> Result<Option<SamplePacket>, SampleReaderError> {
let mut bytes = Vec::new();
let Some(metadata) = self.next_sample_into(&mut bytes)? else {
return Ok(None);
};
Ok(Some(SamplePacket { metadata, bytes }))
}
pub fn next_sample_into(
&mut self,
bytes: &mut Vec<u8>,
) -> Result<Option<SampleMetadata>, SampleReaderError> {
let Some(event) = next_sample_event(&mut self.cursor) else {
return Ok(None);
};
let staged = event.planned_item().staged();
let source_count = self.sources.len();
let Some(source) = self.sources.get_mut(staged.source_index()) else {
return Err(SampleReaderError::MissingSourceIndex {
source_index: staged.source_index(),
source_count,
});
};
let source_offset = self.source_offsets.get_mut(staged.source_index()).unwrap();
advance_progressive_source(
source,
staged.source_index(),
source_offset,
staged.data_offset(),
&mut self.advance_buffer,
)?;
read_progressive_sample_into(
source,
staged.source_index(),
source_offset,
u64::from(staged.data_size()),
bytes,
)?;
Ok(Some(metadata_from_sample_event(
event,
&self.track_metadata,
)))
}
}
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(all(feature = "mux", feature = "async"))))]
pub struct AsyncPlannedSampleReader<'a, R> {
sources: &'a mut [R],
cursor: MuxEventCursor<'a>,
track_metadata: BTreeMap<u32, SampleTrackMetadata>,
}
#[cfg(feature = "async")]
impl<'a, R> AsyncPlannedSampleReader<'a, R>
where
R: AsyncReadSeek,
{
pub fn new(sources: &'a mut [R], plan: &'a MuxPlan) -> Self {
Self {
sources,
cursor: plan.event_graph().cursor(),
track_metadata: BTreeMap::new(),
}
}
pub fn new_with_track_configs(
sources: &'a mut [R],
plan: &'a MuxPlan,
track_configs: &[MuxTrackConfig],
) -> Self {
Self {
sources,
cursor: plan.event_graph().cursor(),
track_metadata: build_track_metadata(track_configs),
}
}
pub async fn next_sample(&mut self) -> Result<Option<SamplePacket>, SampleReaderError> {
let mut bytes = Vec::new();
let Some(metadata) = self.next_sample_into(&mut bytes).await? else {
return Ok(None);
};
Ok(Some(SamplePacket { metadata, bytes }))
}
pub async fn next_sample_into(
&mut self,
bytes: &mut Vec<u8>,
) -> Result<Option<SampleMetadata>, SampleReaderError> {
let Some(event) = next_sample_event(&mut self.cursor) else {
return Ok(None);
};
let staged = event.planned_item().staged();
let source_count = self.sources.len();
let Some(source) = self.sources.get_mut(staged.source_index()) else {
return Err(SampleReaderError::MissingSourceIndex {
source_index: staged.source_index(),
source_count,
});
};
source.seek(SeekFrom::Start(staged.data_offset())).await?;
read_sample_bytes_into_async(
source,
staged.source_index(),
u64::from(staged.data_size()),
bytes,
)
.await?;
Ok(Some(metadata_from_sample_event(
event,
&self.track_metadata,
)))
}
}
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(all(feature = "mux", feature = "async"))))]
pub struct AsyncProgressiveSampleReader<'a, R> {
sources: &'a mut [R],
cursor: MuxEventCursor<'a>,
track_metadata: BTreeMap<u32, SampleTrackMetadata>,
source_offsets: Vec<u64>,
advance_buffer: Vec<u8>,
}
#[cfg(feature = "async")]
impl<'a, R> AsyncProgressiveSampleReader<'a, R>
where
R: AsyncReadForward,
{
pub fn new(sources: &'a mut [R], plan: &'a MuxPlan) -> Self {
Self {
source_offsets: vec![0_u64; sources.len()],
sources,
cursor: plan.event_graph().cursor(),
track_metadata: BTreeMap::new(),
advance_buffer: vec![0_u8; 16 * 1024],
}
}
pub fn new_with_track_configs(
sources: &'a mut [R],
plan: &'a MuxPlan,
track_configs: &[MuxTrackConfig],
) -> Self {
Self {
source_offsets: vec![0_u64; sources.len()],
sources,
cursor: plan.event_graph().cursor(),
track_metadata: build_track_metadata(track_configs),
advance_buffer: vec![0_u8; 16 * 1024],
}
}
pub async fn next_sample(&mut self) -> Result<Option<SamplePacket>, SampleReaderError> {
let mut bytes = Vec::new();
let Some(metadata) = self.next_sample_into(&mut bytes).await? else {
return Ok(None);
};
Ok(Some(SamplePacket { metadata, bytes }))
}
pub async fn next_sample_into(
&mut self,
bytes: &mut Vec<u8>,
) -> Result<Option<SampleMetadata>, SampleReaderError> {
let Some(event) = next_sample_event(&mut self.cursor) else {
return Ok(None);
};
let staged = event.planned_item().staged();
let source_count = self.sources.len();
let Some(source) = self.sources.get_mut(staged.source_index()) else {
return Err(SampleReaderError::MissingSourceIndex {
source_index: staged.source_index(),
source_count,
});
};
let source_offset = self.source_offsets.get_mut(staged.source_index()).unwrap();
advance_progressive_source_async(
source,
staged.source_index(),
source_offset,
staged.data_offset(),
&mut self.advance_buffer,
)
.await?;
read_progressive_sample_into_async(
source,
staged.source_index(),
source_offset,
u64::from(staged.data_size()),
bytes,
)
.await?;
Ok(Some(metadata_from_sample_event(
event,
&self.track_metadata,
)))
}
}
fn next_sample_event<'a>(cursor: &mut MuxEventCursor<'a>) -> Option<&'a MuxSampleEvent> {
cursor.next_sample()
}
fn build_track_metadata(track_configs: &[MuxTrackConfig]) -> BTreeMap<u32, SampleTrackMetadata> {
track_configs
.iter()
.map(|track| {
(
track.track_id(),
SampleTrackMetadata {
kind: track.kind(),
language: track.language(),
},
)
})
.collect()
}
fn metadata_from_sample_event(
event: &MuxSampleEvent,
track_metadata: &BTreeMap<u32, SampleTrackMetadata>,
) -> SampleMetadata {
let staged = event.planned_item().staged();
SampleMetadata {
source_index: staged.source_index(),
track_id: staged.track_id(),
track: track_metadata.get(&staged.track_id()).copied(),
decode_time: staged.decode_time(),
composition_time_offset: staged.composition_time_offset(),
duration: staged.duration(),
data_offset: staged.data_offset(),
data_size: staged.data_size(),
output_offset: event.planned_item().output_offset(),
is_sync_sample: staged.is_sync_sample(),
}
}
fn read_sample_bytes_into<R>(
source: &mut R,
source_index: usize,
size: u64,
bytes: &mut Vec<u8>,
) -> Result<(), SampleReaderError>
where
R: Read,
{
let len = usize::try_from(size).map_err(|_| SampleReaderError::SampleSizeOverflow { size })?;
bytes.clear();
bytes.resize(len, 0);
let mut copied = 0_usize;
while copied < len {
let read = match source.read(&mut bytes[copied..]) {
Ok(read) => read,
Err(error) => {
bytes.truncate(copied);
return Err(SampleReaderError::Io(error));
}
};
if read == 0 {
bytes.truncate(copied);
return Err(SampleReaderError::IncompleteSample {
source_index,
expected_size: size,
actual_size: copied as u64,
});
}
copied += read;
}
Ok(())
}
fn advance_progressive_source<R>(
source: &mut R,
source_index: usize,
current_offset: &mut u64,
target_offset: u64,
buffer: &mut [u8],
) -> Result<(), SampleReaderError>
where
R: Read,
{
if target_offset < *current_offset {
return Err(SampleReaderError::NonMonotonicSourceOffset {
source_index,
previous_offset: *current_offset,
next_offset: target_offset,
});
}
let mut remaining = target_offset - *current_offset;
while remaining > 0 {
let chunk_len = remaining.min(buffer.len() as u64) as usize;
let read = source.read(&mut buffer[..chunk_len])?;
if read == 0 {
return Err(SampleReaderError::IncompleteAdvance {
source_index,
expected_offset: target_offset,
actual_offset: *current_offset,
});
}
*current_offset += read as u64;
remaining -= read as u64;
}
Ok(())
}
fn read_progressive_sample_into<R>(
source: &mut R,
source_index: usize,
current_offset: &mut u64,
size: u64,
bytes: &mut Vec<u8>,
) -> Result<(), SampleReaderError>
where
R: Read,
{
let len = usize::try_from(size).map_err(|_| SampleReaderError::SampleSizeOverflow { size })?;
bytes.clear();
bytes.resize(len, 0);
let mut copied = 0_usize;
while copied < len {
let read = match source.read(&mut bytes[copied..]) {
Ok(read) => read,
Err(error) => {
bytes.truncate(copied);
return Err(SampleReaderError::Io(error));
}
};
if read == 0 {
bytes.truncate(copied);
return Err(SampleReaderError::IncompleteSample {
source_index,
expected_size: size,
actual_size: copied as u64,
});
}
copied += read;
}
*current_offset = current_offset
.checked_add(size)
.ok_or(SampleReaderError::SampleSizeOverflow { size })?;
Ok(())
}
#[cfg(feature = "async")]
async fn read_sample_bytes_into_async<R>(
source: &mut R,
source_index: usize,
size: u64,
bytes: &mut Vec<u8>,
) -> Result<(), SampleReaderError>
where
R: AsyncReadForward,
{
let len = usize::try_from(size).map_err(|_| SampleReaderError::SampleSizeOverflow { size })?;
bytes.clear();
bytes.resize(len, 0);
let mut copied = 0_usize;
while copied < len {
let read = match source.read(&mut bytes[copied..]).await {
Ok(read) => read,
Err(error) => {
bytes.truncate(copied);
return Err(SampleReaderError::Io(error));
}
};
if read == 0 {
bytes.truncate(copied);
return Err(SampleReaderError::IncompleteSample {
source_index,
expected_size: size,
actual_size: copied as u64,
});
}
copied += read;
}
Ok(())
}
#[cfg(feature = "async")]
async fn advance_progressive_source_async<R>(
source: &mut R,
source_index: usize,
current_offset: &mut u64,
target_offset: u64,
buffer: &mut [u8],
) -> Result<(), SampleReaderError>
where
R: AsyncReadForward,
{
if target_offset < *current_offset {
return Err(SampleReaderError::NonMonotonicSourceOffset {
source_index,
previous_offset: *current_offset,
next_offset: target_offset,
});
}
let mut remaining = target_offset - *current_offset;
while remaining > 0 {
let chunk_len = remaining.min(buffer.len() as u64) as usize;
let read = source.read(&mut buffer[..chunk_len]).await?;
if read == 0 {
return Err(SampleReaderError::IncompleteAdvance {
source_index,
expected_offset: target_offset,
actual_offset: *current_offset,
});
}
*current_offset += read as u64;
remaining -= read as u64;
}
Ok(())
}
#[cfg(feature = "async")]
async fn read_progressive_sample_into_async<R>(
source: &mut R,
source_index: usize,
current_offset: &mut u64,
size: u64,
bytes: &mut Vec<u8>,
) -> Result<(), SampleReaderError>
where
R: AsyncReadForward,
{
let len = usize::try_from(size).map_err(|_| SampleReaderError::SampleSizeOverflow { size })?;
bytes.clear();
bytes.resize(len, 0);
let mut copied = 0_usize;
while copied < len {
let read = match source.read(&mut bytes[copied..]).await {
Ok(read) => read,
Err(error) => {
bytes.truncate(copied);
return Err(SampleReaderError::Io(error));
}
};
if read == 0 {
bytes.truncate(copied);
return Err(SampleReaderError::IncompleteSample {
source_index,
expected_size: size,
actual_size: copied as u64,
});
}
copied += read;
}
*current_offset = current_offset
.checked_add(size)
.ok_or(SampleReaderError::SampleSizeOverflow { size })?;
Ok(())
}