kithara_stream/source.rs
1#![forbid(unsafe_code)]
2
3use std::{error::Error as StdError, fmt, num::NonZeroUsize, ops::Range, sync::Arc};
4
5use kithara_events::VariantInfo;
6use kithara_platform::{MaybeSend, MaybeSync, time::Duration};
7use kithara_storage::WaitOutcome;
8use kithara_test_utils::kithara;
9
10use crate::{
11 Timeline,
12 error::{SourceError, StreamError, StreamResult},
13 media::MediaInfo,
14};
15
16/// Per-segment metadata exposed by segmented sources (HLS).
17#[derive(Clone, Debug, PartialEq, Eq)]
18#[non_exhaustive]
19pub struct SegmentDescriptor {
20 /// Absolute decode time at the start of this segment (cumulative
21 /// EXTINF over preceding segments).
22 pub decode_time: Duration,
23 /// Segment duration (EXTINF).
24 pub duration: Duration,
25 /// Byte range in the source's virtual stream.
26 pub byte_range: Range<u64>,
27 /// Segment index within the variant.
28 pub segment_index: u32,
29 /// Variant the descriptor was resolved against.
30 pub variant_index: usize,
31}
32
33impl SegmentDescriptor {
34 #[must_use]
35 pub fn new(
36 byte_range: Range<u64>,
37 decode_time: Duration,
38 duration: Duration,
39 segment_index: u32,
40 variant_index: usize,
41 ) -> Self {
42 Self {
43 decode_time,
44 duration,
45 byte_range,
46 segment_index,
47 variant_index,
48 }
49 }
50}
51
52/// Phase of a source's wait/read lifecycle.
53///
54/// Each `Source` implementation returns the current phase from its
55/// `phase()` method — a point-in-time snapshot for external observers
56/// (audio pipeline, tracing, UI).
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
58#[non_exhaustive]
59pub enum SourcePhase {
60 /// Cancelled — terminal, source will not produce more data.
61 Cancelled,
62 /// End of stream reached.
63 Eof,
64 /// Requested range is available for non-blocking read.
65 Ready,
66 /// Active seek in progress — decoder should be interrupted.
67 Seeking,
68 /// Default: data not yet available, no specific sub-state.
69 #[default]
70 Waiting,
71 /// On-demand request already in flight for this seek epoch.
72 WaitingDemand,
73 /// Metadata lookup needed before data can be requested.
74 WaitingMetadata,
75}
76
77/// Reason a [`ReadOutcome::Pending`] was returned — i.e. why the source
78/// did not make progress this call. Each variant maps to a distinct
79/// caller action; there is no overlap and no string-matching required.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81#[non_exhaustive]
82pub enum PendingReason {
83 /// A seek is pending (consumer flagged the timeline). The caller
84 /// must abort the current read and let the seek apply — do **not**
85 /// retry from the same byte offset.
86 SeekPending,
87 /// Data is not yet available at the requested range. Transient —
88 /// caller may retry after backoff. The inner [`NotReadyCause`] tells
89 /// which point in the read pipeline failed to make progress (wait
90 /// budget exhausted, wait interrupted, source-side pending).
91 NotReady(NotReadyCause),
92 /// Source crossed a variant boundary at this offset. Caller must
93 /// recreate the decoder and call
94 /// [`Source::clear_variant_fence`] before reads succeed. Zero bytes
95 /// were touched — the fence fires BEFORE any data is read.
96 VariantChange,
97 /// Resource was evicted between [`Source::wait_range`] (metadata
98 /// ready) and [`Source::read_at`] (actual I/O). Caller should
99 /// retry from `wait_range`, not from the same byte offset.
100 Retry,
101}
102
103/// Concrete cause for a [`PendingReason::NotReady`].
104///
105/// Carried as the typed payload of `NotReady` so the `io::Error` that
106/// `impl Read for Stream` produces names the real stall site without
107/// requiring decoder-side instrumentation.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109#[non_exhaustive]
110pub enum NotReadyCause {
111 /// `wait_range` returned `WaitBudgetExceeded` for `MAX_WAIT_SPINS`
112 /// iterations — the source kept signalling "not yet" past the read
113 /// budget. Typical when a fetch is slower than the read deadline.
114 WaitBudgetExhausted,
115 /// `wait_range` returned `Interrupted` without an active flush, also
116 /// past the spin budget — the downloader woke us but range still
117 /// wasn't satisfied. Typical sign of a flapping ABR/eviction race.
118 WaitInterrupted,
119 /// `wait_range` reported ready but `read_at` then returned `Pending`
120 /// with a non-`Retry` reason — surfaced verbatim from the source.
121 SourcePending,
122}
123
124impl fmt::Display for NotReadyCause {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.write_str(match self {
127 Self::WaitBudgetExhausted => "wait budget exhausted",
128 Self::WaitInterrupted => "wait interrupted, no flush",
129 Self::SourcePending => "source returned pending after wait ready",
130 })
131 }
132}
133
134impl fmt::Display for PendingReason {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 match self {
137 Self::SeekPending => f.write_str("seek pending"),
138 Self::NotReady(cause) => write!(f, "data not ready ({cause})"),
139 Self::VariantChange => f.write_str("variant change: decoder recreation required"),
140 Self::Retry => f.write_str("resource evicted, retry wait_range"),
141 }
142 }
143}
144
145impl StdError for PendingReason {}
146
147/// Outcome of a [`Source::read_at`] call.
148///
149/// Each variant has distinct caller semantics — there is no
150/// overload of a numeric zero. `Bytes` carries a typed
151/// [`NonZeroUsize`] so the type system guarantees forward progress;
152/// `Pending` carries an explicit [`PendingReason`]; `Eof` is terminal.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum ReadOutcome {
155 /// Source produced `count` bytes (`count > 0` by construction).
156 Bytes(NonZeroUsize),
157 /// Source did not make progress this call. See [`PendingReason`]
158 /// for the precise cause and required caller action.
159 Pending(PendingReason),
160 /// Natural end of stream — no more bytes will ever come from this
161 /// source at this offset.
162 Eof,
163}
164
165/// Time-first seek anchor resolved by a segmented source.
166///
167/// Represents a deterministic mapping from target playback time to a byte
168/// position and segment context inside the source.
169#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, bon::Builder)]
170#[non_exhaustive]
171pub struct SourceSeekAnchor {
172 #[builder(default)]
173 pub segment_start: Duration,
174 pub segment_end: Option<Duration>,
175 pub segment_index: Option<u32>,
176 pub variant_index: Option<usize>,
177 #[builder(default)]
178 pub byte_offset: u64,
179}
180
181/// Sync random-access source.
182///
183/// Provides sync interface for waiting and reading data at arbitrary offsets.
184/// Reader wraps this directly to provide `Read + Seek`.
185///
186/// Methods take `&mut self` to allow sources to maintain internal state
187/// (e.g., progress tracking, segment index updates).
188#[kithara::mock(api = SourceMock)]
189pub trait Source: MaybeSend + MaybeSync + 'static {
190 /// Current ABR handle for runtime mode/bandwidth control.
191 ///
192 /// Adaptive sources (HLS) return the peer's `AbrHandle` so callers —
193 /// queue, FFI, UI — can switch variant or cap bandwidth mid-playback.
194 /// Non-adaptive sources (File) keep the default `None`.
195 fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
196 None
197 }
198
199 /// Advance the byte cursor by `n` bytes after a successful read.
200 fn advance(&self, n: u64);
201
202 /// Optional shared segment-layout handle for segment-aware decoders.
203 ///
204 /// Segment-aware decoders (fMP4 segment demuxer) call this once at
205 /// open to grab a lock-free, Arc-shareable view over the segment
206 /// table — independent of the byte cursor passed to the decoder
207 /// through `Read + Seek`. Default `None` for non-segmented sources.
208 fn as_segment_layout(&self) -> Option<Arc<dyn SegmentLayout>> {
209 None
210 }
211
212 /// Clear variant fence, allowing reads from the next variant.
213 ///
214 /// Called when the decoder is recreated after ABR switch.
215 /// Default no-op for non-HLS sources.
216 fn clear_variant_fence(&mut self) {}
217
218 /// Commit the actual post-seek landing after `decoder.seek(...)`.
219 ///
220 /// Segmented sources can use this hook to reconcile source-local state
221 /// with the authoritative landed reader position in [`Timeline`].
222 ///
223 /// Default no-op for sources that do not need post-seek reconciliation.
224 fn commit_seek_landing(&mut self, _anchor: Option<SourceSeekAnchor>) {}
225
226 /// Current segment byte range (HLS-only).
227 ///
228 /// Transitional — removed in Plan 06 once the audio FSM consumes
229 /// segment boundaries through [`SegmentLayout`].
230 fn current_segment_range(&self) -> Option<Range<u64>> {
231 None
232 }
233
234 /// Current variant's full metadata. Adaptive sources (HLS) return
235 /// the live `VariantInfo` for the active variant — pulled from the
236 /// peer on every call so the UI never sees a stale label. Non-adaptive
237 /// sources keep the default `None`.
238 fn current_variant(&self) -> Option<VariantInfo> {
239 None
240 }
241
242 /// Byte range of the header (init segment or first served segment)
243 /// the decoder must read to re-establish container state after a
244 /// format change (HLS ABR cross-codec switch).
245 ///
246 /// Returns `Ok(range)` — header byte range that `apply_format_change`
247 /// seeks to and the decoder factory's probe reads.
248 ///
249 /// # Errors
250 ///
251 /// `Err(SourceError::FormatChangeNotApplicable)` — source has no
252 /// HLS-style format-change recovery (file source — default impl) or
253 /// the active HLS variant was activated with `served_from > 0` so
254 /// the init prefix lives outside the served virtual byte range.
255 /// Callers should fall back to a non-init recovery anchor (e.g.
256 /// the current segment boundary).
257 ///
258 /// Transitional — removed in Plan 06.
259 fn format_change_segment_range(&self) -> StreamResult<Range<u64>> {
260 Err(StreamError::Source(SourceError::FormatChangeNotApplicable))
261 }
262
263 /// `true` if a cross-variant transition is in-flight and `read_at` /
264 /// `wait_range` are short-circuited to `Pending(VariantChange)` /
265 /// `Interrupted` until the decoder acks the switch via
266 /// `clear_variant_fence` (HLS) or equivalent.
267 ///
268 /// Sources without a variant fence keep the default `false`. Used by
269 /// the audio decode loop to break out of `Ok(Pending(_))` retry spin
270 /// when Symphonia / other demuxers absorb the underlying
271 /// `VariantChangeError` and surface only an opaque pending — without
272 /// this polled check the loop would yield forever while the fence
273 /// stays closed waiting for a recreate that never starts.
274 fn has_variant_change_pending(&self) -> bool {
275 false
276 }
277
278 /// Whether the source currently reports zero bytes. Default mirrors
279 /// `self.len()` returning `0` (or being unknown — both are treated as
280 /// "no readable bytes yet" for the conventional `len`/`is_empty` pair).
281 fn is_empty(&self) -> bool {
282 self.len().is_none_or(|n| n == 0)
283 }
284
285 /// Total length if known.
286 ///
287 /// Streaming sources may block briefly until the HTTP response headers
288 /// arrive (Content-Length discovery).
289 fn len(&self) -> Option<u64>;
290
291 /// Create a callback that wakes blocked `wait_range()` without holding
292 /// the `SharedStream` mutex.
293 ///
294 /// The returned closure captures only the underlying condvar/notify
295 /// primitive, so calling it from the main thread cannot deadlock even
296 /// when the worker thread holds the `SharedStream` lock inside `read()`.
297 ///
298 /// Default returns `None` (no blocking waits to wake).
299 fn make_notify_fn(&self) -> Option<Box<dyn Fn() + Send + Sync>> {
300 None
301 }
302
303 /// Get media info if available.
304 fn media_info(&self) -> Option<MediaInfo> {
305 None
306 }
307
308 /// Wake any blocked `wait_range()` calls.
309 ///
310 /// Called after `Timeline::initiate_seek()` to ensure immediate response
311 /// from threads sleeping on condvars. Default no-op for sources without
312 /// blocking waits.
313 fn notify_waiting(&self) {}
314
315 /// Overall source readiness at the current timeline position.
316 ///
317 /// Uses the source's internal knowledge of chunk/segment boundaries
318 /// to determine if the next read operation can proceed without blocking.
319 ///
320 /// Unlike `phase_at(range)` which checks a specific byte range,
321 /// this method lets the source decide the appropriate granularity.
322 ///
323 /// Default checks a single byte at the current position.
324 /// HLS overrides with segment-aware logic, File with 32KB-window logic.
325 fn phase(&self) -> SourcePhase {
326 let pos = self.position();
327 self.phase_at(pos..pos.saturating_add(1))
328 }
329
330 /// Point-in-time snapshot of the source phase for the given range.
331 ///
332 /// Returns the current [`SourcePhase`] without blocking. Used internally
333 /// by `wait_range()` implementations for fast-path dispatch.
334 fn phase_at(&self, range: Range<u64>) -> SourcePhase;
335
336 /// Current byte position in the source's virtual byte space.
337 ///
338 /// HLS delegates to active variant; file owns its own atomic cursor.
339 fn position(&self) -> u64;
340
341 /// Read data at offset into buffer.
342 ///
343 /// Returns [`ReadOutcome::Bytes`] with a non-zero byte count on
344 /// progress, [`ReadOutcome::Pending`] with a typed
345 /// [`PendingReason`] when no progress is possible this call (seek
346 /// pending, variant fence, eviction), or [`ReadOutcome::Eof`] at
347 /// natural end-of-stream.
348 ///
349 /// # Errors
350 ///
351 /// Returns an error if the read fails or the source is in an invalid state.
352 fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> StreamResult<ReadOutcome>;
353
354 /// Resolve `position` to a source-specific seek anchor.
355 ///
356 /// Segmented sources (HLS) should map time to a deterministic segment
357 /// boundary and byte offset. Non-segmented sources return `Ok(None)`.
358 ///
359 /// The caller is expected to set stream position to `byte_offset` and
360 /// perform decoder reset/recreation using this anchor.
361 ///
362 /// # Errors
363 ///
364 /// Returns an error when the source cannot resolve the anchor.
365 fn seek_time_anchor(&mut self, _position: Duration) -> StreamResult<Option<SourceSeekAnchor>> {
366 Ok(None)
367 }
368
369 /// Absolute set of the byte cursor — used by [`Stream::seek`] and
370 /// post-seek landings. Sources implement this via the same atomic
371 /// cursor that backs [`Self::position`] / [`Self::advance`].
372 fn set_position(&self, pos: u64);
373
374 /// Set current seek epoch for stale request invalidation.
375 ///
376 /// HLS uses this to drop in-flight network/segment requests that belong
377 /// to previous seeks. Non-seek-aware sources keep the default no-op.
378 fn set_seek_epoch(&mut self, _seek_epoch: u64) {}
379
380 /// Build a fresh reader-side hooks instance.
381 ///
382 /// Returned by Source-impls that want to expose reader-side events
383 /// (`HlsSource`, `FileSource`). The audio pipeline takes the hook
384 /// at decoder creation/recreation time and threads it into the
385 /// `HookedDecoder` wrapper. Default `None` keeps mock and test
386 /// sources unhooked.
387 ///
388 /// `take_*` is a misnomer: each call must return a **fresh** hook
389 /// instance, because decoder recreation (ABR / format change)
390 /// rebuilds the wrapper and the new hook needs a clean state
391 /// cursor.
392 fn take_reader_hooks(&mut self) -> Option<crate::SharedHooks> {
393 None
394 }
395
396 /// Get shared playback timeline.
397 ///
398 /// Timeline is the single source of truth for playback state across all
399 /// stream types (segmented and non-segmented). Sources own their
400 /// Timeline and hand out cheap Arc clones to downstream consumers
401 /// (reader, audio FSM, Downloader peers).
402 fn timeline(&self) -> Timeline;
403
404 /// Wait for data in range to be available.
405 ///
406 /// `timeout` is the maximum wait time before returning an
407 /// implementation-defined non-ready outcome (typically a typed
408 /// "budget exceeded" error). Pass `None` to wait until the range
409 /// is ready or the source's internal cancel signal fires — used
410 /// for [`Stream::seek`](crate::Stream::seek), where giving up on
411 /// a timer would silently drop the seek under slow connections.
412 /// `Some(WAIT_RANGE_TIMEOUT)` is the cooperative-yield path used
413 /// by the audio worker's read loop.
414 ///
415 /// # Errors
416 ///
417 /// Returns an error if the wait is cancelled or the underlying storage fails.
418 fn wait_range(
419 &mut self,
420 range: Range<u64>,
421 timeout: Option<Duration>,
422 ) -> StreamResult<WaitOutcome>;
423}
424
425/// Segment-table view exposed by segmented sources (HLS, fragmented
426/// file-mp4).
427///
428/// Carries the segment metadata that segment-aware decoders need to
429/// route reads — `init_segment_range` (ftyp+moov / `EXT-X-MAP`),
430/// `segment_at_time`, `segment_after_byte`, `segment_count`, and total
431/// `len`. Has no I/O surface: the byte cursor is the decoder's
432/// `Read + Seek` handle, queried independently. Sources that aren't
433/// segment-aware return `None` from [`Source::as_segment_layout`].
434pub trait SegmentLayout: Send + Sync + 'static {
435 /// Init segment range (e.g. ftyp+moov from `EXT-X-MAP`) for the
436 /// current layout variant. Returns an **empty** range (`0..0`) when
437 /// the layout has no init segment (raw TS/AAC/MPEG-ES) or when the
438 /// active variant has not yet announced one. Callers that require an
439 /// init must check `Range::is_empty()` — distinguishing "no init"
440 /// from "init at offset 0..0" is unsupported because every init we
441 /// emit is non-empty by construction.
442 fn init_segment_range(&self) -> Range<u64>;
443
444 /// Whether the layout currently reports zero bytes. `len()` is `Option`
445 /// because some segmented sources do not know their total upfront, so
446 /// emptiness defaults to "len is `None` or `Some(0)`".
447 fn is_empty(&self) -> bool {
448 self.len().is_none_or(|n| n == 0)
449 }
450
451 /// Total byte length across all segments. Used to compute total
452 /// duration when the source can't provide a direct value.
453 fn len(&self) -> Option<u64>;
454
455 /// Next segment whose byte range starts at or after `byte_offset`.
456 /// Used for sequential play after the current segment is consumed.
457 fn segment_after_byte(&self, byte_offset: u64) -> Option<SegmentDescriptor>;
458
459 /// Segment whose `byte_range` covers `byte_offset`. Default `None`
460 /// keeps non-segmented sources transparent.
461 fn segment_at_byte(&self, _byte_offset: u64) -> Option<SegmentDescriptor> {
462 None
463 }
464
465 /// Descriptor for the segment at `segment_index` in the current
466 /// layout variant. Used by demuxers to re-resolve a cursor's
467 /// `byte_range` against the live layout — without this, a DRM
468 /// post-decrypt size shrink (PKCS7 padding stripped) between cursor
469 /// setup and the actual read leaves `state.range` pointing past
470 /// the segment's real end and `HlsSource::read_at` splices bytes
471 /// from the next segment onto the buffer's tail. Returns `None`
472 /// for non-segmented sources or for indices outside the current
473 /// layout's range.
474 fn segment_at_index(&self, _segment_index: u32) -> Option<SegmentDescriptor> {
475 None
476 }
477
478 /// Locate the segment whose `[decode_time, decode_time + duration)`
479 /// covers `t`. Resolves against the source's *current layout
480 /// variant* — same variant `init_segment_range` describes.
481 fn segment_at_time(&self, t: Duration) -> Option<SegmentDescriptor>;
482
483 /// Total number of segments in the current layout variant.
484 fn segment_count(&self) -> Option<u32>;
485}
486
487#[cfg(test)]
488mod tests {
489 use kithara_test_utils::kithara;
490
491 use super::*;
492
493 #[kithara::test]
494 fn test_source_trait_object_safety() {
495 fn _accepts_source<S: Source>(_s: S) {}
496 }
497
498 #[kithara::test]
499 fn source_phase_defaults_to_waiting() {
500 assert_eq!(SourcePhase::default(), SourcePhase::Waiting);
501 }
502
503 #[kithara::test]
504 fn phase_default_delegates_to_phase_at() {
505 use std::sync::atomic::{AtomicU64, Ordering};
506
507 struct ReadySource {
508 timeline: Timeline,
509 position: Arc<AtomicU64>,
510 }
511 impl Source for ReadySource {
512 fn timeline(&self) -> Timeline {
513 self.timeline.clone()
514 }
515 fn wait_range(
516 &mut self,
517 _range: Range<u64>,
518 _timeout: Option<Duration>,
519 ) -> StreamResult<WaitOutcome> {
520 Ok(WaitOutcome::Ready)
521 }
522 fn read_at(&mut self, _offset: u64, _buf: &mut [u8]) -> StreamResult<ReadOutcome> {
523 Ok(ReadOutcome::Eof)
524 }
525 fn phase_at(&self, _range: Range<u64>) -> SourcePhase {
526 SourcePhase::Ready
527 }
528 fn len(&self) -> Option<u64> {
529 Some(100)
530 }
531 fn position(&self) -> u64 {
532 self.position.load(Ordering::Acquire)
533 }
534 fn advance(&self, n: u64) {
535 self.position.fetch_add(n, Ordering::AcqRel);
536 }
537 fn set_position(&self, pos: u64) {
538 self.position.store(pos, Ordering::Release);
539 }
540 }
541 let source = ReadySource {
542 timeline: Timeline::new(),
543 position: Arc::new(AtomicU64::new(0)),
544 };
545 assert_eq!(source.phase(), SourcePhase::Ready);
546 }
547}