spectrusty_audio/carousel.rs
1/*
2 Copyright (C) 2020-2022 Rafal Michalski
3
4 This file is part of SPECTRUSTY, a Rust library for building emulators.
5
6 For the full copyright notice, see the lib.rs file.
7*/
8/*! Tools for assisting audio rendering via audio frameworks that run on separate threads.
9
10# The Carousel
11
12Some audio frameworks require sample generators to be run on a separate thread while the sound
13is being played to provide data to fill the audio buffer just in time when it needs to be refilled.
14
15When emulating a computer we have to synchronize emulation, render video frames, read user input,
16and at the same time to render audio samples asynchronously exactly when some external thread
17tells us to. Having a concurrent thread for rendering audio frames makes it somewhat difficult.
18
19This module exists solely for the purpose to ease this task. "The Carousel" consists of an
20[audio producer] and an [audio consumer]. The audio producer lives in the same thread where
21the emulation is run and where the sound is being produced. The audio consumer is delegated
22to the audio thread and its role is to relay audio samples to the audio framework.
23
24```text
25 (new sample data)
26 /----> AudioBuffer ----> AudioBuffer ---->\
27+----------------------+ +----------------------+
28| AudioFrameProducer | | AudioFrameConsumer | -> 🔊
29+----------------------+ +----------------------+
30 \<---- AudioBuffer <---- AudioBuffer -----/
31 (recycled buffers)
32```
33The produced [audio buffer]s, ready to be played, are being sent via [mpsc::channel] from
34the [audio producer] to the [audio consumer]. The consumer fills the audio buffers provided by
35the audio framework with samples from the received [audio buffer] frames and sends the used up
36frame buffers back via another channel to the [audio producer] to be filled again with new
37sample data.
38
39The size of each [audio buffer] is determined only by the emulated frame duration and is
40unrelated to the audio framework output buffer size.
41
42The number of buffers in circulation determines the audio latency. The larger the latency the
43more stable the playback is at the cost of the delay of the sound. Knowing the output buffer
44size the minimum latency should be calculated from the number of samples in the output buffer
45divided by the number of samples in the single audio frame plus one.
46
47[audio producer]: AudioFrameProducer
48[audio consumer]: AudioFrameConsumer
49[audio buffer]: AudioBuffer
50[mpsc::channel]: std::sync::mpsc::channel
51*/
52use std::error;
53use core::fmt;
54
55use core::mem::{swap, replace};
56use core::ops::{Deref, DerefMut};
57use std::sync::mpsc::{channel, Sender, Receiver, SendError, RecvError,
58 TryRecvError, RecvTimeoutError, TrySendError};
59
60pub use spectrusty_core::audio::AudioSample;
61
62pub type AudioFrameResult<T> = Result<T, AudioFrameError>;
63
64#[derive(Debug, Clone)]
65pub struct AudioFrameError;
66
67/// The audio buffer is a carrier of audio samples generated for every emulated frame.
68///
69/// The format and number of channels depend on the audio framework requirements.
70#[derive(Clone, Debug)]
71pub struct AudioBuffer<T>(pub Vec<T>);
72
73/// Relays [AudioBuffer] samples to the audio framework output buffers.
74#[derive(Debug)]
75pub struct AudioFrameConsumer<T> {
76 buffer: AudioBuffer<T>,
77 cursor: usize,
78 producer_tx: Sender<AudioBuffer<T>>,
79 rx: Receiver<AudioBuffer<T>>,
80}
81
82/// Allows relaying rendered [AudioBuffer] to the [AudioFrameConsumer].
83#[derive(Debug)]
84pub struct AudioFrameProducer<T> {
85 /// The next audio buffer frame to render samples to.
86 pub buffer: AudioBuffer<T>,
87 rx: Receiver<AudioBuffer<T>>,
88 consumer_tx: Sender<AudioBuffer<T>>,
89}
90
91/// Creates an inter-connected pair or [AudioFrameProducer] and [AudioFrameConsumer].
92///
93/// The `latency` + 1 specifies how many buffers will be circulating in the carousel.
94/// The good indicator of how many are needed depends on the size of the target audio
95/// buffers provided by the framework. The size of the target audio buffer divided by
96/// the size of the produced frame buffers is a good approximation.
97///
98/// Basically, the larger the `latency` is the more stable the output sound stream will
99/// be, but at the cost of more delayed playback. Implementations should set a good
100/// default based on experiments but may allow users to adjust this value eventually.
101///
102/// `sample_frames` and `channels` determine the size of the allocated buffers.
103pub fn create_carousel<T>(latency: usize, sample_frames: usize, channels: u8) ->
104 (AudioFrameProducer<T>, AudioFrameConsumer<T>)
105where T: 'static + AudioSample + Send
106{
107 // let sample_frames = (sample_rate as f64 * frame_duration).ceil() as usize;
108 let buffer = AudioBuffer::<T>::new(sample_frames, channels);
109 let (producer_tx, producer_rx) = channel::<AudioBuffer<T>>();
110 let (consumer_tx, consumer_rx) = channel::<AudioBuffer<T>>();
111 // if latency > 0 {
112 // Add some frame buffers into circulation
113 // for _ in 0..latency {
114 producer_tx.send(buffer.clone()).unwrap(); // infallible
115 // }
116 for _ in 0..latency {
117 consumer_tx.send(buffer.clone()).unwrap(); // infallible
118 }
119 // }
120 // }
121 let producer = AudioFrameProducer::new(buffer.clone(), consumer_tx, producer_rx);
122 let consumer = AudioFrameConsumer::new(buffer, producer_tx, consumer_rx);
123 (producer, consumer)
124}
125
126impl fmt::Display for AudioFrameError {
127 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128 write!(f, "the remote thread has been terminated")
129 }
130}
131
132impl error::Error for AudioFrameError {
133 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
134 None
135 }
136}
137
138impl<T> From<TrySendError<T>> for AudioFrameError {
139 fn from(_error: TrySendError<T>) -> Self {
140 AudioFrameError
141 }
142}
143
144impl<T> From<SendError<T>> for AudioFrameError {
145 fn from(_error: SendError<T>) -> Self {
146 AudioFrameError
147 }
148}
149
150impl From<TryRecvError> for AudioFrameError {
151 fn from(_error: TryRecvError) -> Self {
152 AudioFrameError
153 }
154}
155
156impl From<RecvError> for AudioFrameError {
157 fn from(_error: RecvError) -> Self {
158 AudioFrameError
159 }
160}
161
162impl From<RecvTimeoutError> for AudioFrameError {
163 fn from(_error: RecvTimeoutError) -> Self {
164 AudioFrameError
165 }
166}
167
168impl<T> Deref for AudioBuffer<T> {
169 type Target = Vec<T>;
170 fn deref(&self) -> &Self::Target {
171 &self.0
172 }
173}
174
175impl<T> DerefMut for AudioBuffer<T> {
176 fn deref_mut(&mut self) -> &mut Self::Target {
177 &mut self.0
178 }
179}
180
181impl<T: AudioSample> AudioBuffer<T> {
182 fn new(sample_frames: usize, channels: u8) -> Self {
183 let size = sample_frames * channels as usize;
184 AudioBuffer(vec![T::silence();size])
185 }
186}
187
188impl<T> AudioBuffer<T> {
189 #[inline(always)]
190 fn sampled_size(&self) -> usize {
191 self.0.len()
192 }
193}
194
195impl<T: Copy> AudioBuffer<T> {
196 #[inline]
197 fn copy_to(&self, target: &mut [T], src_offset: usize) -> usize {
198 let end_offset = self.sampled_size().min(src_offset + target.len());
199 let source = &self.0[src_offset..end_offset];
200 // eprintln!("cur: {} out: {} of {} src.len: {}", cursor, target_offset, target_buffer.len(), source.len());
201 let copied_size = source.len();
202 target[..copied_size].copy_from_slice(source);
203 copied_size
204 }
205}
206
207impl<T> AudioFrameConsumer<T> {
208 /// Creates a new instance of `AudioFrameConsumer`.
209 ///
210 /// Prefer to use [create_carousel] instead.
211 pub fn new(buffer: AudioBuffer<T>,
212 producer_tx: Sender<AudioBuffer<T>>,
213 consumer_rx: Receiver<AudioBuffer<T>>) -> Self {
214 AudioFrameConsumer {
215 buffer,
216 cursor: 0,
217 producer_tx,
218 rx: consumer_rx
219 }
220 }
221 /// Resets the audio buffer sample cursor.
222 pub fn reset_cursor(&mut self) {
223 self.cursor = 0;
224 }
225}
226
227impl<T: 'static + Copy + Send> AudioFrameConsumer<T> {
228 /// Attempts to receive the next audio frame from the [AudioFrameProducer].
229 ///
230 /// When `Ok(true)` is returned replaces the current frame buffer with the one received
231 /// and sends back the current one.
232 ///
233 /// If there is no new buffer waiting in the message queue returns `Ok(false)`.
234 ///
235 /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
236 /// which is possible only when the remote end has disconnected.
237 #[inline]
238 pub fn next_frame(&mut self) -> AudioFrameResult<bool> {
239 match self.rx.try_recv() {
240 // match self.rx.recv_timeout(Duration::from_millis(wait_max_ms as u64)) {
241 Ok(mut buffer) => {
242 // print!("{:?} ", buffer.as_ptr());
243 swap(&mut self.buffer, &mut buffer);
244 self.producer_tx.send(buffer)?;
245 // let mut buffer = Some(buffer);
246 // loop {
247 // match self.producer_tx.send(buffer.take().unwrap()) {
248 // Err(TrySendError::Full(buf)) => {
249 // println!("cons couldn't send");
250 // buffer = Some(buf)
251 // }
252 // Ok(()) => break,
253 // Err(e) => Err(e)?,
254 // };
255 // }
256 Ok(true)
257 }
258 Err(TryRecvError::Empty) => {
259 Ok(false)
260 },
261 Err(TryRecvError::Disconnected) => Err(AudioFrameError)
262 // Err(RecvTimeoutError::Timeout) => Ok(false),
263 // Err(RecvTimeoutError::Disconnected) => Err(AudioFrameError),
264 }
265 }
266 /// Exposes the last received frame buffer as a slice of samples.
267 #[inline]
268 pub fn current_frame(&self) -> &[T] {
269 &self.buffer
270 }
271 /// Fills the `target_buffer` with the received audio frame samples.
272 ///
273 /// Attempts to receive new frame buffers when necessary, repeating the process until
274 /// the whole buffer is filled or when there are no more buffers waiting in the incoming
275 /// queue.
276 ///
277 /// Returns the unfilled part of the target buffer in case there were no more frames to receive
278 /// and `ignore_missing` was `false`.
279 ///
280 /// Returns an empty slice if the whole buffer has been filled.
281 ///
282 /// In case `ignore_missing` is `true` the last audio frame will be rendered again if there are
283 /// no more new buffers in the queue.
284 ///
285 /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
286 /// which is possible only when the remote end has disconnected.
287 pub fn fill_buffer<'a>(
288 &mut self,
289 mut target_buffer: &'a mut[T],
290 ignore_missing: bool
291 ) -> AudioFrameResult<&'a mut[T]>
292 {
293 let mut cursor = self.cursor;
294 while !target_buffer.is_empty() {
295 if cursor >= self.buffer.sampled_size() {
296 if !(self.next_frame()? || ignore_missing) {
297 break
298 }
299 cursor = 0;
300 }
301 // print!("{:?} ", self.buffer.as_ptr());
302 let copied_size = self.buffer.copy_to(target_buffer, cursor);
303 cursor += copied_size;
304 target_buffer = &mut target_buffer[copied_size..];
305 }
306 self.cursor = cursor;
307 Ok(target_buffer)
308 }
309}
310
311impl<T> AudioFrameProducer<T> {
312 /// Creates a new instance of `AudioFrameProducer`.
313 ///
314 /// Prefer to use [create_carousel] instead.
315 pub fn new(buffer: AudioBuffer<T>,
316 consumer_tx: Sender<AudioBuffer<T>>,
317 producer_rx: Receiver<AudioBuffer<T>>) -> Self {
318 AudioFrameProducer { buffer, rx: producer_rx, consumer_tx }
319 }
320 /// Provides the current frame buffer as `Vec` of samples for rendering via a closure.
321 ///
322 /// The closure should ensure the size of the `Vec` is resized to the number of actually
323 /// rendered samples.
324 pub fn render_frame<F: FnOnce(&mut Vec<T>)>(&mut self, render: F) {
325 render(&mut self.buffer);
326 // eprintln!("smpl: {}", self.buffer.sampled_size);
327 }
328}
329
330impl<T: 'static + Send> AudioFrameProducer<T> {
331 /// Sends the audio frame buffer to the [AudioFrameConsumer] and replaces it with a recycled
332 /// buffer received back from [AudioFrameConsumer].
333 ///
334 /// This method will block if the recycled buffer queue is empty.
335 ///
336 /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
337 /// which is possible only when the remote end has disconnected.
338 pub fn send_frame(&mut self) -> AudioFrameResult<()> {
339 // eprintln!("waiting for buffer");
340 // let buffer = loop {
341 // match self.rx.try_recv() {
342 // Ok(buf) => break buf,
343 // Err(TryRecvError::Empty) => {
344 // let now = std::time::Instant::now();
345 // let buf = self.rx.recv()?;
346 // println!("prod couldn't recv, {:?}", now.elapsed());
347 // break buf;
348 // }
349 // Err(e) => Err(e)?
350 // }
351 // };
352 // let mut buffer = Some(replace(&mut self.buffer, buffer));
353 // let buffer = replace(&mut self.buffer, buffer);
354 // eprintln!("got buffer");
355 // loop {
356 // match self.consumer_tx.try_send(buffer.take().unwrap()) {
357 // Err(TrySendError::Full(buf)) => {
358 // println!("prod couldn't send");
359 // buffer = Some(buf)
360 // }
361 // Ok(()) => return Ok(()),
362 // Err(e) => Err(e)?
363 // }
364 // }
365 let buffer = replace(&mut self.buffer, self.rx.recv()?);
366 self.consumer_tx.send(buffer).map_err(From::from)
367 // eprintln!("sent buffer");
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use std::thread;
375 use std::f32::consts::PI;
376
377 #[test]
378 fn carousel_works() -> Result<(), Box<dyn error::Error>> {
379 // eprintln!("AudioBuffer<f32>: {:?}", core::mem::size_of::<AudioBuffer<f32>>());
380 // eprintln!("AudioBuffer<u16>: {:?}", core::mem::size_of::<AudioBuffer<u16>>());
381 // eprintln!("Sender<AudioBuffer<f32>>: {:?}", core::mem::size_of::<Sender<AudioBuffer<f32>>>());
382 // eprintln!("Sender<AudioBuffer<u16>>: {:?}", core::mem::size_of::<Sender<AudioBuffer<u16>>>());
383 const TEST_SAMPLES_COUNT: usize = 20000;
384 const LATENCY: usize = 5;
385 const BUFSIZE: usize = 256;
386 const ZEROLEN: usize = BUFSIZE + LATENCY*BUFSIZE;
387 fn sinusoid(n: u16) -> f32 {
388 (PI*(n as f32)/BUFSIZE as f32).sin()
389 }
390
391 let (mut producer, mut consumer) = create_carousel::<f32>(LATENCY, BUFSIZE, 1);
392 let join = thread::spawn(move || {
393 let mut target = vec![0.0;800];
394 let mut unfilled = &mut target[..];
395 loop {
396 thread::sleep(std::time::Duration::from_millis(1));
397 unfilled = consumer.fill_buffer(unfilled, false).unwrap();
398 if unfilled.len() == 0 {
399 break;
400 }
401 }
402 target.resize(TEST_SAMPLES_COUNT, 0.0);
403 let mut unfilled = &mut target[800..];
404 loop {
405 thread::sleep(std::time::Duration::from_millis(1));
406 unfilled = consumer.fill_buffer(unfilled, false).unwrap();
407 if unfilled.len() == 0 {
408 break;
409 }
410 }
411 target
412 });
413
414 loop {
415 producer.render_frame(|vec| {
416 vec.clear();
417 vec.extend((0..BUFSIZE as u16).map(sinusoid));
418 });
419 if let Err(_e) = producer.send_frame() {
420 break
421 }
422 }
423 let target = join.join().unwrap();
424 assert_eq!(vec![0.0;ZEROLEN][..], target[..ZEROLEN]);
425 let mut template = Vec::new();
426 template.extend((0..BUFSIZE as u16).map(sinusoid).cycle().take(TEST_SAMPLES_COUNT-ZEROLEN));
427 assert_eq!(TEST_SAMPLES_COUNT-ZEROLEN, template.len());
428 assert_eq!(template[..], target[ZEROLEN..]);
429 Ok(())
430 }
431}