use crate::stream::{OutputStreamHandle, PlayError};
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use crate::queue;
use crate::source::Done;
use crate::Sample;
use crate::Source;
pub struct Sink {
queue_tx: Arc<queue::SourcesQueueInput<f32>>,
sleep_until_end: Mutex<Option<Receiver<()>>>,
controls: Arc<Controls>,
sound_count: Arc<AtomicUsize>,
detached: bool,
}
struct Controls {
pause: AtomicBool,
volume: Mutex<f32>,
stopped: AtomicBool,
}
impl Sink {
#[inline]
pub fn try_new(stream: &OutputStreamHandle) -> Result<Sink, PlayError> {
let (sink, queue_rx) = Sink::new_idle();
stream.play_raw(queue_rx)?;
Ok(sink)
}
#[inline]
pub fn new_idle() -> (Sink, queue::SourcesQueueOutput<f32>) {
let (queue_tx, queue_rx) = queue::queue(true);
let sink = Sink {
queue_tx,
sleep_until_end: Mutex::new(None),
controls: Arc::new(Controls {
pause: AtomicBool::new(false),
volume: Mutex::new(1.0),
stopped: AtomicBool::new(false),
}),
sound_count: Arc::new(AtomicUsize::new(0)),
detached: false,
};
(sink, queue_rx)
}
#[inline]
pub fn append<S>(&self, source: S)
where
S: Source + Send + 'static,
S::Item: Sample,
S::Item: Send,
{
let controls = self.controls.clone();
let source = source
.pausable(false)
.amplify(1.0)
.stoppable()
.periodic_access(Duration::from_millis(5), move |src| {
if controls.stopped.load(Ordering::SeqCst) {
src.stop();
} else {
src.inner_mut().set_factor(*controls.volume.lock().unwrap());
src.inner_mut()
.inner_mut()
.set_paused(controls.pause.load(Ordering::SeqCst));
}
})
.convert_samples();
self.sound_count.fetch_add(1, Ordering::Relaxed);
let source = Done::new(source, self.sound_count.clone());
*self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
}
#[inline]
pub fn volume(&self) -> f32 {
*self.controls.volume.lock().unwrap()
}
#[inline]
pub fn set_volume(&self, value: f32) {
*self.controls.volume.lock().unwrap() = value;
}
#[inline]
pub fn play(&self) {
self.controls.pause.store(false, Ordering::SeqCst);
}
pub fn pause(&self) {
self.controls.pause.store(true, Ordering::SeqCst);
}
pub fn is_paused(&self) -> bool {
self.controls.pause.load(Ordering::SeqCst)
}
#[inline]
pub fn stop(&self) {
self.controls.stopped.store(true, Ordering::SeqCst);
}
#[inline]
pub fn detach(mut self) {
self.detached = true;
}
#[inline]
pub fn sleep_until_end(&self) {
if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
let _ = sleep_until_end.recv();
}
}
#[inline]
pub fn empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn len(&self) -> usize {
self.sound_count.load(Ordering::Relaxed)
}
}
impl Drop for Sink {
#[inline]
fn drop(&mut self) {
self.queue_tx.set_keep_alive_if_empty(false);
if !self.detached {
self.controls.stopped.store(true, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod tests {
use crate::buffer::SamplesBuffer;
use crate::sink::Sink;
use crate::source::Source;
#[test]
fn test_pause_and_stop() {
let (sink, mut queue_rx) = Sink::new_idle();
let v = vec![10i16, -10, 20, -20, 30, -30];
sink.append(SamplesBuffer::new(1, 1, v.clone()));
let mut src = SamplesBuffer::new(1, 1, v.clone()).convert_samples();
assert_eq!(queue_rx.next(), src.next());
assert_eq!(queue_rx.next(), src.next());
sink.pause();
assert_eq!(queue_rx.next(), Some(0.0));
sink.play();
assert_eq!(queue_rx.next(), src.next());
assert_eq!(queue_rx.next(), src.next());
sink.stop();
assert_eq!(queue_rx.next(), Some(0.0));
assert_eq!(sink.empty(), true);
}
#[test]
fn test_volume() {
let (sink, mut queue_rx) = Sink::new_idle();
let v = vec![10i16, -10, 20, -20, 30, -30];
sink.append(SamplesBuffer::new(2, 44100, v.clone()));
let src = SamplesBuffer::new(2, 44100, v.clone()).convert_samples();
let mut src = src.amplify(0.5);
sink.set_volume(0.5);
for _ in 0..v.len() {
assert_eq!(queue_rx.next(), src.next());
}
}
}