use std::error;
use core::fmt;
use core::mem::{swap, replace};
use core::ops::{Deref, DerefMut};
use std::sync::mpsc::{channel, Sender, Receiver, SendError, RecvError,
TryRecvError, RecvTimeoutError, TrySendError};
pub use spectrusty_core::audio::AudioSample;
pub type AudioFrameResult<T> = Result<T, AudioFrameError>;
#[derive(Debug, Clone)]
pub struct AudioFrameError;
#[derive(Clone, Debug)]
pub struct AudioBuffer<T>(pub Vec<T>);
#[derive(Debug)]
pub struct AudioFrameConsumer<T> {
buffer: AudioBuffer<T>,
cursor: usize,
producer_tx: Sender<AudioBuffer<T>>,
rx: Receiver<AudioBuffer<T>>,
}
#[derive(Debug)]
pub struct AudioFrameProducer<T> {
pub buffer: AudioBuffer<T>,
rx: Receiver<AudioBuffer<T>>,
consumer_tx: Sender<AudioBuffer<T>>,
}
pub fn create_carousel<T>(latency: usize, sample_frames: usize, channels: u8) ->
(AudioFrameProducer<T>, AudioFrameConsumer<T>)
where T: 'static + AudioSample + Send
{
let buffer = AudioBuffer::<T>::new(sample_frames, channels);
let (producer_tx, producer_rx) = channel::<AudioBuffer<T>>();
let (consumer_tx, consumer_rx) = channel::<AudioBuffer<T>>();
producer_tx.send(buffer.clone()).unwrap(); for _ in 0..latency {
consumer_tx.send(buffer.clone()).unwrap(); }
let producer = AudioFrameProducer::new(buffer.clone(), consumer_tx, producer_rx);
let consumer = AudioFrameConsumer::new(buffer, producer_tx, consumer_rx);
(producer, consumer)
}
impl fmt::Display for AudioFrameError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "the remote thread has been terminated")
}
}
impl error::Error for AudioFrameError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
None
}
}
impl<T> From<TrySendError<T>> for AudioFrameError {
fn from(_error: TrySendError<T>) -> Self {
AudioFrameError
}
}
impl<T> From<SendError<T>> for AudioFrameError {
fn from(_error: SendError<T>) -> Self {
AudioFrameError
}
}
impl From<TryRecvError> for AudioFrameError {
fn from(_error: TryRecvError) -> Self {
AudioFrameError
}
}
impl From<RecvError> for AudioFrameError {
fn from(_error: RecvError) -> Self {
AudioFrameError
}
}
impl From<RecvTimeoutError> for AudioFrameError {
fn from(_error: RecvTimeoutError) -> Self {
AudioFrameError
}
}
impl<T> Deref for AudioBuffer<T> {
type Target = Vec<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for AudioBuffer<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: AudioSample> AudioBuffer<T> {
fn new(sample_frames: usize, channels: u8) -> Self {
let size = sample_frames * channels as usize;
AudioBuffer(vec![T::silence();size])
}
}
impl<T> AudioBuffer<T> {
#[inline(always)]
fn sampled_size(&self) -> usize {
self.0.len()
}
}
impl<T: Copy> AudioBuffer<T> {
#[inline]
fn copy_to(&self, target: &mut [T], src_offset: usize) -> usize {
let end_offset = self.sampled_size().min(src_offset + target.len());
let source = &self.0[src_offset..end_offset];
let copied_size = source.len();
target[..copied_size].copy_from_slice(source);
copied_size
}
}
impl<T> AudioFrameConsumer<T> {
pub fn new(buffer: AudioBuffer<T>,
producer_tx: Sender<AudioBuffer<T>>,
consumer_rx: Receiver<AudioBuffer<T>>) -> Self {
AudioFrameConsumer {
buffer,
cursor: 0,
producer_tx,
rx: consumer_rx
}
}
pub fn reset_cursor(&mut self) {
self.cursor = 0;
}
}
impl<T: 'static + Copy + Send> AudioFrameConsumer<T> {
#[inline]
pub fn next_frame(&mut self) -> AudioFrameResult<bool> {
match self.rx.try_recv() {
Ok(mut buffer) => {
swap(&mut self.buffer, &mut buffer);
self.producer_tx.send(buffer)?;
Ok(true)
}
Err(TryRecvError::Empty) => {
Ok(false)
},
Err(TryRecvError::Disconnected) => Err(AudioFrameError)
}
}
#[inline]
pub fn current_frame(&self) -> &[T] {
&self.buffer
}
pub fn fill_buffer<'a>(
&mut self,
mut target_buffer: &'a mut[T],
ignore_missing: bool
) -> AudioFrameResult<&'a mut[T]>
{
let mut cursor = self.cursor;
while !target_buffer.is_empty() {
if cursor >= self.buffer.sampled_size() {
if !(self.next_frame()? || ignore_missing) {
break
}
cursor = 0;
}
let copied_size = self.buffer.copy_to(target_buffer, cursor);
cursor += copied_size;
target_buffer = &mut target_buffer[copied_size..];
}
self.cursor = cursor;
Ok(target_buffer)
}
}
impl<T> AudioFrameProducer<T> {
pub fn new(buffer: AudioBuffer<T>,
consumer_tx: Sender<AudioBuffer<T>>,
producer_rx: Receiver<AudioBuffer<T>>) -> Self {
AudioFrameProducer { buffer, rx: producer_rx, consumer_tx }
}
pub fn render_frame<F: FnOnce(&mut Vec<T>)>(&mut self, render: F) {
render(&mut self.buffer);
}
}
impl<T: 'static + Send> AudioFrameProducer<T> {
pub fn send_frame(&mut self) -> AudioFrameResult<()> {
let buffer = replace(&mut self.buffer, self.rx.recv()?);
self.consumer_tx.send(buffer).map_err(From::from)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::f32::consts::PI;
#[test]
fn carousel_works() -> Result<(), Box<dyn error::Error>> {
const TEST_SAMPLES_COUNT: usize = 20000;
const LATENCY: usize = 5;
const BUFSIZE: usize = 256;
const ZEROLEN: usize = BUFSIZE + LATENCY*BUFSIZE;
fn sinusoid(n: u16) -> f32 {
(PI*(n as f32)/BUFSIZE as f32).sin()
}
let (mut producer, mut consumer) = create_carousel::<f32>(LATENCY, BUFSIZE, 1);
let join = thread::spawn(move || {
let mut target = vec![0.0;800];
let mut unfilled = &mut target[..];
loop {
thread::sleep(std::time::Duration::from_millis(1));
unfilled = consumer.fill_buffer(unfilled, false).unwrap();
if unfilled.len() == 0 {
break;
}
}
target.resize(TEST_SAMPLES_COUNT, 0.0);
let mut unfilled = &mut target[800..];
loop {
thread::sleep(std::time::Duration::from_millis(1));
unfilled = consumer.fill_buffer(unfilled, false).unwrap();
if unfilled.len() == 0 {
break;
}
}
target
});
loop {
producer.render_frame(|vec| {
vec.clear();
vec.extend((0..BUFSIZE as u16).map(sinusoid));
});
if let Err(_e) = producer.send_frame() {
break
}
}
let target = join.join().unwrap();
assert_eq!(vec![0.0;ZEROLEN][..], target[..ZEROLEN]);
let mut template = Vec::new();
template.extend((0..BUFSIZE as u16).map(sinusoid).cycle().take(TEST_SAMPLES_COUNT-ZEROLEN));
assert_eq!(TEST_SAMPLES_COUNT-ZEROLEN, template.len());
assert_eq!(template[..], target[ZEROLEN..]);
Ok(())
}
}