spectrusty_audio/
carousel.rs

1/*
2    Copyright (C) 2020-2022  Rafal Michalski
3
4    This file is part of SPECTRUSTY, a Rust library for building emulators.
5
6    For the full copyright notice, see the lib.rs file.
7*/
8/*! Tools for assisting audio rendering via audio frameworks that run on separate threads.
9
10# The Carousel
11
12Some audio frameworks require sample generators to be run on a separate thread while the sound
13is being played to provide data to fill the audio buffer just in time when it needs to be refilled.
14
15When emulating a computer we have to synchronize emulation, render video frames, read user input,
16and at the same time to render audio samples asynchronously exactly when some external thread
17tells us to. Having a concurrent thread for rendering audio frames makes it somewhat difficult.
18
19This module exists solely for the purpose to ease this task. "The Carousel" consists of an 
20[audio producer] and an [audio consumer]. The audio producer lives in the same thread where
21the emulation is run and where the sound is being produced. The audio consumer is delegated 
22to the audio thread and its role is to relay audio samples to the audio framework.
23
24```text
25                                 (new sample data)
26                    /----> AudioBuffer ----> AudioBuffer ---->\
27+----------------------+                                  +----------------------+
28|  AudioFrameProducer  |                                  |  AudioFrameConsumer  | -> 🔊
29+----------------------+                                  +----------------------+
30                    \<---- AudioBuffer <---- AudioBuffer -----/
31                                 (recycled buffers)
32```
33The produced [audio buffer]s, ready to be played, are being sent via [mpsc::channel] from
34the [audio producer] to the [audio consumer]. The consumer fills the audio buffers provided by
35the audio framework with samples from the received [audio buffer] frames and sends the used up
36frame buffers back via another channel to the [audio producer] to be filled again with new
37sample data.
38
39The size of each [audio buffer] is determined only by the emulated frame duration and is
40unrelated to the audio framework output buffer size.
41
42The number of buffers in circulation determines the audio latency. The larger the latency the
43more stable the playback is at the cost of the delay of the sound. Knowing the output buffer
44size the minimum latency should be calculated from the number of samples in the output buffer
45divided by the number of samples in the single audio frame plus one.
46
47[audio producer]: AudioFrameProducer
48[audio consumer]: AudioFrameConsumer
49[audio buffer]: AudioBuffer
50[mpsc::channel]: std::sync::mpsc::channel
51*/
52use std::error;
53use core::fmt;
54
55use core::mem::{swap, replace};
56use core::ops::{Deref, DerefMut};
57use std::sync::mpsc::{channel, Sender, Receiver, SendError, RecvError,
58                        TryRecvError, RecvTimeoutError, TrySendError};
59
60pub use spectrusty_core::audio::AudioSample;
61
62pub type AudioFrameResult<T> = Result<T, AudioFrameError>;
63
64#[derive(Debug, Clone)]
65pub struct AudioFrameError;
66
67/// The audio buffer is a carrier of audio samples generated for every emulated frame.
68///
69/// The format and number of channels depend on the audio framework requirements.
70#[derive(Clone, Debug)]
71pub struct AudioBuffer<T>(pub Vec<T>);
72
73/// Relays [AudioBuffer] samples to the audio framework output buffers.
74#[derive(Debug)]
75pub struct AudioFrameConsumer<T> {
76    buffer: AudioBuffer<T>,
77    cursor: usize,
78    producer_tx: Sender<AudioBuffer<T>>,
79    rx: Receiver<AudioBuffer<T>>,
80}
81
82/// Allows relaying rendered [AudioBuffer] to the [AudioFrameConsumer].
83#[derive(Debug)]
84pub struct AudioFrameProducer<T> {
85    /// The next audio buffer frame to render samples to.
86    pub buffer: AudioBuffer<T>,
87    rx: Receiver<AudioBuffer<T>>,
88    consumer_tx: Sender<AudioBuffer<T>>,
89}
90
91/// Creates an inter-connected pair or [AudioFrameProducer] and [AudioFrameConsumer].
92///
93/// The `latency` + 1 specifies how many buffers will be circulating in the carousel.
94/// The good indicator of how many are needed depends on the size of the target audio
95/// buffers provided by the framework. The size of the target audio buffer divided by
96/// the size of the produced frame buffers is a good approximation.
97///
98/// Basically, the larger the `latency` is the more stable the output sound stream will
99/// be, but at the cost of more delayed playback. Implementations should set a good
100/// default based on experiments but may allow users to adjust this value eventually.
101///
102/// `sample_frames` and `channels` determine the size of the allocated buffers.
103pub fn create_carousel<T>(latency: usize, sample_frames: usize, channels: u8) ->
104                                                (AudioFrameProducer<T>, AudioFrameConsumer<T>)
105where T: 'static + AudioSample + Send
106{
107    // let sample_frames = (sample_rate as f64 * frame_duration).ceil() as usize;
108    let buffer = AudioBuffer::<T>::new(sample_frames, channels);
109    let (producer_tx, producer_rx) = channel::<AudioBuffer<T>>();
110    let (consumer_tx, consumer_rx) = channel::<AudioBuffer<T>>();
111    // if latency > 0 {
112        // Add some frame buffers into circulation
113        // for _ in 0..latency {
114            producer_tx.send(buffer.clone()).unwrap(); // infallible
115        // }
116        for _ in 0..latency {
117            consumer_tx.send(buffer.clone()).unwrap(); // infallible
118        }
119        // }
120    // }
121    let producer = AudioFrameProducer::new(buffer.clone(), consumer_tx, producer_rx);
122    let consumer = AudioFrameConsumer::new(buffer, producer_tx, consumer_rx);
123    (producer, consumer)
124}
125
126impl fmt::Display for AudioFrameError {
127    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
128        write!(f, "the remote thread has been terminated")
129    }
130}
131
132impl error::Error for AudioFrameError {
133    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
134        None
135    }
136}
137
138impl<T> From<TrySendError<T>> for AudioFrameError {
139    fn from(_error: TrySendError<T>) -> Self {
140        AudioFrameError
141    }
142}
143
144impl<T> From<SendError<T>> for AudioFrameError {
145    fn from(_error: SendError<T>) -> Self {
146        AudioFrameError
147    }
148}
149
150impl From<TryRecvError> for AudioFrameError {
151    fn from(_error: TryRecvError) -> Self {
152        AudioFrameError
153    }
154}
155
156impl From<RecvError> for AudioFrameError {
157    fn from(_error: RecvError) -> Self {
158        AudioFrameError
159    }
160}
161
162impl From<RecvTimeoutError> for AudioFrameError {
163    fn from(_error: RecvTimeoutError) -> Self {
164        AudioFrameError
165    }
166}
167
168impl<T> Deref for AudioBuffer<T> {
169    type Target = Vec<T>;
170    fn deref(&self) -> &Self::Target {
171        &self.0
172    }
173}
174
175impl<T> DerefMut for AudioBuffer<T> {
176    fn deref_mut(&mut self) -> &mut Self::Target {
177        &mut self.0
178    }
179}
180
181impl<T: AudioSample> AudioBuffer<T> {
182    fn new(sample_frames: usize, channels: u8) -> Self {
183        let size = sample_frames * channels as usize;
184        AudioBuffer(vec![T::silence();size])
185    }
186}
187
188impl<T> AudioBuffer<T> {
189    #[inline(always)]
190    fn sampled_size(&self) -> usize {
191        self.0.len()
192    }
193}
194
195impl<T: Copy> AudioBuffer<T> {
196    #[inline]
197    fn copy_to(&self, target: &mut [T], src_offset: usize) -> usize {
198        let end_offset = self.sampled_size().min(src_offset + target.len());
199        let source = &self.0[src_offset..end_offset];
200        // eprintln!("cur: {} out: {} of {} src.len: {}", cursor, target_offset, target_buffer.len(), source.len());
201        let copied_size = source.len();
202        target[..copied_size].copy_from_slice(source);
203        copied_size
204    }
205}
206
207impl<T> AudioFrameConsumer<T> {
208    /// Creates a new instance of `AudioFrameConsumer`.
209    ///
210    /// Prefer to use [create_carousel] instead.
211    pub fn new(buffer: AudioBuffer<T>,
212               producer_tx: Sender<AudioBuffer<T>>,
213               consumer_rx: Receiver<AudioBuffer<T>>) -> Self {
214        AudioFrameConsumer {
215            buffer,
216            cursor: 0,
217            producer_tx,
218            rx: consumer_rx
219        }
220    }
221    /// Resets the audio buffer sample cursor.
222    pub fn reset_cursor(&mut self) {
223        self.cursor = 0;
224    }
225}
226
227impl<T: 'static + Copy + Send> AudioFrameConsumer<T> {
228    /// Attempts to receive the next audio frame from the [AudioFrameProducer].
229    ///
230    /// When `Ok(true)` is returned replaces the current frame buffer with the one received
231    /// and sends back the current one.
232    ///
233    /// If there is no new buffer waiting in the message queue returns `Ok(false)`.
234    ///
235    /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
236    /// which is possible only when the remote end has disconnected.
237    #[inline]
238    pub fn next_frame(&mut self) -> AudioFrameResult<bool> {
239        match self.rx.try_recv() {
240        // match self.rx.recv_timeout(Duration::from_millis(wait_max_ms as u64)) {
241            Ok(mut buffer) => {
242                // print!("{:?} ", buffer.as_ptr());
243                swap(&mut self.buffer, &mut buffer);
244                self.producer_tx.send(buffer)?;
245                // let mut buffer = Some(buffer);
246                // loop {
247                //     match self.producer_tx.send(buffer.take().unwrap()) {
248                //         Err(TrySendError::Full(buf)) => {
249                //             println!("cons couldn't send");
250                //             buffer = Some(buf)
251                //         }
252                //         Ok(()) => break,
253                //         Err(e) => Err(e)?,
254                //     };
255                // }
256                Ok(true)
257            }
258            Err(TryRecvError::Empty) => {
259                Ok(false)
260            },
261            Err(TryRecvError::Disconnected) => Err(AudioFrameError)
262            // Err(RecvTimeoutError::Timeout) => Ok(false),
263            // Err(RecvTimeoutError::Disconnected) => Err(AudioFrameError),
264        }
265    }
266    /// Exposes the last received frame buffer as a slice of samples.
267    #[inline]
268    pub fn current_frame(&self) -> &[T] {
269        &self.buffer
270    }
271    /// Fills the `target_buffer` with the received audio frame samples.
272    ///
273    /// Attempts to receive new frame buffers when necessary, repeating the process until 
274    /// the whole buffer is filled or when there are no more buffers waiting in the incoming
275    /// queue.
276    ///
277    /// Returns the unfilled part of the target buffer in case there were no more frames to receive
278    /// and `ignore_missing` was `false`.
279    ///
280    /// Returns an empty slice if the whole buffer has been filled.
281    ///
282    /// In case `ignore_missing` is `true` the last audio frame will be rendered again if there are
283    /// no more new buffers in the queue.
284    ///
285    /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
286    /// which is possible only when the remote end has disconnected.
287    pub fn fill_buffer<'a>(
288                &mut self,
289                mut target_buffer: &'a mut[T],
290                ignore_missing: bool
291            ) -> AudioFrameResult<&'a mut[T]>
292    {
293        let mut cursor = self.cursor;
294        while !target_buffer.is_empty() {
295            if cursor >= self.buffer.sampled_size() {
296                if !(self.next_frame()? || ignore_missing) {
297                    break
298                }
299                cursor = 0;
300            }
301            // print!("{:?} ", self.buffer.as_ptr());
302            let copied_size = self.buffer.copy_to(target_buffer, cursor);
303            cursor += copied_size;
304            target_buffer = &mut target_buffer[copied_size..];
305        }
306        self.cursor = cursor;
307        Ok(target_buffer)
308    }
309}
310
311impl<T> AudioFrameProducer<T> {
312    /// Creates a new instance of `AudioFrameProducer`.
313    ///
314    /// Prefer to use [create_carousel] instead.
315    pub fn new(buffer: AudioBuffer<T>,
316               consumer_tx: Sender<AudioBuffer<T>>,
317               producer_rx: Receiver<AudioBuffer<T>>) -> Self {
318        AudioFrameProducer { buffer, rx: producer_rx, consumer_tx }
319    }
320    /// Provides the current frame buffer as `Vec` of samples for rendering via a closure.
321    ///
322    /// The closure should ensure the size of the `Vec` is resized to the number of actually
323    /// rendered samples.
324    pub fn render_frame<F: FnOnce(&mut Vec<T>)>(&mut self, render: F) {
325        render(&mut self.buffer);
326        // eprintln!("smpl: {}", self.buffer.sampled_size);
327    }
328}
329
330impl<T: 'static + Send> AudioFrameProducer<T> {
331    /// Sends the audio frame buffer to the [AudioFrameConsumer] and replaces it with a recycled
332    /// buffer received back from [AudioFrameConsumer].
333    ///
334    /// This method will block if the recycled buffer queue is empty.
335    ///
336    /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
337    /// which is possible only when the remote end has disconnected.
338    pub fn send_frame(&mut self) -> AudioFrameResult<()> {
339        // eprintln!("waiting for buffer");
340        // let buffer = loop {
341        //     match self.rx.try_recv() {
342        //         Ok(buf) => break buf,
343        //         Err(TryRecvError::Empty) => {
344        //             let now = std::time::Instant::now();
345        //             let buf = self.rx.recv()?;
346        //             println!("prod couldn't recv, {:?}", now.elapsed());
347        //             break buf;
348        //         }
349        //         Err(e) => Err(e)?
350        //     }
351        // };
352        // let mut buffer = Some(replace(&mut self.buffer, buffer));
353        // let buffer = replace(&mut self.buffer, buffer);
354        // eprintln!("got buffer");
355        // loop {
356        //     match self.consumer_tx.try_send(buffer.take().unwrap()) {
357        //         Err(TrySendError::Full(buf)) => {
358        //             println!("prod couldn't send");
359        //             buffer = Some(buf)
360        //         }
361        //         Ok(()) => return Ok(()),
362        //         Err(e) => Err(e)?
363        //     }
364        // }
365        let buffer = replace(&mut self.buffer, self.rx.recv()?);
366        self.consumer_tx.send(buffer).map_err(From::from)
367        // eprintln!("sent buffer");
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use std::thread;
375    use std::f32::consts::PI;
376
377    #[test]
378    fn carousel_works() -> Result<(), Box<dyn error::Error>> {
379        // eprintln!("AudioBuffer<f32>: {:?}", core::mem::size_of::<AudioBuffer<f32>>());
380        // eprintln!("AudioBuffer<u16>: {:?}", core::mem::size_of::<AudioBuffer<u16>>());
381        // eprintln!("Sender<AudioBuffer<f32>>: {:?}", core::mem::size_of::<Sender<AudioBuffer<f32>>>());
382        // eprintln!("Sender<AudioBuffer<u16>>: {:?}", core::mem::size_of::<Sender<AudioBuffer<u16>>>());
383        const TEST_SAMPLES_COUNT: usize = 20000;
384        const LATENCY: usize = 5;
385        const BUFSIZE: usize = 256;
386        const ZEROLEN: usize = BUFSIZE + LATENCY*BUFSIZE;
387        fn sinusoid(n: u16) -> f32 {
388            (PI*(n as f32)/BUFSIZE as f32).sin()
389        }
390
391        let (mut producer, mut consumer) = create_carousel::<f32>(LATENCY, BUFSIZE, 1);
392        let join = thread::spawn(move || {
393            let mut target = vec![0.0;800];
394            let mut unfilled = &mut target[..];
395            loop {
396                thread::sleep(std::time::Duration::from_millis(1));
397                unfilled = consumer.fill_buffer(unfilled, false).unwrap();
398                if unfilled.len() == 0 {
399                    break;
400                }
401            }
402            target.resize(TEST_SAMPLES_COUNT, 0.0);
403            let mut unfilled = &mut target[800..];
404            loop {
405                thread::sleep(std::time::Duration::from_millis(1));
406                unfilled = consumer.fill_buffer(unfilled, false).unwrap();
407                if unfilled.len() == 0 {
408                    break;
409                }
410            }
411            target
412        });
413
414        loop {
415            producer.render_frame(|vec| {
416                vec.clear();
417                vec.extend((0..BUFSIZE as u16).map(sinusoid));
418            });
419            if let Err(_e) = producer.send_frame() {
420                break
421            }
422        }
423        let target = join.join().unwrap();
424        assert_eq!(vec![0.0;ZEROLEN][..], target[..ZEROLEN]);
425        let mut template = Vec::new();
426        template.extend((0..BUFSIZE as u16).map(sinusoid).cycle().take(TEST_SAMPLES_COUNT-ZEROLEN));
427        assert_eq!(TEST_SAMPLES_COUNT-ZEROLEN, template.len());
428        assert_eq!(template[..], target[ZEROLEN..]);
429        Ok(())
430    }
431}