use std::{
io::{Error as IoError, Read, Seek, SeekFrom},
marker::PhantomData,
num::{NonZeroU32, NonZeroUsize},
sync::{
Arc,
atomic::{AtomicU32, AtomicU64, Ordering},
},
time::Duration,
};
use fast_interleave::deinterleave_variable;
use kithara_bufpool::{PcmBuf, PcmPool};
use kithara_decode::{DecoderFactory, PcmChunk, PcmMeta, PcmSpec, TrackMetadata};
use kithara_events::{AudioEvent, EventBus, SeekLifecycleStage, SegmentLocation};
#[cfg(target_arch = "wasm32")]
use kithara_platform::thread::{is_worker_thread, sleep as thread_sleep};
use kithara_platform::{
thread::park_timeout,
tokio::{sync::Notify, task::spawn_blocking},
};
use kithara_stream::{MediaInfo, Stream, StreamType, Timeline};
use kithara_test_utils::kithara;
use portable_atomic::AtomicF32;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use crate::{
pipeline::{
config::{AudioConfig, create_effects, expected_output_spec},
fetch::{EpochValidator, Fetch, FetchKind},
source::{OffsetReader, SharedStream, StreamAudioSource},
track_fsm::ConsumerPhase,
},
runtime::AtomicServiceClass,
traits::{ChunkOutcome, DecodeError, PcmReader, PendingReason, ReadOutcome, SeekOutcome},
worker::{
handle::{AudioWorkerHandle, TrackRegistration},
thread_wake::ThreadWake,
types::{ServiceClass, TrackId},
},
};
fn clamp_u128_to_u64_millis(ms: u128) -> u64 {
num_traits::cast::ToPrimitive::to_u64(&ms).unwrap_or(u64::MAX)
}
fn frames_to_samples(frames: u64, channels: u64) -> Result<usize, DecodeError> {
let samples = frames.saturating_mul(channels);
usize::try_from(samples).map_err(|err| {
DecodeError::Io(IoError::other(format!(
"frames*channels overflow: {samples} does not fit usize: {err}"
)))
})
}
enum FetchOutcome {
Continue,
Return(Option<PcmChunk>),
}
enum RecvOutcome {
Closed,
Empty,
Item(Fetch<PcmChunk>),
}
pub struct Audio<S> {
pub(crate) preload_notify: Arc<Notify>,
pub(crate) consumer_phase: ConsumerPhase,
pub(crate) validator: EpochValidator,
pub(crate) current_chunk: Option<PcmChunk>,
pub(crate) spec: PcmSpec,
pub(crate) timeline: Timeline,
pub(crate) current_chunk_consumed_frames: u64,
_epoch: Arc<AtomicU64>,
host_sample_rate: Arc<AtomicU32>,
playback_rate: Arc<AtomicF32>,
reader_wake: Arc<ThreadWake>,
bus: EventBus,
pcm_rx: crate::runtime::Inlet<Fetch<PcmChunk>>,
trash_tx: crate::runtime::Outlet<PcmChunk>,
abr_handle: Option<kithara_abr::AbrHandle>,
cancel: Option<CancellationToken>,
track_id: Option<TrackId>,
worker: Option<AudioWorkerHandle>,
service_class: Arc<AtomicServiceClass>,
pcm_pool: PcmPool,
interleaved: Option<PcmBuf>,
_marker: PhantomData<S>,
metadata: TrackMetadata,
is_standalone_worker: bool,
preloaded: bool,
}
impl<S> Audio<S> {
const PROBE_BUFFER_SIZE: usize = 1024;
const WARM_DECODE_FRAMES: usize = 4608;
const RECV_BACKOFF: Duration = Duration::from_micros(100);
fn warm_pcm_pool(pool: &PcmPool, channels: usize, chunks: usize) {
if pool.allocated_bytes() != 0 {
return;
}
let capacity = Self::WARM_DECODE_FRAMES * channels.max(1);
let count = chunks.saturating_mul(2).max(1);
pool.pre_warm(count, |buf| {
buf.clear();
buf.resize(capacity, 0.0);
});
}
fn alloc_interleaved_scratch(pool: &PcmPool, spec: PcmSpec) -> PcmBuf {
let channels = usize::from(spec.channels).max(2);
let sample_rate = usize::try_from(spec.sample_rate).unwrap_or(usize::MAX);
let capacity = sample_rate.saturating_mul(channels);
pool.get_with(|buf| {
buf.clear();
let cap = buf.capacity();
if cap < capacity {
buf.reserve(capacity - cap);
}
})
}
#[must_use]
pub fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
self.abr_handle.clone()
}
fn close_channel_and_mark_eof(&mut self) -> Option<PcmChunk> {
self.consumer_phase = ConsumerPhase::Failed;
None
}
#[must_use]
pub fn current_variant(&self) -> Option<kithara_events::VariantInfo> {
self.abr_handle.as_ref()?.current_variant()
}
#[must_use]
pub fn duration(&self) -> Option<Duration> {
self.timeline.total_duration()
}
fn emit_audio_event(&self, event: AudioEvent) {
self.bus.publish(event);
}
fn emit_playback_progress(&self) {
let position_ms = clamp_u128_to_u64_millis(self.position().as_millis());
let total_ms = self
.timeline
.total_duration()
.map(|duration| clamp_u128_to_u64_millis(duration.as_millis()));
self.emit_audio_event(AudioEvent::PlaybackProgress {
position_ms,
total_ms,
seek_epoch: self.validator.epoch,
});
}
fn emit_post_seek_output_commit(&mut self, meta: Option<PcmMeta>) {
let Some(seek_epoch) = self.timeline.pending_seek_epoch() else {
return;
};
if seek_epoch != self.validator.epoch {
return;
}
let variant = meta.as_ref().and_then(|m| m.variant_index);
let segment_index = meta.as_ref().and_then(|m| m.segment_index);
self.emit_audio_event(AudioEvent::SeekLifecycle {
seek_epoch,
stage: SeekLifecycleStage::OutputCommitted,
location: SegmentLocation::new(variant, segment_index, None, None),
});
self.emit_audio_event(AudioEvent::SeekComplete {
seek_epoch,
position: (*self).position(),
});
let _ = self.timeline.did_clear_pending_seek_epoch(seek_epoch);
}
pub(crate) fn fill_buffer(&mut self) -> bool {
let Some(chunk) = self.recv_valid_chunk() else {
return false;
};
self.spec = chunk.spec();
self.current_chunk = Some(chunk);
self.current_chunk_consumed_frames = 0;
if matches!(
self.consumer_phase,
ConsumerPhase::Buffering | ConsumerPhase::SeekPending { .. }
) {
self.consumer_phase = ConsumerPhase::Playing;
}
true
}
#[must_use]
pub fn is_preloaded(&self) -> bool {
self.preloaded
}
#[must_use]
pub fn metadata(&self) -> &TrackMetadata {
&self.metadata
}
#[must_use]
pub fn position(&self) -> Duration {
self.timeline.committed_position()
}
pub fn preload(&mut self) -> Result<(), DecodeError> {
self.preloaded = true;
if self.current_chunk.is_none() && self.consumer_phase != ConsumerPhase::AtEof {
self.fill_buffer();
if self.consumer_phase == ConsumerPhase::Failed {
return Err(DecodeError::Io(IoError::other(
"pcm channel closed during preload",
)));
}
}
Ok(())
}
fn process_fetch(&mut self, fetch: Fetch<PcmChunk>) -> FetchOutcome {
if !self.validator.is_valid(&fetch) {
self.discard_chunk(fetch.into_inner());
return FetchOutcome::Continue;
}
match fetch.kind {
FetchKind::NaturalEof => {
self.consumer_phase = ConsumerPhase::AtEof;
self.discard_chunk(fetch.into_inner());
FetchOutcome::Return(None)
}
FetchKind::Failure => {
self.consumer_phase = ConsumerPhase::Failed;
self.discard_chunk(fetch.into_inner());
FetchOutcome::Return(None)
}
FetchKind::Data => FetchOutcome::Return(Some(fetch.into_inner())),
}
}
#[cfg_attr(feature = "perf", hotpath::measure)]
#[kithara::hang_watchdog]
pub fn read(&mut self, buf: &mut [f32]) -> Result<ReadOutcome, DecodeError> {
if buf.is_empty() {
return Ok(ReadOutcome::Pending {
reason: PendingReason::Buffering,
position: self.position(),
});
}
match self.consumer_phase {
ConsumerPhase::AtEof if self.current_chunk.is_none() => {
return Ok(ReadOutcome::Eof {
position: self.position(),
});
}
ConsumerPhase::Failed => {
return Err(DecodeError::Io(IoError::other(
"pcm channel closed / producer failed",
)));
}
_ => {}
}
let mut written = 0;
let mut last_output_meta: Option<PcmMeta> = None;
while written < buf.len() {
hang_tick!();
if let Some(chunk) = self.current_chunk.as_ref() {
let channels = u64::from(chunk.meta.spec.channels.max(1));
let chunk_total_frames = u64::from(chunk.meta.frames);
let consumed_frames_in_chunk = self.current_chunk_consumed_frames;
if consumed_frames_in_chunk >= chunk_total_frames {
self.recycle_current_chunk();
if !self.fill_buffer() {
break;
}
continue;
}
let remaining_frames = chunk_total_frames - consumed_frames_in_chunk;
let space_frames = ((buf.len() - written) as u64) / channels.max(1);
let take_frames = remaining_frames.min(space_frames);
if take_frames == 0 {
break;
}
hang_reset!();
let start_sample = frames_to_samples(consumed_frames_in_chunk, channels)?;
let take_samples = frames_to_samples(take_frames, channels)?;
buf[written..written + take_samples]
.copy_from_slice(&chunk.pcm[start_sample..start_sample + take_samples]);
last_output_meta = Some(chunk.meta);
written += take_samples;
let final_segment = take_frames == remaining_frames;
let consumed_total = consumed_frames_in_chunk + take_frames;
self.current_chunk_consumed_frames = consumed_total;
if final_segment {
self.timeline
.advance_committed_chunk(&kithara_stream::ChunkPosition::from(&chunk.meta));
self.recycle_current_chunk();
} else {
let total_frames = chunk_total_frames.max(1);
let start_ns =
u64::try_from(chunk.meta.timestamp.as_nanos()).unwrap_or(u64::MAX);
let end_ns =
u64::try_from(chunk.meta.end_timestamp.as_nanos()).unwrap_or(u64::MAX);
let span_ns = u128::from(end_ns.saturating_sub(start_ns));
let consumed_ns_offset =
span_ns * u128::from(consumed_total) / u128::from(total_frames);
let interpolated = u128::from(start_ns).saturating_add(consumed_ns_offset);
let interpolated_ns = u64::try_from(interpolated).unwrap_or(u64::MAX);
self.timeline
.set_committed_position(Duration::from_nanos(interpolated_ns));
}
}
if written >= buf.len() {
break;
}
if !self.fill_buffer() {
break;
}
}
if let Some(count) = NonZeroUsize::new(written) {
debug_assert!(
count.get() <= buf.len(),
"Audio::read Frames contract violated: count={c} > buf.len()={b}",
c = count.get(),
b = buf.len(),
);
self.emit_post_seek_output_commit(last_output_meta);
self.emit_playback_progress();
let position = self.position();
debug_assert!(
self.timeline
.total_duration()
.is_none_or(|dur| position <= dur),
"Audio::read Frames contract: position={position:?} > duration={:?}",
self.timeline.total_duration(),
);
return Ok(ReadOutcome::Frames { count, position });
}
let position = self.position();
match self.consumer_phase {
ConsumerPhase::AtEof => Ok(ReadOutcome::Eof { position }),
ConsumerPhase::Failed => Err(DecodeError::Io(IoError::other(
"pcm channel closed / producer failed",
))),
ConsumerPhase::SeekPending { .. } => Ok(ReadOutcome::Pending {
position,
reason: PendingReason::SeekInProgress,
}),
_ => Ok(ReadOutcome::Pending {
position,
reason: PendingReason::Buffering,
}),
}
}
fn recv_outcome(&mut self) -> RecvOutcome {
if self.use_nonblocking_recv() {
if let Some(fetch) = self.pcm_rx.try_pop() {
self.wake_worker();
return RecvOutcome::Item(fetch);
}
return RecvOutcome::Empty;
}
self.recv_outcome_blocking()
}
#[kithara::hang_watchdog]
fn recv_outcome_blocking(&mut self) -> RecvOutcome {
loop {
if let Some(fetch) = self.pcm_rx.try_pop() {
hang_reset!();
self.wake_worker();
return RecvOutcome::Item(fetch);
}
if self
.cancel
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
hang_reset!();
return RecvOutcome::Closed;
}
self.wake_worker();
self.reader_wake.register_current();
if let Some(fetch) = self.pcm_rx.try_pop() {
hang_reset!();
self.wake_worker();
return RecvOutcome::Item(fetch);
}
if self
.cancel
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
hang_reset!();
return RecvOutcome::Closed;
}
hang_tick!();
Self::wait_for_fetch();
}
}
#[kithara::hang_watchdog]
fn recv_valid_chunk(&mut self) -> Option<PcmChunk> {
if self.consumer_phase.is_terminal() {
return None;
}
loop {
match self.recv_outcome() {
RecvOutcome::Item(fetch) => match self.process_fetch(fetch) {
FetchOutcome::Continue => {
hang_tick!();
continue;
}
FetchOutcome::Return(chunk) => {
hang_reset!();
return chunk;
}
},
RecvOutcome::Empty => return None,
RecvOutcome::Closed => {
hang_reset!();
return self.close_channel_and_mark_eof();
}
}
}
}
#[kithara::hang_watchdog]
pub fn seek(&mut self, position: Duration) -> Result<SeekOutcome, DecodeError> {
let epoch = self.timeline.initiate_seek(position);
self.timeline.mark_pending_seek_epoch(epoch);
self.validator.epoch = epoch;
self.recycle_current_chunk();
self.current_chunk_consumed_frames = 0;
self.consumer_phase = ConsumerPhase::SeekPending { epoch };
while let Some(fetch) = self.pcm_rx.try_pop() {
self.discard_chunk(fetch.into_inner());
hang_tick!();
}
if let Some(ref worker) = self.worker {
worker.wake();
}
trace!(?position, epoch, "seek initiated via Timeline");
match self.timeline.total_duration() {
Some(duration) if position >= duration => {
debug_assert!(
position >= duration,
"Audio::seek PastEof contract: target={position:?} < duration={duration:?}",
);
Ok(SeekOutcome::PastEof {
duration,
target: position,
})
}
_ => {
debug_assert!(
self.timeline
.total_duration()
.is_none_or(|dur| position <= dur),
"Audio::seek Landed contract: landed_at={position:?} > duration={:?}",
self.timeline.total_duration(),
);
Ok(SeekOutcome::Landed {
target: position,
landed_at: position,
})
}
}
}
#[must_use]
pub fn spec(&self) -> PcmSpec {
self.spec
}
fn use_nonblocking_recv(&self) -> bool {
#[cfg(target_arch = "wasm32")]
{
true
}
#[cfg(not(target_arch = "wasm32"))]
{
self.is_preloaded()
}
}
fn wait_for_fetch() {
#[cfg(not(target_arch = "wasm32"))]
{
park_timeout(Self::RECV_BACKOFF);
}
#[cfg(target_arch = "wasm32")]
{
if is_worker_thread() {
park_timeout(Self::RECV_BACKOFF);
} else {
thread_sleep(Self::RECV_BACKOFF);
}
}
}
fn wake_worker(&self) {
if let Some(ref worker) = self.worker {
worker.wake();
}
}
fn discard_chunk(&mut self, chunk: PcmChunk) {
if let Err(_overflow) = self.trash_tx.try_push(chunk) {
debug_assert!(
false,
"PCM trash ring overflow — spent buffer freed on the audio thread"
);
}
}
fn recycle_current_chunk(&mut self) {
if let Some(chunk) = self.current_chunk.take() {
self.discard_chunk(chunk);
}
}
}
impl<T> Audio<Stream<T>>
where
T: StreamType<Events = EventBus>,
{
pub async fn new(config: AudioConfig<T>) -> Result<Self, DecodeError> {
let AudioConfig {
byte_pool,
hint,
host_sample_rate: config_host_sr,
media_info: user_media_info,
pcm_buffer_chunks,
pcm_pool: mut pool,
playback_rate: config_playback_rate,
decoder_backend,
preload_chunks,
resampler_quality,
stream: stream_config,
bus: config_bus,
effects: custom_effects,
worker: config_worker,
gapless_mode: config_gapless_mode,
cancel: config_cancel,
} = config;
let cancel = config_cancel.unwrap_or_default();
let bus = Self::resolve_event_bus(&stream_config, config_bus);
let byte_pool = byte_pool.unwrap_or_else(|| kithara_bufpool::BytePool::default().clone());
let stream = Self::create_stream_with_probe(stream_config, byte_pool.clone()).await?;
let initial_byte_len = stream.len().unwrap_or(0);
let timeline = stream.timeline();
let initial_media_info =
merge_user_and_stream_media_info(user_media_info, stream.media_info());
debug!(?initial_media_info, "Initial MediaInfo from stream");
let shared_stream = SharedStream::new(stream);
let byte_len_handle = Arc::new(AtomicU64::new(initial_byte_len));
let pool = pool.get_or_insert_with(|| PcmPool::default().clone());
let warm_channels = initial_media_info
.as_ref()
.and_then(|info| info.channels)
.map_or(2, usize::from);
Self::warm_pcm_pool(pool, warm_channels, pcm_buffer_chunks);
let decoder = Self::create_initial_decoder(
shared_stream.clone(),
initial_media_info.clone(),
hint.clone(),
pool.clone(),
byte_pool.clone(),
decoder_backend,
)
.await?;
let initial_spec = decoder.spec();
let total_duration = decoder.duration().or_else(|| timeline.total_duration());
timeline.set_total_duration(total_duration);
let metadata = decoder.metadata();
let epoch = Arc::new(AtomicU64::new(0));
let host_sample_rate = Arc::new(AtomicU32::new(config_host_sr.map_or(0, NonZeroU32::get)));
let playback_rate = config_playback_rate.unwrap_or_else(|| Arc::new(AtomicF32::new(1.0)));
let output_spec = expected_output_spec(initial_spec, &host_sample_rate);
let effects = create_effects(
initial_spec,
&host_sample_rate,
&playback_rate,
resampler_quality,
Some(pool.clone()),
custom_effects,
);
Self::log_pipeline_ready(initial_spec, output_spec, &host_sample_rate);
let interleaved = Self::alloc_interleaved_scratch(pool, output_spec);
let emit = Self::create_emit(&bus);
let decoder_factory = Self::create_decoder_factory(
decoder_backend,
&epoch,
&byte_len_handle,
pool,
&byte_pool,
);
let initial_variant = initial_media_info.as_ref().and_then(|i| i.variant_index);
let abr_handle = shared_stream.abr_handle();
let audio_source = StreamAudioSource::new(
shared_stream,
decoder,
decoder_factory,
initial_media_info,
Arc::clone(&epoch),
effects,
config_gapless_mode,
)
.with_emit(emit);
bus.publish(AudioEvent::DecoderReady {
base_offset: 0,
variant: initial_variant,
});
let preload_notify = Arc::new(Notify::new());
let reader_wake = Arc::new(ThreadWake::default());
let (data_tx, data_rx) = Self::create_channels(pcm_buffer_chunks, Arc::clone(&reader_wake));
let (trash_tx, trash_inlet) = Self::create_trash_channel(pcm_buffer_chunks);
let (worker, is_standalone) =
config_worker.map_or_else(|| (AudioWorkerHandle::new(), true), |w| (w, false));
let service_class = Arc::new(AtomicServiceClass::new(ServiceClass::default()));
let track_id = worker.register_track(TrackRegistration {
source: Box::new(audio_source),
outlet: data_tx,
trash_inlet,
preload_notify: preload_notify.clone(),
preload_chunks: preload_chunks.get(),
service_class: Arc::clone(&service_class),
});
Ok(Self {
timeline,
metadata,
bus,
host_sample_rate,
playback_rate,
preload_notify,
reader_wake,
abr_handle,
pcm_rx: data_rx,
trash_tx,
_epoch: epoch,
validator: EpochValidator::default(),
spec: output_spec,
current_chunk: None,
current_chunk_consumed_frames: 0,
consumer_phase: ConsumerPhase::Buffering,
cancel: Some(cancel),
interleaved: Some(interleaved),
pcm_pool: pool.clone(),
preloaded: false,
track_id: Some(track_id),
worker: Some(worker),
service_class,
is_standalone_worker: is_standalone,
_marker: PhantomData,
})
}
fn create_channels(
pcm_buffer_chunks: usize,
wake: Arc<ThreadWake>,
) -> (
crate::runtime::Outlet<Fetch<PcmChunk>>,
crate::runtime::Inlet<Fetch<PcmChunk>>,
) {
crate::runtime::connect::<Fetch<PcmChunk>>(pcm_buffer_chunks.max(1), Some(wake))
}
fn create_trash_channel(
pcm_buffer_chunks: usize,
) -> (
crate::runtime::Outlet<PcmChunk>,
crate::runtime::Inlet<PcmChunk>,
) {
crate::runtime::connect::<PcmChunk>(pcm_buffer_chunks.max(1) + 2, None)
}
fn create_decoder_factory(
decoder_backend: kithara_decode::DecoderBackend,
epoch: &Arc<AtomicU64>,
byte_len_handle: &Arc<AtomicU64>,
pool: &PcmPool,
byte_pool: &kithara_bufpool::BytePool,
) -> crate::pipeline::source::DecoderFactory<T> {
let factory_epoch = Arc::clone(epoch);
let factory_byte_len = Arc::clone(byte_len_handle);
let factory_pool = pool.clone();
let factory_byte_pool = byte_pool.clone();
Box::new(move |stream, info, base_offset| {
let byte_len = stream
.len()
.map_or(0, |len| len.saturating_sub(base_offset));
factory_byte_len.store(byte_len, Ordering::Release);
let config = kithara_decode::DecoderConfig::builder()
.backend(decoder_backend)
.byte_len_handle(Arc::clone(&factory_byte_len))
.pcm_pool(factory_pool.clone())
.byte_pool(factory_byte_pool.clone())
.epoch(factory_epoch.load(Ordering::Acquire))
.maybe_segment_layout(stream.as_segment_layout())
.maybe_hooks(stream.take_reader_hooks())
.build();
let source = OffsetReader::new(stream.clone(), base_offset);
match DecoderFactory::create_from_media_info(source, info, &config) {
Ok(d) => {
d.update_byte_len(byte_len);
Ok(d)
}
Err(e) => {
warn!(?e, "failed to recreate decoder");
Err(e)
}
}
})
}
fn create_emit(bus: &EventBus) -> Box<dyn Fn(AudioEvent) + Send> {
let emit_bus = bus.clone();
Box::new(move |event: AudioEvent| {
emit_bus.publish(event);
})
}
async fn create_initial_decoder(
shared_stream: SharedStream<T>,
initial_media_info: Option<MediaInfo>,
hint: Option<String>,
pcm_pool: PcmPool,
byte_pool: kithara_bufpool::BytePool,
decoder_backend: kithara_decode::DecoderBackend,
) -> Result<Box<dyn kithara_decode::Decoder>, DecodeError> {
debug!("Audio::new — spawning decoder creation...");
let byte_len_handle = Arc::new(AtomicU64::new(shared_stream.len().unwrap_or(0)));
let decoder_config = kithara_decode::DecoderConfig::builder()
.backend(decoder_backend)
.byte_len_handle(byte_len_handle)
.pcm_pool(pcm_pool)
.byte_pool(byte_pool)
.maybe_segment_layout(shared_stream.as_segment_layout())
.maybe_hooks(shared_stream.take_reader_hooks())
.maybe_hint(hint.clone())
.build();
let hint_for_decoder = hint;
let initial_media_info_for_decoder = initial_media_info;
let decoder = spawn_blocking(move || {
if let Some(ref info) = initial_media_info_for_decoder {
DecoderFactory::create_from_media_info(shared_stream, info, &decoder_config)
} else {
DecoderFactory::create_with_probe(
shared_stream,
hint_for_decoder.as_deref(),
&decoder_config,
)
}
})
.await
.map_err(|e| DecodeError::Io(IoError::other(format!("decoder task panicked: {e}"))))??;
debug!("Audio::new — decoder created");
Ok(decoder)
}
async fn create_stream_with_probe(
stream_config: T::Config,
byte_pool: kithara_bufpool::BytePool,
) -> Result<Stream<T>, DecodeError> {
let stream = Self::open_stream(stream_config).await?;
Self::spawn_probe(stream, byte_pool).await
}
#[must_use]
pub fn event_bus(&self) -> &EventBus {
&self.bus
}
#[must_use]
pub fn events(&self) -> kithara_events::EventReceiver {
self.bus.subscribe()
}
fn log_pipeline_ready(
initial_spec: PcmSpec,
output_spec: PcmSpec,
host_sample_rate: &Arc<AtomicU32>,
) {
info!(
?initial_spec,
?output_spec,
host_sr = host_sample_rate.load(Ordering::Relaxed),
"Audio pipeline created"
);
}
async fn open_stream(stream_config: T::Config) -> Result<Stream<T>, DecodeError> {
debug!("Audio::new — creating Stream...");
let stream = Stream::<T>::new(stream_config)
.await
.map_err(|e| DecodeError::Io(IoError::other(e.to_string())))?;
debug!("Audio::new — Stream created");
Ok(stream)
}
fn probe_stream_blocking(
mut stream: Stream<T>,
byte_pool: &kithara_bufpool::BytePool,
) -> Result<Stream<T>, DecodeError> {
let mut probe_buf = byte_pool.get_with(|b| b.resize(Self::PROBE_BUFFER_SIZE, 0));
if let Err(e) = stream.read(&mut probe_buf) {
tracing::debug!(?e, "probe_stream_blocking: probe read failed");
}
stream.seek(SeekFrom::Start(0)).map_err(DecodeError::Io)?;
Ok(stream)
}
fn resolve_event_bus(stream_config: &T::Config, config_bus: Option<EventBus>) -> EventBus {
T::event_bus(stream_config)
.or(config_bus)
.unwrap_or_default()
}
#[cfg(not(target_arch = "wasm32"))]
async fn spawn_probe(
stream: Stream<T>,
byte_pool: kithara_bufpool::BytePool,
) -> Result<Stream<T>, DecodeError> {
debug!("Audio::new — spawning probe task...");
let result = spawn_blocking(move || Self::probe_stream_blocking(stream, &byte_pool))
.await
.map_err(|e| DecodeError::Io(IoError::other(format!("probe task panicked: {e}"))))??;
debug!("Audio::new — probe task done");
Ok(result)
}
#[cfg(target_arch = "wasm32")]
async fn spawn_probe(
stream: Stream<T>,
byte_pool: kithara_bufpool::BytePool,
) -> Result<Stream<T>, DecodeError> {
debug!("Audio::new — running probe inline (wasm)...");
let result = Self::probe_stream_blocking(stream, &byte_pool)?;
debug!("Audio::new — probe done");
Ok(result)
}
}
fn merge_user_and_stream_media_info(
user: Option<MediaInfo>,
stream: Option<MediaInfo>,
) -> Option<MediaInfo> {
match (user, stream) {
(Some(user), Some(stream)) => {
let mut merged = user;
if merged.codec.is_none() {
merged.codec = stream.codec;
}
if merged.container.is_none() {
merged.container = stream.container;
}
if merged.channels.is_none() {
merged.channels = stream.channels;
}
if merged.sample_rate.is_none() {
merged.sample_rate = stream.sample_rate;
}
if merged.variant_index.is_none() {
merged.variant_index = stream.variant_index;
}
Some(merged)
}
(Some(user), None) => Some(user),
(None, stream) => stream,
}
}
impl<S> Drop for Audio<S> {
fn drop(&mut self) {
if let Some(ref cancel) = self.cancel {
cancel.cancel();
}
if let (Some(worker), Some(track_id)) = (&self.worker, self.track_id.take()) {
worker.unregister_track(track_id);
if self.is_standalone_worker {
worker.shutdown();
}
}
}
}
impl<S: kithara_platform::MaybeSend> PcmReader for Audio<S> {
fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
self.abr_handle.clone()
}
fn duration(&self) -> Option<Duration> {
Self::duration(self)
}
fn event_bus(&self) -> &EventBus {
&self.bus
}
fn metadata(&self) -> &TrackMetadata {
Self::metadata(self)
}
fn next_chunk(&mut self) -> Result<ChunkOutcome, DecodeError> {
self.preloaded = true;
let chunk = if let Some(c) = self.current_chunk.take() {
c
} else if let Some(c) = self.recv_valid_chunk() {
c
} else {
let position = self.position();
return match self.consumer_phase {
ConsumerPhase::AtEof => Ok(ChunkOutcome::Eof { position }),
ConsumerPhase::Failed => Err(DecodeError::Io(IoError::other(
"pcm channel closed / producer failed",
))),
ConsumerPhase::SeekPending { .. } => Ok(ChunkOutcome::Pending {
position,
reason: PendingReason::SeekInProgress,
}),
_ => Ok(ChunkOutcome::Pending {
position,
reason: PendingReason::Buffering,
}),
};
};
self.spec = chunk.spec();
if matches!(
self.consumer_phase,
ConsumerPhase::Buffering | ConsumerPhase::SeekPending { .. }
) {
self.consumer_phase = ConsumerPhase::Playing;
}
self.timeline
.advance_committed_chunk(&kithara_stream::ChunkPosition::from(&chunk.meta));
Ok(ChunkOutcome::Chunk(chunk))
}
fn position(&self) -> Duration {
Self::position(self)
}
fn preload(&mut self) -> Result<(), DecodeError> {
Self::preload(self)
}
fn preload_notify(&self) -> Option<Arc<Notify>> {
Some(self.preload_notify.clone())
}
fn read(&mut self, buf: &mut [f32]) -> Result<ReadOutcome, DecodeError> {
Self::read(self, buf)
}
#[cfg_attr(feature = "perf", hotpath::measure)]
fn read_planar<'a>(
&mut self,
output: &'a mut [&'a mut [f32]],
) -> Result<ReadOutcome, DecodeError> {
let channels = output.len();
if channels == 0 {
return Ok(ReadOutcome::Pending {
reason: PendingReason::Buffering,
position: self.position(),
});
}
let frames = output[0].len();
let total_samples = frames * channels;
let mut interleaved = self
.interleaved
.take()
.unwrap_or_else(|| self.pcm_pool.get());
interleaved.clear();
interleaved.resize(total_samples, 0.0);
debug_assert!(
interleaved.capacity() >= total_samples,
"Audio::read_planar scratch undersized: capacity={} < total_samples={total_samples}",
interleaved.capacity(),
);
let result = match self.read(&mut interleaved[..]) {
Ok(ReadOutcome::Eof { position }) => Ok(ReadOutcome::Eof { position }),
Ok(ReadOutcome::Pending { reason, position }) => {
Ok(ReadOutcome::Pending { reason, position })
}
Ok(ReadOutcome::Frames { count, position }) => {
let actual_frames = count.get() / channels;
debug_assert!(
actual_frames <= frames,
"Audio::read_planar Frames contract: actual_frames={actual_frames} \
> per-channel buf frames={frames}",
);
let num_channels =
NonZeroUsize::new(channels).expect("channels checked non-zero above");
deinterleave_variable(&interleaved[..], num_channels, output, 0..actual_frames);
NonZeroUsize::new(actual_frames).map_or(
Ok(ReadOutcome::Pending {
position,
reason: PendingReason::Buffering,
}),
|actual| {
Ok(ReadOutcome::Frames {
position,
count: actual,
})
},
)
}
Err(err) => Err(err),
};
self.interleaved = Some(interleaved);
result
}
fn seek(&mut self, position: Duration) -> Result<SeekOutcome, DecodeError> {
Self::seek(self, position)
}
fn set_host_sample_rate(&self, sample_rate: NonZeroU32) {
self.host_sample_rate
.store(sample_rate.get(), Ordering::Relaxed);
}
fn set_playback_rate(&self, rate: f32) {
self.playback_rate.store(rate, Ordering::Relaxed);
}
fn set_service_class(&self, class: ServiceClass) {
self.service_class.store(class);
if let Some(worker) = &self.worker {
worker.wake();
}
}
fn spec(&self) -> PcmSpec {
Self::spec(self)
}
}
#[cfg(test)]
mod tests {
use std::{
marker::PhantomData,
sync::{
Arc,
atomic::{AtomicU32, AtomicU64},
},
};
use kithara_test_utils::kithara;
use super::*;
fn empty_audio() -> Audio<()> {
let (_data_tx, pcm_rx) = crate::runtime::connect::<Fetch<PcmChunk>>(1, None);
let (trash_tx, _trash_rx) = crate::runtime::connect::<PcmChunk>(8, None);
Audio {
pcm_rx,
trash_tx,
_epoch: Arc::new(AtomicU64::new(0)),
validator: EpochValidator::default(),
spec: PcmSpec::default(),
current_chunk: None,
current_chunk_consumed_frames: 0,
consumer_phase: ConsumerPhase::Buffering,
timeline: Timeline::new(),
metadata: TrackMetadata::default(),
bus: EventBus::default(),
cancel: None,
interleaved: None,
pcm_pool: PcmPool::default().clone(),
host_sample_rate: Arc::new(AtomicU32::new(0)),
playback_rate: Arc::new(AtomicF32::new(1.0)),
preload_notify: Arc::new(Notify::new()),
preloaded: false,
track_id: None,
worker: None,
service_class: Arc::new(AtomicServiceClass::new(ServiceClass::default())),
reader_wake: Arc::new(ThreadWake::default()),
is_standalone_worker: false,
abr_handle: None,
_marker: PhantomData,
}
}
#[cfg(not(target_arch = "wasm32"))]
#[kithara::test(env(KITHARA_HANG_TIMEOUT_SECS = "1"))]
#[should_panic(expected = "recv_outcome_blocking")]
fn blocking_recv_without_preload_panics_when_no_chunk_arrives() {
let mut audio = empty_audio();
let _ = audio.recv_valid_chunk();
}
#[cfg(not(target_arch = "wasm32"))]
#[kithara::test]
fn blocking_recv_returns_closed_after_cancel() {
let mut audio = empty_audio();
let cancel = CancellationToken::new();
cancel.cancel();
audio.cancel = Some(cancel);
assert!(matches!(audio.recv_outcome(), RecvOutcome::Closed));
}
#[kithara::test]
fn preloaded_recv_is_nonblocking() {
let mut audio = empty_audio();
audio.preload().expect("preload");
assert!(matches!(audio.recv_outcome(), RecvOutcome::Empty));
}
fn audio_with_channel() -> (Audio<()>, crate::runtime::Outlet<Fetch<PcmChunk>>) {
let (data_tx, pcm_rx) = crate::runtime::connect::<Fetch<PcmChunk>>(4, None);
let (trash_tx, _trash_rx) = crate::runtime::connect::<PcmChunk>(8, None);
let audio = Audio {
pcm_rx,
trash_tx,
_epoch: Arc::new(AtomicU64::new(0)),
validator: EpochValidator::default(),
spec: PcmSpec::default(),
current_chunk: None,
current_chunk_consumed_frames: 0,
consumer_phase: ConsumerPhase::Buffering,
timeline: Timeline::new(),
metadata: TrackMetadata::default(),
bus: EventBus::default(),
cancel: None,
interleaved: None,
pcm_pool: PcmPool::default().clone(),
host_sample_rate: Arc::new(AtomicU32::new(0)),
playback_rate: Arc::new(AtomicF32::new(1.0)),
preload_notify: Arc::new(Notify::new()),
preloaded: true,
track_id: None,
worker: None,
service_class: Arc::new(AtomicServiceClass::new(ServiceClass::default())),
reader_wake: Arc::new(ThreadWake::default()),
is_standalone_worker: false,
abr_handle: None,
_marker: PhantomData,
};
(audio, data_tx)
}
fn make_chunk(samples: &[f32]) -> PcmChunk {
let mut chunk = PcmChunk::default();
chunk.pcm.clear();
chunk.pcm.extend_from_slice(samples);
chunk
}
#[kithara::test]
fn consumer_phase_starts_buffering() {
let audio = empty_audio();
assert_eq!(audio.consumer_phase, ConsumerPhase::Buffering);
}
#[kithara::test]
fn consumer_phase_transitions_to_playing_on_first_chunk() {
let (mut audio, mut tx) = audio_with_channel();
let chunk = make_chunk(&[0.1, 0.2]);
let fetch = Fetch::new(chunk, false, 0);
tx.try_push(fetch).ok();
assert!(audio.fill_buffer());
assert_eq!(audio.consumer_phase, ConsumerPhase::Playing);
}
#[kithara::test]
fn consumer_phase_transitions_to_seek_pending() {
let (mut audio, _tx) = audio_with_channel();
audio.seek(Duration::from_secs(5)).ok();
assert!(matches!(
audio.consumer_phase,
ConsumerPhase::SeekPending { .. }
));
}
#[kithara::test]
fn consumer_phase_seek_pending_to_playing_on_chunk() {
let (mut audio, mut tx) = audio_with_channel();
audio.seek(Duration::from_secs(5)).ok();
let epoch = audio.validator.epoch;
let chunk = make_chunk(&[0.1, 0.2]);
let fetch = Fetch::new(chunk, false, epoch);
tx.try_push(fetch).ok();
assert!(audio.fill_buffer());
assert_eq!(audio.consumer_phase, ConsumerPhase::Playing);
}
#[kithara::test]
fn consumer_phase_eof_terminates() {
let (mut audio, mut tx) = audio_with_channel();
let fetch = Fetch::new(PcmChunk::default(), true, 0);
tx.try_push(fetch).ok();
let result = audio.recv_valid_chunk();
assert!(result.is_none());
assert_eq!(audio.consumer_phase, ConsumerPhase::AtEof);
let mut buf = [0.0f32; 16];
assert!(matches!(audio.read(&mut buf), Ok(ReadOutcome::Eof { .. })));
}
#[kithara::test]
fn consumer_phase_failed_on_channel_close() {
let (mut audio, _tx) = audio_with_channel();
let cancel = CancellationToken::new();
cancel.cancel();
audio.cancel = Some(cancel);
audio.preloaded = false;
let result = audio.recv_valid_chunk();
assert!(result.is_none());
assert_eq!(audio.consumer_phase, ConsumerPhase::Failed);
let mut buf = [0.0f32; 16];
assert!(matches!(audio.read(&mut buf), Err(DecodeError::Io(_))));
}
#[kithara::test]
fn consumer_does_not_park_in_terminal_phase() {
let (mut audio, _tx) = audio_with_channel();
audio.consumer_phase = ConsumerPhase::AtEof;
let mut buf = [0.0f32; 16];
assert!(matches!(audio.read(&mut buf), Ok(ReadOutcome::Eof { .. })));
}
#[kithara::test]
fn process_fetch_must_distinguish_failure_from_natural_eof() {
let (mut audio_eof, mut tx_eof) = audio_with_channel();
tx_eof
.try_push(Fetch::new(PcmChunk::default(), true, 0))
.expect("push natural-eof marker");
let _ = audio_eof.recv_valid_chunk();
assert_eq!(audio_eof.consumer_phase, ConsumerPhase::AtEof);
let (mut audio_failure, mut tx_failure) = audio_with_channel();
tx_failure
.try_push(Fetch::failure(PcmChunk::default(), 0))
.expect("push failure marker");
let _ = audio_failure.recv_valid_chunk();
assert_ne!(
audio_failure.consumer_phase,
ConsumerPhase::AtEof,
"process_fetch must not collapse FetchKind::Failure into \
ConsumerPhase::AtEof — AtEof means 'clip finished' and is \
used by PlayerTrack to finalize; a transient failure must \
land in a distinct non-natural-eof state so the pipeline \
can recover instead of removing the track from the arena"
);
assert_eq!(
audio_failure.consumer_phase,
ConsumerPhase::Failed,
"failure marker must route to ConsumerPhase::Failed"
);
}
}