use std::num::NonZeroU32;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Once};
use std::time::Duration;
use audio_samples::AudioSamples;
use audio_samples::simd_conversions;
use cpal::SampleFormat;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use non_empty_slice::NonEmptySlice;
use crate::error::StreamingError;
use crate::notify::AudioNotifier;
use crate::traits::AudioSink;
pub struct DevicePlayback {
_stream: cpal::Stream,
producer: rtrb::Producer<f32>,
channels: u16,
sample_rate: u32,
interleaved: Vec<f32>,
notifier: Arc<AudioNotifier>,
xruns: Arc<AtomicU32>,
}
impl DevicePlayback {
pub fn from_default_output(target_latency: Option<Duration>) -> Result<Self, StreamingError> {
let host = cpal::default_host();
let device = host
.default_output_device()
.ok_or_else(|| StreamingError::Parameter("no default output device".into()))?;
Self::from_device(&device, target_latency)
}
pub fn from_device(
device: &cpal::Device,
target_latency: Option<Duration>,
) -> Result<Self, StreamingError> {
let supported = device
.default_output_config()
.map_err(|e| StreamingError::Parameter(e.to_string()))?;
let channels = supported.channels();
let sample_rate = supported.sample_rate();
let period_frames: Option<u32> =
target_latency.map(|d| (d.as_secs_f64() * sample_rate as f64).round() as u32);
let ring_capacity = 8 * period_frames.unwrap_or(1024) as usize * channels as usize;
let (producer, consumer) = rtrb::RingBuffer::new(ring_capacity);
let config = cpal::StreamConfig {
channels,
sample_rate: supported.sample_rate(),
buffer_size: match period_frames {
Some(n) => cpal::BufferSize::Fixed(n),
None => cpal::BufferSize::Default,
},
};
let notifier = Arc::new(AudioNotifier::new());
let xruns = Arc::new(AtomicU32::new(0));
let stream = match supported.sample_format() {
SampleFormat::F32 => build_f32_output(
device,
&config,
consumer,
Arc::clone(¬ifier),
Arc::clone(&xruns),
)?,
SampleFormat::I16 => build_i16_output(
device,
&config,
consumer,
Arc::clone(¬ifier),
Arc::clone(&xruns),
)?,
SampleFormat::U8 => build_u8_output(
device,
&config,
consumer,
Arc::clone(¬ifier),
Arc::clone(&xruns),
)?,
fmt => {
return Err(StreamingError::Unsupported(format!(
"sample format {fmt:?}"
)));
}
};
stream
.play()
.map_err(|e| StreamingError::Parameter(e.to_string()))?;
let interleaved_cap = period_frames.unwrap_or(1024) as usize * channels as usize;
let interleaved = Vec::with_capacity(interleaved_cap);
Ok(Self {
_stream: stream,
producer,
channels,
sample_rate,
interleaved,
notifier,
xruns,
})
}
pub fn sample_rate(&self) -> u32 {
self.sample_rate
}
pub fn channels(&self) -> u16 {
self.channels
}
pub fn xruns(&self) -> u32 {
self.xruns.load(Ordering::Relaxed)
}
}
impl AudioSink for DevicePlayback {
type Sample = f32;
type Error = StreamingError;
fn write_chunk(&mut self, chunk: &AudioSamples<'static, f32>) -> Result<(), StreamingError> {
let channels = self.channels as usize;
let frames = chunk.samples_per_channel().get();
let samples = frames * channels;
self.interleaved.clear();
self.interleaved.resize(samples, 0.0);
if chunk.is_mono() {
let mono = chunk.as_mono().expect("is_mono but as_mono returned None");
for (frame, &s) in mono.iter().take(frames).enumerate() {
for ch in 0..channels {
self.interleaved[frame * channels + ch] = s;
}
}
} else {
let flat = chunk
.as_multi_channel()
.expect("multi-channel but as_multi_channel returned None")
.as_slice()
.ok_or_else(|| StreamingError::Parameter("source buffer not contiguous".into()))?;
let flat_nes = unsafe { NonEmptySlice::new_unchecked(flat) };
let out_nes = unsafe { NonEmptySlice::new_unchecked_mut(&mut self.interleaved) };
let channels_nz = unsafe { NonZeroU32::new_unchecked(self.channels as u32) };
simd_conversions::interleave_multi(flat_nes, out_nes, channels_nz)
.map_err(StreamingError::AudioSamples)?;
}
loop {
if self.producer.slots() >= samples {
break;
}
self.notifier.wait();
}
self.producer
.write_chunk_uninit(samples)
.expect("slots verified above")
.fill_from_iter(self.interleaved.iter().copied());
Ok(())
}
fn flush(&mut self) -> Result<(), StreamingError> {
let capacity = self.producer.buffer().capacity();
loop {
if self.producer.slots() >= capacity {
break;
}
self.notifier.wait();
}
Ok(())
}
fn finalize(mut self) -> Result<(), StreamingError> {
self.flush()
}
}
#[cfg(target_os = "linux")]
fn try_set_rt_priority(once: &Once) {
once.call_once(|| unsafe {
let param = libc::sched_param { sched_priority: 80 };
libc::pthread_setschedparam(libc::pthread_self(), libc::SCHED_FIFO, ¶m);
});
}
#[cfg(not(target_os = "linux"))]
fn try_set_rt_priority(_once: &Once) {}
fn build_f32_output(
device: &cpal::Device,
config: &cpal::StreamConfig,
mut consumer: rtrb::Consumer<f32>,
notifier: Arc<AudioNotifier>,
xruns: Arc<AtomicU32>,
) -> Result<cpal::Stream, StreamingError> {
let rt_once = Arc::new(Once::new());
device
.build_output_stream(
config,
move |data: &mut [f32], _| {
try_set_rt_priority(&rt_once);
match consumer.read_chunk(data.len()) {
Ok(chunk) => {
let (s1, s2) = chunk.as_slices();
let mid = s1.len();
data[..mid].copy_from_slice(s1);
data[mid..].copy_from_slice(s2);
chunk.commit_all();
}
Err(_) => {
xruns.fetch_add(1, Ordering::Relaxed);
data.fill(0.0);
}
}
notifier.notify();
},
|e| eprintln!("playback error: {e}"),
None,
)
.map_err(|e| StreamingError::Parameter(e.to_string()))
}
fn build_i16_output(
device: &cpal::Device,
config: &cpal::StreamConfig,
mut consumer: rtrb::Consumer<f32>,
notifier: Arc<AudioNotifier>,
xruns: Arc<AtomicU32>,
) -> Result<cpal::Stream, StreamingError> {
use cpal::Sample;
let rt_once = Arc::new(Once::new());
device
.build_output_stream(
config,
move |data: &mut [i16], _| {
try_set_rt_priority(&rt_once);
match consumer.read_chunk(data.len()) {
Ok(chunk) => {
let (s1, s2) = chunk.as_slices();
for (dst, &src) in data.iter_mut().zip(s1.iter().chain(s2.iter())) {
*dst = src.to_sample::<i16>();
}
chunk.commit_all();
}
Err(_) => {
xruns.fetch_add(1, Ordering::Relaxed);
data.fill(0i16);
}
}
notifier.notify();
},
|e| eprintln!("playback error: {e}"),
None,
)
.map_err(|e| StreamingError::Parameter(e.to_string()))
}
fn build_u8_output(
device: &cpal::Device,
config: &cpal::StreamConfig,
mut consumer: rtrb::Consumer<f32>,
notifier: Arc<AudioNotifier>,
xruns: Arc<AtomicU32>,
) -> Result<cpal::Stream, StreamingError> {
use cpal::Sample;
let rt_once = Arc::new(Once::new());
device
.build_output_stream(
config,
move |data: &mut [u8], _| {
try_set_rt_priority(&rt_once);
match consumer.read_chunk(data.len()) {
Ok(chunk) => {
let (s1, s2) = chunk.as_slices();
for (dst, &src) in data.iter_mut().zip(s1.iter().chain(s2.iter())) {
*dst = src.to_sample::<u8>();
}
chunk.commit_all();
}
Err(_) => {
xruns.fetch_add(1, Ordering::Relaxed);
data.fill(128u8);
}
}
notifier.notify();
},
|e| eprintln!("playback error: {e}"),
None,
)
.map_err(|e| StreamingError::Parameter(e.to_string()))
}