#![forbid(unsafe_code)]
use std::{
error::Error as StdError,
fmt,
future::Future,
io::{self, Error as IoError, ErrorKind, Read, Seek, SeekFrom},
num::NonZeroUsize,
ops::Range,
sync::Arc,
};
use kithara_platform::{MaybeSend, MaybeSync, thread::yield_now, time::Duration, tokio::task};
use kithara_storage::WaitOutcome;
use kithara_test_utils::kithara;
#[derive(Debug)]
#[non_exhaustive]
pub enum StreamReadError {
Source(IoError),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamReadOutcome {
Bytes {
count: NonZeroUsize,
byte_position: u64,
},
Pending(PendingReason),
Eof { byte_position: u64 },
}
#[derive(Debug, Clone, Copy)]
pub struct StreamSeekPastEof {
pub current_pos: u64,
pub len: u64,
pub new_pos: u64,
}
impl fmt::Display for StreamSeekPastEof {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"seek past EOF: new_pos={} len={} current_pos={}",
self.new_pos, self.len, self.current_pos
)
}
}
impl StdError for StreamSeekPastEof {}
impl fmt::Display for StreamReadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Source(e) => write!(f, "source error: {e}"),
}
}
}
impl StdError for StreamReadError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Source(e) => Some(e),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct StreamPending {
pub len: Option<u64>,
pub reason: PendingReason,
pub phase: SourcePhase,
pub flushing: bool,
pub variant_fence: bool,
pub epoch: u64,
pub pos: u64,
pub want: usize,
}
impl fmt::Display for StreamPending {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}: pos={} want={} len={:?} phase={:?} epoch={} flushing={} variant_fence={}",
self.reason,
self.pos,
self.want,
self.len,
self.phase,
self.epoch,
self.flushing,
self.variant_fence,
)
}
}
impl StdError for StreamPending {}
#[derive(Debug, Clone, Copy)]
pub struct VariantChangeError;
impl fmt::Display for VariantChangeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("variant change: decoder recreation required")
}
}
impl StdError for VariantChangeError {}
use crate::{
MediaInfo, SourcePhase, SourceSeekAnchor, Timeline,
error::{SourceError, StreamError},
source::{NotReadyCause, PendingReason, ReadOutcome, Source},
};
pub trait StreamType: MaybeSend + 'static {
type Config: Default + MaybeSend;
type Events: Clone + MaybeSend + MaybeSync + 'static;
type Source: Source;
fn create(config: Self::Config) -> impl Future<Output = Result<Self::Source, SourceError>>;
fn event_bus(config: &Self::Config) -> Option<Self::Events> {
let _ = config;
None
}
}
pub struct Stream<T: StreamType> {
source: T::Source,
}
impl<T: StreamType> Stream<T> {
pub async fn new(config: T::Config) -> Result<Self, SourceError> {
let source = T::create(config).await?;
task::yield_now().await;
Ok(Self { source })
}
pub fn is_empty(&self) -> Option<bool> {
self.len().map(|len| len == 0)
}
pub fn position(&self) -> u64 {
self.source.position()
}
pub fn seek_time_anchor(
&mut self,
position: Duration,
) -> Result<Option<SourceSeekAnchor>, io::Error> {
self.source
.seek_time_anchor(position)
.map_err(|e| IoError::other(e.to_string()))
}
pub fn source(&self) -> &T::Source {
&self.source
}
pub fn timeline(&self) -> Timeline {
self.source.timeline()
}
delegate::delegate! {
to self.source {
pub fn phase(&self) -> SourcePhase;
pub fn phase_at(&self, range: Range<u64>) -> SourcePhase;
pub fn media_info(&self) -> Option<MediaInfo>;
pub fn abr_handle(&self) -> Option<kithara_abr::AbrHandle>;
pub fn current_variant(&self) -> Option<kithara_events::VariantInfo>;
pub fn len(&self) -> Option<u64>;
pub fn current_segment_range(&self) -> Option<Range<u64>>;
pub fn format_change_segment_range(&self) -> crate::error::StreamResult<Range<u64>>;
pub fn clear_variant_fence(&mut self);
pub fn has_variant_change_pending(&self) -> bool;
pub fn set_seek_epoch(&mut self, seek_epoch: u64);
pub fn notify_waiting(&self);
pub fn make_notify_fn(&self) -> Option<Box<dyn Fn() + Send + Sync>>;
pub fn commit_seek_landing(&mut self, anchor: Option<SourceSeekAnchor>);
pub fn take_reader_hooks(&mut self) -> Option<crate::SharedHooks>;
pub fn as_segment_layout(&self) -> Option<Arc<dyn crate::SegmentLayout>>;
pub fn set_position(&self, pos: u64);
}
}
}
impl<T: StreamType> Stream<T> {
const MAX_WAIT_SPINS: u32 = 50;
const SEEK_WAIT_TIMEOUT: Duration = Duration::from_millis(500);
const WAIT_RANGE_TIMEOUT: Duration = Duration::from_millis(10);
#[cfg_attr(feature = "perf", hotpath::measure)]
#[kithara::hang_watchdog]
pub fn try_read(&mut self, buf: &mut [u8]) -> Result<StreamReadOutcome, StreamReadError> {
if buf.is_empty() {
return Ok(StreamReadOutcome::Eof {
byte_position: self.source.position(),
});
}
let mut wait_spins = 0u32;
loop {
let timeline = self.source.timeline();
let read_epoch = timeline.seek_epoch();
let pos = self.source.position();
let range = pos..pos.saturating_add(buf.len() as u64);
if self.source.has_variant_change_pending() {
return Ok(StreamReadOutcome::Pending(PendingReason::VariantChange));
}
let wait_result = self
.source
.wait_range(range, Some(Self::WAIT_RANGE_TIMEOUT));
let wait_outcome = match wait_result {
Ok(outcome) => outcome,
Err(StreamError::Source(SourceError::WaitBudgetExceeded)) => {
if timeline.is_flushing() || timeline.seek_epoch() != read_epoch {
return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
}
wait_spins += 1;
if wait_spins >= Self::MAX_WAIT_SPINS {
return Ok(StreamReadOutcome::Pending(PendingReason::NotReady(
NotReadyCause::WaitBudgetExhausted,
)));
}
hang_tick!();
yield_now();
continue;
}
Err(e) => {
return Err(StreamReadError::Source(IoError::other(e.to_string())));
}
};
match wait_outcome {
WaitOutcome::Ready => {}
WaitOutcome::Eof => {
return Ok(StreamReadOutcome::Eof { byte_position: pos });
}
WaitOutcome::Interrupted => {
if !timeline.is_flushing() {
wait_spins += 1;
if wait_spins >= Self::MAX_WAIT_SPINS {
return Ok(StreamReadOutcome::Pending(PendingReason::NotReady(
NotReadyCause::WaitInterrupted,
)));
}
hang_tick!();
yield_now();
continue;
}
return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
}
}
wait_spins = 0;
if timeline.seek_epoch() != read_epoch {
return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
}
match self
.source
.read_at(pos, buf)
.map_err(|e| StreamReadError::Source(IoError::other(e.to_string())))?
{
ReadOutcome::Bytes(count) => {
if timeline.seek_epoch() != read_epoch {
return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
}
hang_reset!();
timeline.set_segment_position(pos);
self.source.advance(count.get() as u64);
let new_pos = self.source.position();
return Ok(StreamReadOutcome::Bytes {
count,
byte_position: new_pos,
});
}
ReadOutcome::Eof => {
return Ok(StreamReadOutcome::Eof { byte_position: pos });
}
ReadOutcome::Pending(PendingReason::Retry) => {
hang_tick!();
yield_now();
continue;
}
ReadOutcome::Pending(reason) => {
return Ok(StreamReadOutcome::Pending(reason));
}
}
}
}
}
impl<T: StreamType> Read for Stream<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.try_read(buf) {
Ok(StreamReadOutcome::Bytes { count, .. }) => Ok(count.get()),
Ok(StreamReadOutcome::Eof { .. }) => Ok(0),
Ok(StreamReadOutcome::Pending(reason @ PendingReason::SeekPending)) => {
Err(IoError::new(ErrorKind::Interrupted, reason))
}
Ok(StreamReadOutcome::Pending(
reason @ (PendingReason::NotReady(_) | PendingReason::Retry),
)) => Err(IoError::new(
ErrorKind::Interrupted,
self.snapshot_pending(reason, buf.len()),
)),
Ok(StreamReadOutcome::Pending(PendingReason::VariantChange)) => {
Err(IoError::other(VariantChangeError))
}
Err(StreamReadError::Source(e)) => Err(e),
}
}
}
impl<T: StreamType> Stream<T> {
fn snapshot_pending(&self, reason: PendingReason, want: usize) -> StreamPending {
let pos = self.source.position();
let len = self.source.len();
let phase = self.source.phase_at(pos..pos.saturating_add(want as u64));
let timeline = self.source.timeline();
StreamPending {
reason,
pos,
want,
len,
phase,
epoch: timeline.seek_epoch(),
flushing: timeline.is_flushing(),
variant_fence: self.source.has_variant_change_pending(),
}
}
}
impl<T: StreamType> Seek for Stream<T> {
#[cfg_attr(feature = "perf", hotpath::measure)]
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let current = self.source.position();
let new_pos: i128 = match pos {
SeekFrom::Start(p) => i128::from(p),
SeekFrom::Current(delta) => i128::from(current).saturating_add(i128::from(delta)),
SeekFrom::End(delta) => {
if self.source.len().is_none() {
let _ = self.source.wait_range(0..1, None);
}
let Some(len) = self.source.len() else {
return Err(IoError::new(
ErrorKind::Unsupported,
"seek from end requires known length",
));
};
i128::from(len).saturating_add(i128::from(delta))
}
};
if new_pos < 0 {
return Err(IoError::new(
ErrorKind::InvalidInput,
"negative seek position",
));
}
let new_pos = u64::try_from(new_pos).unwrap_or(u64::MAX);
let wait_range = match self.source.format_change_segment_range() {
Ok(range) if range.start == new_pos => range,
_ => new_pos..new_pos.saturating_add(1),
};
let _ = self
.source
.wait_range(wait_range, Some(Self::SEEK_WAIT_TIMEOUT));
if let Some(len) = self.source.len()
&& new_pos > len
{
return Err(IoError::new(
ErrorKind::InvalidInput,
StreamSeekPastEof {
new_pos,
len,
current_pos: current,
},
));
}
self.source.set_position(new_pos);
Ok(new_pos)
}
}
#[cfg(test)]
mod tests {
use std::{
collections::VecDeque,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};
use kithara_storage::WaitOutcome;
use kithara_test_utils::kithara;
use super::*;
use crate::{ReadOutcome, Source, SourcePhase};
#[derive(Clone, Copy)]
enum ScriptRead {
Data(usize),
Eof,
}
fn bytes(count: usize) -> ReadOutcome {
let nz = NonZeroUsize::new(count)
.expect("BUG: ScriptSource::bytes invariant — count must be > 0");
ReadOutcome::Bytes(nz)
}
struct ScriptSource {
position: Arc<AtomicU64>,
anchor: Option<SourceSeekAnchor>,
timeline: Timeline,
data: Vec<u8>,
reads: VecDeque<ScriptRead>,
waits: VecDeque<WaitOutcome>,
}
impl ScriptSource {
fn new(
timeline: Timeline,
waits: impl IntoIterator<Item = WaitOutcome>,
reads: impl IntoIterator<Item = ScriptRead>,
data: Vec<u8>,
) -> Self {
Self {
timeline,
data,
position: Arc::new(AtomicU64::new(0)),
anchor: None,
reads: reads.into_iter().collect(),
waits: waits.into_iter().collect(),
}
}
}
impl Source for ScriptSource {
fn advance(&self, n: u64) {
self.position.fetch_add(n, Ordering::AcqRel);
}
fn len(&self) -> Option<u64> {
Some(self.data.len() as u64)
}
fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
SourcePhase::Waiting
}
fn position(&self) -> u64 {
self.position.load(Ordering::Acquire)
}
fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> crate::StreamResult<ReadOutcome> {
let step = self.reads.pop_front().unwrap_or(ScriptRead::Eof);
match step {
ScriptRead::Eof => Ok(ReadOutcome::Eof),
ScriptRead::Data(n) => {
let Ok(start) = usize::try_from(offset) else {
return Ok(ReadOutcome::Eof);
};
let end = (start + n).min(self.data.len());
let bytes_count = end.saturating_sub(start).min(buf.len());
if bytes_count == 0 {
return Ok(ReadOutcome::Eof);
}
buf[..bytes_count].copy_from_slice(&self.data[start..start + bytes_count]);
Ok(bytes(bytes_count))
}
}
}
fn seek_time_anchor(
&mut self,
_position: Duration,
) -> crate::StreamResult<Option<SourceSeekAnchor>> {
Ok(self.anchor)
}
fn set_position(&self, pos: u64) {
self.position.store(pos, Ordering::Release);
}
fn timeline(&self) -> Timeline {
self.timeline.clone()
}
fn wait_range(
&mut self,
_range: Range<u64>,
_timeout: Option<Duration>,
) -> crate::StreamResult<WaitOutcome> {
Ok(self.waits.pop_front().unwrap_or(WaitOutcome::Ready))
}
}
struct DummyType;
impl StreamType for DummyType {
type Config = ();
type Events = ();
type Source = ScriptSource;
async fn create(_config: Self::Config) -> Result<Self::Source, SourceError> {
Err(SourceError::other(IoError::other("not used in unit tests")))
}
}
struct SeekDuringWaitType;
impl StreamType for SeekDuringWaitType {
type Config = ();
type Events = ();
type Source = SeekDuringWaitSource;
async fn create(_config: Self::Config) -> Result<Self::Source, SourceError> {
Err(SourceError::other(IoError::other("not used in unit tests")))
}
}
struct SeekDuringWaitSource {
position: Arc<AtomicU64>,
timeline: Timeline,
read_calls: usize,
}
impl Source for SeekDuringWaitSource {
fn advance(&self, n: u64) {
self.position.fetch_add(n, Ordering::AcqRel);
}
fn len(&self) -> Option<u64> {
Some(4)
}
fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
SourcePhase::Ready
}
fn position(&self) -> u64 {
self.position.load(Ordering::Acquire)
}
fn read_at(&mut self, _offset: u64, _buf: &mut [u8]) -> crate::StreamResult<ReadOutcome> {
self.read_calls += 1;
Ok(bytes(4))
}
fn set_position(&self, pos: u64) {
self.position.store(pos, Ordering::Release);
}
fn timeline(&self) -> Timeline {
self.timeline.clone()
}
fn wait_range(
&mut self,
_range: Range<u64>,
_timeout: Option<Duration>,
) -> crate::StreamResult<WaitOutcome> {
let _ = self.timeline.initiate_seek(Duration::from_millis(10));
Ok(WaitOutcome::Ready)
}
}
#[kithara::test]
fn read_retries_interrupted_when_not_flushing() {
let timeline = Timeline::new();
let source = ScriptSource::new(
timeline.clone(),
[WaitOutcome::Interrupted, WaitOutcome::Ready],
[ScriptRead::Data(4)],
b"ABCD".to_vec(),
);
let mut stream = Stream::<DummyType> { source };
let mut buf = [0u8; 4];
let n = stream
.read(&mut buf)
.expect("BUG: read must succeed after the explicit retry in this test scenario");
assert_eq!(n, 4);
assert_eq!(&buf, b"ABCD");
}
#[kithara::test]
fn try_read_returns_seek_pending_when_flushing() {
let timeline = Timeline::new();
let _ = timeline.initiate_seek(Duration::from_millis(10));
let source = ScriptSource::new(timeline.clone(), [WaitOutcome::Interrupted], [], vec![]);
let mut stream = Stream::<DummyType> { source };
let mut buf = [0u8; 4];
let outcome = stream
.try_read(&mut buf)
.expect("BUG: seek-pending is a status return; not a hard error in this test");
assert!(matches!(
outcome,
StreamReadOutcome::Pending(PendingReason::SeekPending)
));
}
#[kithara::test]
fn try_read_returns_seek_pending_when_epoch_changes_after_wait() {
let timeline = Timeline::new();
let source = SeekDuringWaitSource {
timeline: timeline.clone(),
position: Arc::new(AtomicU64::new(0)),
read_calls: 0,
};
let mut stream = Stream::<SeekDuringWaitType> { source };
let mut buf = [0u8; 4];
let outcome = stream
.try_read(&mut buf)
.expect("BUG: seek-pending is a status return; not a hard error in this test");
assert!(matches!(
outcome,
StreamReadOutcome::Pending(PendingReason::SeekPending)
));
assert_eq!(stream.source.read_calls, 0);
assert_eq!(stream.position(), 0);
}
#[kithara::test]
fn seek_updates_position() {
let timeline = Timeline::new();
let source = ScriptSource::new(timeline.clone(), [], [], b"ABCDE".to_vec());
let mut stream = Stream::<DummyType> { source };
let pos = stream
.seek(SeekFrom::Start(3))
.expect("BUG: seek to a position within the test stream must succeed");
assert_eq!(pos, 3);
assert_eq!(stream.position(), 3);
}
#[kithara::test]
fn seek_time_anchor_does_not_move_position() {
let timeline = Timeline::new();
let mut source = ScriptSource::new(timeline.clone(), [], [], b"ABCDE".to_vec());
source.set_position(11);
source.anchor = Some(SourceSeekAnchor {
byte_offset: 3,
segment_start: Duration::from_secs(8),
segment_end: Some(Duration::from_secs(12)),
segment_index: Some(2),
variant_index: Some(1),
});
let mut stream = Stream::<DummyType> { source };
let anchor = stream
.seek_time_anchor(Duration::from_millis(8_500))
.expect("BUG: anchor resolution must succeed for the constructed test stream")
.expect("BUG: stream must return the resolved anchor in this test");
assert_eq!(anchor.byte_offset, 3);
assert_eq!(
stream.position(),
11,
"anchor resolution must not eagerly commit stream position"
);
}
}