pub mod audio;
pub mod rf;
use crate::bufferpool::*;
use crate::flow::*;
use crate::impl_block_trait;
use crate::signal::*;
use num::Zero;
use tokio::select;
use tokio::sync::watch;
use tokio::task::spawn;
pub struct Silence<T> {
sender_connector: SenderConnector<Signal<T>>,
chunk_size: watch::Sender<usize>,
sample_rate: watch::Sender<f64>,
}
impl_block_trait! { <T> Producer<Signal<T>> for Silence<T> }
impl<T> Silence<T>
where
T: Clone + Send + Sync + Zero + 'static,
{
pub fn new(mut chunk_size: usize, mut sample_rate: f64) -> Self {
let (sender, sender_connector) = new_sender::<Signal<T>>();
let (chunk_size_send, mut chunk_size_recv) = watch::channel(chunk_size);
let (sample_rate_send, mut sample_rate_recv) = watch::channel(sample_rate);
spawn(async move {
let mut chunk = Chunk::from(vec![T::zero(); chunk_size]);
loop {
match chunk_size_recv.has_changed() {
Ok(false) => (),
Ok(true) => {
chunk_size = chunk_size_recv.borrow_and_update().clone();
if chunk.len() != chunk_size {
chunk = Chunk::from(vec![T::zero(); chunk_size]);
}
}
Err(_) => return,
}
match sample_rate_recv.has_changed() {
Ok(false) => (),
Ok(true) => sample_rate = sample_rate_recv.borrow_and_update().clone(),
Err(_) => return,
}
let Ok(()) = sender.send(Signal::Samples {
sample_rate,
chunk: chunk.clone()
}).await
else { return; };
}
});
Self {
sender_connector,
chunk_size: chunk_size_send,
sample_rate: sample_rate_send,
}
}
pub fn chunk_size(&self) -> usize {
self.chunk_size.borrow().clone()
}
pub fn set_chunk_size(&self, chunk_size: usize) {
self.chunk_size.send_replace(chunk_size);
}
pub fn sample_rate(&self) -> f64 {
self.sample_rate.borrow().clone()
}
pub fn set_sample_rate(&self, sample_rate: f64) {
self.sample_rate.send_replace(sample_rate);
}
}
pub struct Blackhole<T> {
receiver_connector: ReceiverConnector<Signal<T>>,
event_handlers: EventHandlers,
_drop_watch: watch::Sender<()>,
}
impl_block_trait! { <T> Consumer<Signal<T>> for Blackhole<T> }
impl_block_trait! { <T> EventHandling for Blackhole<T> }
impl<T> Blackhole<T>
where
T: Clone + Send + Sync + 'static,
{
pub fn new() -> Self {
let (mut receiver, receiver_connector) = new_receiver::<Signal<T>>();
let (drop_watch_send, mut drop_watch_recv) = watch::channel(());
let event_handlers = EventHandlers::new();
let evhdl_clone = event_handlers.clone();
spawn(async move {
loop {
select! {
result = drop_watch_recv.changed() => match result {
Err(_) => return,
_ => (),
},
result = receiver.recv() => match result {
Ok(Signal::Samples { .. }) => (),
Ok(Signal::Event(event)) => evhdl_clone.invoke(&event),
Err(_) => return,
},
}
}
});
Self {
receiver_connector,
event_handlers,
_drop_watch: drop_watch_send,
}
}
}