1use std::{
2 io::{Error as IoError, Read, Seek, SeekFrom},
3 marker::PhantomData,
4 num::{NonZeroU32, NonZeroUsize},
5 sync::{
6 Arc,
7 atomic::{AtomicU32, AtomicU64, Ordering},
8 },
9 time::Duration,
10};
11
12use fast_interleave::deinterleave_variable;
13use kithara_bufpool::{PcmBuf, PcmPool};
14use kithara_decode::{DecoderFactory, PcmChunk, PcmMeta, PcmSpec, TrackMetadata};
15use kithara_events::{AudioEvent, EventBus, SeekLifecycleStage, SegmentLocation};
16#[cfg(target_arch = "wasm32")]
17use kithara_platform::thread::{is_worker_thread, sleep as thread_sleep};
18use kithara_platform::{
19 thread::park_timeout,
20 tokio::{sync::Notify, task::spawn_blocking},
21};
22use kithara_stream::{MediaInfo, Stream, StreamType, Timeline};
23use kithara_test_utils::kithara;
24use portable_atomic::AtomicF32;
25use tokio_util::sync::CancellationToken;
26use tracing::{debug, info, trace, warn};
27
28use crate::{
29 pipeline::{
30 config::{AudioConfig, create_effects, expected_output_spec},
31 fetch::{EpochValidator, Fetch, FetchKind},
32 source::{OffsetReader, SharedStream, StreamAudioSource},
33 track_fsm::ConsumerPhase,
34 },
35 runtime::AtomicServiceClass,
36 traits::{ChunkOutcome, DecodeError, PcmReader, PendingReason, ReadOutcome, SeekOutcome},
37 worker::{
38 handle::{AudioWorkerHandle, TrackRegistration},
39 thread_wake::ThreadWake,
40 types::{ServiceClass, TrackId},
41 },
42};
43
44fn clamp_u128_to_u64_millis(ms: u128) -> u64 {
49 num_traits::cast::ToPrimitive::to_u64(&ms).unwrap_or(u64::MAX)
50}
51
52fn frames_to_samples(frames: u64, channels: u64) -> Result<usize, DecodeError> {
55 let samples = frames.saturating_mul(channels);
56 usize::try_from(samples).map_err(|err| {
57 DecodeError::Io(IoError::other(format!(
58 "frames*channels overflow: {samples} does not fit usize: {err}"
59 )))
60 })
61}
62
63enum FetchOutcome {
64 Continue,
65 Return(Option<PcmChunk>),
66}
67
68enum RecvOutcome {
69 Closed,
70 Empty,
71 Item(Fetch<PcmChunk>),
72}
73
74pub struct Audio<S> {
104 pub(crate) preload_notify: Arc<Notify>,
106
107 pub(crate) consumer_phase: ConsumerPhase,
109
110 pub(crate) validator: EpochValidator,
112
113 pub(crate) current_chunk: Option<PcmChunk>,
115
116 pub(crate) spec: PcmSpec,
118
119 pub(crate) timeline: Timeline,
121
122 pub(crate) current_chunk_consumed_frames: u64,
131
132 _epoch: Arc<AtomicU64>,
134
135 host_sample_rate: Arc<AtomicU32>,
138
139 playback_rate: Arc<AtomicF32>,
141
142 reader_wake: Arc<ThreadWake>,
144
145 bus: EventBus,
147
148 pcm_rx: crate::runtime::Inlet<Fetch<PcmChunk>>,
150
151 trash_tx: crate::runtime::Outlet<PcmChunk>,
157
158 abr_handle: Option<kithara_abr::AbrHandle>,
161
162 cancel: Option<CancellationToken>,
164
165 track_id: Option<TrackId>,
167
168 worker: Option<AudioWorkerHandle>,
170
171 service_class: Arc<AtomicServiceClass>,
176
177 pcm_pool: PcmPool,
180
181 interleaved: Option<PcmBuf>,
186
187 _marker: PhantomData<S>,
189
190 metadata: TrackMetadata,
192
193 is_standalone_worker: bool,
196
197 preloaded: bool,
199}
200
201impl<S> Audio<S> {
202 const PROBE_BUFFER_SIZE: usize = 1024;
204
205 const WARM_DECODE_FRAMES: usize = 4608;
211
212 const RECV_BACKOFF: Duration = Duration::from_micros(100);
214
215 fn warm_pcm_pool(pool: &PcmPool, channels: usize, chunks: usize) {
223 if pool.allocated_bytes() != 0 {
224 return;
225 }
226 let capacity = Self::WARM_DECODE_FRAMES * channels.max(1);
227 let count = chunks.saturating_mul(2).max(1);
228 pool.pre_warm(count, |buf| {
229 buf.clear();
230 buf.resize(capacity, 0.0);
231 });
232 }
233
234 fn alloc_interleaved_scratch(pool: &PcmPool, spec: PcmSpec) -> PcmBuf {
242 let channels = usize::from(spec.channels).max(2);
243 let sample_rate = usize::try_from(spec.sample_rate).unwrap_or(usize::MAX);
244 let capacity = sample_rate.saturating_mul(channels);
245 pool.get_with(|buf| {
246 buf.clear();
247 let cap = buf.capacity();
248 if cap < capacity {
249 buf.reserve(capacity - cap);
250 }
251 })
252 }
253
254 #[must_use]
258 pub fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
259 self.abr_handle.clone()
260 }
261
262 fn close_channel_and_mark_eof(&mut self) -> Option<PcmChunk> {
263 self.consumer_phase = ConsumerPhase::Failed;
264 None
265 }
266
267 #[must_use]
272 pub fn current_variant(&self) -> Option<kithara_events::VariantInfo> {
273 self.abr_handle.as_ref()?.current_variant()
274 }
275
276 #[must_use]
280 pub fn duration(&self) -> Option<Duration> {
281 self.timeline.total_duration()
282 }
283
284 fn emit_audio_event(&self, event: AudioEvent) {
285 self.bus.publish(event);
286 }
287
288 fn emit_playback_progress(&self) {
289 let position_ms = clamp_u128_to_u64_millis(self.position().as_millis());
290 let total_ms = self
291 .timeline
292 .total_duration()
293 .map(|duration| clamp_u128_to_u64_millis(duration.as_millis()));
294
295 self.emit_audio_event(AudioEvent::PlaybackProgress {
296 position_ms,
297 total_ms,
298 seek_epoch: self.validator.epoch,
299 });
300 }
301
302 fn emit_post_seek_output_commit(&mut self, meta: Option<PcmMeta>) {
303 let Some(seek_epoch) = self.timeline.pending_seek_epoch() else {
304 return;
305 };
306 if seek_epoch != self.validator.epoch {
307 return;
308 }
309
310 let variant = meta.as_ref().and_then(|m| m.variant_index);
311 let segment_index = meta.as_ref().and_then(|m| m.segment_index);
312
313 self.emit_audio_event(AudioEvent::SeekLifecycle {
314 seek_epoch,
315 stage: SeekLifecycleStage::OutputCommitted,
316 location: SegmentLocation::new(variant, segment_index, None, None),
317 });
318
319 self.emit_audio_event(AudioEvent::SeekComplete {
320 seek_epoch,
321 position: (*self).position(),
322 });
323 let _ = self.timeline.did_clear_pending_seek_epoch(seek_epoch);
324 }
325
326 pub(crate) fn fill_buffer(&mut self) -> bool {
330 let Some(chunk) = self.recv_valid_chunk() else {
331 return false;
332 };
333 self.spec = chunk.spec();
334 self.current_chunk = Some(chunk);
335 self.current_chunk_consumed_frames = 0;
336
337 if matches!(
338 self.consumer_phase,
339 ConsumerPhase::Buffering | ConsumerPhase::SeekPending { .. }
340 ) {
341 self.consumer_phase = ConsumerPhase::Playing;
342 }
343 true
344 }
345
346 #[must_use]
350 pub fn is_preloaded(&self) -> bool {
351 self.preloaded
352 }
353
354 #[must_use]
356 pub fn metadata(&self) -> &TrackMetadata {
357 &self.metadata
358 }
359
360 #[must_use]
364 pub fn position(&self) -> Duration {
365 self.timeline.committed_position()
366 }
367
368 pub fn preload(&mut self) -> Result<(), DecodeError> {
385 self.preloaded = true;
386 if self.current_chunk.is_none() && self.consumer_phase != ConsumerPhase::AtEof {
387 self.fill_buffer();
388 if self.consumer_phase == ConsumerPhase::Failed {
389 return Err(DecodeError::Io(IoError::other(
390 "pcm channel closed during preload",
391 )));
392 }
393 }
394 Ok(())
395 }
396
397 fn process_fetch(&mut self, fetch: Fetch<PcmChunk>) -> FetchOutcome {
398 if !self.validator.is_valid(&fetch) {
399 self.discard_chunk(fetch.into_inner());
400 return FetchOutcome::Continue;
401 }
402
403 match fetch.kind {
404 FetchKind::NaturalEof => {
405 self.consumer_phase = ConsumerPhase::AtEof;
406 self.discard_chunk(fetch.into_inner());
407 FetchOutcome::Return(None)
408 }
409 FetchKind::Failure => {
410 self.consumer_phase = ConsumerPhase::Failed;
411 self.discard_chunk(fetch.into_inner());
412 FetchOutcome::Return(None)
413 }
414 FetchKind::Data => FetchOutcome::Return(Some(fetch.into_inner())),
415 }
416 }
417
418 #[cfg_attr(feature = "perf", hotpath::measure)]
434 #[kithara::hang_watchdog]
435 pub fn read(&mut self, buf: &mut [f32]) -> Result<ReadOutcome, DecodeError> {
436 if buf.is_empty() {
437 return Ok(ReadOutcome::Pending {
438 reason: PendingReason::Buffering,
439 position: self.position(),
440 });
441 }
442 match self.consumer_phase {
443 ConsumerPhase::AtEof if self.current_chunk.is_none() => {
444 return Ok(ReadOutcome::Eof {
445 position: self.position(),
446 });
447 }
448 ConsumerPhase::Failed => {
449 return Err(DecodeError::Io(IoError::other(
450 "pcm channel closed / producer failed",
451 )));
452 }
453 _ => {}
454 }
455
456 let mut written = 0;
457 let mut last_output_meta: Option<PcmMeta> = None;
458
459 while written < buf.len() {
460 hang_tick!();
461
462 if let Some(chunk) = self.current_chunk.as_ref() {
463 let channels = u64::from(chunk.meta.spec.channels.max(1));
464 let chunk_total_frames = u64::from(chunk.meta.frames);
465 let consumed_frames_in_chunk = self.current_chunk_consumed_frames;
466 if consumed_frames_in_chunk >= chunk_total_frames {
467 self.recycle_current_chunk();
468 if !self.fill_buffer() {
469 break;
470 }
471 continue;
472 }
473 let remaining_frames = chunk_total_frames - consumed_frames_in_chunk;
474 let space_frames = ((buf.len() - written) as u64) / channels.max(1);
475 let take_frames = remaining_frames.min(space_frames);
476 if take_frames == 0 {
477 break;
478 }
479
480 hang_reset!();
481 let start_sample = frames_to_samples(consumed_frames_in_chunk, channels)?;
482 let take_samples = frames_to_samples(take_frames, channels)?;
483 buf[written..written + take_samples]
484 .copy_from_slice(&chunk.pcm[start_sample..start_sample + take_samples]);
485 last_output_meta = Some(chunk.meta);
486 written += take_samples;
487
488 let final_segment = take_frames == remaining_frames;
489 let consumed_total = consumed_frames_in_chunk + take_frames;
490 self.current_chunk_consumed_frames = consumed_total;
491
492 if final_segment {
493 self.timeline
494 .advance_committed_chunk(&kithara_stream::ChunkPosition::from(&chunk.meta));
495 self.recycle_current_chunk();
496 } else {
497 let total_frames = chunk_total_frames.max(1);
498 let start_ns =
499 u64::try_from(chunk.meta.timestamp.as_nanos()).unwrap_or(u64::MAX);
500 let end_ns =
501 u64::try_from(chunk.meta.end_timestamp.as_nanos()).unwrap_or(u64::MAX);
502 let span_ns = u128::from(end_ns.saturating_sub(start_ns));
503 let consumed_ns_offset =
504 span_ns * u128::from(consumed_total) / u128::from(total_frames);
505 let interpolated = u128::from(start_ns).saturating_add(consumed_ns_offset);
506 let interpolated_ns = u64::try_from(interpolated).unwrap_or(u64::MAX);
507 self.timeline
508 .set_committed_position(Duration::from_nanos(interpolated_ns));
509 }
510 }
511
512 if written >= buf.len() {
513 break;
514 }
515
516 if !self.fill_buffer() {
517 break;
518 }
519 }
520
521 if let Some(count) = NonZeroUsize::new(written) {
522 debug_assert!(
523 count.get() <= buf.len(),
524 "Audio::read Frames contract violated: count={c} > buf.len()={b}",
525 c = count.get(),
526 b = buf.len(),
527 );
528 self.emit_post_seek_output_commit(last_output_meta);
529 self.emit_playback_progress();
530 let position = self.position();
531 debug_assert!(
532 self.timeline
533 .total_duration()
534 .is_none_or(|dur| position <= dur),
535 "Audio::read Frames contract: position={position:?} > duration={:?}",
536 self.timeline.total_duration(),
537 );
538 return Ok(ReadOutcome::Frames { count, position });
539 }
540
541 let position = self.position();
542 match self.consumer_phase {
543 ConsumerPhase::AtEof => Ok(ReadOutcome::Eof { position }),
544 ConsumerPhase::Failed => Err(DecodeError::Io(IoError::other(
545 "pcm channel closed / producer failed",
546 ))),
547 ConsumerPhase::SeekPending { .. } => Ok(ReadOutcome::Pending {
548 position,
549 reason: PendingReason::SeekInProgress,
550 }),
551 _ => Ok(ReadOutcome::Pending {
552 position,
553 reason: PendingReason::Buffering,
554 }),
555 }
556 }
557
558 fn recv_outcome(&mut self) -> RecvOutcome {
559 if self.use_nonblocking_recv() {
560 if let Some(fetch) = self.pcm_rx.try_pop() {
561 self.wake_worker();
562 return RecvOutcome::Item(fetch);
563 }
564 return RecvOutcome::Empty;
565 }
566
567 self.recv_outcome_blocking()
568 }
569
570 #[kithara::hang_watchdog]
571 fn recv_outcome_blocking(&mut self) -> RecvOutcome {
572 loop {
573 if let Some(fetch) = self.pcm_rx.try_pop() {
574 hang_reset!();
575 self.wake_worker();
576 return RecvOutcome::Item(fetch);
577 }
578 if self
579 .cancel
580 .as_ref()
581 .is_some_and(CancellationToken::is_cancelled)
582 {
583 hang_reset!();
584 return RecvOutcome::Closed;
585 }
586 self.wake_worker();
587 self.reader_wake.register_current();
588 if let Some(fetch) = self.pcm_rx.try_pop() {
589 hang_reset!();
590 self.wake_worker();
591 return RecvOutcome::Item(fetch);
592 }
593 if self
594 .cancel
595 .as_ref()
596 .is_some_and(CancellationToken::is_cancelled)
597 {
598 hang_reset!();
599 return RecvOutcome::Closed;
600 }
601 hang_tick!();
602 Self::wait_for_fetch();
603 }
604 }
605
606 #[kithara::hang_watchdog]
607 fn recv_valid_chunk(&mut self) -> Option<PcmChunk> {
608 if self.consumer_phase.is_terminal() {
609 return None;
610 }
611
612 loop {
613 match self.recv_outcome() {
614 RecvOutcome::Item(fetch) => match self.process_fetch(fetch) {
615 FetchOutcome::Continue => {
616 hang_tick!();
617 continue;
618 }
619 FetchOutcome::Return(chunk) => {
620 hang_reset!();
621 return chunk;
622 }
623 },
624 RecvOutcome::Empty => return None,
625 RecvOutcome::Closed => {
626 hang_reset!();
627 return self.close_channel_and_mark_eof();
628 }
629 }
630 }
631 }
632
633 #[kithara::hang_watchdog]
650 pub fn seek(&mut self, position: Duration) -> Result<SeekOutcome, DecodeError> {
651 let epoch = self.timeline.initiate_seek(position);
652 self.timeline.mark_pending_seek_epoch(epoch);
653 self.validator.epoch = epoch;
654 self.recycle_current_chunk();
655 self.current_chunk_consumed_frames = 0;
656 self.consumer_phase = ConsumerPhase::SeekPending { epoch };
657
658 while let Some(fetch) = self.pcm_rx.try_pop() {
659 self.discard_chunk(fetch.into_inner());
660 hang_tick!();
661 }
662
663 if let Some(ref worker) = self.worker {
664 worker.wake();
665 }
666
667 trace!(?position, epoch, "seek initiated via Timeline");
668 match self.timeline.total_duration() {
669 Some(duration) if position >= duration => {
670 debug_assert!(
671 position >= duration,
672 "Audio::seek PastEof contract: target={position:?} < duration={duration:?}",
673 );
674 Ok(SeekOutcome::PastEof {
675 duration,
676 target: position,
677 })
678 }
679 _ => {
680 debug_assert!(
681 self.timeline
682 .total_duration()
683 .is_none_or(|dur| position <= dur),
684 "Audio::seek Landed contract: landed_at={position:?} > duration={:?}",
685 self.timeline.total_duration(),
686 );
687 Ok(SeekOutcome::Landed {
688 target: position,
689 landed_at: position,
690 })
691 }
692 }
693 }
694
695 #[must_use]
701 pub fn spec(&self) -> PcmSpec {
702 self.spec
703 }
704
705 fn use_nonblocking_recv(&self) -> bool {
706 #[cfg(target_arch = "wasm32")]
707 {
708 true
709 }
710 #[cfg(not(target_arch = "wasm32"))]
711 {
712 self.is_preloaded()
713 }
714 }
715
716 fn wait_for_fetch() {
717 #[cfg(not(target_arch = "wasm32"))]
718 {
719 park_timeout(Self::RECV_BACKOFF);
720 }
721
722 #[cfg(target_arch = "wasm32")]
723 {
724 if is_worker_thread() {
725 park_timeout(Self::RECV_BACKOFF);
726 } else {
727 thread_sleep(Self::RECV_BACKOFF);
728 }
729 }
730 }
731
732 fn wake_worker(&self) {
738 if let Some(ref worker) = self.worker {
739 worker.wake();
740 }
741 }
742
743 fn discard_chunk(&mut self, chunk: PcmChunk) {
750 if let Err(_overflow) = self.trash_tx.try_push(chunk) {
751 debug_assert!(
752 false,
753 "PCM trash ring overflow — spent buffer freed on the audio thread"
754 );
755 }
756 }
757
758 fn recycle_current_chunk(&mut self) {
760 if let Some(chunk) = self.current_chunk.take() {
761 self.discard_chunk(chunk);
762 }
763 }
764}
765
766impl<T> Audio<Stream<T>>
771where
772 T: StreamType<Events = EventBus>,
773{
774 pub async fn new(config: AudioConfig<T>) -> Result<Self, DecodeError> {
792 let AudioConfig {
793 byte_pool,
794 hint,
795 host_sample_rate: config_host_sr,
796 media_info: user_media_info,
797 pcm_buffer_chunks,
798 pcm_pool: mut pool,
799 playback_rate: config_playback_rate,
800 decoder_backend,
801 preload_chunks,
802 resampler_quality,
803 stream: stream_config,
804 bus: config_bus,
805 effects: custom_effects,
806 worker: config_worker,
807 gapless_mode: config_gapless_mode,
808 cancel: config_cancel,
809 } = config;
810 let cancel = config_cancel.unwrap_or_default();
811
812 let bus = Self::resolve_event_bus(&stream_config, config_bus);
813 let byte_pool = byte_pool.unwrap_or_else(|| kithara_bufpool::BytePool::default().clone());
814 let stream = Self::create_stream_with_probe(stream_config, byte_pool.clone()).await?;
815
816 let initial_byte_len = stream.len().unwrap_or(0);
817 let timeline = stream.timeline();
818 let initial_media_info =
819 merge_user_and_stream_media_info(user_media_info, stream.media_info());
820 debug!(?initial_media_info, "Initial MediaInfo from stream");
821
822 let shared_stream = SharedStream::new(stream);
823 let byte_len_handle = Arc::new(AtomicU64::new(initial_byte_len));
824
825 let pool = pool.get_or_insert_with(|| PcmPool::default().clone());
826 let warm_channels = initial_media_info
827 .as_ref()
828 .and_then(|info| info.channels)
829 .map_or(2, usize::from);
830 Self::warm_pcm_pool(pool, warm_channels, pcm_buffer_chunks);
831 let decoder = Self::create_initial_decoder(
832 shared_stream.clone(),
833 initial_media_info.clone(),
834 hint.clone(),
835 pool.clone(),
836 byte_pool.clone(),
837 decoder_backend,
838 )
839 .await?;
840
841 let initial_spec = decoder.spec();
842 let total_duration = decoder.duration().or_else(|| timeline.total_duration());
843 timeline.set_total_duration(total_duration);
844 let metadata = decoder.metadata();
845
846 let epoch = Arc::new(AtomicU64::new(0));
847 let host_sample_rate = Arc::new(AtomicU32::new(config_host_sr.map_or(0, NonZeroU32::get)));
848 let playback_rate = config_playback_rate.unwrap_or_else(|| Arc::new(AtomicF32::new(1.0)));
849
850 let output_spec = expected_output_spec(initial_spec, &host_sample_rate);
851 let effects = create_effects(
852 initial_spec,
853 &host_sample_rate,
854 &playback_rate,
855 resampler_quality,
856 Some(pool.clone()),
857 custom_effects,
858 );
859
860 Self::log_pipeline_ready(initial_spec, output_spec, &host_sample_rate);
861
862 let interleaved = Self::alloc_interleaved_scratch(pool, output_spec);
863
864 let emit = Self::create_emit(&bus);
865 let decoder_factory = Self::create_decoder_factory(
866 decoder_backend,
867 &epoch,
868 &byte_len_handle,
869 pool,
870 &byte_pool,
871 );
872 let initial_variant = initial_media_info.as_ref().and_then(|i| i.variant_index);
873 let abr_handle = shared_stream.abr_handle();
874 let audio_source = StreamAudioSource::new(
875 shared_stream,
876 decoder,
877 decoder_factory,
878 initial_media_info,
879 Arc::clone(&epoch),
880 effects,
881 config_gapless_mode,
882 )
883 .with_emit(emit);
884
885 bus.publish(AudioEvent::DecoderReady {
886 base_offset: 0,
887 variant: initial_variant,
888 });
889
890 let preload_notify = Arc::new(Notify::new());
891 let reader_wake = Arc::new(ThreadWake::default());
892 let (data_tx, data_rx) = Self::create_channels(pcm_buffer_chunks, Arc::clone(&reader_wake));
893 let (trash_tx, trash_inlet) = Self::create_trash_channel(pcm_buffer_chunks);
894
895 let (worker, is_standalone) =
896 config_worker.map_or_else(|| (AudioWorkerHandle::new(), true), |w| (w, false));
897
898 let service_class = Arc::new(AtomicServiceClass::new(ServiceClass::default()));
899
900 let track_id = worker.register_track(TrackRegistration {
901 source: Box::new(audio_source),
902 outlet: data_tx,
903 trash_inlet,
904 preload_notify: preload_notify.clone(),
905 preload_chunks: preload_chunks.get(),
906 service_class: Arc::clone(&service_class),
907 });
908
909 Ok(Self {
910 timeline,
911 metadata,
912 bus,
913 host_sample_rate,
914 playback_rate,
915 preload_notify,
916 reader_wake,
917 abr_handle,
918 pcm_rx: data_rx,
919 trash_tx,
920 _epoch: epoch,
921 validator: EpochValidator::default(),
922 spec: output_spec,
923 current_chunk: None,
924 current_chunk_consumed_frames: 0,
925 consumer_phase: ConsumerPhase::Buffering,
926 cancel: Some(cancel),
927 interleaved: Some(interleaved),
928 pcm_pool: pool.clone(),
929 preloaded: false,
930 track_id: Some(track_id),
931 worker: Some(worker),
932 service_class,
933 is_standalone_worker: is_standalone,
934 _marker: PhantomData,
935 })
936 }
937
938 fn create_channels(
939 pcm_buffer_chunks: usize,
940 wake: Arc<ThreadWake>,
941 ) -> (
942 crate::runtime::Outlet<Fetch<PcmChunk>>,
943 crate::runtime::Inlet<Fetch<PcmChunk>>,
944 ) {
945 crate::runtime::connect::<Fetch<PcmChunk>>(pcm_buffer_chunks.max(1), Some(wake))
946 }
947
948 fn create_trash_channel(
955 pcm_buffer_chunks: usize,
956 ) -> (
957 crate::runtime::Outlet<PcmChunk>,
958 crate::runtime::Inlet<PcmChunk>,
959 ) {
960 crate::runtime::connect::<PcmChunk>(pcm_buffer_chunks.max(1) + 2, None)
961 }
962
963 fn create_decoder_factory(
964 decoder_backend: kithara_decode::DecoderBackend,
965 epoch: &Arc<AtomicU64>,
966 byte_len_handle: &Arc<AtomicU64>,
967 pool: &PcmPool,
968 byte_pool: &kithara_bufpool::BytePool,
969 ) -> crate::pipeline::source::DecoderFactory<T> {
970 let factory_epoch = Arc::clone(epoch);
971 let factory_byte_len = Arc::clone(byte_len_handle);
972 let factory_pool = pool.clone();
973 let factory_byte_pool = byte_pool.clone();
974 Box::new(move |stream, info, base_offset| {
975 let byte_len = stream
976 .len()
977 .map_or(0, |len| len.saturating_sub(base_offset));
978 factory_byte_len.store(byte_len, Ordering::Release);
979 let config = kithara_decode::DecoderConfig::builder()
980 .backend(decoder_backend)
981 .byte_len_handle(Arc::clone(&factory_byte_len))
982 .pcm_pool(factory_pool.clone())
983 .byte_pool(factory_byte_pool.clone())
984 .epoch(factory_epoch.load(Ordering::Acquire))
985 .maybe_segment_layout(stream.as_segment_layout())
986 .maybe_hooks(stream.take_reader_hooks())
987 .build();
988 let source = OffsetReader::new(stream.clone(), base_offset);
989 match DecoderFactory::create_from_media_info(source, info, &config) {
990 Ok(d) => {
991 d.update_byte_len(byte_len);
992 Ok(d)
993 }
994 Err(e) => {
995 warn!(?e, "failed to recreate decoder");
996 Err(e)
997 }
998 }
999 })
1000 }
1001
1002 fn create_emit(bus: &EventBus) -> Box<dyn Fn(AudioEvent) + Send> {
1003 let emit_bus = bus.clone();
1004 Box::new(move |event: AudioEvent| {
1005 emit_bus.publish(event);
1006 })
1007 }
1008
1009 async fn create_initial_decoder(
1010 shared_stream: SharedStream<T>,
1011 initial_media_info: Option<MediaInfo>,
1012 hint: Option<String>,
1013 pcm_pool: PcmPool,
1014 byte_pool: kithara_bufpool::BytePool,
1015 decoder_backend: kithara_decode::DecoderBackend,
1016 ) -> Result<Box<dyn kithara_decode::Decoder>, DecodeError> {
1017 debug!("Audio::new — spawning decoder creation...");
1018 let byte_len_handle = Arc::new(AtomicU64::new(shared_stream.len().unwrap_or(0)));
1019 let decoder_config = kithara_decode::DecoderConfig::builder()
1020 .backend(decoder_backend)
1021 .byte_len_handle(byte_len_handle)
1022 .pcm_pool(pcm_pool)
1023 .byte_pool(byte_pool)
1024 .maybe_segment_layout(shared_stream.as_segment_layout())
1025 .maybe_hooks(shared_stream.take_reader_hooks())
1026 .maybe_hint(hint.clone())
1027 .build();
1028 let hint_for_decoder = hint;
1029 let initial_media_info_for_decoder = initial_media_info;
1030 let decoder = spawn_blocking(move || {
1031 if let Some(ref info) = initial_media_info_for_decoder {
1032 DecoderFactory::create_from_media_info(shared_stream, info, &decoder_config)
1033 } else {
1034 DecoderFactory::create_with_probe(
1035 shared_stream,
1036 hint_for_decoder.as_deref(),
1037 &decoder_config,
1038 )
1039 }
1040 })
1041 .await
1042 .map_err(|e| DecodeError::Io(IoError::other(format!("decoder task panicked: {e}"))))??;
1043 debug!("Audio::new — decoder created");
1044 Ok(decoder)
1045 }
1046
1047 async fn create_stream_with_probe(
1048 stream_config: T::Config,
1049 byte_pool: kithara_bufpool::BytePool,
1050 ) -> Result<Stream<T>, DecodeError> {
1051 let stream = Self::open_stream(stream_config).await?;
1052 Self::spawn_probe(stream, byte_pool).await
1053 }
1054
1055 #[must_use]
1059 pub fn event_bus(&self) -> &EventBus {
1060 &self.bus
1061 }
1062
1063 #[must_use]
1067 pub fn events(&self) -> kithara_events::EventReceiver {
1068 self.bus.subscribe()
1069 }
1070
1071 fn log_pipeline_ready(
1072 initial_spec: PcmSpec,
1073 output_spec: PcmSpec,
1074 host_sample_rate: &Arc<AtomicU32>,
1075 ) {
1076 info!(
1077 ?initial_spec,
1078 ?output_spec,
1079 host_sr = host_sample_rate.load(Ordering::Relaxed),
1080 "Audio pipeline created"
1081 );
1082 }
1083
1084 async fn open_stream(stream_config: T::Config) -> Result<Stream<T>, DecodeError> {
1085 debug!("Audio::new — creating Stream...");
1086 let stream = Stream::<T>::new(stream_config)
1087 .await
1088 .map_err(|e| DecodeError::Io(IoError::other(e.to_string())))?;
1089 debug!("Audio::new — Stream created");
1090 Ok(stream)
1091 }
1092
1093 fn probe_stream_blocking(
1094 mut stream: Stream<T>,
1095 byte_pool: &kithara_bufpool::BytePool,
1096 ) -> Result<Stream<T>, DecodeError> {
1097 let mut probe_buf = byte_pool.get_with(|b| b.resize(Self::PROBE_BUFFER_SIZE, 0));
1098 if let Err(e) = stream.read(&mut probe_buf) {
1099 tracing::debug!(?e, "probe_stream_blocking: probe read failed");
1100 }
1101 stream.seek(SeekFrom::Start(0)).map_err(DecodeError::Io)?;
1102 Ok(stream)
1103 }
1104
1105 fn resolve_event_bus(stream_config: &T::Config, config_bus: Option<EventBus>) -> EventBus {
1106 T::event_bus(stream_config)
1107 .or(config_bus)
1108 .unwrap_or_default()
1109 }
1110
1111 #[cfg(not(target_arch = "wasm32"))]
1112 async fn spawn_probe(
1113 stream: Stream<T>,
1114 byte_pool: kithara_bufpool::BytePool,
1115 ) -> Result<Stream<T>, DecodeError> {
1116 debug!("Audio::new — spawning probe task...");
1117 let result = spawn_blocking(move || Self::probe_stream_blocking(stream, &byte_pool))
1118 .await
1119 .map_err(|e| DecodeError::Io(IoError::other(format!("probe task panicked: {e}"))))??;
1120 debug!("Audio::new — probe task done");
1121 Ok(result)
1122 }
1123
1124 #[cfg(target_arch = "wasm32")]
1129 async fn spawn_probe(
1130 stream: Stream<T>,
1131 byte_pool: kithara_bufpool::BytePool,
1132 ) -> Result<Stream<T>, DecodeError> {
1133 debug!("Audio::new — running probe inline (wasm)...");
1134 let result = Self::probe_stream_blocking(stream, &byte_pool)?;
1135 debug!("Audio::new — probe done");
1136 Ok(result)
1137 }
1138}
1139
1140fn merge_user_and_stream_media_info(
1151 user: Option<MediaInfo>,
1152 stream: Option<MediaInfo>,
1153) -> Option<MediaInfo> {
1154 match (user, stream) {
1155 (Some(user), Some(stream)) => {
1156 let mut merged = user;
1157 if merged.codec.is_none() {
1158 merged.codec = stream.codec;
1159 }
1160 if merged.container.is_none() {
1161 merged.container = stream.container;
1162 }
1163 if merged.channels.is_none() {
1164 merged.channels = stream.channels;
1165 }
1166 if merged.sample_rate.is_none() {
1167 merged.sample_rate = stream.sample_rate;
1168 }
1169 if merged.variant_index.is_none() {
1170 merged.variant_index = stream.variant_index;
1171 }
1172 Some(merged)
1173 }
1174 (Some(user), None) => Some(user),
1175 (None, stream) => stream,
1176 }
1177}
1178
1179impl<S> Drop for Audio<S> {
1180 fn drop(&mut self) {
1181 if let Some(ref cancel) = self.cancel {
1182 cancel.cancel();
1183 }
1184
1185 if let (Some(worker), Some(track_id)) = (&self.worker, self.track_id.take()) {
1186 worker.unregister_track(track_id);
1187
1188 if self.is_standalone_worker {
1189 worker.shutdown();
1190 }
1191 }
1192 }
1193}
1194
1195impl<S: kithara_platform::MaybeSend> PcmReader for Audio<S> {
1196 fn abr_handle(&self) -> Option<kithara_abr::AbrHandle> {
1197 self.abr_handle.clone()
1198 }
1199
1200 fn duration(&self) -> Option<Duration> {
1201 Self::duration(self)
1202 }
1203
1204 fn event_bus(&self) -> &EventBus {
1205 &self.bus
1206 }
1207
1208 fn metadata(&self) -> &TrackMetadata {
1209 Self::metadata(self)
1210 }
1211
1212 fn next_chunk(&mut self) -> Result<ChunkOutcome, DecodeError> {
1213 self.preloaded = true;
1214 let chunk = if let Some(c) = self.current_chunk.take() {
1215 c
1216 } else if let Some(c) = self.recv_valid_chunk() {
1217 c
1218 } else {
1219 let position = self.position();
1220 return match self.consumer_phase {
1221 ConsumerPhase::AtEof => Ok(ChunkOutcome::Eof { position }),
1222 ConsumerPhase::Failed => Err(DecodeError::Io(IoError::other(
1223 "pcm channel closed / producer failed",
1224 ))),
1225 ConsumerPhase::SeekPending { .. } => Ok(ChunkOutcome::Pending {
1226 position,
1227 reason: PendingReason::SeekInProgress,
1228 }),
1229 _ => Ok(ChunkOutcome::Pending {
1230 position,
1231 reason: PendingReason::Buffering,
1232 }),
1233 };
1234 };
1235 self.spec = chunk.spec();
1236
1237 if matches!(
1238 self.consumer_phase,
1239 ConsumerPhase::Buffering | ConsumerPhase::SeekPending { .. }
1240 ) {
1241 self.consumer_phase = ConsumerPhase::Playing;
1242 }
1243
1244 self.timeline
1245 .advance_committed_chunk(&kithara_stream::ChunkPosition::from(&chunk.meta));
1246 Ok(ChunkOutcome::Chunk(chunk))
1247 }
1248
1249 fn position(&self) -> Duration {
1250 Self::position(self)
1251 }
1252
1253 fn preload(&mut self) -> Result<(), DecodeError> {
1254 Self::preload(self)
1255 }
1256
1257 fn preload_notify(&self) -> Option<Arc<Notify>> {
1258 Some(self.preload_notify.clone())
1259 }
1260
1261 fn read(&mut self, buf: &mut [f32]) -> Result<ReadOutcome, DecodeError> {
1262 Self::read(self, buf)
1263 }
1264
1265 #[cfg_attr(feature = "perf", hotpath::measure)]
1266 fn read_planar<'a>(
1267 &mut self,
1268 output: &'a mut [&'a mut [f32]],
1269 ) -> Result<ReadOutcome, DecodeError> {
1270 let channels = output.len();
1271 if channels == 0 {
1272 return Ok(ReadOutcome::Pending {
1273 reason: PendingReason::Buffering,
1274 position: self.position(),
1275 });
1276 }
1277 let frames = output[0].len();
1278 let total_samples = frames * channels;
1279
1280 let mut interleaved = self
1285 .interleaved
1286 .take()
1287 .unwrap_or_else(|| self.pcm_pool.get());
1288 interleaved.clear();
1289 interleaved.resize(total_samples, 0.0);
1290 debug_assert!(
1291 interleaved.capacity() >= total_samples,
1292 "Audio::read_planar scratch undersized: capacity={} < total_samples={total_samples}",
1293 interleaved.capacity(),
1294 );
1295
1296 let result = match self.read(&mut interleaved[..]) {
1297 Ok(ReadOutcome::Eof { position }) => Ok(ReadOutcome::Eof { position }),
1298 Ok(ReadOutcome::Pending { reason, position }) => {
1299 Ok(ReadOutcome::Pending { reason, position })
1300 }
1301 Ok(ReadOutcome::Frames { count, position }) => {
1302 let actual_frames = count.get() / channels;
1303 debug_assert!(
1304 actual_frames <= frames,
1305 "Audio::read_planar Frames contract: actual_frames={actual_frames} \
1306 > per-channel buf frames={frames}",
1307 );
1308 let num_channels =
1309 NonZeroUsize::new(channels).expect("channels checked non-zero above");
1310 deinterleave_variable(&interleaved[..], num_channels, output, 0..actual_frames);
1311 NonZeroUsize::new(actual_frames).map_or(
1312 Ok(ReadOutcome::Pending {
1313 position,
1314 reason: PendingReason::Buffering,
1315 }),
1316 |actual| {
1317 Ok(ReadOutcome::Frames {
1318 position,
1319 count: actual,
1320 })
1321 },
1322 )
1323 }
1324 Err(err) => Err(err),
1325 };
1326
1327 self.interleaved = Some(interleaved);
1328 result
1329 }
1330
1331 fn seek(&mut self, position: Duration) -> Result<SeekOutcome, DecodeError> {
1332 Self::seek(self, position)
1333 }
1334
1335 fn set_host_sample_rate(&self, sample_rate: NonZeroU32) {
1336 self.host_sample_rate
1337 .store(sample_rate.get(), Ordering::Relaxed);
1338 }
1339 fn set_playback_rate(&self, rate: f32) {
1340 self.playback_rate.store(rate, Ordering::Relaxed);
1341 }
1342
1343 fn set_service_class(&self, class: ServiceClass) {
1344 self.service_class.store(class);
1345 if let Some(worker) = &self.worker {
1346 worker.wake();
1347 }
1348 }
1349
1350 fn spec(&self) -> PcmSpec {
1351 Self::spec(self)
1352 }
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357 use std::{
1358 marker::PhantomData,
1359 sync::{
1360 Arc,
1361 atomic::{AtomicU32, AtomicU64},
1362 },
1363 };
1364
1365 use kithara_test_utils::kithara;
1366
1367 use super::*;
1368
1369 fn empty_audio() -> Audio<()> {
1370 let (_data_tx, pcm_rx) = crate::runtime::connect::<Fetch<PcmChunk>>(1, None);
1371 let (trash_tx, _trash_rx) = crate::runtime::connect::<PcmChunk>(8, None);
1372
1373 Audio {
1374 pcm_rx,
1375 trash_tx,
1376 _epoch: Arc::new(AtomicU64::new(0)),
1377 validator: EpochValidator::default(),
1378 spec: PcmSpec::default(),
1379 current_chunk: None,
1380 current_chunk_consumed_frames: 0,
1381 consumer_phase: ConsumerPhase::Buffering,
1382 timeline: Timeline::new(),
1383 metadata: TrackMetadata::default(),
1384 bus: EventBus::default(),
1385 cancel: None,
1386 interleaved: None,
1387 pcm_pool: PcmPool::default().clone(),
1388 host_sample_rate: Arc::new(AtomicU32::new(0)),
1389 playback_rate: Arc::new(AtomicF32::new(1.0)),
1390 preload_notify: Arc::new(Notify::new()),
1391 preloaded: false,
1392 track_id: None,
1393 worker: None,
1394 service_class: Arc::new(AtomicServiceClass::new(ServiceClass::default())),
1395 reader_wake: Arc::new(ThreadWake::default()),
1396 is_standalone_worker: false,
1397 abr_handle: None,
1398 _marker: PhantomData,
1399 }
1400 }
1401
1402 #[cfg(not(target_arch = "wasm32"))]
1403 #[kithara::test(env(KITHARA_HANG_TIMEOUT_SECS = "1"))]
1404 #[should_panic(expected = "recv_outcome_blocking")]
1405 fn blocking_recv_without_preload_panics_when_no_chunk_arrives() {
1406 let mut audio = empty_audio();
1407 let _ = audio.recv_valid_chunk();
1408 }
1409
1410 #[cfg(not(target_arch = "wasm32"))]
1411 #[kithara::test]
1412 fn blocking_recv_returns_closed_after_cancel() {
1413 let mut audio = empty_audio();
1414 let cancel = CancellationToken::new();
1415 cancel.cancel();
1416 audio.cancel = Some(cancel);
1417
1418 assert!(matches!(audio.recv_outcome(), RecvOutcome::Closed));
1419 }
1420
1421 #[kithara::test]
1422 fn preloaded_recv_is_nonblocking() {
1423 let mut audio = empty_audio();
1424 audio.preload().expect("preload");
1425
1426 assert!(matches!(audio.recv_outcome(), RecvOutcome::Empty));
1427 }
1428
1429 fn audio_with_channel() -> (Audio<()>, crate::runtime::Outlet<Fetch<PcmChunk>>) {
1430 let (data_tx, pcm_rx) = crate::runtime::connect::<Fetch<PcmChunk>>(4, None);
1431 let (trash_tx, _trash_rx) = crate::runtime::connect::<PcmChunk>(8, None);
1432
1433 let audio = Audio {
1434 pcm_rx,
1435 trash_tx,
1436 _epoch: Arc::new(AtomicU64::new(0)),
1437 validator: EpochValidator::default(),
1438 spec: PcmSpec::default(),
1439 current_chunk: None,
1440 current_chunk_consumed_frames: 0,
1441 consumer_phase: ConsumerPhase::Buffering,
1442 timeline: Timeline::new(),
1443 metadata: TrackMetadata::default(),
1444 bus: EventBus::default(),
1445 cancel: None,
1446 interleaved: None,
1447 pcm_pool: PcmPool::default().clone(),
1448 host_sample_rate: Arc::new(AtomicU32::new(0)),
1449 playback_rate: Arc::new(AtomicF32::new(1.0)),
1450 preload_notify: Arc::new(Notify::new()),
1451 preloaded: true,
1452 track_id: None,
1453 worker: None,
1454 service_class: Arc::new(AtomicServiceClass::new(ServiceClass::default())),
1455 reader_wake: Arc::new(ThreadWake::default()),
1456 is_standalone_worker: false,
1457 abr_handle: None,
1458 _marker: PhantomData,
1459 };
1460 (audio, data_tx)
1461 }
1462
1463 fn make_chunk(samples: &[f32]) -> PcmChunk {
1464 let mut chunk = PcmChunk::default();
1465 chunk.pcm.clear();
1466 chunk.pcm.extend_from_slice(samples);
1467 chunk
1468 }
1469
1470 #[kithara::test]
1471 fn consumer_phase_starts_buffering() {
1472 let audio = empty_audio();
1473 assert_eq!(audio.consumer_phase, ConsumerPhase::Buffering);
1474 }
1475
1476 #[kithara::test]
1477 fn consumer_phase_transitions_to_playing_on_first_chunk() {
1478 let (mut audio, mut tx) = audio_with_channel();
1479 let chunk = make_chunk(&[0.1, 0.2]);
1480 let fetch = Fetch::new(chunk, false, 0);
1481 tx.try_push(fetch).ok();
1482
1483 assert!(audio.fill_buffer());
1484 assert_eq!(audio.consumer_phase, ConsumerPhase::Playing);
1485 }
1486
1487 #[kithara::test]
1488 fn consumer_phase_transitions_to_seek_pending() {
1489 let (mut audio, _tx) = audio_with_channel();
1490 audio.seek(Duration::from_secs(5)).ok();
1491 assert!(matches!(
1492 audio.consumer_phase,
1493 ConsumerPhase::SeekPending { .. }
1494 ));
1495 }
1496
1497 #[kithara::test]
1498 fn consumer_phase_seek_pending_to_playing_on_chunk() {
1499 let (mut audio, mut tx) = audio_with_channel();
1500
1501 audio.seek(Duration::from_secs(5)).ok();
1502 let epoch = audio.validator.epoch;
1503
1504 let chunk = make_chunk(&[0.1, 0.2]);
1505 let fetch = Fetch::new(chunk, false, epoch);
1506 tx.try_push(fetch).ok();
1507
1508 assert!(audio.fill_buffer());
1509 assert_eq!(audio.consumer_phase, ConsumerPhase::Playing);
1510 }
1511
1512 #[kithara::test]
1513 fn consumer_phase_eof_terminates() {
1514 let (mut audio, mut tx) = audio_with_channel();
1515
1516 let fetch = Fetch::new(PcmChunk::default(), true, 0);
1517 tx.try_push(fetch).ok();
1518
1519 let result = audio.recv_valid_chunk();
1520 assert!(result.is_none());
1521 assert_eq!(audio.consumer_phase, ConsumerPhase::AtEof);
1522 let mut buf = [0.0f32; 16];
1523 assert!(matches!(audio.read(&mut buf), Ok(ReadOutcome::Eof { .. })));
1524 }
1525
1526 #[kithara::test]
1527 fn consumer_phase_failed_on_channel_close() {
1528 let (mut audio, _tx) = audio_with_channel();
1529 let cancel = CancellationToken::new();
1530 cancel.cancel();
1531 audio.cancel = Some(cancel);
1532 audio.preloaded = false;
1533
1534 let result = audio.recv_valid_chunk();
1535 assert!(result.is_none());
1536 assert_eq!(audio.consumer_phase, ConsumerPhase::Failed);
1537 let mut buf = [0.0f32; 16];
1538 assert!(matches!(audio.read(&mut buf), Err(DecodeError::Io(_))));
1539 }
1540
1541 #[kithara::test]
1542 fn consumer_does_not_park_in_terminal_phase() {
1543 let (mut audio, _tx) = audio_with_channel();
1544 audio.consumer_phase = ConsumerPhase::AtEof;
1545
1546 let mut buf = [0.0f32; 16];
1547 assert!(matches!(audio.read(&mut buf), Ok(ReadOutcome::Eof { .. })));
1548 }
1549
1550 #[kithara::test]
1551 fn process_fetch_must_distinguish_failure_from_natural_eof() {
1552 let (mut audio_eof, mut tx_eof) = audio_with_channel();
1553 tx_eof
1554 .try_push(Fetch::new(PcmChunk::default(), true, 0))
1555 .expect("push natural-eof marker");
1556 let _ = audio_eof.recv_valid_chunk();
1557 assert_eq!(audio_eof.consumer_phase, ConsumerPhase::AtEof);
1558
1559 let (mut audio_failure, mut tx_failure) = audio_with_channel();
1560 tx_failure
1561 .try_push(Fetch::failure(PcmChunk::default(), 0))
1562 .expect("push failure marker");
1563 let _ = audio_failure.recv_valid_chunk();
1564
1565 assert_ne!(
1566 audio_failure.consumer_phase,
1567 ConsumerPhase::AtEof,
1568 "process_fetch must not collapse FetchKind::Failure into \
1569 ConsumerPhase::AtEof — AtEof means 'clip finished' and is \
1570 used by PlayerTrack to finalize; a transient failure must \
1571 land in a distinct non-natural-eof state so the pipeline \
1572 can recover instead of removing the track from the arena"
1573 );
1574 assert_eq!(
1575 audio_failure.consumer_phase,
1576 ConsumerPhase::Failed,
1577 "failure marker must route to ConsumerPhase::Failed"
1578 );
1579 }
1580}