#![forbid(unsafe_code)]
use std::{error::Error as StdError, fmt, num::NonZeroUsize, ops::Range, sync::Arc};
use kithara_events::VariantInfo;
use kithara_platform::{MaybeSend, MaybeSync, time::Duration};
use kithara_storage::WaitOutcome;
use kithara_test_utils::kithara;
use crate::{
Timeline,
error::{SourceError, StreamError, StreamResult},
media::MediaInfo,
};
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct SegmentDescriptor {
pub decode_time: Duration,
pub duration: Duration,
pub byte_range: Range<u64>,
pub segment_index: u32,
pub variant_index: usize,
}
impl SegmentDescriptor {
#[must_use]
pub fn new(
byte_range: Range<u64>,
decode_time: Duration,
duration: Duration,
segment_index: u32,
variant_index: usize,
) -> Self {
Self {
decode_time,
duration,
byte_range,
segment_index,
variant_index,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum SourcePhase {
Cancelled,
Eof,
Ready,
Seeking,
#[default]
Waiting,
WaitingDemand,
WaitingMetadata,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PendingReason {
SeekPending,
NotReady(NotReadyCause),
VariantChange,
Retry,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum NotReadyCause {
WaitBudgetExhausted,
WaitInterrupted,
SourcePending,
}
impl fmt::Display for NotReadyCause {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::WaitBudgetExhausted => "wait budget exhausted",
Self::WaitInterrupted => "wait interrupted, no flush",
Self::SourcePending => "source returned pending after wait ready",
})
}
}
impl fmt::Display for PendingReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SeekPending => f.write_str("seek pending"),
Self::NotReady(cause) => write!(f, "data not ready ({cause})"),
Self::VariantChange => f.write_str("variant change: decoder recreation required"),
Self::Retry => f.write_str("resource evicted, retry wait_range"),
}
}
}
impl StdError for PendingReason {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadOutcome {
Bytes(NonZeroUsize),
Pending(PendingReason),
Eof,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, bon::Builder)]
#[non_exhaustive]
pub struct SourceSeekAnchor {
#[builder(default)]
pub segment_start: Duration,
pub segment_end: Option<Duration>,
pub segment_index: Option<u32>,
pub variant_index: Option<usize>,
#[builder(default)]
pub byte_offset: u64,
}
#[kithara::mock(api = SourceMock)]
pub trait Source: MaybeSend + MaybeSync + 'static {
fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
None
}
fn advance(&self, n: u64);
fn as_segment_layout(&self) -> Option<Arc<dyn SegmentLayout>> {
None
}
fn clear_variant_fence(&mut self) {}
fn commit_seek_landing(&mut self, _anchor: Option<SourceSeekAnchor>) {}
fn current_segment_range(&self) -> Option<Range<u64>> {
None
}
fn current_variant(&self) -> Option<VariantInfo> {
None
}
fn format_change_segment_range(&self) -> StreamResult<Range<u64>> {
Err(StreamError::Source(SourceError::FormatChangeNotApplicable))
}
fn has_variant_change_pending(&self) -> bool {
false
}
fn is_empty(&self) -> bool {
self.len().is_none_or(|n| n == 0)
}
fn len(&self) -> Option<u64>;
fn make_notify_fn(&self) -> Option<Box<dyn Fn() + Send + Sync>> {
None
}
fn media_info(&self) -> Option<MediaInfo> {
None
}
fn notify_waiting(&self) {}
fn phase(&self) -> SourcePhase {
let pos = self.position();
self.phase_at(pos..pos.saturating_add(1))
}
fn phase_at(&self, range: Range<u64>) -> SourcePhase;
fn position(&self) -> u64;
fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> StreamResult<ReadOutcome>;
fn seek_time_anchor(&mut self, _position: Duration) -> StreamResult<Option<SourceSeekAnchor>> {
Ok(None)
}
fn set_position(&self, pos: u64);
fn set_seek_epoch(&mut self, _seek_epoch: u64) {}
fn take_reader_hooks(&mut self) -> Option<crate::SharedHooks> {
None
}
fn timeline(&self) -> Timeline;
fn wait_range(
&mut self,
range: Range<u64>,
timeout: Option<Duration>,
) -> StreamResult<WaitOutcome>;
}
pub trait SegmentLayout: Send + Sync + 'static {
fn init_segment_range(&self) -> Range<u64>;
fn is_empty(&self) -> bool {
self.len().is_none_or(|n| n == 0)
}
fn len(&self) -> Option<u64>;
fn segment_after_byte(&self, byte_offset: u64) -> Option<SegmentDescriptor>;
fn segment_at_byte(&self, _byte_offset: u64) -> Option<SegmentDescriptor> {
None
}
fn segment_at_index(&self, _segment_index: u32) -> Option<SegmentDescriptor> {
None
}
fn segment_at_time(&self, t: Duration) -> Option<SegmentDescriptor>;
fn segment_count(&self) -> Option<u32>;
}
#[cfg(test)]
mod tests {
use kithara_test_utils::kithara;
use super::*;
#[kithara::test]
fn test_source_trait_object_safety() {
fn _accepts_source<S: Source>(_s: S) {}
}
#[kithara::test]
fn source_phase_defaults_to_waiting() {
assert_eq!(SourcePhase::default(), SourcePhase::Waiting);
}
#[kithara::test]
fn phase_default_delegates_to_phase_at() {
use std::sync::atomic::{AtomicU64, Ordering};
struct ReadySource {
timeline: Timeline,
position: Arc<AtomicU64>,
}
impl Source for ReadySource {
fn timeline(&self) -> Timeline {
self.timeline.clone()
}
fn wait_range(
&mut self,
_range: Range<u64>,
_timeout: Option<Duration>,
) -> StreamResult<WaitOutcome> {
Ok(WaitOutcome::Ready)
}
fn read_at(&mut self, _offset: u64, _buf: &mut [u8]) -> StreamResult<ReadOutcome> {
Ok(ReadOutcome::Eof)
}
fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
SourcePhase::Ready
}
fn len(&self) -> Option<u64> {
Some(100)
}
fn position(&self) -> u64 {
self.position.load(Ordering::Acquire)
}
fn advance(&self, n: u64) {
self.position.fetch_add(n, Ordering::AcqRel);
}
fn set_position(&self, pos: u64) {
self.position.store(pos, Ordering::Release);
}
}
let source = ReadySource {
timeline: Timeline::new(),
position: Arc::new(AtomicU64::new(0)),
};
assert_eq!(source.phase(), SourcePhase::Ready);
}
}