Skip to main content

kithara_stream/
source.rs

1#![forbid(unsafe_code)]
2
3use std::{error::Error as StdError, fmt, num::NonZeroUsize, ops::Range, sync::Arc};
4
5use kithara_events::VariantInfo;
6use kithara_platform::{MaybeSend, MaybeSync, time::Duration};
7use kithara_storage::WaitOutcome;
8use kithara_test_utils::kithara;
9
10use crate::{
11    Timeline,
12    error::{SourceError, StreamError, StreamResult},
13    media::MediaInfo,
14};
15
16/// Per-segment metadata exposed by segmented sources (HLS).
17#[derive(Clone, Debug, PartialEq, Eq)]
18#[non_exhaustive]
19pub struct SegmentDescriptor {
20    /// Absolute decode time at the start of this segment (cumulative
21    /// EXTINF over preceding segments).
22    pub decode_time: Duration,
23    /// Segment duration (EXTINF).
24    pub duration: Duration,
25    /// Byte range in the source's virtual stream.
26    pub byte_range: Range<u64>,
27    /// Segment index within the variant.
28    pub segment_index: u32,
29    /// Variant the descriptor was resolved against.
30    pub variant_index: usize,
31}
32
33impl SegmentDescriptor {
34    #[must_use]
35    pub fn new(
36        byte_range: Range<u64>,
37        decode_time: Duration,
38        duration: Duration,
39        segment_index: u32,
40        variant_index: usize,
41    ) -> Self {
42        Self {
43            decode_time,
44            duration,
45            byte_range,
46            segment_index,
47            variant_index,
48        }
49    }
50}
51
52/// Phase of a source's wait/read lifecycle.
53///
54/// Each `Source` implementation returns the current phase from its
55/// `phase()` method — a point-in-time snapshot for external observers
56/// (audio pipeline, tracing, UI).
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58#[non_exhaustive]
59pub enum SourcePhase {
60    /// Cancelled — terminal, source will not produce more data.
61    Cancelled,
62    /// End of stream reached.
63    Eof,
64    /// Requested range is available for non-blocking read.
65    Ready,
66    /// Active seek in progress — decoder should be interrupted.
67    Seeking,
68    /// Default: data not yet available, no specific sub-state.
69    #[default]
70    Waiting,
71    /// On-demand request already in flight for this seek epoch.
72    WaitingDemand,
73    /// Metadata lookup needed before data can be requested.
74    WaitingMetadata,
75}
76
77/// Reason a [`ReadOutcome::Pending`] was returned — i.e. why the source
78/// did not make progress this call. Each variant maps to a distinct
79/// caller action; there is no overlap and no string-matching required.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81#[non_exhaustive]
82pub enum PendingReason {
83    /// A seek is pending (consumer flagged the timeline). The caller
84    /// must abort the current read and let the seek apply — do **not**
85    /// retry from the same byte offset.
86    SeekPending,
87    /// Data is not yet available at the requested range. Transient —
88    /// caller may retry after backoff. The inner [`NotReadyCause`] tells
89    /// which point in the read pipeline failed to make progress (wait
90    /// budget exhausted, wait interrupted, source-side pending).
91    NotReady(NotReadyCause),
92    /// Source crossed a variant boundary at this offset. Caller must
93    /// recreate the decoder and call
94    /// [`Source::clear_variant_fence`] before reads succeed. Zero bytes
95    /// were touched — the fence fires BEFORE any data is read.
96    VariantChange,
97    /// Resource was evicted between [`Source::wait_range`] (metadata
98    /// ready) and [`Source::read_at`] (actual I/O). Caller should
99    /// retry from `wait_range`, not from the same byte offset.
100    Retry,
101}
102
103/// Concrete cause for a [`PendingReason::NotReady`].
104///
105/// Carried as the typed payload of `NotReady` so the `io::Error` that
106/// `impl Read for Stream` produces names the real stall site without
107/// requiring decoder-side instrumentation.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109#[non_exhaustive]
110pub enum NotReadyCause {
111    /// `wait_range` returned `WaitBudgetExceeded` for `MAX_WAIT_SPINS`
112    /// iterations — the source kept signalling "not yet" past the read
113    /// budget. Typical when a fetch is slower than the read deadline.
114    WaitBudgetExhausted,
115    /// `wait_range` returned `Interrupted` without an active flush, also
116    /// past the spin budget — the downloader woke us but range still
117    /// wasn't satisfied. Typical sign of a flapping ABR/eviction race.
118    WaitInterrupted,
119    /// `wait_range` reported ready but `read_at` then returned `Pending`
120    /// with a non-`Retry` reason — surfaced verbatim from the source.
121    SourcePending,
122}
123
124impl fmt::Display for NotReadyCause {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.write_str(match self {
127            Self::WaitBudgetExhausted => "wait budget exhausted",
128            Self::WaitInterrupted => "wait interrupted, no flush",
129            Self::SourcePending => "source returned pending after wait ready",
130        })
131    }
132}
133
134impl fmt::Display for PendingReason {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        match self {
137            Self::SeekPending => f.write_str("seek pending"),
138            Self::NotReady(cause) => write!(f, "data not ready ({cause})"),
139            Self::VariantChange => f.write_str("variant change: decoder recreation required"),
140            Self::Retry => f.write_str("resource evicted, retry wait_range"),
141        }
142    }
143}
144
145impl StdError for PendingReason {}
146
147/// Outcome of a [`Source::read_at`] call.
148///
149/// Each variant has distinct caller semantics — there is no
150/// overload of a numeric zero. `Bytes` carries a typed
151/// [`NonZeroUsize`] so the type system guarantees forward progress;
152/// `Pending` carries an explicit [`PendingReason`]; `Eof` is terminal.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum ReadOutcome {
155    /// Source produced `count` bytes (`count > 0` by construction).
156    Bytes(NonZeroUsize),
157    /// Source did not make progress this call. See [`PendingReason`]
158    /// for the precise cause and required caller action.
159    Pending(PendingReason),
160    /// Natural end of stream — no more bytes will ever come from this
161    /// source at this offset.
162    Eof,
163}
164
165/// Time-first seek anchor resolved by a segmented source.
166///
167/// Represents a deterministic mapping from target playback time to a byte
168/// position and segment context inside the source.
169#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, bon::Builder)]
170#[non_exhaustive]
171pub struct SourceSeekAnchor {
172    #[builder(default)]
173    pub segment_start: Duration,
174    pub segment_end: Option<Duration>,
175    pub segment_index: Option<u32>,
176    pub variant_index: Option<usize>,
177    #[builder(default)]
178    pub byte_offset: u64,
179}
180
181/// Sync random-access source.
182///
183/// Provides sync interface for waiting and reading data at arbitrary offsets.
184/// Reader wraps this directly to provide `Read + Seek`.
185///
186/// Methods take `&mut self` to allow sources to maintain internal state
187/// (e.g., progress tracking, segment index updates).
188#[kithara::mock(api = SourceMock)]
189pub trait Source: MaybeSend + MaybeSync + 'static {
190    /// Current ABR handle for runtime mode/bandwidth control.
191    ///
192    /// Adaptive sources (HLS) return the peer's `AbrHandle` so callers —
193    /// queue, FFI, UI — can switch variant or cap bandwidth mid-playback.
194    /// Non-adaptive sources (File) keep the default `None`.
195    fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
196        None
197    }
198
199    /// Advance the byte cursor by `n` bytes after a successful read.
200    fn advance(&self, n: u64);
201
202    /// Optional shared segment-layout handle for segment-aware decoders.
203    ///
204    /// Segment-aware decoders (fMP4 segment demuxer) call this once at
205    /// open to grab a lock-free, Arc-shareable view over the segment
206    /// table — independent of the byte cursor passed to the decoder
207    /// through `Read + Seek`. Default `None` for non-segmented sources.
208    fn as_segment_layout(&self) -> Option<Arc<dyn SegmentLayout>> {
209        None
210    }
211
212    /// Clear variant fence, allowing reads from the next variant.
213    ///
214    /// Called when the decoder is recreated after ABR switch.
215    /// Default no-op for non-HLS sources.
216    fn clear_variant_fence(&mut self) {}
217
218    /// Commit the actual post-seek landing after `decoder.seek(...)`.
219    ///
220    /// Segmented sources can use this hook to reconcile source-local state
221    /// with the authoritative landed reader position in [`Timeline`].
222    ///
223    /// Default no-op for sources that do not need post-seek reconciliation.
224    fn commit_seek_landing(&mut self, _anchor: Option<SourceSeekAnchor>) {}
225
226    /// Current segment byte range (HLS-only).
227    ///
228    /// Transitional — removed in Plan 06 once the audio FSM consumes
229    /// segment boundaries through [`SegmentLayout`].
230    fn current_segment_range(&self) -> Option<Range<u64>> {
231        None
232    }
233
234    /// Current variant's full metadata. Adaptive sources (HLS) return
235    /// the live `VariantInfo` for the active variant — pulled from the
236    /// peer on every call so the UI never sees a stale label. Non-adaptive
237    /// sources keep the default `None`.
238    fn current_variant(&self) -> Option<VariantInfo> {
239        None
240    }
241
242    /// Byte range of the header (init segment or first served segment)
243    /// the decoder must read to re-establish container state after a
244    /// format change (HLS ABR cross-codec switch).
245    ///
246    /// Returns `Ok(range)` — header byte range that `apply_format_change`
247    /// seeks to and the decoder factory's probe reads.
248    ///
249    /// # Errors
250    ///
251    /// `Err(SourceError::FormatChangeNotApplicable)` — source has no
252    /// HLS-style format-change recovery (file source — default impl) or
253    /// the active HLS variant was activated with `served_from > 0` so
254    /// the init prefix lives outside the served virtual byte range.
255    /// Callers should fall back to a non-init recovery anchor (e.g.
256    /// the current segment boundary).
257    ///
258    /// Transitional — removed in Plan 06.
259    fn format_change_segment_range(&self) -> StreamResult<Range<u64>> {
260        Err(StreamError::Source(SourceError::FormatChangeNotApplicable))
261    }
262
263    /// `true` if a cross-variant transition is in-flight and `read_at` /
264    /// `wait_range` are short-circuited to `Pending(VariantChange)` /
265    /// `Interrupted` until the decoder acks the switch via
266    /// `clear_variant_fence` (HLS) or equivalent.
267    ///
268    /// Sources without a variant fence keep the default `false`. Used by
269    /// the audio decode loop to break out of `Ok(Pending(_))` retry spin
270    /// when Symphonia / other demuxers absorb the underlying
271    /// `VariantChangeError` and surface only an opaque pending — without
272    /// this polled check the loop would yield forever while the fence
273    /// stays closed waiting for a recreate that never starts.
274    fn has_variant_change_pending(&self) -> bool {
275        false
276    }
277
278    /// Whether the source currently reports zero bytes. Default mirrors
279    /// `self.len()` returning `0` (or being unknown — both are treated as
280    /// "no readable bytes yet" for the conventional `len`/`is_empty` pair).
281    fn is_empty(&self) -> bool {
282        self.len().is_none_or(|n| n == 0)
283    }
284
285    /// Total length if known.
286    ///
287    /// Streaming sources may block briefly until the HTTP response headers
288    /// arrive (Content-Length discovery).
289    fn len(&self) -> Option<u64>;
290
291    /// Create a callback that wakes blocked `wait_range()` without holding
292    /// the `SharedStream` mutex.
293    ///
294    /// The returned closure captures only the underlying condvar/notify
295    /// primitive, so calling it from the main thread cannot deadlock even
296    /// when the worker thread holds the `SharedStream` lock inside `read()`.
297    ///
298    /// Default returns `None` (no blocking waits to wake).
299    fn make_notify_fn(&self) -> Option<Box<dyn Fn() + Send + Sync>> {
300        None
301    }
302
303    /// Get media info if available.
304    fn media_info(&self) -> Option<MediaInfo> {
305        None
306    }
307
308    /// Wake any blocked `wait_range()` calls.
309    ///
310    /// Called after `Timeline::initiate_seek()` to ensure immediate response
311    /// from threads sleeping on condvars. Default no-op for sources without
312    /// blocking waits.
313    fn notify_waiting(&self) {}
314
315    /// Overall source readiness at the current timeline position.
316    ///
317    /// Uses the source's internal knowledge of chunk/segment boundaries
318    /// to determine if the next read operation can proceed without blocking.
319    ///
320    /// Unlike `phase_at(range)` which checks a specific byte range,
321    /// this method lets the source decide the appropriate granularity.
322    ///
323    /// Default checks a single byte at the current position.
324    /// HLS overrides with segment-aware logic, File with 32KB-window logic.
325    fn phase(&self) -> SourcePhase {
326        let pos = self.position();
327        self.phase_at(pos..pos.saturating_add(1))
328    }
329
330    /// Point-in-time snapshot of the source phase for the given range.
331    ///
332    /// Returns the current [`SourcePhase`] without blocking. Used internally
333    /// by `wait_range()` implementations for fast-path dispatch.
334    fn phase_at(&self, range: Range<u64>) -> SourcePhase;
335
336    /// Current byte position in the source's virtual byte space.
337    ///
338    /// HLS delegates to active variant; file owns its own atomic cursor.
339    fn position(&self) -> u64;
340
341    /// Read data at offset into buffer.
342    ///
343    /// Returns [`ReadOutcome::Bytes`] with a non-zero byte count on
344    /// progress, [`ReadOutcome::Pending`] with a typed
345    /// [`PendingReason`] when no progress is possible this call (seek
346    /// pending, variant fence, eviction), or [`ReadOutcome::Eof`] at
347    /// natural end-of-stream.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if the read fails or the source is in an invalid state.
352    fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> StreamResult<ReadOutcome>;
353
354    /// Resolve `position` to a source-specific seek anchor.
355    ///
356    /// Segmented sources (HLS) should map time to a deterministic segment
357    /// boundary and byte offset. Non-segmented sources return `Ok(None)`.
358    ///
359    /// The caller is expected to set stream position to `byte_offset` and
360    /// perform decoder reset/recreation using this anchor.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error when the source cannot resolve the anchor.
365    fn seek_time_anchor(&mut self, _position: Duration) -> StreamResult<Option<SourceSeekAnchor>> {
366        Ok(None)
367    }
368
369    /// Absolute set of the byte cursor — used by [`Stream::seek`] and
370    /// post-seek landings. Sources implement this via the same atomic
371    /// cursor that backs [`Self::position`] / [`Self::advance`].
372    fn set_position(&self, pos: u64);
373
374    /// Set current seek epoch for stale request invalidation.
375    ///
376    /// HLS uses this to drop in-flight network/segment requests that belong
377    /// to previous seeks. Non-seek-aware sources keep the default no-op.
378    fn set_seek_epoch(&mut self, _seek_epoch: u64) {}
379
380    /// Build a fresh reader-side hooks instance.
381    ///
382    /// Returned by Source-impls that want to expose reader-side events
383    /// (`HlsSource`, `FileSource`). The audio pipeline takes the hook
384    /// at decoder creation/recreation time and threads it into the
385    /// `HookedDecoder` wrapper. Default `None` keeps mock and test
386    /// sources unhooked.
387    ///
388    /// `take_*` is a misnomer: each call must return a **fresh** hook
389    /// instance, because decoder recreation (ABR / format change)
390    /// rebuilds the wrapper and the new hook needs a clean state
391    /// cursor.
392    fn take_reader_hooks(&mut self) -> Option<crate::SharedHooks> {
393        None
394    }
395
396    /// Get shared playback timeline.
397    ///
398    /// Timeline is the single source of truth for playback state across all
399    /// stream types (segmented and non-segmented). Sources own their
400    /// Timeline and hand out cheap Arc clones to downstream consumers
401    /// (reader, audio FSM, Downloader peers).
402    fn timeline(&self) -> Timeline;
403
404    /// Wait for data in range to be available.
405    ///
406    /// `timeout` is the maximum wait time before returning an
407    /// implementation-defined non-ready outcome (typically a typed
408    /// "budget exceeded" error). Pass `None` to wait until the range
409    /// is ready or the source's internal cancel signal fires — used
410    /// for [`Stream::seek`](crate::Stream::seek), where giving up on
411    /// a timer would silently drop the seek under slow connections.
412    /// `Some(WAIT_RANGE_TIMEOUT)` is the cooperative-yield path used
413    /// by the audio worker's read loop.
414    ///
415    /// # Errors
416    ///
417    /// Returns an error if the wait is cancelled or the underlying storage fails.
418    fn wait_range(
419        &mut self,
420        range: Range<u64>,
421        timeout: Option<Duration>,
422    ) -> StreamResult<WaitOutcome>;
423}
424
425/// Segment-table view exposed by segmented sources (HLS, fragmented
426/// file-mp4).
427///
428/// Carries the segment metadata that segment-aware decoders need to
429/// route reads — `init_segment_range` (ftyp+moov / `EXT-X-MAP`),
430/// `segment_at_time`, `segment_after_byte`, `segment_count`, and total
431/// `len`. Has no I/O surface: the byte cursor is the decoder's
432/// `Read + Seek` handle, queried independently. Sources that aren't
433/// segment-aware return `None` from [`Source::as_segment_layout`].
434pub trait SegmentLayout: Send + Sync + 'static {
435    /// Init segment range (e.g. ftyp+moov from `EXT-X-MAP`) for the
436    /// current layout variant. Returns an **empty** range (`0..0`) when
437    /// the layout has no init segment (raw TS/AAC/MPEG-ES) or when the
438    /// active variant has not yet announced one. Callers that require an
439    /// init must check `Range::is_empty()` — distinguishing "no init"
440    /// from "init at offset 0..0" is unsupported because every init we
441    /// emit is non-empty by construction.
442    fn init_segment_range(&self) -> Range<u64>;
443
444    /// Whether the layout currently reports zero bytes. `len()` is `Option`
445    /// because some segmented sources do not know their total upfront, so
446    /// emptiness defaults to "len is `None` or `Some(0)`".
447    fn is_empty(&self) -> bool {
448        self.len().is_none_or(|n| n == 0)
449    }
450
451    /// Total byte length across all segments. Used to compute total
452    /// duration when the source can't provide a direct value.
453    fn len(&self) -> Option<u64>;
454
455    /// Next segment whose byte range starts at or after `byte_offset`.
456    /// Used for sequential play after the current segment is consumed.
457    fn segment_after_byte(&self, byte_offset: u64) -> Option<SegmentDescriptor>;
458
459    /// Segment whose `byte_range` covers `byte_offset`. Default `None`
460    /// keeps non-segmented sources transparent.
461    fn segment_at_byte(&self, _byte_offset: u64) -> Option<SegmentDescriptor> {
462        None
463    }
464
465    /// Descriptor for the segment at `segment_index` in the current
466    /// layout variant. Used by demuxers to re-resolve a cursor's
467    /// `byte_range` against the live layout — without this, a DRM
468    /// post-decrypt size shrink (PKCS7 padding stripped) between cursor
469    /// setup and the actual read leaves `state.range` pointing past
470    /// the segment's real end and `HlsSource::read_at` splices bytes
471    /// from the next segment onto the buffer's tail. Returns `None`
472    /// for non-segmented sources or for indices outside the current
473    /// layout's range.
474    fn segment_at_index(&self, _segment_index: u32) -> Option<SegmentDescriptor> {
475        None
476    }
477
478    /// Locate the segment whose `[decode_time, decode_time + duration)`
479    /// covers `t`. Resolves against the source's *current layout
480    /// variant* — same variant `init_segment_range` describes.
481    fn segment_at_time(&self, t: Duration) -> Option<SegmentDescriptor>;
482
483    /// Total number of segments in the current layout variant.
484    fn segment_count(&self) -> Option<u32>;
485}
486
487#[cfg(test)]
488mod tests {
489    use kithara_test_utils::kithara;
490
491    use super::*;
492
493    #[kithara::test]
494    fn test_source_trait_object_safety() {
495        fn _accepts_source<S: Source>(_s: S) {}
496    }
497
498    #[kithara::test]
499    fn source_phase_defaults_to_waiting() {
500        assert_eq!(SourcePhase::default(), SourcePhase::Waiting);
501    }
502
503    #[kithara::test]
504    fn phase_default_delegates_to_phase_at() {
505        use std::sync::atomic::{AtomicU64, Ordering};
506
507        struct ReadySource {
508            timeline: Timeline,
509            position: Arc<AtomicU64>,
510        }
511        impl Source for ReadySource {
512            fn timeline(&self) -> Timeline {
513                self.timeline.clone()
514            }
515            fn wait_range(
516                &mut self,
517                _range: Range<u64>,
518                _timeout: Option<Duration>,
519            ) -> StreamResult<WaitOutcome> {
520                Ok(WaitOutcome::Ready)
521            }
522            fn read_at(&mut self, _offset: u64, _buf: &mut [u8]) -> StreamResult<ReadOutcome> {
523                Ok(ReadOutcome::Eof)
524            }
525            fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
526                SourcePhase::Ready
527            }
528            fn len(&self) -> Option<u64> {
529                Some(100)
530            }
531            fn position(&self) -> u64 {
532                self.position.load(Ordering::Acquire)
533            }
534            fn advance(&self, n: u64) {
535                self.position.fetch_add(n, Ordering::AcqRel);
536            }
537            fn set_position(&self, pos: u64) {
538                self.position.store(pos, Ordering::Release);
539            }
540        }
541        let source = ReadySource {
542            timeline: Timeline::new(),
543            position: Arc::new(AtomicU64::new(0)),
544        };
545        assert_eq!(source.phase(), SourcePhase::Ready);
546    }
547}