spectrusty-audio 0.4.0

Components of the SPECTRUSTY library for synthesizing audio samples and native audio playback.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
/*
    Copyright (C) 2020-2022  Rafal Michalski

    This file is part of SPECTRUSTY, a Rust library for building emulators.

    For the full copyright notice, see the lib.rs file.
*/
/*! Tools for assisting audio rendering via audio frameworks that run on separate threads.

# The Carousel

Some audio frameworks require sample generators to be run on a separate thread while the sound
is being played to provide data to fill the audio buffer just in time when it needs to be refilled.

When emulating a computer we have to synchronize emulation, render video frames, read user input,
and at the same time to render audio samples asynchronously exactly when some external thread
tells us to. Having a concurrent thread for rendering audio frames makes it somewhat difficult.

This module exists solely for the purpose to ease this task. "The Carousel" consists of an 
[audio producer] and an [audio consumer]. The audio producer lives in the same thread where
the emulation is run and where the sound is being produced. The audio consumer is delegated 
to the audio thread and its role is to relay audio samples to the audio framework.

```text
                                 (new sample data)
                    /----> AudioBuffer ----> AudioBuffer ---->\
+----------------------+                                  +----------------------+
|  AudioFrameProducer  |                                  |  AudioFrameConsumer  | -> 🔊
+----------------------+                                  +----------------------+
                    \<---- AudioBuffer <---- AudioBuffer -----/
                                 (recycled buffers)
```
The produced [audio buffer]s, ready to be played, are being sent via [mpsc::channel] from
the [audio producer] to the [audio consumer]. The consumer fills the audio buffers provided by
the audio framework with samples from the received [audio buffer] frames and sends the used up
frame buffers back via another channel to the [audio producer] to be filled again with new
sample data.

The size of each [audio buffer] is determined only by the emulated frame duration and is
unrelated to the audio framework output buffer size.

The number of buffers in circulation determines the audio latency. The larger the latency the
more stable the playback is at the cost of the delay of the sound. Knowing the output buffer
size the minimum latency should be calculated from the number of samples in the output buffer
divided by the number of samples in the single audio frame plus one.

[audio producer]: AudioFrameProducer
[audio consumer]: AudioFrameConsumer
[audio buffer]: AudioBuffer
[mpsc::channel]: std::sync::mpsc::channel
*/
use std::error;
use core::fmt;

use core::mem::{swap, replace};
use core::ops::{Deref, DerefMut};
use std::sync::mpsc::{channel, Sender, Receiver, SendError, RecvError,
                        TryRecvError, RecvTimeoutError, TrySendError};

pub use spectrusty_core::audio::AudioSample;

pub type AudioFrameResult<T> = Result<T, AudioFrameError>;

#[derive(Debug, Clone)]
pub struct AudioFrameError;

/// The audio buffer is a carrier of audio samples generated for every emulated frame.
///
/// The format and number of channels depend on the audio framework requirements.
#[derive(Clone, Debug)]
pub struct AudioBuffer<T>(pub Vec<T>);

/// Relays [AudioBuffer] samples to the audio framework output buffers.
#[derive(Debug)]
pub struct AudioFrameConsumer<T> {
    buffer: AudioBuffer<T>,
    cursor: usize,
    producer_tx: Sender<AudioBuffer<T>>,
    rx: Receiver<AudioBuffer<T>>,
}

/// Allows relaying rendered [AudioBuffer] to the [AudioFrameConsumer].
#[derive(Debug)]
pub struct AudioFrameProducer<T> {
    /// The next audio buffer frame to render samples to.
    pub buffer: AudioBuffer<T>,
    rx: Receiver<AudioBuffer<T>>,
    consumer_tx: Sender<AudioBuffer<T>>,
}

/// Creates an inter-connected pair or [AudioFrameProducer] and [AudioFrameConsumer].
///
/// The `latency` + 1 specifies how many buffers will be circulating in the carousel.
/// The good indicator of how many are needed depends on the size of the target audio
/// buffers provided by the framework. The size of the target audio buffer divided by
/// the size of the produced frame buffers is a good approximation.
///
/// Basically, the larger the `latency` is the more stable the output sound stream will
/// be, but at the cost of more delayed playback. Implementations should set a good
/// default based on experiments but may allow users to adjust this value eventually.
///
/// `sample_frames` and `channels` determine the size of the allocated buffers.
pub fn create_carousel<T>(latency: usize, sample_frames: usize, channels: u8) ->
                                                (AudioFrameProducer<T>, AudioFrameConsumer<T>)
where T: 'static + AudioSample + Send
{
    // let sample_frames = (sample_rate as f64 * frame_duration).ceil() as usize;
    let buffer = AudioBuffer::<T>::new(sample_frames, channels);
    let (producer_tx, producer_rx) = channel::<AudioBuffer<T>>();
    let (consumer_tx, consumer_rx) = channel::<AudioBuffer<T>>();
    // if latency > 0 {
        // Add some frame buffers into circulation
        // for _ in 0..latency {
            producer_tx.send(buffer.clone()).unwrap(); // infallible
        // }
        for _ in 0..latency {
            consumer_tx.send(buffer.clone()).unwrap(); // infallible
        }
        // }
    // }
    let producer = AudioFrameProducer::new(buffer.clone(), consumer_tx, producer_rx);
    let consumer = AudioFrameConsumer::new(buffer, producer_tx, consumer_rx);
    (producer, consumer)
}

impl fmt::Display for AudioFrameError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "the remote thread has been terminated")
    }
}

impl error::Error for AudioFrameError {
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
        None
    }
}

impl<T> From<TrySendError<T>> for AudioFrameError {
    fn from(_error: TrySendError<T>) -> Self {
        AudioFrameError
    }
}

impl<T> From<SendError<T>> for AudioFrameError {
    fn from(_error: SendError<T>) -> Self {
        AudioFrameError
    }
}

impl From<TryRecvError> for AudioFrameError {
    fn from(_error: TryRecvError) -> Self {
        AudioFrameError
    }
}

impl From<RecvError> for AudioFrameError {
    fn from(_error: RecvError) -> Self {
        AudioFrameError
    }
}

impl From<RecvTimeoutError> for AudioFrameError {
    fn from(_error: RecvTimeoutError) -> Self {
        AudioFrameError
    }
}

impl<T> Deref for AudioBuffer<T> {
    type Target = Vec<T>;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl<T> DerefMut for AudioBuffer<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

impl<T: AudioSample> AudioBuffer<T> {
    fn new(sample_frames: usize, channels: u8) -> Self {
        let size = sample_frames * channels as usize;
        AudioBuffer(vec![T::silence();size])
    }
}

impl<T> AudioBuffer<T> {
    #[inline(always)]
    fn sampled_size(&self) -> usize {
        self.0.len()
    }
}

impl<T: Copy> AudioBuffer<T> {
    #[inline]
    fn copy_to(&self, target: &mut [T], src_offset: usize) -> usize {
        let end_offset = self.sampled_size().min(src_offset + target.len());
        let source = &self.0[src_offset..end_offset];
        // eprintln!("cur: {} out: {} of {} src.len: {}", cursor, target_offset, target_buffer.len(), source.len());
        let copied_size = source.len();
        target[..copied_size].copy_from_slice(source);
        copied_size
    }
}

impl<T> AudioFrameConsumer<T> {
    /// Creates a new instance of `AudioFrameConsumer`.
    ///
    /// Prefer to use [create_carousel] instead.
    pub fn new(buffer: AudioBuffer<T>,
               producer_tx: Sender<AudioBuffer<T>>,
               consumer_rx: Receiver<AudioBuffer<T>>) -> Self {
        AudioFrameConsumer {
            buffer,
            cursor: 0,
            producer_tx,
            rx: consumer_rx
        }
    }
    /// Resets the audio buffer sample cursor.
    pub fn reset_cursor(&mut self) {
        self.cursor = 0;
    }
}

impl<T: 'static + Copy + Send> AudioFrameConsumer<T> {
    /// Attempts to receive the next audio frame from the [AudioFrameProducer].
    ///
    /// When `Ok(true)` is returned replaces the current frame buffer with the one received
    /// and sends back the current one.
    ///
    /// If there is no new buffer waiting in the message queue returns `Ok(false)`.
    ///
    /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
    /// which is possible only when the remote end has disconnected.
    #[inline]
    pub fn next_frame(&mut self) -> AudioFrameResult<bool> {
        match self.rx.try_recv() {
        // match self.rx.recv_timeout(Duration::from_millis(wait_max_ms as u64)) {
            Ok(mut buffer) => {
                // print!("{:?} ", buffer.as_ptr());
                swap(&mut self.buffer, &mut buffer);
                self.producer_tx.send(buffer)?;
                // let mut buffer = Some(buffer);
                // loop {
                //     match self.producer_tx.send(buffer.take().unwrap()) {
                //         Err(TrySendError::Full(buf)) => {
                //             println!("cons couldn't send");
                //             buffer = Some(buf)
                //         }
                //         Ok(()) => break,
                //         Err(e) => Err(e)?,
                //     };
                // }
                Ok(true)
            }
            Err(TryRecvError::Empty) => {
                Ok(false)
            },
            Err(TryRecvError::Disconnected) => Err(AudioFrameError)
            // Err(RecvTimeoutError::Timeout) => Ok(false),
            // Err(RecvTimeoutError::Disconnected) => Err(AudioFrameError),
        }
    }
    /// Exposes the last received frame buffer as a slice of samples.
    #[inline]
    pub fn current_frame(&self) -> &[T] {
        &self.buffer
    }
    /// Fills the `target_buffer` with the received audio frame samples.
    ///
    /// Attempts to receive new frame buffers when necessary, repeating the process until 
    /// the whole buffer is filled or when there are no more buffers waiting in the incoming
    /// queue.
    ///
    /// Returns the unfilled part of the target buffer in case there were no more frames to receive
    /// and `ignore_missing` was `false`.
    ///
    /// Returns an empty slice if the whole buffer has been filled.
    ///
    /// In case `ignore_missing` is `true` the last audio frame will be rendered again if there are
    /// no more new buffers in the queue.
    ///
    /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
    /// which is possible only when the remote end has disconnected.
    pub fn fill_buffer<'a>(
                &mut self,
                mut target_buffer: &'a mut[T],
                ignore_missing: bool
            ) -> AudioFrameResult<&'a mut[T]>
    {
        let mut cursor = self.cursor;
        while !target_buffer.is_empty() {
            if cursor >= self.buffer.sampled_size() {
                if !(self.next_frame()? || ignore_missing) {
                    break
                }
                cursor = 0;
            }
            // print!("{:?} ", self.buffer.as_ptr());
            let copied_size = self.buffer.copy_to(target_buffer, cursor);
            cursor += copied_size;
            target_buffer = &mut target_buffer[copied_size..];
        }
        self.cursor = cursor;
        Ok(target_buffer)
    }
}

impl<T> AudioFrameProducer<T> {
    /// Creates a new instance of `AudioFrameProducer`.
    ///
    /// Prefer to use [create_carousel] instead.
    pub fn new(buffer: AudioBuffer<T>,
               consumer_tx: Sender<AudioBuffer<T>>,
               producer_rx: Receiver<AudioBuffer<T>>) -> Self {
        AudioFrameProducer { buffer, rx: producer_rx, consumer_tx }
    }
    /// Provides the current frame buffer as `Vec` of samples for rendering via a closure.
    ///
    /// The closure should ensure the size of the `Vec` is resized to the number of actually
    /// rendered samples.
    pub fn render_frame<F: FnOnce(&mut Vec<T>)>(&mut self, render: F) {
        render(&mut self.buffer);
        // eprintln!("smpl: {}", self.buffer.sampled_size);
    }
}

impl<T: 'static + Send> AudioFrameProducer<T> {
    /// Sends the audio frame buffer to the [AudioFrameConsumer] and replaces it with a recycled
    /// buffer received back from [AudioFrameConsumer].
    ///
    /// This method will block if the recycled buffer queue is empty.
    ///
    /// Returns `Err(AudioFrameError)` only when sending or receiving buffers failed,
    /// which is possible only when the remote end has disconnected.
    pub fn send_frame(&mut self) -> AudioFrameResult<()> {
        // eprintln!("waiting for buffer");
        // let buffer = loop {
        //     match self.rx.try_recv() {
        //         Ok(buf) => break buf,
        //         Err(TryRecvError::Empty) => {
        //             let now = std::time::Instant::now();
        //             let buf = self.rx.recv()?;
        //             println!("prod couldn't recv, {:?}", now.elapsed());
        //             break buf;
        //         }
        //         Err(e) => Err(e)?
        //     }
        // };
        // let mut buffer = Some(replace(&mut self.buffer, buffer));
        // let buffer = replace(&mut self.buffer, buffer);
        // eprintln!("got buffer");
        // loop {
        //     match self.consumer_tx.try_send(buffer.take().unwrap()) {
        //         Err(TrySendError::Full(buf)) => {
        //             println!("prod couldn't send");
        //             buffer = Some(buf)
        //         }
        //         Ok(()) => return Ok(()),
        //         Err(e) => Err(e)?
        //     }
        // }
        let buffer = replace(&mut self.buffer, self.rx.recv()?);
        self.consumer_tx.send(buffer).map_err(From::from)
        // eprintln!("sent buffer");
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::thread;
    use std::f32::consts::PI;

    #[test]
    fn carousel_works() -> Result<(), Box<dyn error::Error>> {
        // eprintln!("AudioBuffer<f32>: {:?}", core::mem::size_of::<AudioBuffer<f32>>());
        // eprintln!("AudioBuffer<u16>: {:?}", core::mem::size_of::<AudioBuffer<u16>>());
        // eprintln!("Sender<AudioBuffer<f32>>: {:?}", core::mem::size_of::<Sender<AudioBuffer<f32>>>());
        // eprintln!("Sender<AudioBuffer<u16>>: {:?}", core::mem::size_of::<Sender<AudioBuffer<u16>>>());
        const TEST_SAMPLES_COUNT: usize = 20000;
        const LATENCY: usize = 5;
        const BUFSIZE: usize = 256;
        const ZEROLEN: usize = BUFSIZE + LATENCY*BUFSIZE;
        fn sinusoid(n: u16) -> f32 {
            (PI*(n as f32)/BUFSIZE as f32).sin()
        }

        let (mut producer, mut consumer) = create_carousel::<f32>(LATENCY, BUFSIZE, 1);
        let join = thread::spawn(move || {
            let mut target = vec![0.0;800];
            let mut unfilled = &mut target[..];
            loop {
                thread::sleep(std::time::Duration::from_millis(1));
                unfilled = consumer.fill_buffer(unfilled, false).unwrap();
                if unfilled.len() == 0 {
                    break;
                }
            }
            target.resize(TEST_SAMPLES_COUNT, 0.0);
            let mut unfilled = &mut target[800..];
            loop {
                thread::sleep(std::time::Duration::from_millis(1));
                unfilled = consumer.fill_buffer(unfilled, false).unwrap();
                if unfilled.len() == 0 {
                    break;
                }
            }
            target
        });

        loop {
            producer.render_frame(|vec| {
                vec.clear();
                vec.extend((0..BUFSIZE as u16).map(sinusoid));
            });
            if let Err(_e) = producer.send_frame() {
                break
            }
        }
        let target = join.join().unwrap();
        assert_eq!(vec![0.0;ZEROLEN][..], target[..ZEROLEN]);
        let mut template = Vec::new();
        template.extend((0..BUFSIZE as u16).map(sinusoid).cycle().take(TEST_SAMPLES_COUNT-ZEROLEN));
        assert_eq!(TEST_SAMPLES_COUNT-ZEROLEN, template.len());
        assert_eq!(template[..], target[ZEROLEN..]);
        Ok(())
    }
}