1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
pub mod audio;
pub mod rf;
use crate::bufferpool::*;
use crate::flow::*;
use crate::impl_block_trait;
use crate::signal::*;
use num::Zero;
use tokio::select;
use tokio::sync::watch;
use tokio::task::spawn;
pub struct Silence<T> {
sender_connector: SenderConnector<Signal<T>>,
chunk_size: watch::Sender<usize>,
sample_rate: watch::Sender<f64>,
}
impl_block_trait! { <T> Producer<Signal<T>> for Silence<T> }
impl<T> Silence<T>
where
T: Clone + Send + Sync + Zero + 'static,
{
pub fn new(mut chunk_size: usize, mut sample_rate: f64) -> Self {
let (sender, sender_connector) = new_sender::<Signal<T>>();
let (chunk_size_send, mut chunk_size_recv) = watch::channel(chunk_size);
let (sample_rate_send, mut sample_rate_recv) = watch::channel(sample_rate);
spawn(async move {
let mut chunk = Chunk::from(vec![T::zero(); chunk_size]);
loop {
match chunk_size_recv.has_changed() {
Ok(false) => (),
Ok(true) => {
chunk_size = chunk_size_recv.borrow_and_update().clone();
if chunk.len() != chunk_size {
chunk = Chunk::from(vec![T::zero(); chunk_size]);
}
}
Err(_) => return,
}
match sample_rate_recv.has_changed() {
Ok(false) => (),
Ok(true) => sample_rate = sample_rate_recv.borrow_and_update().clone(),
Err(_) => return,
}
let Ok(()) = sender.send(Signal::Samples {
sample_rate,
chunk: chunk.clone()
}).await
else { return; };
}
});
Self {
sender_connector,
chunk_size: chunk_size_send,
sample_rate: sample_rate_send,
}
}
pub fn chunk_size(&self) -> usize {
self.chunk_size.borrow().clone()
}
pub fn set_chunk_size(&self, chunk_size: usize) {
self.chunk_size.send_replace(chunk_size);
}
pub fn sample_rate(&self) -> f64 {
self.sample_rate.borrow().clone()
}
pub fn set_sample_rate(&self, sample_rate: f64) {
self.sample_rate.send_replace(sample_rate);
}
}
pub struct Blackhole<T> {
receiver_connector: ReceiverConnector<Signal<T>>,
event_handlers: EventHandlers,
_drop_watch: watch::Sender<()>,
}
impl_block_trait! { <T> Consumer<Signal<T>> for Blackhole<T> }
impl_block_trait! { <T> EventHandling for Blackhole<T> }
impl<T> Blackhole<T>
where
T: Clone + Send + Sync + 'static,
{
pub fn new() -> Self {
let (mut receiver, receiver_connector) = new_receiver::<Signal<T>>();
let (drop_watch_send, mut drop_watch_recv) = watch::channel(());
let event_handlers = EventHandlers::new();
let evhdl_clone = event_handlers.clone();
spawn(async move {
loop {
select! {
result = drop_watch_recv.changed() => match result {
Err(_) => return,
_ => (),
},
result = receiver.recv() => match result {
Ok(Signal::Samples { .. }) => (),
Ok(Signal::Event(event)) => evhdl_clone.invoke(&event),
Err(_) => return,
},
}
}
});
Self {
receiver_connector,
event_handlers,
_drop_watch: drop_watch_send,
}
}
}