use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use crate::player::PlayerMsg;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::Sender;
use super::{queue, source::Done, Sample, Source};
use super::{OutputStreamHandle, PlayError};
pub struct Sink {
queue_tx: Arc<queue::SourcesQueueInput<f32>>,
sleep_until_end: Mutex<Option<Receiver<()>>>,
controls: Arc<Controls>,
sound_count: Arc<AtomicUsize>,
detached: bool,
elapsed: Arc<RwLock<Duration>>,
message_tx: Sender<PlayerMsg>,
}
struct Controls {
pause: AtomicBool,
volume: Mutex<f32>,
seek: Mutex<Option<Duration>>,
stopped: AtomicBool,
speed: Mutex<f32>,
do_skip: AtomicBool,
}
#[allow(unused)]
impl Sink {
#[inline]
pub fn try_new(
stream: &OutputStreamHandle,
gapless_playback: bool,
tx: Sender<PlayerMsg>,
) -> Result<Self, PlayError> {
let (sink, queue_rx) = Self::new_idle(gapless_playback, tx);
stream.play_raw(queue_rx)?;
Ok(sink)
}
#[inline]
pub fn new_idle(
gapless_playback: bool,
tx: Sender<PlayerMsg>,
) -> (Self, queue::SourcesQueueOutput<f32>) {
let (queue_tx, queue_rx) = queue::queue(true, gapless_playback);
let sink = Self {
queue_tx,
sleep_until_end: Mutex::new(None),
controls: Arc::new(Controls {
pause: AtomicBool::new(false),
volume: Mutex::new(1.0),
stopped: AtomicBool::new(false),
seek: Mutex::new(None),
speed: Mutex::new(1.0),
do_skip: AtomicBool::new(false),
}),
sound_count: Arc::new(AtomicUsize::new(0)),
detached: false,
elapsed: Arc::new(RwLock::new(Duration::from_secs(0))),
message_tx: tx,
};
(sink, queue_rx)
}
#[inline]
#[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
pub fn append<S>(&self, source: S)
where
S: Source + Send + 'static,
S::Item: Sample + Send,
{
let controls = self.controls.clone();
let tx = self.message_tx.clone();
let elapsed = self.elapsed.clone();
let source = source
.speed(1.0)
.pausable(false)
.amplify(1.0)
.skippable()
.stoppable()
.periodic_access(Duration::from_secs(1), move |src| {
let position = src.elapsed().as_secs() as i64;
let duration = src
.total_duration()
.map_or(99.0, |duration| duration.as_secs_f64() - 0.29)
as i64;
tx.send(PlayerMsg::Progress(position, duration)).ok();
})
.periodic_access(Duration::from_millis(50), move |src| {
let mut src = src.inner_mut();
if controls.stopped.load(Ordering::SeqCst) {
src.stop();
} else if controls.do_skip.load(Ordering::SeqCst) {
src.inner_mut().skip();
controls.do_skip.store(false, Ordering::SeqCst);
} else {
if let Some(seek_time) = controls.seek.lock().unwrap().take() {
src.seek(seek_time).unwrap();
}
*elapsed.write().unwrap() = src.elapsed();
let mut new_factor = *controls.volume.lock().unwrap();
if new_factor < 0.0001 {
new_factor = 0.0001;
}
src.inner_mut().inner_mut().set_factor(new_factor);
src.inner_mut()
.inner_mut()
.inner_mut()
.set_paused(controls.pause.load(Ordering::SeqCst));
src.inner_mut()
.inner_mut()
.inner_mut()
.inner_mut()
.set_factor(*controls.speed.lock().unwrap());
}
})
.convert_samples();
self.sound_count.fetch_add(1, Ordering::Relaxed);
let source = Done::new(source, self.sound_count.clone());
*self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
}
#[inline]
pub fn volume(&self) -> f32 {
*self.controls.volume.lock().unwrap()
}
#[inline]
pub fn set_volume(&self, value: f32) {
*self.controls.volume.lock().unwrap() = value;
}
#[inline]
pub fn play(&self) {
self.controls.pause.store(false, Ordering::SeqCst);
}
pub fn pause(&self) {
self.controls.pause.store(true, Ordering::SeqCst);
}
pub fn toggle_playback(&self) {
if self.is_paused() {
self.play();
} else {
self.pause();
}
}
pub fn seek(&self, seek_time: Duration) {
if self.is_paused() {
self.play();
}
*self.controls.seek.lock().unwrap() = Some(seek_time);
}
pub fn is_paused(&self) -> bool {
self.controls.pause.load(Ordering::SeqCst)
}
#[inline]
pub fn detach(mut self) {
self.detached = true;
}
#[inline]
pub fn sleep_until_end(&self) {
if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
let _ = sleep_until_end.recv();
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn len(&self) -> usize {
self.sound_count.load(Ordering::Relaxed)
}
#[inline]
pub fn elapsed(&self) -> Duration {
*self.elapsed.read().unwrap()
}
#[inline]
pub fn speed(&self) -> f32 {
*self.controls.speed.lock().unwrap()
}
#[inline]
pub fn set_speed(&self, value: f32) {
*self.controls.speed.lock().unwrap() = value;
}
pub fn clear(&self) {
let len = self.queue_tx.clear();
self.sound_count.fetch_sub(len, Ordering::SeqCst);
self.pause();
}
pub fn skip_one(&self) {
self.controls.do_skip.store(true, Ordering::SeqCst);
}
pub fn message_on_end(&self) {
let tx1 = self.message_tx.clone();
if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
std::thread::spawn(move || {
let _ = sleep_until_end.recv();
tx1.send(PlayerMsg::Eos).ok();
});
}
}
}
impl Drop for Sink {
#[inline]
fn drop(&mut self) {
self.queue_tx.set_keep_alive_if_empty(false);
if !self.detached {
self.controls.stopped.store(true, Ordering::Relaxed);
}
}
}