use cpal::traits::{DeviceTrait, StreamTrait};
use cpal::{SampleFormat, SupportedStreamConfig};
use dasp_graph::{Buffer, Input};
use rtrb::{Consumer, Producer, RingBuffer};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use crate::node::{AudioNode, ProcessContext};
pub struct CpalSink {
buffer: Producer<f32>,
channels: usize,
samples_consumed: Arc<AtomicUsize>,
had_underrun: Arc<AtomicBool>,
}
impl CpalSink {
pub fn new(device: &cpal::Device, config: &SupportedStreamConfig) -> Self {
let channels = config.channels() as usize;
let sample_format = config.sample_format();
let stream_config = config.config();
let sample_rate = stream_config.sample_rate.0;
let buffer_samples = ((sample_rate as f32 * 0.1) as usize) * channels;
let buffer_size = buffer_samples.next_power_of_two().max(8192);
let (producer, consumer) = RingBuffer::<f32>::new(buffer_size);
let samples_consumed = Arc::new(AtomicUsize::new(0));
let samples_consumed_clone = samples_consumed.clone();
let had_underrun = Arc::new(AtomicBool::new(false));
let had_underrun_clone = had_underrun.clone();
let device = device.clone();
std::thread::spawn(move || {
let stream = build_stream(
&device,
sample_format,
&stream_config,
consumer,
samples_consumed_clone,
had_underrun_clone,
)
.expect("Failed to build output stream");
stream.play().expect("Failed to start audio stream");
loop {
std::thread::park();
}
});
Self {
buffer: producer,
channels,
samples_consumed,
had_underrun,
}
}
#[inline]
pub fn samples_consumed(&self) -> usize {
self.samples_consumed.load(Ordering::Relaxed)
}
#[inline]
pub fn buffer_available(&self) -> usize {
self.buffer.slots()
}
pub fn check_underrun(&self) -> bool {
self.had_underrun.swap(false, Ordering::Relaxed)
}
}
#[inline]
fn f32_to_bits(f: f32) -> u32 {
f.to_bits()
}
#[inline]
fn bits_to_f32(bits: u32) -> f32 {
f32::from_bits(bits)
}
fn build_stream(
device: &cpal::Device,
sample_format: SampleFormat,
stream_config: &cpal::StreamConfig,
mut consumer: Consumer<f32>,
samples_consumed: Arc<AtomicUsize>,
had_underrun: Arc<AtomicBool>,
) -> Result<cpal::Stream, cpal::BuildStreamError> {
let last_sample = Arc::new(AtomicU32::new(f32_to_bits(0.0)));
match sample_format {
SampleFormat::F32 => {
let last_sample = last_sample.clone();
device.build_output_stream(
stream_config,
move |data: &mut [f32], _| {
let mut underrun = false;
for sample in data.iter_mut() {
let s = match consumer.pop() {
Ok(v) => {
last_sample.store(f32_to_bits(v), Ordering::Relaxed);
v
}
Err(_) => {
underrun = true;
bits_to_f32(last_sample.load(Ordering::Relaxed))
}
};
*sample = s.clamp(-1.0, 1.0);
}
if underrun {
had_underrun.store(true, Ordering::Relaxed);
}
samples_consumed.fetch_add(data.len(), Ordering::Relaxed);
},
|err| eprintln!("CPAL stream error: {:?}", err),
None,
)
}
SampleFormat::I16 => {
let last_sample = last_sample.clone();
device.build_output_stream(
stream_config,
move |data: &mut [i16], _| {
let mut underrun = false;
for sample in data.iter_mut() {
let s = match consumer.pop() {
Ok(v) => {
last_sample.store(f32_to_bits(v), Ordering::Relaxed);
v
}
Err(_) => {
underrun = true;
bits_to_f32(last_sample.load(Ordering::Relaxed))
}
};
*sample = (s.clamp(-1.0, 1.0) * i16::MAX as f32) as i16;
}
if underrun {
had_underrun.store(true, Ordering::Relaxed);
}
samples_consumed.fetch_add(data.len(), Ordering::Relaxed);
},
|err| eprintln!("CPAL stream error: {:?}", err),
None,
)
}
SampleFormat::U16 => {
let last_sample = last_sample.clone();
device.build_output_stream(
stream_config,
move |data: &mut [u16], _| {
let mut underrun = false;
for sample in data.iter_mut() {
let s = match consumer.pop() {
Ok(v) => {
last_sample.store(f32_to_bits(v), Ordering::Relaxed);
v
}
Err(_) => {
underrun = true;
bits_to_f32(last_sample.load(Ordering::Relaxed))
}
};
*sample = ((s.clamp(-1.0, 1.0) + 1.0) * 0.5 * u16::MAX as f32) as u16;
}
if underrun {
had_underrun.store(true, Ordering::Relaxed);
}
samples_consumed.fetch_add(data.len(), Ordering::Relaxed);
},
|err| eprintln!("CPAL stream error: {:?}", err),
None,
)
}
_ => panic!("Unsupported sample format: {:?}", sample_format),
}
}
impl AudioNode for CpalSink {
type Message = ();
fn process(
&mut self,
_ctx: &ProcessContext,
_messages: impl Iterator<Item = ()>,
inputs: &[Input],
_outputs: &mut [Buffer],
) {
if inputs.is_empty() {
return;
}
let input = &inputs[0];
let buffers = input.buffers();
if buffers.is_empty() {
return;
}
let buffer_len = buffers[0].len();
let samples_needed = buffer_len * self.channels;
if self.buffer.slots() < samples_needed {
return;
}
for i in 0..buffer_len {
for ch in 0..self.channels {
let src_ch = ch.min(buffers.len() - 1);
let _ = self.buffer.push(buffers[src_ch][i]);
}
}
}
#[inline]
fn num_inputs(&self) -> usize { 1 }
#[inline]
fn num_outputs(&self) -> usize { 0 }
}