Skip to main content

kithara_stream/
stream.rs

1#![forbid(unsafe_code)]
2
3use std::{
4    error::Error as StdError,
5    fmt,
6    future::Future,
7    io::{self, Error as IoError, ErrorKind, Read, Seek, SeekFrom},
8    num::NonZeroUsize,
9    ops::Range,
10    sync::Arc,
11};
12
13use kithara_platform::{MaybeSend, MaybeSync, thread::yield_now, time::Duration, tokio::task};
14use kithara_storage::WaitOutcome;
15use kithara_test_utils::kithara;
16
17/// Real error from [`Stream::try_read`] — the underlying source
18/// surfaced an I/O failure.
19///
20/// Status conditions (seek pending, data not ready, variant change,
21/// retry) are **not** errors and are carried in
22/// [`StreamReadOutcome::Pending`] with a typed [`PendingReason`]. Only
23/// genuine source failures end up here.
24#[derive(Debug)]
25#[non_exhaustive]
26pub enum StreamReadError {
27    /// Anything surfaced by the underlying [`Source`] as a real error.
28    Source(IoError),
29}
30
31/// Outcome of a [`Stream::try_read`] call.
32///
33/// Mirrors the [`ReadOutcome`] shape from
34/// [`Source::read_at`](crate::Source::read_at), but extends each variant
35/// with the authoritative `byte_position` from the stream's
36/// [`Timeline`] for callers that don't want to read it back themselves.
37/// `Bytes` carries a [`NonZeroUsize`] count — the type system
38/// guarantees forward progress.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum StreamReadOutcome {
41    /// Stream produced `count` bytes. `byte_position` is the new byte
42    /// offset **after** the read.
43    Bytes {
44        count: NonZeroUsize,
45        byte_position: u64,
46    },
47    /// No progress this call. See [`PendingReason`] for the precise
48    /// cause and required caller action.
49    Pending(PendingReason),
50    /// Natural end of stream. `byte_position` is the offset where EOF
51    /// was observed (typically the source length).
52    Eof { byte_position: u64 },
53}
54
55/// Typed error from [`Stream::seek`] for an absolute byte target that
56/// lands beyond the stream's known length.
57///
58/// Surfaced as the typed payload of an `io::Error` (kind
59/// [`ErrorKind::InvalidInput`]) so consumers like Symphonia preserve it
60/// through their own error chain. Decoders downcast to recover the
61/// structured info and classify the failure as caller-side (the seek
62/// target is invalid for this stream, not a decoder state corruption).
63#[derive(Debug, Clone, Copy)]
64pub struct StreamSeekPastEof {
65    pub current_pos: u64,
66    pub len: u64,
67    pub new_pos: u64,
68}
69
70impl fmt::Display for StreamSeekPastEof {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        write!(
73            f,
74            "seek past EOF: new_pos={} len={} current_pos={}",
75            self.new_pos, self.len, self.current_pos
76        )
77    }
78}
79
80impl StdError for StreamSeekPastEof {}
81
82impl fmt::Display for StreamReadError {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            Self::Source(e) => write!(f, "source error: {e}"),
86        }
87    }
88}
89
90impl StdError for StreamReadError {
91    // ast-grep-ignore: idioms.match-self-conversion
92    fn source(&self) -> Option<&(dyn StdError + 'static)> {
93        match self {
94            Self::Source(e) => Some(e),
95        }
96    }
97}
98
99/// Typed payload of an `io::Error` (kind [`ErrorKind::Interrupted`])
100/// emitted by `impl Read for Stream` when the underlying source could
101/// not satisfy the read this call. Both `SeekPending` and
102/// `NotReady`/`Retry` surface as `Interrupted` so demuxers (notably
103/// Symphonia's fragmented MP4 reader) treat the pause as a transient
104/// cooperative interruption and let `kithara-decode::is_seek_pending_io`
105/// classify the failure correctly — the previous `WouldBlock` mapping
106/// was treated as a hard "would block" by Symphonia's seek path and
107/// corrupted the demuxer cursor on partial reads. Carries the
108/// [`PendingReason`] verbatim plus a snapshot of source/timeline state
109/// at the wrap site, so callers downcasting from `io::Error` recover
110/// both *what* stalled and *why* without having to instrument their
111/// own decoder.
112#[derive(Debug, Clone, Copy)]
113pub struct StreamPending {
114    pub len: Option<u64>,
115    pub reason: PendingReason,
116    pub phase: SourcePhase,
117    pub flushing: bool,
118    pub variant_fence: bool,
119    pub epoch: u64,
120    pub pos: u64,
121    pub want: usize,
122}
123
124impl fmt::Display for StreamPending {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        write!(
127            f,
128            "{}: pos={} want={} len={:?} phase={:?} epoch={} flushing={} variant_fence={}",
129            self.reason,
130            self.pos,
131            self.want,
132            self.len,
133            self.phase,
134            self.epoch,
135            self.flushing,
136            self.variant_fence,
137        )
138    }
139}
140
141impl StdError for StreamPending {}
142
143/// Non-retriable cross-variant boundary signal — the typed payload of
144/// the `io::Error` produced by `impl Read for Stream` when the
145/// underlying source fenced on a variant change. Decoders that go
146/// through `std::io::Read` (Symphonia chain walker) downcast on this
147/// type to recover the precise classification without string-matching.
148#[derive(Debug, Clone, Copy)]
149pub struct VariantChangeError;
150
151impl fmt::Display for VariantChangeError {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        f.write_str("variant change: decoder recreation required")
154    }
155}
156
157impl StdError for VariantChangeError {}
158
159use crate::{
160    MediaInfo, SourcePhase, SourceSeekAnchor, Timeline,
161    error::{SourceError, StreamError},
162    source::{NotReadyCause, PendingReason, ReadOutcome, Source},
163};
164
165/// Defines a stream type and how to create it.
166///
167/// This trait is implemented by marker types (`Hls`, `File`) in their respective crates.
168/// The implementation provides the config type and source type.
169///
170/// On wasm32, `Send`/`Sync` bounds are relaxed via [`MaybeSend`]/[`MaybeSync`].
171pub trait StreamType: MaybeSend + 'static {
172    /// Configuration for this stream type.
173    type Config: Default + MaybeSend;
174
175    /// Event bus type carried by the stream config.
176    ///
177    /// Concrete stream types set this to `kithara_events::EventBus`.
178    /// `Audio::new()` constrains `T::Events = EventBus` to extract it.
179    type Events: Clone + MaybeSend + MaybeSync + 'static;
180
181    /// Source implementing `Source`.
182    type Source: Source;
183
184    /// Create the source from configuration.
185    ///
186    /// May also start background tasks (downloader) internally.
187    fn create(config: Self::Config) -> impl Future<Output = Result<Self::Source, SourceError>>;
188
189    /// Extract the event bus from config (if set).
190    ///
191    /// Used by `Audio::new()` to share a single bus across the stream
192    /// and the audio pipeline.
193    fn event_bus(config: &Self::Config) -> Option<Self::Events> {
194        let _ = config;
195        None
196    }
197}
198
199/// Generic audio stream with sync `Read + Seek`.
200///
201/// `T` is a marker type defining the stream source (`Hls`, `File`, etc.).
202/// Stream holds the source directly and implements `Read + Seek` by calling
203/// `Source::wait_range()` and `Source::read_at()`.
204pub struct Stream<T: StreamType> {
205    source: T::Source,
206}
207
208impl<T: StreamType> Stream<T> {
209    /// Create a new stream from configuration.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the underlying stream source cannot be created.
214    pub async fn new(config: T::Config) -> Result<Self, SourceError> {
215        let source = T::create(config).await?;
216        task::yield_now().await;
217        Ok(Self { source })
218    }
219
220    pub fn is_empty(&self) -> Option<bool> {
221        self.len().map(|len| len == 0)
222    }
223
224    /// Get current read position.
225    pub fn position(&self) -> u64 {
226        self.source.position()
227    }
228
229    /// Resolve a deterministic time-based seek anchor.
230    ///
231    /// Returns `None` for sources without segmented time mapping.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error when the source failed to resolve the anchor.
236    pub fn seek_time_anchor(
237        &mut self,
238        position: Duration,
239    ) -> Result<Option<SourceSeekAnchor>, io::Error> {
240        self.source
241            .seek_time_anchor(position)
242            .map_err(|e| IoError::other(e.to_string()))
243    }
244
245    /// Get shared reference to inner source.
246    pub fn source(&self) -> &T::Source {
247        &self.source
248    }
249
250    /// Get stream timeline.
251    pub fn timeline(&self) -> Timeline {
252        self.source.timeline()
253    }
254
255    delegate::delegate! {
256        to self.source {
257            /// Overall source readiness at current position.
258            pub fn phase(&self) -> SourcePhase;
259            /// Point-in-time readiness for a specific byte range.
260            pub fn phase_at(&self, range: Range<u64>) -> SourcePhase;
261            /// Get current media info if known.
262            pub fn media_info(&self) -> Option<MediaInfo>;
263            /// Runtime ABR handle — `Some` for adaptive sources (HLS).
264            pub fn abr_handle(&self) -> Option<kithara_abr::AbrHandle>;
265            /// Current variant metadata — `Some` for adaptive sources (HLS).
266            pub fn current_variant(&self) -> Option<kithara_events::VariantInfo>;
267            /// Get total length if known.
268            pub fn len(&self) -> Option<u64>;
269            /// Get current segment byte range (for segmented sources like HLS).
270            /// Transitional — removed in Plan 06.
271            pub fn current_segment_range(&self) -> Option<Range<u64>>;
272            /// Header byte range for decoder recreate after a format change.
273            /// Transitional — removed in Plan 06.
274            ///
275            /// # Errors
276            ///
277            /// `Err(SourceError::FormatChangeNotApplicable)` for non-HLS
278            /// sources or HLS variants activated with `served_from > 0`
279            /// (init prefix unreachable via Stream reads).
280            pub fn format_change_segment_range(&self) -> crate::error::StreamResult<Range<u64>>;
281            /// Clear variant fence, allowing reads from the next variant.
282            pub fn clear_variant_fence(&mut self);
283            /// `true` while a cross-variant fence keeps `read_at` / `wait_range`
284            /// short-circuited to `Pending(VariantChange)` / `Interrupted`.
285            pub fn has_variant_change_pending(&self) -> bool;
286            /// Set seek epoch for stale request invalidation.
287            pub fn set_seek_epoch(&mut self, seek_epoch: u64);
288            /// Wake any blocked `wait_range()` calls.
289            pub fn notify_waiting(&self);
290            /// Create a lock-free callback for waking blocked `wait_range()`.
291            pub fn make_notify_fn(&self) -> Option<Box<dyn Fn() + Send + Sync>>;
292            /// Commit the actual post-seek landing after `decoder.seek(...)`.
293            pub fn commit_seek_landing(&mut self, anchor: Option<SourceSeekAnchor>);
294            /// Build a fresh reader-side hooks instance from the inner source.
295            pub fn take_reader_hooks(&mut self) -> Option<crate::SharedHooks>;
296            /// Optional segment-layout handle for segment-aware decoders.
297            pub fn as_segment_layout(&self) -> Option<Arc<dyn crate::SegmentLayout>>;
298            /// Absolute byte-position set — used by [`Stream::seek`] callers
299            /// and audio FSM landings. Forwards to the source's atomic cursor.
300            pub fn set_position(&self, pos: u64);
301        }
302    }
303}
304
305impl<T: StreamType> Stream<T> {
306    /// Maximum `wait_range` retries before returning
307    /// `Pending(NotReady)` to the caller. Each retry takes
308    /// `WAIT_RANGE_TIMEOUT` (10ms), so 50 iterations ≈ 500ms.
309    /// Prevents the hang detector from firing when data is
310    /// legitimately not yet available (e.g. encrypted HLS startup).
311    const MAX_WAIT_SPINS: u32 = 50;
312
313    /// `Seek` calls `wait_range(new_pos..new_pos+1)` to prime metadata
314    /// (so `source.len()` answers before the seek-past-EOF check). A
315    /// hard cap is required because a broken downloader would otherwise
316    /// hang the seek forever; on the happy path local metadata resolves
317    /// well under this budget. Matches the read path's total budget
318    /// (`WAIT_RANGE_TIMEOUT * MAX_WAIT_SPINS`).
319    const SEEK_WAIT_TIMEOUT: Duration = Duration::from_millis(500);
320
321    /// Short timeout keeps the audio worker responsive for round-robin
322    /// between tracks. At 44100Hz stereo with 4096-sample chunks, one chunk
323    /// lasts ~46ms. A 10ms budget gives the worker time to serve other
324    /// tracks and still refill the ringbuf before the audio callback drains it.
325    const WAIT_RANGE_TIMEOUT: Duration = Duration::from_millis(10);
326
327    /// Typed read — returns a [`StreamReadOutcome`] discriminating
328    /// progress (`Bytes` with [`NonZeroUsize`]) from non-progress
329    /// (`Pending` with a typed [`PendingReason`]) and natural EOF.
330    /// Only genuine source I/O failures surface as
331    /// [`StreamReadError::Source`]. `impl Read for Stream` wraps this
332    /// outcome for `std::io::Read` consumers.
333    #[cfg_attr(feature = "perf", hotpath::measure)]
334    #[kithara::hang_watchdog]
335    pub fn try_read(&mut self, buf: &mut [u8]) -> Result<StreamReadOutcome, StreamReadError> {
336        if buf.is_empty() {
337            return Ok(StreamReadOutcome::Eof {
338                byte_position: self.source.position(),
339            });
340        }
341
342        let mut wait_spins = 0u32;
343
344        loop {
345            let timeline = self.source.timeline();
346            let read_epoch = timeline.seek_epoch();
347            let pos = self.source.position();
348            let range = pos..pos.saturating_add(buf.len() as u64);
349
350            // WHY: `wait_range` returns `Interrupted` when the source has a
351            if self.source.has_variant_change_pending() {
352                return Ok(StreamReadOutcome::Pending(PendingReason::VariantChange));
353            }
354
355            let wait_result = self
356                .source
357                .wait_range(range, Some(Self::WAIT_RANGE_TIMEOUT));
358            let wait_outcome = match wait_result {
359                Ok(outcome) => outcome,
360                Err(StreamError::Source(SourceError::WaitBudgetExceeded)) => {
361                    if timeline.is_flushing() || timeline.seek_epoch() != read_epoch {
362                        return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
363                    }
364                    wait_spins += 1;
365                    if wait_spins >= Self::MAX_WAIT_SPINS {
366                        return Ok(StreamReadOutcome::Pending(PendingReason::NotReady(
367                            NotReadyCause::WaitBudgetExhausted,
368                        )));
369                    }
370                    hang_tick!();
371                    yield_now();
372                    continue;
373                }
374                Err(e) => {
375                    return Err(StreamReadError::Source(IoError::other(e.to_string())));
376                }
377            };
378            match wait_outcome {
379                WaitOutcome::Ready => {}
380                WaitOutcome::Eof => {
381                    return Ok(StreamReadOutcome::Eof { byte_position: pos });
382                }
383                WaitOutcome::Interrupted => {
384                    if !timeline.is_flushing() {
385                        wait_spins += 1;
386                        if wait_spins >= Self::MAX_WAIT_SPINS {
387                            return Ok(StreamReadOutcome::Pending(PendingReason::NotReady(
388                                NotReadyCause::WaitInterrupted,
389                            )));
390                        }
391                        hang_tick!();
392                        yield_now();
393                        continue;
394                    }
395                    return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
396                }
397            }
398
399            wait_spins = 0;
400
401            if timeline.seek_epoch() != read_epoch {
402                return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
403            }
404
405            match self
406                .source
407                .read_at(pos, buf)
408                .map_err(|e| StreamReadError::Source(IoError::other(e.to_string())))?
409            {
410                ReadOutcome::Bytes(count) => {
411                    if timeline.seek_epoch() != read_epoch {
412                        return Ok(StreamReadOutcome::Pending(PendingReason::SeekPending));
413                    }
414                    hang_reset!();
415                    timeline.set_segment_position(pos);
416                    self.source.advance(count.get() as u64);
417                    let new_pos = self.source.position();
418                    return Ok(StreamReadOutcome::Bytes {
419                        count,
420                        byte_position: new_pos,
421                    });
422                }
423                ReadOutcome::Eof => {
424                    return Ok(StreamReadOutcome::Eof { byte_position: pos });
425                }
426                ReadOutcome::Pending(PendingReason::Retry) => {
427                    hang_tick!();
428                    yield_now();
429                    continue;
430                }
431                ReadOutcome::Pending(reason) => {
432                    return Ok(StreamReadOutcome::Pending(reason));
433                }
434            }
435        }
436    }
437}
438
439impl<T: StreamType> Read for Stream<T> {
440    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
441        match self.try_read(buf) {
442            Ok(StreamReadOutcome::Bytes { count, .. }) => Ok(count.get()),
443            Ok(StreamReadOutcome::Eof { .. }) => Ok(0),
444            Ok(StreamReadOutcome::Pending(reason @ PendingReason::SeekPending)) => {
445                Err(IoError::new(ErrorKind::Interrupted, reason))
446            }
447            Ok(StreamReadOutcome::Pending(
448                reason @ (PendingReason::NotReady(_) | PendingReason::Retry),
449            )) => Err(IoError::new(
450                ErrorKind::Interrupted,
451                self.snapshot_pending(reason, buf.len()),
452            )),
453            Ok(StreamReadOutcome::Pending(PendingReason::VariantChange)) => {
454                Err(IoError::other(VariantChangeError))
455            }
456            Err(StreamReadError::Source(e)) => Err(e),
457        }
458    }
459}
460
461impl<T: StreamType> Stream<T> {
462    /// Build a typed [`StreamPending`] payload for a
463    /// `Pending(NotReady|Retry)` surfaced through `impl Read`. Pulls
464    /// live source/timeline state at the moment of the wrap so the
465    /// resulting `io::Error` carries the real reason ("data not ready
466    /// (`wait_budget_exhausted`): pos=N len=M phase=… epoch=E flushing=…
467    /// `variant_fence`=…") instead of a bare "data not ready". Decoders
468    /// downcast on `StreamPending` to recover the typed [`PendingReason`].
469    fn snapshot_pending(&self, reason: PendingReason, want: usize) -> StreamPending {
470        let pos = self.source.position();
471        let len = self.source.len();
472        let phase = self.source.phase_at(pos..pos.saturating_add(want as u64));
473        let timeline = self.source.timeline();
474        StreamPending {
475            reason,
476            pos,
477            want,
478            len,
479            phase,
480            epoch: timeline.seek_epoch(),
481            flushing: timeline.is_flushing(),
482            variant_fence: self.source.has_variant_change_pending(),
483        }
484    }
485}
486
487impl<T: StreamType> Seek for Stream<T> {
488    #[cfg_attr(feature = "perf", hotpath::measure)]
489    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
490        let current = self.source.position();
491
492        let new_pos: i128 = match pos {
493            SeekFrom::Start(p) => i128::from(p),
494            SeekFrom::Current(delta) => i128::from(current).saturating_add(i128::from(delta)),
495            SeekFrom::End(delta) => {
496                if self.source.len().is_none() {
497                    let _ = self.source.wait_range(0..1, None);
498                }
499                let Some(len) = self.source.len() else {
500                    return Err(IoError::new(
501                        ErrorKind::Unsupported,
502                        "seek from end requires known length",
503                    ));
504                };
505                i128::from(len).saturating_add(i128::from(delta))
506            }
507        };
508
509        if new_pos < 0 {
510            return Err(IoError::new(
511                ErrorKind::InvalidInput,
512                "negative seek position",
513            ));
514        }
515
516        let new_pos = u64::try_from(new_pos).unwrap_or(u64::MAX);
517
518        let wait_range = match self.source.format_change_segment_range() {
519            Ok(range) if range.start == new_pos => range,
520            _ => new_pos..new_pos.saturating_add(1),
521        };
522        let _ = self
523            .source
524            .wait_range(wait_range, Some(Self::SEEK_WAIT_TIMEOUT));
525
526        if let Some(len) = self.source.len()
527            && new_pos > len
528        {
529            return Err(IoError::new(
530                ErrorKind::InvalidInput,
531                StreamSeekPastEof {
532                    new_pos,
533                    len,
534                    current_pos: current,
535                },
536            ));
537        }
538
539        self.source.set_position(new_pos);
540        Ok(new_pos)
541    }
542}
543
544#[cfg(test)]
545mod tests {
546    use std::{
547        collections::VecDeque,
548        sync::{
549            Arc,
550            atomic::{AtomicU64, Ordering},
551        },
552    };
553
554    use kithara_storage::WaitOutcome;
555    use kithara_test_utils::kithara;
556
557    use super::*;
558    use crate::{ReadOutcome, Source, SourcePhase};
559
560    /// Test helper — script entry that maps to either `Bytes(N)` (with
561    /// the source slicing actual `data`) or a terminal `Eof`. Pending
562    /// causes are exercised through the timeline (`initiate_seek`) and
563    /// the wait-outcome script, not the read script.
564    #[derive(Clone, Copy)]
565    enum ScriptRead {
566        Data(usize),
567        Eof,
568    }
569
570    fn bytes(count: usize) -> ReadOutcome {
571        let nz = NonZeroUsize::new(count)
572            .expect("BUG: ScriptSource::bytes invariant — count must be > 0");
573        ReadOutcome::Bytes(nz)
574    }
575
576    struct ScriptSource {
577        position: Arc<AtomicU64>,
578        anchor: Option<SourceSeekAnchor>,
579        timeline: Timeline,
580        data: Vec<u8>,
581        reads: VecDeque<ScriptRead>,
582        waits: VecDeque<WaitOutcome>,
583    }
584
585    impl ScriptSource {
586        fn new(
587            timeline: Timeline,
588            waits: impl IntoIterator<Item = WaitOutcome>,
589            reads: impl IntoIterator<Item = ScriptRead>,
590            data: Vec<u8>,
591        ) -> Self {
592            Self {
593                timeline,
594                data,
595                position: Arc::new(AtomicU64::new(0)),
596                anchor: None,
597                reads: reads.into_iter().collect(),
598                waits: waits.into_iter().collect(),
599            }
600        }
601    }
602
603    impl Source for ScriptSource {
604        fn advance(&self, n: u64) {
605            self.position.fetch_add(n, Ordering::AcqRel);
606        }
607
608        fn len(&self) -> Option<u64> {
609            Some(self.data.len() as u64)
610        }
611
612        fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
613            SourcePhase::Waiting
614        }
615
616        fn position(&self) -> u64 {
617            self.position.load(Ordering::Acquire)
618        }
619
620        fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> crate::StreamResult<ReadOutcome> {
621            let step = self.reads.pop_front().unwrap_or(ScriptRead::Eof);
622            match step {
623                ScriptRead::Eof => Ok(ReadOutcome::Eof),
624                ScriptRead::Data(n) => {
625                    let Ok(start) = usize::try_from(offset) else {
626                        return Ok(ReadOutcome::Eof);
627                    };
628                    let end = (start + n).min(self.data.len());
629                    let bytes_count = end.saturating_sub(start).min(buf.len());
630                    if bytes_count == 0 {
631                        return Ok(ReadOutcome::Eof);
632                    }
633                    buf[..bytes_count].copy_from_slice(&self.data[start..start + bytes_count]);
634                    Ok(bytes(bytes_count))
635                }
636            }
637        }
638
639        fn seek_time_anchor(
640            &mut self,
641            _position: Duration,
642        ) -> crate::StreamResult<Option<SourceSeekAnchor>> {
643            Ok(self.anchor)
644        }
645
646        fn set_position(&self, pos: u64) {
647            self.position.store(pos, Ordering::Release);
648        }
649
650        fn timeline(&self) -> Timeline {
651            self.timeline.clone()
652        }
653
654        fn wait_range(
655            &mut self,
656            _range: Range<u64>,
657            _timeout: Option<Duration>,
658        ) -> crate::StreamResult<WaitOutcome> {
659            Ok(self.waits.pop_front().unwrap_or(WaitOutcome::Ready))
660        }
661    }
662
663    struct DummyType;
664
665    impl StreamType for DummyType {
666        type Config = ();
667        type Events = ();
668        type Source = ScriptSource;
669
670        async fn create(_config: Self::Config) -> Result<Self::Source, SourceError> {
671            Err(SourceError::other(IoError::other("not used in unit tests")))
672        }
673    }
674
675    struct SeekDuringWaitType;
676
677    impl StreamType for SeekDuringWaitType {
678        type Config = ();
679        type Events = ();
680        type Source = SeekDuringWaitSource;
681
682        async fn create(_config: Self::Config) -> Result<Self::Source, SourceError> {
683            Err(SourceError::other(IoError::other("not used in unit tests")))
684        }
685    }
686
687    struct SeekDuringWaitSource {
688        position: Arc<AtomicU64>,
689        timeline: Timeline,
690        read_calls: usize,
691    }
692
693    impl Source for SeekDuringWaitSource {
694        fn advance(&self, n: u64) {
695            self.position.fetch_add(n, Ordering::AcqRel);
696        }
697
698        fn len(&self) -> Option<u64> {
699            Some(4)
700        }
701
702        fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
703            SourcePhase::Ready
704        }
705
706        fn position(&self) -> u64 {
707            self.position.load(Ordering::Acquire)
708        }
709
710        fn read_at(&mut self, _offset: u64, _buf: &mut [u8]) -> crate::StreamResult<ReadOutcome> {
711            self.read_calls += 1;
712            Ok(bytes(4))
713        }
714
715        fn set_position(&self, pos: u64) {
716            self.position.store(pos, Ordering::Release);
717        }
718
719        fn timeline(&self) -> Timeline {
720            self.timeline.clone()
721        }
722
723        fn wait_range(
724            &mut self,
725            _range: Range<u64>,
726            _timeout: Option<Duration>,
727        ) -> crate::StreamResult<WaitOutcome> {
728            let _ = self.timeline.initiate_seek(Duration::from_millis(10));
729            Ok(WaitOutcome::Ready)
730        }
731    }
732
733    #[kithara::test]
734    fn read_retries_interrupted_when_not_flushing() {
735        let timeline = Timeline::new();
736        let source = ScriptSource::new(
737            timeline.clone(),
738            [WaitOutcome::Interrupted, WaitOutcome::Ready],
739            [ScriptRead::Data(4)],
740            b"ABCD".to_vec(),
741        );
742        let mut stream = Stream::<DummyType> { source };
743        let mut buf = [0u8; 4];
744
745        let n = stream
746            .read(&mut buf)
747            .expect("BUG: read must succeed after the explicit retry in this test scenario");
748        assert_eq!(n, 4);
749        assert_eq!(&buf, b"ABCD");
750    }
751
752    #[kithara::test]
753    fn try_read_returns_seek_pending_when_flushing() {
754        let timeline = Timeline::new();
755        let _ = timeline.initiate_seek(Duration::from_millis(10));
756        let source = ScriptSource::new(timeline.clone(), [WaitOutcome::Interrupted], [], vec![]);
757        let mut stream = Stream::<DummyType> { source };
758        let mut buf = [0u8; 4];
759
760        let outcome = stream
761            .try_read(&mut buf)
762            .expect("BUG: seek-pending is a status return; not a hard error in this test");
763        assert!(matches!(
764            outcome,
765            StreamReadOutcome::Pending(PendingReason::SeekPending)
766        ));
767    }
768
769    #[kithara::test]
770    fn try_read_returns_seek_pending_when_epoch_changes_after_wait() {
771        let timeline = Timeline::new();
772        let source = SeekDuringWaitSource {
773            timeline: timeline.clone(),
774            position: Arc::new(AtomicU64::new(0)),
775            read_calls: 0,
776        };
777        let mut stream = Stream::<SeekDuringWaitType> { source };
778        let mut buf = [0u8; 4];
779
780        let outcome = stream
781            .try_read(&mut buf)
782            .expect("BUG: seek-pending is a status return; not a hard error in this test");
783
784        assert!(matches!(
785            outcome,
786            StreamReadOutcome::Pending(PendingReason::SeekPending)
787        ));
788        assert_eq!(stream.source.read_calls, 0);
789        assert_eq!(stream.position(), 0);
790    }
791
792    #[kithara::test]
793    fn seek_updates_position() {
794        let timeline = Timeline::new();
795        let source = ScriptSource::new(timeline.clone(), [], [], b"ABCDE".to_vec());
796        let mut stream = Stream::<DummyType> { source };
797
798        let pos = stream
799            .seek(SeekFrom::Start(3))
800            .expect("BUG: seek to a position within the test stream must succeed");
801
802        assert_eq!(pos, 3);
803        assert_eq!(stream.position(), 3);
804    }
805
806    #[kithara::test]
807    fn seek_time_anchor_does_not_move_position() {
808        let timeline = Timeline::new();
809        let mut source = ScriptSource::new(timeline.clone(), [], [], b"ABCDE".to_vec());
810        source.set_position(11);
811        source.anchor = Some(SourceSeekAnchor {
812            byte_offset: 3,
813            segment_start: Duration::from_secs(8),
814            segment_end: Some(Duration::from_secs(12)),
815            segment_index: Some(2),
816            variant_index: Some(1),
817        });
818        let mut stream = Stream::<DummyType> { source };
819
820        let anchor = stream
821            .seek_time_anchor(Duration::from_millis(8_500))
822            .expect("BUG: anchor resolution must succeed for the constructed test stream")
823            .expect("BUG: stream must return the resolved anchor in this test");
824
825        assert_eq!(anchor.byte_offset, 3);
826        assert_eq!(
827            stream.position(),
828            11,
829            "anchor resolution must not eagerly commit stream position"
830        );
831    }
832}