Skip to main content

kithara_audio/
audio.rs

1use std::{
2    io::{Error as IoError, Read, Seek, SeekFrom},
3    marker::PhantomData,
4    num::{NonZeroU32, NonZeroUsize},
5    sync::{
6        Arc,
7        atomic::{AtomicU32, AtomicU64, Ordering},
8    },
9    time::Duration,
10};
11
12use fast_interleave::deinterleave_variable;
13use kithara_bufpool::{PcmBuf, PcmPool};
14use kithara_decode::{DecoderFactory, PcmChunk, PcmMeta, PcmSpec, TrackMetadata};
15use kithara_events::{AudioEvent, EventBus, SeekLifecycleStage, SegmentLocation};
16#[cfg(target_arch = "wasm32")]
17use kithara_platform::thread::{is_worker_thread, sleep as thread_sleep};
18use kithara_platform::{
19    thread::park_timeout,
20    tokio::{sync::Notify, task::spawn_blocking},
21};
22use kithara_stream::{MediaInfo, Stream, StreamType, Timeline};
23use kithara_test_utils::kithara;
24use portable_atomic::AtomicF32;
25use tokio_util::sync::CancellationToken;
26use tracing::{debug, info, trace, warn};
27
28use crate::{
29    pipeline::{
30        config::{AudioConfig, create_effects, expected_output_spec},
31        fetch::{EpochValidator, Fetch, FetchKind},
32        source::{OffsetReader, SharedStream, StreamAudioSource},
33        track_fsm::ConsumerPhase,
34    },
35    runtime::AtomicServiceClass,
36    traits::{ChunkOutcome, DecodeError, PcmReader, PendingReason, ReadOutcome, SeekOutcome},
37    worker::{
38        handle::{AudioWorkerHandle, TrackRegistration},
39        thread_wake::ThreadWake,
40        types::{ServiceClass, TrackId},
41    },
42};
43
44/// Saturating-clamp `u128` milliseconds into `u64`. Caller has explicitly
45/// chosen "report capped value" semantics for telemetry events that can't
46/// surface a wider integer to subscribers; production durations are well
47/// under `u64::MAX` ms (~584 million years).
48fn clamp_u128_to_u64_millis(ms: u128) -> u64 {
49    num_traits::cast::ToPrimitive::to_u64(&ms).unwrap_or(u64::MAX)
50}
51
52/// Multiply `frames * channels` and convert to `usize` for buffer indexing.
53/// Errors if the product does not fit in `usize` on the host (32-bit targets).
54fn frames_to_samples(frames: u64, channels: u64) -> Result<usize, DecodeError> {
55    let samples = frames.saturating_mul(channels);
56    usize::try_from(samples).map_err(|err| {
57        DecodeError::Io(IoError::other(format!(
58            "frames*channels overflow: {samples} does not fit usize: {err}"
59        )))
60    })
61}
62
63enum FetchOutcome {
64    Continue,
65    Return(Option<PcmChunk>),
66}
67
68enum RecvOutcome {
69    Closed,
70    Empty,
71    Item(Fetch<PcmChunk>),
72}
73
74/// Generic audio pipeline running in a separate thread.
75///
76/// Provides a simple interface for reading decoded PCM audio,
77/// compatible with cpal and rodio audio backends.
78///
79/// # Example
80///
81/// ```ignore
82/// use kithara_audio::{Audio, AudioConfig};
83/// use kithara_hls::{Hls, HlsConfig};
84/// use kithara_stream::Stream;
85///
86/// let config = AudioConfig::<Hls>::new(hls_config)
87///     .hint("mp3");
88/// let audio = Audio::<Stream<Hls>>::new(config).await?;
89///
90/// // Get audio format
91/// let spec = audio.spec();
92/// println!("{}Hz, {} channels", spec.sample_rate, spec.channels);
93///
94/// // Read PCM samples
95/// let mut buf = [0.0f32; 1024];
96/// loop {
97///     match audio.read(&mut buf)? {
98///         ReadOutcome::Frames { count, .. } => play_samples(&buf[..count]),
99///         ReadOutcome::Eof { .. } => break,
100///     }
101/// }
102/// ```
103pub struct Audio<S> {
104    /// Notify for async preload (first chunk available).
105    pub(crate) preload_notify: Arc<Notify>,
106
107    /// Consumer-side phase — replaces the old `eof: bool` flag.
108    pub(crate) consumer_phase: ConsumerPhase,
109
110    /// Epoch validator for filtering stale chunks.
111    pub(crate) validator: EpochValidator,
112
113    /// Current chunk being read (auto-recycles to pool on drop).
114    pub(crate) current_chunk: Option<PcmChunk>,
115
116    /// Current audio specification (updated from chunks).
117    pub(crate) spec: PcmSpec,
118
119    /// Shared stream timeline for committed playback position.
120    pub(crate) timeline: Timeline,
121
122    /// How many frames of `current_chunk` have been served to the
123    /// caller. Local consumer cursor — reset to 0 on every new chunk
124    /// (`fill_buffer`) and after seek (the next `fill_buffer` after
125    /// `commit_seek_landed` issues a fresh chunk). Avoids reloading
126    /// `committed_frame_end` from the timeline inside `Audio::read`'s
127    /// per-slice loop, which used to issue an atomic load + 3 atomic
128    /// stores on every slice and serialised the audio hot path on
129    /// the timeline counters.
130    pub(crate) current_chunk_consumed_frames: u64,
131
132    /// Shared epoch counter with worker (kept alive for `Arc` shared ownership).
133    _epoch: Arc<AtomicU64>,
134
135    /// Target sample rate of the audio host (shared for dynamic updates).
136    /// 0 means "not set".
137    host_sample_rate: Arc<AtomicU32>,
138
139    /// Shared playback rate for timeline scaling (1.0 = normal speed).
140    playback_rate: Arc<AtomicF32>,
141
142    /// Wake handle for blocking PCM reads.
143    reader_wake: Arc<ThreadWake>,
144
145    /// Unified event bus.
146    bus: EventBus,
147
148    /// PCM chunk receiver.
149    pcm_rx: crate::runtime::Inlet<Fetch<PcmChunk>>,
150
151    /// Spent-chunk return ring. Every `PcmChunk` this real-time consumer
152    /// finishes with is pushed here instead of being dropped, so the pooled
153    /// buffer is recycled on the worker thread (`DecoderNode::drain_trash`)
154    /// and never freed on the audio thread. Sized to outlive a full
155    /// ring-drain on seek, so the lock-free push never fails on the hot path.
156    trash_tx: crate::runtime::Outlet<PcmChunk>,
157
158    /// Runtime ABR handle snapshot taken at construction — cloned from the
159    /// underlying stream's source. `None` for non-adaptive sources.
160    abr_handle: Option<kithara_abr::AbrHandle>,
161
162    /// Cancellation token for graceful shutdown.
163    cancel: Option<CancellationToken>,
164
165    /// Assigned track ID in the shared worker (used for unregister on drop).
166    track_id: Option<TrackId>,
167
168    /// Worker handle for unregistration and optional shutdown.
169    worker: Option<AudioWorkerHandle>,
170
171    /// Shared priority hint for this track's worker node. Written wait-free
172    /// from the real-time audio thread (`set_service_class` during fade
173    /// transitions) and read by the worker scheduler each pass, so a priority
174    /// change needs no allocating command-channel send on the audio thread.
175    service_class: Arc<AtomicServiceClass>,
176
177    /// Shared PCM pool: source for the decode worker's per-packet buffers and
178    /// for the held `read_planar` interleaved scratch below.
179    pcm_pool: PcmPool,
180
181    /// Interleaved scratch for `read_planar`, drawn from `pcm_pool` once and
182    /// pre-sized off the audio thread in `new` so the real-time path never
183    /// reallocates. `Option` only so it can be detached during the inner
184    /// `read` call, then restored.
185    interleaved: Option<PcmBuf>,
186
187    /// Marker for source type.
188    _marker: PhantomData<S>,
189
190    /// Track metadata (title, artist, album, artwork).
191    metadata: TrackMetadata,
192
193    /// Whether the worker was auto-created for this track (standalone mode).
194    /// Standalone workers are shut down when the track is dropped.
195    is_standalone_worker: bool,
196
197    /// Whether `preload()` has been called (enables non-blocking mode).
198    preloaded: bool,
199}
200
201impl<S> Audio<S> {
202    /// Probe buffer size in bytes for initial stream detection.
203    const PROBE_BUFFER_SIZE: usize = 1024;
204
205    /// Per-buffer frame capacity used to pre-warm the PCM pool for the decode
206    /// worker's per-packet buffers. Covers the largest decoder packet across
207    /// supported codecs (FLAC's 4608-frame block; AAC/MP3/ALAC are smaller and
208    /// reuse these buffers without a realloc). The `read_planar` interleaved
209    /// scratch is sized separately and held per-`Audio` (see `interleaved`).
210    const WARM_DECODE_FRAMES: usize = 4608;
211
212    /// Backoff duration between receive attempts.
213    const RECV_BACKOFF: Duration = Duration::from_micros(100);
214
215    /// Pre-warm the shared PCM pool so the decode hot path (`pool.get()`
216    /// per packet) and the first `read_planar` calls reuse pre-allocated
217    /// buffers instead of paying a cold-start allocation on the audio
218    /// thread. Warms only a cold pool (`allocated_bytes == 0`), so the
219    /// process-global default singleton is warmed once on the first track
220    /// while a freshly-built custom pool still gets warmed when it's first
221    /// resolved here.
222    fn warm_pcm_pool(pool: &PcmPool, channels: usize, chunks: usize) {
223        if pool.allocated_bytes() != 0 {
224            return;
225        }
226        let capacity = Self::WARM_DECODE_FRAMES * channels.max(1);
227        let count = chunks.saturating_mul(2).max(1);
228        pool.pre_warm(count, |buf| {
229            buf.clear();
230            buf.resize(capacity, 0.0);
231        });
232    }
233
234    /// Acquire and pre-size the held interleaved scratch for `read_planar`.
235    ///
236    /// Sized to one second of interleaved output at `spec` — the consumer
237    /// reads at most a few hundred ms per call, so this covers the request
238    /// with margin for host-rate / playback-rate changes. Called off the audio
239    /// thread in `new`, so `read_planar` reuses this buffer without ever
240    /// reallocating on the real-time path.
241    fn alloc_interleaved_scratch(pool: &PcmPool, spec: PcmSpec) -> PcmBuf {
242        let channels = usize::from(spec.channels).max(2);
243        let sample_rate = usize::try_from(spec.sample_rate).unwrap_or(usize::MAX);
244        let capacity = sample_rate.saturating_mul(channels);
245        pool.get_with(|buf| {
246            buf.clear();
247            let cap = buf.capacity();
248            if cap < capacity {
249                buf.reserve(capacity - cap);
250            }
251        })
252    }
253
254    /// Runtime ABR handle (cloned from the stream's source at
255    /// construction). `Some` for adaptive sources (HLS), `None` for
256    /// file/non-adaptive sources.
257    #[must_use]
258    pub fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
259        self.abr_handle.clone()
260    }
261
262    fn close_channel_and_mark_eof(&mut self) -> Option<PcmChunk> {
263        self.consumer_phase = ConsumerPhase::Failed;
264        None
265    }
266
267    /// Current variant's metadata. Pulled live from the ABR peer on
268    /// every call — no caching — so the UI never sees a stale label
269    /// after an ABR switch. `None` for non-adaptive sources or peers
270    /// that have not yet registered variants.
271    #[must_use]
272    pub fn current_variant(&self) -> Option<kithara_events::VariantInfo> {
273        self.abr_handle.as_ref()?.current_variant()
274    }
275
276    /// Get total duration of the audio stream.
277    ///
278    /// Returns `None` for streaming sources where duration is unknown.
279    #[must_use]
280    pub fn duration(&self) -> Option<Duration> {
281        self.timeline.total_duration()
282    }
283
284    fn emit_audio_event(&self, event: AudioEvent) {
285        self.bus.publish(event);
286    }
287
288    fn emit_playback_progress(&self) {
289        let position_ms = clamp_u128_to_u64_millis(self.position().as_millis());
290        let total_ms = self
291            .timeline
292            .total_duration()
293            .map(|duration| clamp_u128_to_u64_millis(duration.as_millis()));
294
295        self.emit_audio_event(AudioEvent::PlaybackProgress {
296            position_ms,
297            total_ms,
298            seek_epoch: self.validator.epoch,
299        });
300    }
301
302    fn emit_post_seek_output_commit(&mut self, meta: Option<PcmMeta>) {
303        let Some(seek_epoch) = self.timeline.pending_seek_epoch() else {
304            return;
305        };
306        if seek_epoch != self.validator.epoch {
307            return;
308        }
309
310        let variant = meta.as_ref().and_then(|m| m.variant_index);
311        let segment_index = meta.as_ref().and_then(|m| m.segment_index);
312
313        self.emit_audio_event(AudioEvent::SeekLifecycle {
314            seek_epoch,
315            stage: SeekLifecycleStage::OutputCommitted,
316            location: SegmentLocation::new(variant, segment_index, None, None),
317        });
318
319        self.emit_audio_event(AudioEvent::SeekComplete {
320            seek_epoch,
321            position: (*self).position(),
322        });
323        let _ = self.timeline.did_clear_pending_seek_epoch(seek_epoch);
324    }
325
326    /// Receive next chunk and store it as `current_chunk`.
327    ///
328    /// Returns `true` if a chunk was received, `false` on EOF or no data.
329    pub(crate) fn fill_buffer(&mut self) -> bool {
330        let Some(chunk) = self.recv_valid_chunk() else {
331            return false;
332        };
333        self.spec = chunk.spec();
334        self.current_chunk = Some(chunk);
335        self.current_chunk_consumed_frames = 0;
336
337        if matches!(
338            self.consumer_phase,
339            ConsumerPhase::Buffering | ConsumerPhase::SeekPending { .. }
340        ) {
341            self.consumer_phase = ConsumerPhase::Playing;
342        }
343        true
344    }
345
346    /// Whether non-blocking recv is active.
347    ///
348    /// Returns `false` after `seek()` until `preload()` is called again.
349    #[must_use]
350    pub fn is_preloaded(&self) -> bool {
351        self.preloaded
352    }
353
354    /// Get track metadata (title, artist, album, artwork).
355    #[must_use]
356    pub fn metadata(&self) -> &TrackMetadata {
357        &self.metadata
358    }
359
360    /// Get current playback position.
361    ///
362    /// Calculated from samples read since last seek plus the seek base.
363    #[must_use]
364    pub fn position(&self) -> Duration {
365        self.timeline.committed_position()
366    }
367
368    /// Enable non-blocking mode for `read()` and prime the first chunk.
369    ///
370    /// After calling this, `read()` returns immediately from buffered
371    /// data without blocking. Must be called after construction so
372    /// that `fill_buffer()` calls from JS (via `requestAnimationFrame`)
373    /// don't hang.
374    ///
375    /// Returns `Err(DecodeError)` if the producer channel closed
376    /// during the initial `fill_buffer` (e.g. upstream decoder
377    /// reported `TrackStep::Failed` before any data). Natural EOF
378    /// encountered during preload is **not** surfaced here — the
379    /// subsequent `read()` / `next_chunk()` call will report
380    /// `ReadOutcome::Eof`.
381    ///
382    /// # Errors
383    /// Returns `DecodeError::Io` if the producer channel closed during preload.
384    pub fn preload(&mut self) -> Result<(), DecodeError> {
385        self.preloaded = true;
386        if self.current_chunk.is_none() && self.consumer_phase != ConsumerPhase::AtEof {
387            self.fill_buffer();
388            if self.consumer_phase == ConsumerPhase::Failed {
389                return Err(DecodeError::Io(IoError::other(
390                    "pcm channel closed during preload",
391                )));
392            }
393        }
394        Ok(())
395    }
396
397    fn process_fetch(&mut self, fetch: Fetch<PcmChunk>) -> FetchOutcome {
398        if !self.validator.is_valid(&fetch) {
399            self.discard_chunk(fetch.into_inner());
400            return FetchOutcome::Continue;
401        }
402
403        match fetch.kind {
404            FetchKind::NaturalEof => {
405                self.consumer_phase = ConsumerPhase::AtEof;
406                self.discard_chunk(fetch.into_inner());
407                FetchOutcome::Return(None)
408            }
409            FetchKind::Failure => {
410                self.consumer_phase = ConsumerPhase::Failed;
411                self.discard_chunk(fetch.into_inner());
412                FetchOutcome::Return(None)
413            }
414            FetchKind::Data => FetchOutcome::Return(Some(fetch.into_inner())),
415        }
416    }
417
418    /// Read decoded PCM samples into buffer.
419    ///
420    /// Samples are interleaved f32 (e.g., LRLRLR for stereo).
421    ///
422    /// Returns [`ReadOutcome::Frames`] with a non-zero count when the
423    /// reader produced data, [`ReadOutcome::Pending`] with a typed
424    /// [`PendingReason`] when the reader is alive but produced no
425    /// frames this tick (buffering, seek-in-progress), or
426    /// [`ReadOutcome::Eof`] on natural end-of-stream. Decoder /
427    /// channel failures surface as [`DecodeError`] via the `Err` arm.
428    ///
429    /// # Errors
430    /// Returns `DecodeError::Io` when the producer channel closed /
431    /// reported a failure (`ConsumerPhase::Failed`) before any frames
432    /// could be flushed.
433    #[cfg_attr(feature = "perf", hotpath::measure)]
434    #[kithara::hang_watchdog]
435    pub fn read(&mut self, buf: &mut [f32]) -> Result<ReadOutcome, DecodeError> {
436        if buf.is_empty() {
437            return Ok(ReadOutcome::Pending {
438                reason: PendingReason::Buffering,
439                position: self.position(),
440            });
441        }
442        match self.consumer_phase {
443            ConsumerPhase::AtEof if self.current_chunk.is_none() => {
444                return Ok(ReadOutcome::Eof {
445                    position: self.position(),
446                });
447            }
448            ConsumerPhase::Failed => {
449                return Err(DecodeError::Io(IoError::other(
450                    "pcm channel closed / producer failed",
451                )));
452            }
453            _ => {}
454        }
455
456        let mut written = 0;
457        let mut last_output_meta: Option<PcmMeta> = None;
458
459        while written < buf.len() {
460            hang_tick!();
461
462            if let Some(chunk) = self.current_chunk.as_ref() {
463                let channels = u64::from(chunk.meta.spec.channels.max(1));
464                let chunk_total_frames = u64::from(chunk.meta.frames);
465                let consumed_frames_in_chunk = self.current_chunk_consumed_frames;
466                if consumed_frames_in_chunk >= chunk_total_frames {
467                    self.recycle_current_chunk();
468                    if !self.fill_buffer() {
469                        break;
470                    }
471                    continue;
472                }
473                let remaining_frames = chunk_total_frames - consumed_frames_in_chunk;
474                let space_frames = ((buf.len() - written) as u64) / channels.max(1);
475                let take_frames = remaining_frames.min(space_frames);
476                if take_frames == 0 {
477                    break;
478                }
479
480                hang_reset!();
481                let start_sample = frames_to_samples(consumed_frames_in_chunk, channels)?;
482                let take_samples = frames_to_samples(take_frames, channels)?;
483                buf[written..written + take_samples]
484                    .copy_from_slice(&chunk.pcm[start_sample..start_sample + take_samples]);
485                last_output_meta = Some(chunk.meta);
486                written += take_samples;
487
488                let final_segment = take_frames == remaining_frames;
489                let consumed_total = consumed_frames_in_chunk + take_frames;
490                self.current_chunk_consumed_frames = consumed_total;
491
492                if final_segment {
493                    self.timeline
494                        .advance_committed_chunk(&kithara_stream::ChunkPosition::from(&chunk.meta));
495                    self.recycle_current_chunk();
496                } else {
497                    let total_frames = chunk_total_frames.max(1);
498                    let start_ns =
499                        u64::try_from(chunk.meta.timestamp.as_nanos()).unwrap_or(u64::MAX);
500                    let end_ns =
501                        u64::try_from(chunk.meta.end_timestamp.as_nanos()).unwrap_or(u64::MAX);
502                    let span_ns = u128::from(end_ns.saturating_sub(start_ns));
503                    let consumed_ns_offset =
504                        span_ns * u128::from(consumed_total) / u128::from(total_frames);
505                    let interpolated = u128::from(start_ns).saturating_add(consumed_ns_offset);
506                    let interpolated_ns = u64::try_from(interpolated).unwrap_or(u64::MAX);
507                    self.timeline
508                        .set_committed_position(Duration::from_nanos(interpolated_ns));
509                }
510            }
511
512            if written >= buf.len() {
513                break;
514            }
515
516            if !self.fill_buffer() {
517                break;
518            }
519        }
520
521        if let Some(count) = NonZeroUsize::new(written) {
522            debug_assert!(
523                count.get() <= buf.len(),
524                "Audio::read Frames contract violated: count={c} > buf.len()={b}",
525                c = count.get(),
526                b = buf.len(),
527            );
528            self.emit_post_seek_output_commit(last_output_meta);
529            self.emit_playback_progress();
530            let position = self.position();
531            debug_assert!(
532                self.timeline
533                    .total_duration()
534                    .is_none_or(|dur| position <= dur),
535                "Audio::read Frames contract: position={position:?} > duration={:?}",
536                self.timeline.total_duration(),
537            );
538            return Ok(ReadOutcome::Frames { count, position });
539        }
540
541        let position = self.position();
542        match self.consumer_phase {
543            ConsumerPhase::AtEof => Ok(ReadOutcome::Eof { position }),
544            ConsumerPhase::Failed => Err(DecodeError::Io(IoError::other(
545                "pcm channel closed / producer failed",
546            ))),
547            ConsumerPhase::SeekPending { .. } => Ok(ReadOutcome::Pending {
548                position,
549                reason: PendingReason::SeekInProgress,
550            }),
551            _ => Ok(ReadOutcome::Pending {
552                position,
553                reason: PendingReason::Buffering,
554            }),
555        }
556    }
557
558    fn recv_outcome(&mut self) -> RecvOutcome {
559        if self.use_nonblocking_recv() {
560            if let Some(fetch) = self.pcm_rx.try_pop() {
561                self.wake_worker();
562                return RecvOutcome::Item(fetch);
563            }
564            return RecvOutcome::Empty;
565        }
566
567        self.recv_outcome_blocking()
568    }
569
570    #[kithara::hang_watchdog]
571    fn recv_outcome_blocking(&mut self) -> RecvOutcome {
572        loop {
573            if let Some(fetch) = self.pcm_rx.try_pop() {
574                hang_reset!();
575                self.wake_worker();
576                return RecvOutcome::Item(fetch);
577            }
578            if self
579                .cancel
580                .as_ref()
581                .is_some_and(CancellationToken::is_cancelled)
582            {
583                hang_reset!();
584                return RecvOutcome::Closed;
585            }
586            self.wake_worker();
587            self.reader_wake.register_current();
588            if let Some(fetch) = self.pcm_rx.try_pop() {
589                hang_reset!();
590                self.wake_worker();
591                return RecvOutcome::Item(fetch);
592            }
593            if self
594                .cancel
595                .as_ref()
596                .is_some_and(CancellationToken::is_cancelled)
597            {
598                hang_reset!();
599                return RecvOutcome::Closed;
600            }
601            hang_tick!();
602            Self::wait_for_fetch();
603        }
604    }
605
606    #[kithara::hang_watchdog]
607    fn recv_valid_chunk(&mut self) -> Option<PcmChunk> {
608        if self.consumer_phase.is_terminal() {
609            return None;
610        }
611
612        loop {
613            match self.recv_outcome() {
614                RecvOutcome::Item(fetch) => match self.process_fetch(fetch) {
615                    FetchOutcome::Continue => {
616                        hang_tick!();
617                        continue;
618                    }
619                    FetchOutcome::Return(chunk) => {
620                        hang_reset!();
621                        return chunk;
622                    }
623                },
624                RecvOutcome::Empty => return None,
625                RecvOutcome::Closed => {
626                    hang_reset!();
627                    return self.close_channel_and_mark_eof();
628                }
629            }
630        }
631    }
632
633    /// Seek to position in the audio stream.
634    ///
635    /// This method never blocks. Seek coordination flows entirely through
636    /// Timeline atomics (`FLUSH_START`/`FLUSH_STOP` pattern). The worker thread reads
637    /// the seek target and epoch from Timeline and applies the seek.
638    ///
639    /// Returns [`SeekOutcome::Landed`] when the reader is now parked
640    /// at `position`; [`SeekOutcome::PastEof`] when the target is
641    /// beyond a known `duration()` (the subsequent read returns
642    /// `ReadOutcome::Eof`).
643    ///
644    /// # Errors
645    /// Propagated from the underlying stream (currently infallible at
646    /// this layer — the worker thread surfaces errors lazily via
647    /// `FetchKind::Failure`, which becomes `Err` from a subsequent
648    /// `read()` / `next_chunk()`).
649    #[kithara::hang_watchdog]
650    pub fn seek(&mut self, position: Duration) -> Result<SeekOutcome, DecodeError> {
651        let epoch = self.timeline.initiate_seek(position);
652        self.timeline.mark_pending_seek_epoch(epoch);
653        self.validator.epoch = epoch;
654        self.recycle_current_chunk();
655        self.current_chunk_consumed_frames = 0;
656        self.consumer_phase = ConsumerPhase::SeekPending { epoch };
657
658        while let Some(fetch) = self.pcm_rx.try_pop() {
659            self.discard_chunk(fetch.into_inner());
660            hang_tick!();
661        }
662
663        if let Some(ref worker) = self.worker {
664            worker.wake();
665        }
666
667        trace!(?position, epoch, "seek initiated via Timeline");
668        match self.timeline.total_duration() {
669            Some(duration) if position >= duration => {
670                debug_assert!(
671                    position >= duration,
672                    "Audio::seek PastEof contract: target={position:?} < duration={duration:?}",
673                );
674                Ok(SeekOutcome::PastEof {
675                    duration,
676                    target: position,
677                })
678            }
679            _ => {
680                debug_assert!(
681                    self.timeline
682                        .total_duration()
683                        .is_none_or(|dur| position <= dur),
684                    "Audio::seek Landed contract: landed_at={position:?} > duration={:?}",
685                    self.timeline.total_duration(),
686                );
687                Ok(SeekOutcome::Landed {
688                    target: position,
689                    landed_at: position,
690                })
691            }
692        }
693    }
694
695    /// Subscribe to audio events.
696    ///
697    /// Get current audio specification.
698    ///
699    /// Returns sample rate and channel count for audio output setup.
700    #[must_use]
701    pub fn spec(&self) -> PcmSpec {
702        self.spec
703    }
704
705    fn use_nonblocking_recv(&self) -> bool {
706        #[cfg(target_arch = "wasm32")]
707        {
708            true
709        }
710        #[cfg(not(target_arch = "wasm32"))]
711        {
712            self.is_preloaded()
713        }
714    }
715
716    fn wait_for_fetch() {
717        #[cfg(not(target_arch = "wasm32"))]
718        {
719            park_timeout(Self::RECV_BACKOFF);
720        }
721
722        #[cfg(target_arch = "wasm32")]
723        {
724            if is_worker_thread() {
725                park_timeout(Self::RECV_BACKOFF);
726            } else {
727                thread_sleep(Self::RECV_BACKOFF);
728            }
729        }
730    }
731
732    /// Receive next valid chunk from channel, filtering stale chunks.
733    ///
734    /// After `preload()`, non-blocking. Before `preload()`, blocks on first call.
735    /// Returns `None` on EOF or channel close.
736    /// Wake the shared worker so it can fill the freed ringbuf slot.
737    fn wake_worker(&self) {
738        if let Some(ref worker) = self.worker {
739            worker.wake();
740        }
741    }
742
743    /// Hand a spent chunk to the worker's return ring instead of dropping
744    /// it here. The pooled buffer is then recycled on the worker thread,
745    /// keeping `free`/`Pool::put` off the real-time audio thread. The ring
746    /// is sized so this lock-free push never fails on the hot path; the
747    /// `debug_assert` guards the sizing invariant, and the last-resort drop
748    /// only runs if that invariant is ever broken.
749    fn discard_chunk(&mut self, chunk: PcmChunk) {
750        if let Err(_overflow) = self.trash_tx.try_push(chunk) {
751            debug_assert!(
752                false,
753                "PCM trash ring overflow — spent buffer freed on the audio thread"
754            );
755        }
756    }
757
758    /// Return the current chunk to the worker for off-thread recycling.
759    fn recycle_current_chunk(&mut self) {
760        if let Some(chunk) = self.current_chunk.take() {
761            self.discard_chunk(chunk);
762        }
763    }
764}
765
766/// Specialized impl for Stream-based audio pipelines.
767///
768/// Provides async constructor that creates Stream internally.
769/// Uses `StreamAudioSource` for automatic format change detection on ABR switch.
770impl<T> Audio<Stream<T>>
771where
772    T: StreamType<Events = EventBus>,
773{
774    /// Create audio pipeline from `AudioConfig`.
775    ///
776    /// This is the target API for Stream sources.
777    /// Uses `StreamAudioSource` for automatic decoder recreation on format change.
778    ///
779    /// # Errors
780    ///
781    /// Returns [`DecodeError`] if the stream cannot be created, the initial probe
782    /// fails, or the decoder cannot be initialized.
783    ///
784    /// # Example
785    ///
786    /// ```ignore
787    /// let config = AudioConfig::<Hls>::new(hls_config);
788    /// let audio = Audio::new(config).await?;
789    /// sink.append(audio);
790    /// ```
791    pub async fn new(config: AudioConfig<T>) -> Result<Self, DecodeError> {
792        let AudioConfig {
793            byte_pool,
794            hint,
795            host_sample_rate: config_host_sr,
796            media_info: user_media_info,
797            pcm_buffer_chunks,
798            pcm_pool: mut pool,
799            playback_rate: config_playback_rate,
800            decoder_backend,
801            preload_chunks,
802            resampler_quality,
803            stream: stream_config,
804            bus: config_bus,
805            effects: custom_effects,
806            worker: config_worker,
807            gapless_mode: config_gapless_mode,
808            cancel: config_cancel,
809        } = config;
810        let cancel = config_cancel.unwrap_or_default();
811
812        let bus = Self::resolve_event_bus(&stream_config, config_bus);
813        let byte_pool = byte_pool.unwrap_or_else(|| kithara_bufpool::BytePool::default().clone());
814        let stream = Self::create_stream_with_probe(stream_config, byte_pool.clone()).await?;
815
816        let initial_byte_len = stream.len().unwrap_or(0);
817        let timeline = stream.timeline();
818        let initial_media_info =
819            merge_user_and_stream_media_info(user_media_info, stream.media_info());
820        debug!(?initial_media_info, "Initial MediaInfo from stream");
821
822        let shared_stream = SharedStream::new(stream);
823        let byte_len_handle = Arc::new(AtomicU64::new(initial_byte_len));
824
825        let pool = pool.get_or_insert_with(|| PcmPool::default().clone());
826        let warm_channels = initial_media_info
827            .as_ref()
828            .and_then(|info| info.channels)
829            .map_or(2, usize::from);
830        Self::warm_pcm_pool(pool, warm_channels, pcm_buffer_chunks);
831        let decoder = Self::create_initial_decoder(
832            shared_stream.clone(),
833            initial_media_info.clone(),
834            hint.clone(),
835            pool.clone(),
836            byte_pool.clone(),
837            decoder_backend,
838        )
839        .await?;
840
841        let initial_spec = decoder.spec();
842        let total_duration = decoder.duration().or_else(|| timeline.total_duration());
843        timeline.set_total_duration(total_duration);
844        let metadata = decoder.metadata();
845
846        let epoch = Arc::new(AtomicU64::new(0));
847        let host_sample_rate = Arc::new(AtomicU32::new(config_host_sr.map_or(0, NonZeroU32::get)));
848        let playback_rate = config_playback_rate.unwrap_or_else(|| Arc::new(AtomicF32::new(1.0)));
849
850        let output_spec = expected_output_spec(initial_spec, &host_sample_rate);
851        let effects = create_effects(
852            initial_spec,
853            &host_sample_rate,
854            &playback_rate,
855            resampler_quality,
856            Some(pool.clone()),
857            custom_effects,
858        );
859
860        Self::log_pipeline_ready(initial_spec, output_spec, &host_sample_rate);
861
862        let interleaved = Self::alloc_interleaved_scratch(pool, output_spec);
863
864        let emit = Self::create_emit(&bus);
865        let decoder_factory = Self::create_decoder_factory(
866            decoder_backend,
867            &epoch,
868            &byte_len_handle,
869            pool,
870            &byte_pool,
871        );
872        let initial_variant = initial_media_info.as_ref().and_then(|i| i.variant_index);
873        let abr_handle = shared_stream.abr_handle();
874        let audio_source = StreamAudioSource::new(
875            shared_stream,
876            decoder,
877            decoder_factory,
878            initial_media_info,
879            Arc::clone(&epoch),
880            effects,
881            config_gapless_mode,
882        )
883        .with_emit(emit);
884
885        bus.publish(AudioEvent::DecoderReady {
886            base_offset: 0,
887            variant: initial_variant,
888        });
889
890        let preload_notify = Arc::new(Notify::new());
891        let reader_wake = Arc::new(ThreadWake::default());
892        let (data_tx, data_rx) = Self::create_channels(pcm_buffer_chunks, Arc::clone(&reader_wake));
893        let (trash_tx, trash_inlet) = Self::create_trash_channel(pcm_buffer_chunks);
894
895        let (worker, is_standalone) =
896            config_worker.map_or_else(|| (AudioWorkerHandle::new(), true), |w| (w, false));
897
898        let service_class = Arc::new(AtomicServiceClass::new(ServiceClass::default()));
899
900        let track_id = worker.register_track(TrackRegistration {
901            source: Box::new(audio_source),
902            outlet: data_tx,
903            trash_inlet,
904            preload_notify: preload_notify.clone(),
905            preload_chunks: preload_chunks.get(),
906            service_class: Arc::clone(&service_class),
907        });
908
909        Ok(Self {
910            timeline,
911            metadata,
912            bus,
913            host_sample_rate,
914            playback_rate,
915            preload_notify,
916            reader_wake,
917            abr_handle,
918            pcm_rx: data_rx,
919            trash_tx,
920            _epoch: epoch,
921            validator: EpochValidator::default(),
922            spec: output_spec,
923            current_chunk: None,
924            current_chunk_consumed_frames: 0,
925            consumer_phase: ConsumerPhase::Buffering,
926            cancel: Some(cancel),
927            interleaved: Some(interleaved),
928            pcm_pool: pool.clone(),
929            preloaded: false,
930            track_id: Some(track_id),
931            worker: Some(worker),
932            service_class,
933            is_standalone_worker: is_standalone,
934            _marker: PhantomData,
935        })
936    }
937
938    fn create_channels(
939        pcm_buffer_chunks: usize,
940        wake: Arc<ThreadWake>,
941    ) -> (
942        crate::runtime::Outlet<Fetch<PcmChunk>>,
943        crate::runtime::Inlet<Fetch<PcmChunk>>,
944    ) {
945        crate::runtime::connect::<Fetch<PcmChunk>>(pcm_buffer_chunks.max(1), Some(wake))
946    }
947
948    /// Build the spent-chunk return ring. Capacity covers every chunk the
949    /// consumer can hold at once — the whole forward ring plus the current
950    /// chunk — so a seek that drains the forward ring back into here never
951    /// overflows and the real-time push stays infallible. No wake handle:
952    /// the worker is already woken on every `recv_outcome`, and the drain is
953    /// not latency-sensitive.
954    fn create_trash_channel(
955        pcm_buffer_chunks: usize,
956    ) -> (
957        crate::runtime::Outlet<PcmChunk>,
958        crate::runtime::Inlet<PcmChunk>,
959    ) {
960        crate::runtime::connect::<PcmChunk>(pcm_buffer_chunks.max(1) + 2, None)
961    }
962
963    fn create_decoder_factory(
964        decoder_backend: kithara_decode::DecoderBackend,
965        epoch: &Arc<AtomicU64>,
966        byte_len_handle: &Arc<AtomicU64>,
967        pool: &PcmPool,
968        byte_pool: &kithara_bufpool::BytePool,
969    ) -> crate::pipeline::source::DecoderFactory<T> {
970        let factory_epoch = Arc::clone(epoch);
971        let factory_byte_len = Arc::clone(byte_len_handle);
972        let factory_pool = pool.clone();
973        let factory_byte_pool = byte_pool.clone();
974        Box::new(move |stream, info, base_offset| {
975            let byte_len = stream
976                .len()
977                .map_or(0, |len| len.saturating_sub(base_offset));
978            factory_byte_len.store(byte_len, Ordering::Release);
979            let config = kithara_decode::DecoderConfig::builder()
980                .backend(decoder_backend)
981                .byte_len_handle(Arc::clone(&factory_byte_len))
982                .pcm_pool(factory_pool.clone())
983                .byte_pool(factory_byte_pool.clone())
984                .epoch(factory_epoch.load(Ordering::Acquire))
985                .maybe_segment_layout(stream.as_segment_layout())
986                .maybe_hooks(stream.take_reader_hooks())
987                .build();
988            let source = OffsetReader::new(stream.clone(), base_offset);
989            match DecoderFactory::create_from_media_info(source, info, &config) {
990                Ok(d) => {
991                    d.update_byte_len(byte_len);
992                    Ok(d)
993                }
994                Err(e) => {
995                    warn!(?e, "failed to recreate decoder");
996                    Err(e)
997                }
998            }
999        })
1000    }
1001
1002    fn create_emit(bus: &EventBus) -> Box<dyn Fn(AudioEvent) + Send> {
1003        let emit_bus = bus.clone();
1004        Box::new(move |event: AudioEvent| {
1005            emit_bus.publish(event);
1006        })
1007    }
1008
1009    async fn create_initial_decoder(
1010        shared_stream: SharedStream<T>,
1011        initial_media_info: Option<MediaInfo>,
1012        hint: Option<String>,
1013        pcm_pool: PcmPool,
1014        byte_pool: kithara_bufpool::BytePool,
1015        decoder_backend: kithara_decode::DecoderBackend,
1016    ) -> Result<Box<dyn kithara_decode::Decoder>, DecodeError> {
1017        debug!("Audio::new — spawning decoder creation...");
1018        let byte_len_handle = Arc::new(AtomicU64::new(shared_stream.len().unwrap_or(0)));
1019        let decoder_config = kithara_decode::DecoderConfig::builder()
1020            .backend(decoder_backend)
1021            .byte_len_handle(byte_len_handle)
1022            .pcm_pool(pcm_pool)
1023            .byte_pool(byte_pool)
1024            .maybe_segment_layout(shared_stream.as_segment_layout())
1025            .maybe_hooks(shared_stream.take_reader_hooks())
1026            .maybe_hint(hint.clone())
1027            .build();
1028        let hint_for_decoder = hint;
1029        let initial_media_info_for_decoder = initial_media_info;
1030        let decoder = spawn_blocking(move || {
1031            if let Some(ref info) = initial_media_info_for_decoder {
1032                DecoderFactory::create_from_media_info(shared_stream, info, &decoder_config)
1033            } else {
1034                DecoderFactory::create_with_probe(
1035                    shared_stream,
1036                    hint_for_decoder.as_deref(),
1037                    &decoder_config,
1038                )
1039            }
1040        })
1041        .await
1042        .map_err(|e| DecodeError::Io(IoError::other(format!("decoder task panicked: {e}"))))??;
1043        debug!("Audio::new — decoder created");
1044        Ok(decoder)
1045    }
1046
1047    async fn create_stream_with_probe(
1048        stream_config: T::Config,
1049        byte_pool: kithara_bufpool::BytePool,
1050    ) -> Result<Stream<T>, DecodeError> {
1051        let stream = Self::open_stream(stream_config).await?;
1052        Self::spawn_probe(stream, byte_pool).await
1053    }
1054
1055    /// Get a reference to the underlying `EventBus`.
1056    ///
1057    /// Useful for passing to downstream components that also publish events.
1058    #[must_use]
1059    pub fn event_bus(&self) -> &EventBus {
1060        &self.bus
1061    }
1062
1063    /// Subscribe to unified events via the `EventBus`.
1064    ///
1065    /// Returns a receiver for all events published to the bus.
1066    #[must_use]
1067    pub fn events(&self) -> kithara_events::EventReceiver {
1068        self.bus.subscribe()
1069    }
1070
1071    fn log_pipeline_ready(
1072        initial_spec: PcmSpec,
1073        output_spec: PcmSpec,
1074        host_sample_rate: &Arc<AtomicU32>,
1075    ) {
1076        info!(
1077            ?initial_spec,
1078            ?output_spec,
1079            host_sr = host_sample_rate.load(Ordering::Relaxed),
1080            "Audio pipeline created"
1081        );
1082    }
1083
1084    async fn open_stream(stream_config: T::Config) -> Result<Stream<T>, DecodeError> {
1085        debug!("Audio::new — creating Stream...");
1086        let stream = Stream::<T>::new(stream_config)
1087            .await
1088            .map_err(|e| DecodeError::Io(IoError::other(e.to_string())))?;
1089        debug!("Audio::new — Stream created");
1090        Ok(stream)
1091    }
1092
1093    fn probe_stream_blocking(
1094        mut stream: Stream<T>,
1095        byte_pool: &kithara_bufpool::BytePool,
1096    ) -> Result<Stream<T>, DecodeError> {
1097        let mut probe_buf = byte_pool.get_with(|b| b.resize(Self::PROBE_BUFFER_SIZE, 0));
1098        if let Err(e) = stream.read(&mut probe_buf) {
1099            tracing::debug!(?e, "probe_stream_blocking: probe read failed");
1100        }
1101        stream.seek(SeekFrom::Start(0)).map_err(DecodeError::Io)?;
1102        Ok(stream)
1103    }
1104
1105    fn resolve_event_bus(stream_config: &T::Config, config_bus: Option<EventBus>) -> EventBus {
1106        T::event_bus(stream_config)
1107            .or(config_bus)
1108            .unwrap_or_default()
1109    }
1110
1111    #[cfg(not(target_arch = "wasm32"))]
1112    async fn spawn_probe(
1113        stream: Stream<T>,
1114        byte_pool: kithara_bufpool::BytePool,
1115    ) -> Result<Stream<T>, DecodeError> {
1116        debug!("Audio::new — spawning probe task...");
1117        let result = spawn_blocking(move || Self::probe_stream_blocking(stream, &byte_pool))
1118            .await
1119            .map_err(|e| DecodeError::Io(IoError::other(format!("probe task panicked: {e}"))))??;
1120        debug!("Audio::new — probe task done");
1121        Ok(result)
1122    }
1123
1124    /// Wasm probe path: the browser tokio runtime is single-threaded
1125    /// and `spawn_blocking` requires `Send` — but `Stream<T>` is
1126    /// `!Send` because it holds JS-backed network streams. Probe runs
1127    /// inline on the calling task.
1128    #[cfg(target_arch = "wasm32")]
1129    async fn spawn_probe(
1130        stream: Stream<T>,
1131        byte_pool: kithara_bufpool::BytePool,
1132    ) -> Result<Stream<T>, DecodeError> {
1133        debug!("Audio::new — running probe inline (wasm)...");
1134        let result = Self::probe_stream_blocking(stream, &byte_pool)?;
1135        debug!("Audio::new — probe done");
1136        Ok(result)
1137    }
1138}
1139
1140/// Merge user-supplied `MediaInfo` over the stream's declarative info.
1141///
1142/// Keeps user's specific fields and fills `None` fields from the stream.
1143/// The result is the single source of truth for what kind of decoder is
1144/// being run: the initial-decoder factory probes with it AND the FSM's
1145/// `session.media_info` is seeded with it. Without the merge, user's
1146/// container override (e.g. Wav) would be silently dropped at session
1147/// seeding, and `detect_format_change` would later treat the stream's
1148/// declarative container (e.g. Fmp4 inferred from EXT-X-MAP URL
1149/// extension) as authoritative.
1150fn merge_user_and_stream_media_info(
1151    user: Option<MediaInfo>,
1152    stream: Option<MediaInfo>,
1153) -> Option<MediaInfo> {
1154    match (user, stream) {
1155        (Some(user), Some(stream)) => {
1156            let mut merged = user;
1157            if merged.codec.is_none() {
1158                merged.codec = stream.codec;
1159            }
1160            if merged.container.is_none() {
1161                merged.container = stream.container;
1162            }
1163            if merged.channels.is_none() {
1164                merged.channels = stream.channels;
1165            }
1166            if merged.sample_rate.is_none() {
1167                merged.sample_rate = stream.sample_rate;
1168            }
1169            if merged.variant_index.is_none() {
1170                merged.variant_index = stream.variant_index;
1171            }
1172            Some(merged)
1173        }
1174        (Some(user), None) => Some(user),
1175        (None, stream) => stream,
1176    }
1177}
1178
1179impl<S> Drop for Audio<S> {
1180    fn drop(&mut self) {
1181        if let Some(ref cancel) = self.cancel {
1182            cancel.cancel();
1183        }
1184
1185        if let (Some(worker), Some(track_id)) = (&self.worker, self.track_id.take()) {
1186            worker.unregister_track(track_id);
1187
1188            if self.is_standalone_worker {
1189                worker.shutdown();
1190            }
1191        }
1192    }
1193}
1194
1195impl<S: kithara_platform::MaybeSend> PcmReader for Audio<S> {
1196    fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
1197        self.abr_handle.clone()
1198    }
1199
1200    fn duration(&self) -> Option<Duration> {
1201        Self::duration(self)
1202    }
1203
1204    fn event_bus(&self) -> &EventBus {
1205        &self.bus
1206    }
1207
1208    fn metadata(&self) -> &TrackMetadata {
1209        Self::metadata(self)
1210    }
1211
1212    fn next_chunk(&mut self) -> Result<ChunkOutcome, DecodeError> {
1213        self.preloaded = true;
1214        let chunk = if let Some(c) = self.current_chunk.take() {
1215            c
1216        } else if let Some(c) = self.recv_valid_chunk() {
1217            c
1218        } else {
1219            let position = self.position();
1220            return match self.consumer_phase {
1221                ConsumerPhase::AtEof => Ok(ChunkOutcome::Eof { position }),
1222                ConsumerPhase::Failed => Err(DecodeError::Io(IoError::other(
1223                    "pcm channel closed / producer failed",
1224                ))),
1225                ConsumerPhase::SeekPending { .. } => Ok(ChunkOutcome::Pending {
1226                    position,
1227                    reason: PendingReason::SeekInProgress,
1228                }),
1229                _ => Ok(ChunkOutcome::Pending {
1230                    position,
1231                    reason: PendingReason::Buffering,
1232                }),
1233            };
1234        };
1235        self.spec = chunk.spec();
1236
1237        if matches!(
1238            self.consumer_phase,
1239            ConsumerPhase::Buffering | ConsumerPhase::SeekPending { .. }
1240        ) {
1241            self.consumer_phase = ConsumerPhase::Playing;
1242        }
1243
1244        self.timeline
1245            .advance_committed_chunk(&kithara_stream::ChunkPosition::from(&chunk.meta));
1246        Ok(ChunkOutcome::Chunk(chunk))
1247    }
1248
1249    fn position(&self) -> Duration {
1250        Self::position(self)
1251    }
1252
1253    fn preload(&mut self) -> Result<(), DecodeError> {
1254        Self::preload(self)
1255    }
1256
1257    fn preload_notify(&self) -> Option<Arc<Notify>> {
1258        Some(self.preload_notify.clone())
1259    }
1260
1261    fn read(&mut self, buf: &mut [f32]) -> Result<ReadOutcome, DecodeError> {
1262        Self::read(self, buf)
1263    }
1264
1265    #[cfg_attr(feature = "perf", hotpath::measure)]
1266    fn read_planar<'a>(
1267        &mut self,
1268        output: &'a mut [&'a mut [f32]],
1269    ) -> Result<ReadOutcome, DecodeError> {
1270        let channels = output.len();
1271        if channels == 0 {
1272            return Ok(ReadOutcome::Pending {
1273                reason: PendingReason::Buffering,
1274                position: self.position(),
1275            });
1276        }
1277        let frames = output[0].len();
1278        let total_samples = frames * channels;
1279
1280        // Detach the held, pre-sized scratch so we can pass it to `self.read`
1281        // (which needs `&mut self`); restore it before returning. Pre-sized in
1282        // `new`, so this `resize` stays within capacity and never reallocates
1283        // on the audio thread.
1284        let mut interleaved = self
1285            .interleaved
1286            .take()
1287            .unwrap_or_else(|| self.pcm_pool.get());
1288        interleaved.clear();
1289        interleaved.resize(total_samples, 0.0);
1290        debug_assert!(
1291            interleaved.capacity() >= total_samples,
1292            "Audio::read_planar scratch undersized: capacity={} < total_samples={total_samples}",
1293            interleaved.capacity(),
1294        );
1295
1296        let result = match self.read(&mut interleaved[..]) {
1297            Ok(ReadOutcome::Eof { position }) => Ok(ReadOutcome::Eof { position }),
1298            Ok(ReadOutcome::Pending { reason, position }) => {
1299                Ok(ReadOutcome::Pending { reason, position })
1300            }
1301            Ok(ReadOutcome::Frames { count, position }) => {
1302                let actual_frames = count.get() / channels;
1303                debug_assert!(
1304                    actual_frames <= frames,
1305                    "Audio::read_planar Frames contract: actual_frames={actual_frames} \
1306                     > per-channel buf frames={frames}",
1307                );
1308                let num_channels =
1309                    NonZeroUsize::new(channels).expect("channels checked non-zero above");
1310                deinterleave_variable(&interleaved[..], num_channels, output, 0..actual_frames);
1311                NonZeroUsize::new(actual_frames).map_or(
1312                    Ok(ReadOutcome::Pending {
1313                        position,
1314                        reason: PendingReason::Buffering,
1315                    }),
1316                    |actual| {
1317                        Ok(ReadOutcome::Frames {
1318                            position,
1319                            count: actual,
1320                        })
1321                    },
1322                )
1323            }
1324            Err(err) => Err(err),
1325        };
1326
1327        self.interleaved = Some(interleaved);
1328        result
1329    }
1330
1331    fn seek(&mut self, position: Duration) -> Result<SeekOutcome, DecodeError> {
1332        Self::seek(self, position)
1333    }
1334
1335    fn set_host_sample_rate(&self, sample_rate: NonZeroU32) {
1336        self.host_sample_rate
1337            .store(sample_rate.get(), Ordering::Relaxed);
1338    }
1339    fn set_playback_rate(&self, rate: f32) {
1340        self.playback_rate.store(rate, Ordering::Relaxed);
1341    }
1342
1343    fn set_service_class(&self, class: ServiceClass) {
1344        self.service_class.store(class);
1345        if let Some(worker) = &self.worker {
1346            worker.wake();
1347        }
1348    }
1349
1350    fn spec(&self) -> PcmSpec {
1351        Self::spec(self)
1352    }
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357    use std::{
1358        marker::PhantomData,
1359        sync::{
1360            Arc,
1361            atomic::{AtomicU32, AtomicU64},
1362        },
1363    };
1364
1365    use kithara_test_utils::kithara;
1366
1367    use super::*;
1368
1369    fn empty_audio() -> Audio<()> {
1370        let (_data_tx, pcm_rx) = crate::runtime::connect::<Fetch<PcmChunk>>(1, None);
1371        let (trash_tx, _trash_rx) = crate::runtime::connect::<PcmChunk>(8, None);
1372
1373        Audio {
1374            pcm_rx,
1375            trash_tx,
1376            _epoch: Arc::new(AtomicU64::new(0)),
1377            validator: EpochValidator::default(),
1378            spec: PcmSpec::default(),
1379            current_chunk: None,
1380            current_chunk_consumed_frames: 0,
1381            consumer_phase: ConsumerPhase::Buffering,
1382            timeline: Timeline::new(),
1383            metadata: TrackMetadata::default(),
1384            bus: EventBus::default(),
1385            cancel: None,
1386            interleaved: None,
1387            pcm_pool: PcmPool::default().clone(),
1388            host_sample_rate: Arc::new(AtomicU32::new(0)),
1389            playback_rate: Arc::new(AtomicF32::new(1.0)),
1390            preload_notify: Arc::new(Notify::new()),
1391            preloaded: false,
1392            track_id: None,
1393            worker: None,
1394            service_class: Arc::new(AtomicServiceClass::new(ServiceClass::default())),
1395            reader_wake: Arc::new(ThreadWake::default()),
1396            is_standalone_worker: false,
1397            abr_handle: None,
1398            _marker: PhantomData,
1399        }
1400    }
1401
1402    #[cfg(not(target_arch = "wasm32"))]
1403    #[kithara::test(env(KITHARA_HANG_TIMEOUT_SECS = "1"))]
1404    #[should_panic(expected = "recv_outcome_blocking")]
1405    fn blocking_recv_without_preload_panics_when_no_chunk_arrives() {
1406        let mut audio = empty_audio();
1407        let _ = audio.recv_valid_chunk();
1408    }
1409
1410    #[cfg(not(target_arch = "wasm32"))]
1411    #[kithara::test]
1412    fn blocking_recv_returns_closed_after_cancel() {
1413        let mut audio = empty_audio();
1414        let cancel = CancellationToken::new();
1415        cancel.cancel();
1416        audio.cancel = Some(cancel);
1417
1418        assert!(matches!(audio.recv_outcome(), RecvOutcome::Closed));
1419    }
1420
1421    #[kithara::test]
1422    fn preloaded_recv_is_nonblocking() {
1423        let mut audio = empty_audio();
1424        audio.preload().expect("preload");
1425
1426        assert!(matches!(audio.recv_outcome(), RecvOutcome::Empty));
1427    }
1428
1429    fn audio_with_channel() -> (Audio<()>, crate::runtime::Outlet<Fetch<PcmChunk>>) {
1430        let (data_tx, pcm_rx) = crate::runtime::connect::<Fetch<PcmChunk>>(4, None);
1431        let (trash_tx, _trash_rx) = crate::runtime::connect::<PcmChunk>(8, None);
1432
1433        let audio = Audio {
1434            pcm_rx,
1435            trash_tx,
1436            _epoch: Arc::new(AtomicU64::new(0)),
1437            validator: EpochValidator::default(),
1438            spec: PcmSpec::default(),
1439            current_chunk: None,
1440            current_chunk_consumed_frames: 0,
1441            consumer_phase: ConsumerPhase::Buffering,
1442            timeline: Timeline::new(),
1443            metadata: TrackMetadata::default(),
1444            bus: EventBus::default(),
1445            cancel: None,
1446            interleaved: None,
1447            pcm_pool: PcmPool::default().clone(),
1448            host_sample_rate: Arc::new(AtomicU32::new(0)),
1449            playback_rate: Arc::new(AtomicF32::new(1.0)),
1450            preload_notify: Arc::new(Notify::new()),
1451            preloaded: true,
1452            track_id: None,
1453            worker: None,
1454            service_class: Arc::new(AtomicServiceClass::new(ServiceClass::default())),
1455            reader_wake: Arc::new(ThreadWake::default()),
1456            is_standalone_worker: false,
1457            abr_handle: None,
1458            _marker: PhantomData,
1459        };
1460        (audio, data_tx)
1461    }
1462
1463    fn make_chunk(samples: &[f32]) -> PcmChunk {
1464        let mut chunk = PcmChunk::default();
1465        chunk.pcm.clear();
1466        chunk.pcm.extend_from_slice(samples);
1467        chunk
1468    }
1469
1470    #[kithara::test]
1471    fn consumer_phase_starts_buffering() {
1472        let audio = empty_audio();
1473        assert_eq!(audio.consumer_phase, ConsumerPhase::Buffering);
1474    }
1475
1476    #[kithara::test]
1477    fn consumer_phase_transitions_to_playing_on_first_chunk() {
1478        let (mut audio, mut tx) = audio_with_channel();
1479        let chunk = make_chunk(&[0.1, 0.2]);
1480        let fetch = Fetch::new(chunk, false, 0);
1481        tx.try_push(fetch).ok();
1482
1483        assert!(audio.fill_buffer());
1484        assert_eq!(audio.consumer_phase, ConsumerPhase::Playing);
1485    }
1486
1487    #[kithara::test]
1488    fn consumer_phase_transitions_to_seek_pending() {
1489        let (mut audio, _tx) = audio_with_channel();
1490        audio.seek(Duration::from_secs(5)).ok();
1491        assert!(matches!(
1492            audio.consumer_phase,
1493            ConsumerPhase::SeekPending { .. }
1494        ));
1495    }
1496
1497    #[kithara::test]
1498    fn consumer_phase_seek_pending_to_playing_on_chunk() {
1499        let (mut audio, mut tx) = audio_with_channel();
1500
1501        audio.seek(Duration::from_secs(5)).ok();
1502        let epoch = audio.validator.epoch;
1503
1504        let chunk = make_chunk(&[0.1, 0.2]);
1505        let fetch = Fetch::new(chunk, false, epoch);
1506        tx.try_push(fetch).ok();
1507
1508        assert!(audio.fill_buffer());
1509        assert_eq!(audio.consumer_phase, ConsumerPhase::Playing);
1510    }
1511
1512    #[kithara::test]
1513    fn consumer_phase_eof_terminates() {
1514        let (mut audio, mut tx) = audio_with_channel();
1515
1516        let fetch = Fetch::new(PcmChunk::default(), true, 0);
1517        tx.try_push(fetch).ok();
1518
1519        let result = audio.recv_valid_chunk();
1520        assert!(result.is_none());
1521        assert_eq!(audio.consumer_phase, ConsumerPhase::AtEof);
1522        let mut buf = [0.0f32; 16];
1523        assert!(matches!(audio.read(&mut buf), Ok(ReadOutcome::Eof { .. })));
1524    }
1525
1526    #[kithara::test]
1527    fn consumer_phase_failed_on_channel_close() {
1528        let (mut audio, _tx) = audio_with_channel();
1529        let cancel = CancellationToken::new();
1530        cancel.cancel();
1531        audio.cancel = Some(cancel);
1532        audio.preloaded = false;
1533
1534        let result = audio.recv_valid_chunk();
1535        assert!(result.is_none());
1536        assert_eq!(audio.consumer_phase, ConsumerPhase::Failed);
1537        let mut buf = [0.0f32; 16];
1538        assert!(matches!(audio.read(&mut buf), Err(DecodeError::Io(_))));
1539    }
1540
1541    #[kithara::test]
1542    fn consumer_does_not_park_in_terminal_phase() {
1543        let (mut audio, _tx) = audio_with_channel();
1544        audio.consumer_phase = ConsumerPhase::AtEof;
1545
1546        let mut buf = [0.0f32; 16];
1547        assert!(matches!(audio.read(&mut buf), Ok(ReadOutcome::Eof { .. })));
1548    }
1549
1550    #[kithara::test]
1551    fn process_fetch_must_distinguish_failure_from_natural_eof() {
1552        let (mut audio_eof, mut tx_eof) = audio_with_channel();
1553        tx_eof
1554            .try_push(Fetch::new(PcmChunk::default(), true, 0))
1555            .expect("push natural-eof marker");
1556        let _ = audio_eof.recv_valid_chunk();
1557        assert_eq!(audio_eof.consumer_phase, ConsumerPhase::AtEof);
1558
1559        let (mut audio_failure, mut tx_failure) = audio_with_channel();
1560        tx_failure
1561            .try_push(Fetch::failure(PcmChunk::default(), 0))
1562            .expect("push failure marker");
1563        let _ = audio_failure.recv_valid_chunk();
1564
1565        assert_ne!(
1566            audio_failure.consumer_phase,
1567            ConsumerPhase::AtEof,
1568            "process_fetch must not collapse FetchKind::Failure into \
1569             ConsumerPhase::AtEof — AtEof means 'clip finished' and is \
1570             used by PlayerTrack to finalize; a transient failure must \
1571             land in a distinct non-natural-eof state so the pipeline \
1572             can recover instead of removing the track from the arena"
1573        );
1574        assert_eq!(
1575            audio_failure.consumer_phase,
1576            ConsumerPhase::Failed,
1577            "failure marker must route to ConsumerPhase::Failed"
1578        );
1579    }
1580}