1use std::{
2 sync::{
3 atomic::{AtomicBool, Ordering},
4 Arc,
5 },
6 time::{Duration, Instant},
7};
8
9use cpal::{
10 traits::{DeviceTrait, StreamTrait},
11 Device, SampleFormat, Stream, StreamConfig,
12};
13use dasp_sample::Sample;
14use rtrb::{Consumer, RingBuffer};
15
16use crate::{StreamSink, StreamSource};
17
18#[derive(Debug)]
19pub struct CpalSource {
20 pub interleaved_in: Consumer<f32>,
21 channels: usize,
22}
23
24impl CpalSource {
25 pub fn channels(&self) -> usize {
26 self.channels
27 }
28}
29
30pub fn start_cpal_source(
31 device: &Device,
32 config: &StreamConfig,
33 sample_format: SampleFormat,
34 ring_size: usize,
35) -> Result<(Stream, CpalSource), cpal::BuildStreamError> {
36 let channels = config.channels as usize;
37 let ring_buffer_size = ring_size * channels;
38
39 let (producer, consumer) = RingBuffer::new(ring_buffer_size);
40
41 let mut manager = StreamSource::with_defaults(producer, channels);
42 let callback_start = Instant::now();
43
44 let cfg: StreamConfig = config.clone();
45
46 let stream = match sample_format {
47 cpal::SampleFormat::I8 => device.build_input_stream(
48 &cfg,
49 move |data, _: &_| input_callback::<i8>(data, &mut manager, callback_start),
50 |_| {},
51 None,
52 )?,
53 cpal::SampleFormat::I16 => device.build_input_stream(
54 &cfg,
55 move |data, _: &_| input_callback::<i16>(data, &mut manager, callback_start),
56 |_| {},
57 None,
58 )?,
59 cpal::SampleFormat::I32 => device.build_input_stream(
60 &cfg,
61 move |data, _: &_| input_callback::<i32>(data, &mut manager, callback_start),
62 |_| {},
63 None,
64 )?,
65 cpal::SampleFormat::I64 => device.build_input_stream(
66 &cfg,
67 move |data, _: &_| input_callback::<i64>(data, &mut manager, callback_start),
68 |_| {},
69 None,
70 )?,
71 cpal::SampleFormat::U8 => device.build_input_stream(
72 &cfg,
73 move |data, _: &_| input_callback::<u8>(data, &mut manager, callback_start),
74 |_| {},
75 None,
76 )?,
77 cpal::SampleFormat::U16 => device.build_input_stream(
78 &cfg,
79 move |data, _: &_| input_callback::<u16>(data, &mut manager, callback_start),
80 |_| {},
81 None,
82 )?,
83 cpal::SampleFormat::U32 => device.build_input_stream(
84 &cfg,
85 move |data, _: &_| input_callback::<u32>(data, &mut manager, callback_start),
86 |_| {},
87 None,
88 )?,
89 cpal::SampleFormat::U64 => device.build_input_stream(
90 &cfg,
91 move |data, _: &_| input_callback::<u64>(data, &mut manager, callback_start),
92 |_| {},
93 None,
94 )?,
95 cpal::SampleFormat::F32 => device.build_input_stream(
96 &cfg,
97 move |data, _: &_| input_callback::<f32>(data, &mut manager, callback_start),
98 |_| {},
99 None,
100 )?,
101 cpal::SampleFormat::F64 => device.build_input_stream(
102 &cfg,
103 move |data, _: &_| input_callback::<f64>(data, &mut manager, callback_start),
104 |_| {},
105 None,
106 )?,
107 _ => {
108 unreachable!("this program has crashed due to a `TooManyObfuscatingAbstractions` error")
109 }
110 };
111
112 Ok((
114 stream,
115 CpalSource {
116 interleaved_in: consumer,
117 channels,
118 },
119 ))
120}
121
122fn input_callback<T>(input: &[T], manager: &mut StreamSource, callback_start: Instant)
123where
124 T: cpal::Sample + dasp_sample::ToSample<f32>,
125{
126 let callback = Instant::now() - callback_start;
127
128 manager.input_samples(
129 input.iter().map(|x| x.to_sample::<f32>()),
130 input.len(),
131 callback > Duration::from_secs(1),
132 );
133}
134
135#[derive(Debug)]
136pub struct CpalSink {
137 pub interleaved_out: rtrb::Producer<f32>,
138 pub measure_xruns: Arc<AtomicBool>,
139 channels: usize,
140}
141
142impl CpalSink {
143 pub fn channels(&self) -> usize {
144 self.channels
145 }
146}
147
148pub fn start_cpal_sink(
149 device: &Device,
150 config: &StreamConfig,
151 sample_format: SampleFormat,
152 ring_size: usize,
153) -> Result<(Stream, CpalSink), cpal::BuildStreamError> {
154 let channels = config.channels;
155 let ring_buffer_size = ring_size * channels as usize;
156
157 let (producer, consumer) = RingBuffer::new(ring_buffer_size);
158
159 let mut manager = StreamSink::with_defaults(consumer, channels as usize);
160 let mut scratch = Vec::with_capacity(ring_buffer_size);
162
163 let cfg: StreamConfig = config.clone();
164
165 let measure_xruns = Arc::new(AtomicBool::new(false));
166 let measure_xruns_clone = measure_xruns.clone();
167
168 let stream = match sample_format {
169 cpal::SampleFormat::I8 => device.build_output_stream(
170 &cfg,
171 move |data, _: &_| output_callback::<i8>(data, &mut manager, &mut scratch, &measure_xruns),
172 |_| {},
173 None,
174 )?,
175 cpal::SampleFormat::I16 => device.build_output_stream(
176 &cfg,
177 move |data, _: &_| output_callback::<i16>(data, &mut manager, &mut scratch, &measure_xruns),
178 |_| {},
179 None,
180 )?,
181 cpal::SampleFormat::I32 => device.build_output_stream(
182 &cfg,
183 move |data, _: &_| output_callback::<i32>(data, &mut manager, &mut scratch, &measure_xruns),
184 |_| {},
185 None,
186 )?,
187 cpal::SampleFormat::I64 => device.build_output_stream(
188 &cfg,
189 move |data, _: &_| output_callback::<i64>(data, &mut manager, &mut scratch, &measure_xruns),
190 |_| {},
191 None,
192 )?,
193 cpal::SampleFormat::U8 => device.build_output_stream(
194 &cfg,
195 move |data, _: &_| output_callback::<u8>(data, &mut manager, &mut scratch, &measure_xruns),
196 |_| {},
197 None,
198 )?,
199 cpal::SampleFormat::U16 => device.build_output_stream(
200 &cfg,
201 move |data, _: &_| output_callback::<u16>(data, &mut manager, &mut scratch, &measure_xruns),
202 |_| {},
203 None,
204 )?,
205 cpal::SampleFormat::U32 => device.build_output_stream(
206 &cfg,
207 move |data, _: &_| output_callback::<u32>(data, &mut manager, &mut scratch, &measure_xruns),
208 |_| {},
209 None,
210 )?,
211 cpal::SampleFormat::U64 => device.build_output_stream(
212 &cfg,
213 move |data, _: &_| output_callback::<u64>(data, &mut manager, &mut scratch, &measure_xruns),
214 |_| {},
215 None,
216 )?,
217 cpal::SampleFormat::F32 => device.build_output_stream(
218 &cfg,
219 move |data, _: &_| output_callback::<f32>(data, &mut manager, &mut scratch, &measure_xruns),
220 |_| {},
221 None,
222 )?,
223 cpal::SampleFormat::F64 => device.build_output_stream(
224 &cfg,
225 move |data, _: &_| output_callback::<f64>(data, &mut manager, &mut scratch, &measure_xruns),
226 |_| {},
227 None,
228 )?,
229 _ => {
230 unreachable!("this program has crashed due to a `TooManyObfuscatingAbstractions` error")
231 }
232 };
233 stream.play().unwrap();
234
235 Ok((
237 stream,
238 CpalSink {
239 interleaved_out: producer,
240 channels: channels as usize,
241 measure_xruns: measure_xruns_clone,
242 },
243 ))
244}
245
246fn output_callback<T>(output: &mut [T], manager: &mut StreamSink, scratch: &mut Vec<f32>, measure_xruns: &AtomicBool)
247where
248 T: cpal::Sample + dasp_sample::ToSample<T> + cpal::FromSample<f32>,
249{
250 scratch.resize(output.len(), 0.0);
251 manager.output_samples(scratch, measure_xruns.load(Ordering::Relaxed));
252
253 for (sample, sample_out) in scratch.iter().zip(output.iter_mut()) {
254 *sample_out = sample.to_sample::<T>();
255 }
256}