1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::sync::Mutex;
use play_raw;
use queue;
use source;
use Endpoint;
use Source;
use Sample;
pub struct Sink {
queue_tx: Arc<queue::SourcesQueueInput<f32>>,
sleep_until_end: Mutex<Option<Receiver<()>>>,
pause: Arc<AtomicBool>,
volume: Arc<Mutex<f32>>,
stopped: Arc<AtomicBool>,
detached: bool,
}
impl Sink {
#[inline]
pub fn new(endpoint: &Endpoint) -> Sink {
let (queue_tx, queue_rx) = queue::queue(true);
play_raw(endpoint, queue_rx);
Sink {
queue_tx: queue_tx,
sleep_until_end: Mutex::new(None),
pause: Arc::new(AtomicBool::new(false)),
volume: Arc::new(Mutex::new(1.0)),
stopped: Arc::new(AtomicBool::new(false)),
detached: false,
}
}
#[inline]
pub fn append<S>(&self, source: S)
where S: Source + Send + 'static,
S::Item: Sample,
S::Item: Send
{
let source = source::Pausable::new(source, self.pause.clone(), 5);
let source = source::Stoppable::new(source, self.stopped.clone(), 5);
let source = source::VolumeFilter::new(source, self.volume.clone(), 5);
let source = source::SamplesConverter::new(source);
*self.sleep_until_end.lock().unwrap() = Some(self.queue_tx.append_with_signal(source));
}
#[inline]
pub fn volume(&self) -> f32 {
*self.volume.lock().unwrap()
}
#[inline]
pub fn set_volume(&mut self, value: f32) {
*self.volume.lock().unwrap() = value;
}
#[inline]
pub fn play(&self) {
self.pause.store(false, Ordering::SeqCst);
}
pub fn pause(&self) {
self.pause.store(true, Ordering::SeqCst);
}
pub fn is_paused(&self) -> bool {
self.pause.load(Ordering::SeqCst)
}
#[inline]
pub fn detach(mut self) {
self.detached = true;
}
#[inline]
pub fn sleep_until_end(&self) {
if let Some(sleep_until_end) = self.sleep_until_end.lock().unwrap().take() {
let _ = sleep_until_end.recv();
}
}
}
impl Drop for Sink {
#[inline]
fn drop(&mut self) {
self.queue_tx.set_keep_alive_if_empty(false);
if !self.detached {
self.stopped.store(true, Ordering::Relaxed);
}
}
}