use std::{
collections::VecDeque,
sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering},
},
thread,
time::{Duration, Instant},
};
use cpal::{
Stream, StreamError,
traits::{DeviceTrait, HostTrait, StreamTrait},
};
use super::{
config::{PlaybackConfig, SampleFormat},
output_stream::AudioOutputStream,
};
use smol_str::format_smolstr;
use crate::error::{
AllocFailurePayload, ArithmeticOverflowPayload, CapExceededPayload, Error, ExternalOpPayload,
InvariantViolationPayload, OutOfRangePayload, Result,
};
pub const STATE_STOPPED: u8 = 0;
pub const STATE_RUNNING: u8 = 1;
pub const STATE_PAUSED: u8 = 2;
const FLUSH_POLL_INTERVAL: Duration = Duration::from_millis(10);
const FLUSH_TIMEOUT: Duration = Duration::from_secs(30);
pub const WRITE_CHUNK_MAX: usize = 4096;
#[must_use]
pub fn sanitize_volume(vol: f32) -> f32 {
if vol.is_finite() {
vol.clamp(0.0, 1.0)
} else {
0.0
}
}
struct SharedState {
queue: Mutex<VecDeque<f32>>,
queue_capacity_samples: usize,
state: AtomicU8,
terminated: AtomicBool,
volume_bits: AtomicU32,
callback_error: Mutex<Option<StreamError>>,
}
impl SharedState {
fn new(queue_capacity_samples: usize) -> Result<Self> {
let mut queue = VecDeque::new();
queue
.try_reserve_exact(queue_capacity_samples)
.map_err(|e| {
Error::AllocFailure(AllocFailurePayload::new(
"AudioPlayer::with_device: pre-allocate queue capacity",
"samples",
queue_capacity_samples as u64,
e,
))
})?;
Ok(Self {
queue: Mutex::new(queue),
queue_capacity_samples,
state: AtomicU8::new(STATE_STOPPED),
terminated: AtomicBool::new(false),
volume_bits: AtomicU32::new(1.0_f32.to_bits()),
callback_error: Mutex::new(None),
})
}
#[inline(always)]
fn load_volume(&self) -> f32 {
f32::from_bits(self.volume_bits.load(Ordering::Relaxed))
}
fn stop_cleanup(&self) {
{
let mut q = match self.queue.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
q.clear();
}
{
let mut e = match self.callback_error.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
*e = None;
}
}
}
pub struct AudioPlayer {
stream: Option<Stream>,
shared: Arc<SharedState>,
config: PlaybackConfig,
}
impl AudioPlayer {
pub fn new(config: PlaybackConfig) -> Result<Self> {
let host = cpal::default_host();
let device = host.default_output_device().ok_or_else(|| {
Error::ExternalOp(ExternalOpPayload::new(
"AudioPlayer: no default cpal output device available",
"cpal default_output_device",
std::io::Error::other("host returned None"),
))
})?;
Self::with_device(&device, config)
}
pub fn with_device(device: &cpal::Device, config: PlaybackConfig) -> Result<Self> {
if !matches!(config.sample_format(), SampleFormat::F32) {
return Err(Error::OutOfRange(OutOfRangePayload::new(
"AudioPlayer: sample_format (non-F32 device negotiation reserved for a follow-up)",
"must be SampleFormat::F32",
format_smolstr!("{:?}", config.sample_format()),
)));
}
let stream_config = config.cpal_config()?;
let queue_frames = config.queue_capacity_frames();
let channels = usize::from(config.channels().count());
let queue_capacity_samples = queue_frames.checked_mul(channels).ok_or_else(|| {
Error::ArithmeticOverflow(ArithmeticOverflowPayload::with_operands(
"AudioPlayer: queue_capacity_frames * channels",
"usize",
[
("queue_capacity_frames", queue_frames as u64),
("channels", channels as u64),
],
))
})?;
let shared = Arc::new(SharedState::new(queue_capacity_samples)?);
let cb_shared = Arc::clone(&shared);
let data_callback = move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
let state = cb_shared.state.load(Ordering::Acquire);
if state != STATE_RUNNING {
for s in out.iter_mut() {
*s = 0.0;
}
return;
}
let volume = cb_shared.load_volume();
let mut q = match cb_shared.queue.try_lock() {
Ok(g) => g,
Err(std::sync::TryLockError::WouldBlock) => {
for s in out.iter_mut() {
*s = 0.0;
}
return;
}
Err(std::sync::TryLockError::Poisoned(poisoned)) => poisoned.into_inner(),
};
let drain_n = out.len().min(q.len());
for slot in out.iter_mut().take(drain_n) {
let sample = q.pop_front().unwrap_or(0.0);
*slot = sample * volume;
}
drop(q);
for slot in out.iter_mut().skip(drain_n) {
*slot = 0.0;
}
};
let err_shared = Arc::clone(&shared);
let err_callback = move |err: StreamError| {
let mut slot = match err_shared.callback_error.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
if slot.is_none() {
*slot = Some(err);
}
};
let stream = device
.build_output_stream(&stream_config, data_callback, err_callback, None)
.map_err(|e| {
Error::ExternalOp(ExternalOpPayload::new(
"AudioPlayer: cpal build_output_stream failed",
"cpal stream config",
e,
))
})?;
Ok(Self {
stream: Some(stream),
shared,
config,
})
}
#[inline(always)]
#[must_use]
pub fn config(&self) -> PlaybackConfig {
self.config
}
#[inline(always)]
#[must_use]
pub fn buffer_depth(&self) -> usize {
match self.shared.queue.lock() {
Ok(g) => g.len(),
Err(poisoned) => poisoned.into_inner().len(),
}
}
#[inline(always)]
#[must_use]
pub fn is_running(&self) -> bool {
self.shared.state.load(Ordering::Acquire) == STATE_RUNNING
}
#[inline(always)]
#[must_use]
pub fn is_paused(&self) -> bool {
self.shared.state.load(Ordering::Acquire) == STATE_PAUSED
}
#[inline(always)]
#[must_use]
pub fn volume(&self) -> f32 {
self.shared.load_volume()
}
pub fn store_volume(&self, vol: f32) {
let sanitized = sanitize_volume(vol);
self
.shared
.volume_bits
.store(sanitized.to_bits(), Ordering::Release);
}
pub fn start(&mut self) -> Result<()> {
if self.shared.terminated.load(Ordering::Acquire) {
return Err(Error::InvariantViolation(InvariantViolationPayload::new(
"AudioPlayer::start",
"called on terminated player — construct a new AudioPlayer",
)));
}
self.take_callback_error()?;
let stream = self.stream.as_ref().ok_or_else(|| {
Error::InvariantViolation(InvariantViolationPayload::new(
"AudioPlayer::start",
"stream has been dropped (post-stop)",
))
})?;
stream.play().map_err(|e| {
Error::ExternalOp(ExternalOpPayload::new(
"AudioPlayer::start: cpal play() failed",
"cpal stream",
e,
))
})?;
self.shared.state.store(STATE_RUNNING, Ordering::Release);
Ok(())
}
pub fn pause(&mut self) -> Result<()> {
if self.shared.terminated.load(Ordering::Acquire) {
return Err(Error::InvariantViolation(InvariantViolationPayload::new(
"AudioPlayer::pause",
"called on terminated player — construct a new AudioPlayer",
)));
}
self.take_callback_error()?;
let stream = self.stream.as_ref().ok_or_else(|| {
Error::InvariantViolation(InvariantViolationPayload::new(
"AudioPlayer::pause",
"stream has been dropped (post-stop)",
))
})?;
stream.pause().map_err(|e| {
Error::ExternalOp(ExternalOpPayload::new(
"AudioPlayer::pause: cpal pause() failed",
"cpal stream",
e,
))
})?;
self.shared.state.store(STATE_PAUSED, Ordering::Release);
Ok(())
}
pub fn resume(&mut self) -> Result<()> {
if self.shared.terminated.load(Ordering::Acquire) {
return Err(Error::InvariantViolation(InvariantViolationPayload::new(
"AudioPlayer::resume",
"called on terminated player — construct a new AudioPlayer",
)));
}
self.start()
}
pub fn stop(&mut self) -> Result<()> {
self.shared.terminated.store(true, Ordering::Release);
self.shared.state.store(STATE_STOPPED, Ordering::Release);
let pause_result = if let Some(stream) = self.stream.as_ref() {
stream.pause().map_err(|e| {
Error::ExternalOp(ExternalOpPayload::new(
"AudioPlayer::stop: cpal pause() failed",
"cpal stream",
e,
))
})
} else {
Ok(())
};
self.shared.stop_cleanup();
pause_result
}
pub fn write_samples(&mut self, samples: &[f32]) -> Result<usize> {
if self.shared.terminated.load(Ordering::Acquire) {
return Err(Error::InvariantViolation(InvariantViolationPayload::new(
"AudioPlayer::write_samples",
"called after stop() — player is terminated",
)));
}
self.take_callback_error()?;
let state = self.shared.state.load(Ordering::Acquire);
if state == STATE_STOPPED {
return Err(Error::InvariantViolation(InvariantViolationPayload::new(
"AudioPlayer::write_samples",
"called on stopped player (STATE_STOPPED); call start() before writing samples",
)));
}
{
let q = match self.shared.queue.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
let q_len = q.len();
let s_len = samples.len();
let projected_len = q_len.checked_add(s_len).ok_or_else(|| {
Error::ArithmeticOverflow(ArithmeticOverflowPayload::with_operands(
"AudioPlayer::write_samples: queue length + new samples",
"usize",
[("queue_len", q_len as u64), ("samples_len", s_len as u64)],
))
})?;
if projected_len > self.shared.queue_capacity_samples {
return Err(Error::CapExceeded(CapExceededPayload::new(
"AudioPlayer::write_samples: queue overflow",
"queue_capacity_samples",
self.shared.queue_capacity_samples as u64,
projected_len as u64,
)));
}
}
for chunk in samples.chunks(WRITE_CHUNK_MAX) {
let mut q = match self.shared.queue.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
q.extend(chunk.iter().copied());
}
Ok(samples.len())
}
pub fn flush(&mut self) -> Result<()> {
self.take_callback_error()?;
let start = Instant::now();
loop {
let depth = self.buffer_depth();
if depth == 0 {
return Ok(());
}
let state = self.shared.state.load(Ordering::Acquire);
if state != STATE_RUNNING {
return Err(Error::OutOfRange(OutOfRangePayload::new(
"AudioPlayer::flush: player state with samples queued (call start() before flush())",
"must be STATE_RUNNING when queue is non-empty",
format_smolstr!("state={state}, depth={depth}"),
)));
}
if start.elapsed() > FLUSH_TIMEOUT {
return Err(Error::OutOfRange(OutOfRangePayload::new(
"AudioPlayer::flush: drain timeout (samples still queued past FLUSH_TIMEOUT)",
"FLUSH_TIMEOUT must elapse with queue depth reaching 0",
format_smolstr!("FLUSH_TIMEOUT={FLUSH_TIMEOUT:?}, depth={depth}"),
)));
}
thread::sleep(FLUSH_POLL_INTERVAL);
self.take_callback_error()?;
}
}
fn take_callback_error(&self) -> Result<()> {
let mut slot = match self.shared.callback_error.lock() {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
if let Some(err) = slot.take() {
return Err(Error::ExternalOp(ExternalOpPayload::new(
"AudioPlayer: cpal stream callback (async) error",
"cpal stream",
err,
)));
}
Ok(())
}
}
impl AudioOutputStream for AudioPlayer {
fn write_samples(&mut self, samples: &[f32]) -> Result<usize> {
AudioPlayer::write_samples(self, samples)
}
fn flush(&mut self) -> Result<()> {
AudioPlayer::flush(self)
}
fn stop(&mut self) -> Result<()> {
AudioPlayer::stop(self)
}
fn is_running(&self) -> bool {
AudioPlayer::is_running(self)
}
}
impl Drop for AudioPlayer {
fn drop(&mut self) {
self.shared.state.store(STATE_STOPPED, Ordering::Release);
if let Some(stream) = self.stream.take() {
let _ = stream.pause();
drop(stream);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::audio::playback::config::{ChannelLayout, PlaybackConfig, SampleFormat};
#[test]
fn shared_state_stop_cleanup_drains_queue_and_clears_error_unconditionally() {
let shared = SharedState::new(4096).expect("pre-allocate test queue");
{
let mut q = shared.queue.lock().unwrap();
q.extend([0.1_f32, 0.2, 0.3, 0.4]);
}
{
let mut e = shared.callback_error.lock().unwrap();
*e = Some(StreamError::DeviceNotAvailable);
}
shared.terminated.store(true, Ordering::Release);
shared.state.store(STATE_STOPPED, Ordering::Release);
shared.stop_cleanup();
assert!(
shared.terminated.load(Ordering::Acquire),
"terminated latch must be set (stop ordering: latch -> state -> cleanup)"
);
assert_eq!(
shared.state.load(Ordering::Acquire),
STATE_STOPPED,
"state must be STOPPED post-stop"
);
assert_eq!(
shared.queue.lock().unwrap().len(),
0,
"stop_cleanup must drain the queue unconditionally (this branch \
runs even when cpal pause errs — an early `?` on pause would \
skip this and leave samples lingering until Drop)"
);
assert!(
shared.callback_error.lock().unwrap().is_none(),
"stop_cleanup must clear captured callback_error unconditionally"
);
}
#[test]
fn shared_state_stop_cleanup_recovers_from_poisoned_locks() {
use std::{panic, sync::Arc};
let shared = Arc::new(SharedState::new(64).expect("pre-allocate test queue"));
let queue_poisoner = Arc::clone(&shared);
let _ = std::thread::spawn(move || {
let _g = queue_poisoner.queue.lock().unwrap();
panic!("simulated callback panic poisoning queue lock");
})
.join();
let err_poisoner = Arc::clone(&shared);
let _ = std::thread::spawn(move || {
let _g = err_poisoner.callback_error.lock().unwrap();
panic!("simulated callback panic poisoning callback_error lock");
})
.join();
assert!(
shared.queue.is_poisoned(),
"test setup: queue lock should be poisoned"
);
assert!(
shared.callback_error.is_poisoned(),
"test setup: callback_error lock should be poisoned"
);
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
shared.stop_cleanup();
}));
assert!(
result.is_ok(),
"stop_cleanup must not panic on poisoned locks (poison-recover via into_inner)"
);
}
#[cfg(target_os = "macos")]
#[test]
#[ignore = "requires real default audio output device"]
fn audio_player_pre_allocates_queue_capacity_at_construction() {
let cfg = PlaybackConfig::new(16_000, ChannelLayout::Mono, SampleFormat::F32)
.with_queue_capacity_frames(4096);
let player = AudioPlayer::new(cfg).unwrap();
assert_eq!(player.buffer_depth(), 0);
let cap_samples = player.shared.queue_capacity_samples;
assert_eq!(
cap_samples, 4096,
"queue cap = frames * channels = 4096 * 1"
);
let underlying = match player.shared.queue.lock() {
Ok(g) => g.capacity(),
Err(poisoned) => poisoned.into_inner().capacity(),
};
assert!(
underlying >= cap_samples,
"VecDeque underlying capacity ({underlying}) must be >= bounded cap ({cap_samples}) per \
try_reserve_exact contract"
);
}
#[cfg(target_os = "macos")]
#[test]
#[ignore = "requires real default audio output device"]
fn audio_player_write_samples_does_not_grow_queue_capacity_during_playback() {
let cfg = PlaybackConfig::new(16_000, ChannelLayout::Mono, SampleFormat::F32)
.with_queue_capacity_frames(4096);
let mut player = AudioPlayer::new(cfg).unwrap();
player.start().unwrap();
player.pause().unwrap();
let cap_before = match player.shared.queue.lock() {
Ok(g) => g.capacity(),
Err(poisoned) => poisoned.into_inner().capacity(),
};
let cap_samples = player.shared.queue_capacity_samples;
player.write_samples(&[0.25_f32; 1024]).unwrap();
let cap_after = match player.shared.queue.lock() {
Ok(g) => g.capacity(),
Err(poisoned) => poisoned.into_inner().capacity(),
};
assert_eq!(
cap_after, cap_before,
"queue capacity grew during write_samples (before={cap_before}, after={cap_after}) — \
producer-loop `extend` must not realloc because the queue is pre-allocated to \
queue_capacity_samples ({cap_samples}) at construction"
);
let _ = player.stop();
}
#[test]
fn shared_state_new_allocfailure_on_overflowing_capacity() {
let err = SharedState::new(usize::MAX)
.err()
.expect("usize::MAX sample capacity must overflow try_reserve_exact -> AllocFailure");
match err {
Error::AllocFailure(p) => {
assert_eq!(
p.context(),
"AudioPlayer::with_device: pre-allocate queue capacity",
"AllocFailure context must name the queue-preallocation site"
);
assert_eq!(p.item(), "samples", "AllocFailure item must be \"samples\"");
assert_eq!(
p.count(),
usize::MAX as u64,
"AllocFailure count must echo the overflowing capacity argument"
);
}
other => panic!("expected Error::AllocFailure, got {other:?}"),
}
}
#[test]
fn shared_state_new_success_initializes_default_fields() {
let shared = SharedState::new(2048).expect("small bounded capacity reserves fine");
assert_eq!(
shared.queue_capacity_samples, 2048,
"queue_capacity_samples must be stored verbatim from the constructor argument"
);
assert_eq!(
shared.state.load(Ordering::Acquire),
STATE_STOPPED,
"freshly-built SharedState must start STATE_STOPPED (built-but-not-playing)"
);
assert!(
!shared.terminated.load(Ordering::Acquire),
"terminated latch must start cleared (not yet stopped)"
);
assert_eq!(
f32::from_bits(shared.volume_bits.load(Ordering::Relaxed)),
1.0_f32,
"volume_bits default must decode to unity gain (1.0)"
);
assert!(
shared.callback_error.lock().unwrap().is_none(),
"callback_error must start empty (no captured device error)"
);
assert_eq!(
shared.queue.lock().unwrap().len(),
0,
"queue must start empty regardless of reserved capacity"
);
}
#[test]
fn shared_state_load_volume_decodes_stored_bits() {
let shared = SharedState::new(16).expect("tiny capacity reserves fine");
assert_eq!(
shared.load_volume(),
1.0_f32,
"load_volume must decode the unity-gain default"
);
shared
.volume_bits
.store(0.375_f32.to_bits(), Ordering::Relaxed);
assert_eq!(
shared.load_volume(),
0.375_f32,
"load_volume must bit-exactly decode a stored f32 (0.375 is exactly representable)"
);
shared
.volume_bits
.store(0.0_f32.to_bits(), Ordering::Relaxed);
assert_eq!(
shared.load_volume(),
0.0_f32,
"load_volume must decode a stored 0.0 (silence gain)"
);
}
#[cfg(target_os = "macos")]
#[test]
#[ignore = "requires real default audio output device"]
fn audio_player_write_before_start_rejected_state_stopped() {
let cfg = PlaybackConfig::new(16_000, ChannelLayout::Mono, SampleFormat::F32)
.with_queue_capacity_frames(1024);
let mut player = AudioPlayer::new(cfg).unwrap();
let err = player.write_samples(&[0.5_f32; 16]).unwrap_err();
match err {
Error::InvariantViolation(p) => {
assert_eq!(p.context(), "AudioPlayer::write_samples");
assert!(
p.requirement().contains("STATE_STOPPED"),
"pre-start write rejection must name STATE_STOPPED, got: {}",
p.requirement()
);
}
other => panic!("expected InvariantViolation (STATE_STOPPED), got {other:?}"),
}
let _ = player.stop();
}
#[cfg(target_os = "macos")]
#[test]
#[ignore = "requires real default audio output device"]
fn audio_player_flush_on_paused_with_queued_samples_errs() {
let cfg = PlaybackConfig::new(16_000, ChannelLayout::Mono, SampleFormat::F32)
.with_queue_capacity_frames(1024);
let mut player = AudioPlayer::new(cfg).unwrap();
player.start().unwrap();
player.pause().unwrap();
player.write_samples(&[0.25_f32; 64]).unwrap();
assert_eq!(
player.buffer_depth(),
64,
"paused player must retain the 64 pushed samples (callback drains only when RUNNING)"
);
let err = player.flush().unwrap_err();
match err {
Error::OutOfRange(p) => {
assert!(
p.requirement().contains("STATE_RUNNING"),
"flush-on-non-running requirement must name STATE_RUNNING, got: {}",
p.requirement()
);
}
other => panic!("expected OutOfRange (flush on paused with queue), got {other:?}"),
}
let _ = player.stop();
}
}