1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//! External sources and sinks (plus [`Silence`] and [`Blackhole`])
//!
//! The [`audio`] and [`rf`] modules contain blocks that allow accessing
//! hardware audio or radio interfaces.
//!
//! **Note:** Blocks in this module will stop working when dropped.

pub mod audio;
pub mod rf;

use crate::bufferpool::*;
use crate::flow::*;
use crate::impl_block_trait;
use crate::signal::*;

use num::Zero;
use tokio::select;
use tokio::sync::watch;
use tokio::task::spawn;

/// [`Producer`] which sends silence with adjustable chunk size and sample rate
pub struct Silence<T> {
    sender_connector: SenderConnector<Signal<T>>,
    chunk_size: watch::Sender<usize>,
    sample_rate: watch::Sender<f64>,
}

impl_block_trait! { <T> Producer<Signal<T>> for Silence<T> }

impl<T> Silence<T>
where
    T: Clone + Send + Sync + Zero + 'static,
{
    /// Creates a block which sends silence with the given `chunk_size` and
    /// `sample_rate`
    pub fn new(mut chunk_size: usize, mut sample_rate: f64) -> Self {
        let (sender, sender_connector) = new_sender::<Signal<T>>();
        let (chunk_size_send, mut chunk_size_recv) = watch::channel(chunk_size);
        let (sample_rate_send, mut sample_rate_recv) = watch::channel(sample_rate);
        spawn(async move {
            let mut chunk = Chunk::from(vec![T::zero(); chunk_size]);
            loop {
                match chunk_size_recv.has_changed() {
                    Ok(false) => (),
                    Ok(true) => {
                        chunk_size = chunk_size_recv.borrow_and_update().clone();
                        if chunk.len() != chunk_size {
                            chunk = Chunk::from(vec![T::zero(); chunk_size]);
                        }
                    }
                    Err(_) => return,
                }
                match sample_rate_recv.has_changed() {
                    Ok(false) => (),
                    Ok(true) => sample_rate = sample_rate_recv.borrow_and_update().clone(),
                    Err(_) => return,
                }
                let Ok(()) = sender.send(Signal::Samples {
                    sample_rate,
                    chunk: chunk.clone()
                }).await
                else { return; };
            }
        });
        Self {
            sender_connector,
            chunk_size: chunk_size_send,
            sample_rate: sample_rate_send,
        }
    }
    /// Get chunk size
    pub fn chunk_size(&self) -> usize {
        self.chunk_size.borrow().clone()
    }
    /// Set chunk size
    pub fn set_chunk_size(&self, chunk_size: usize) {
        self.chunk_size.send_replace(chunk_size);
    }
    /// Get sample rate in samples per second
    pub fn sample_rate(&self) -> f64 {
        self.sample_rate.borrow().clone()
    }
    /// Set sample rate in samples per second
    pub fn set_sample_rate(&self, sample_rate: f64) {
        self.sample_rate.send_replace(sample_rate);
    }
}

/// [`Consumer`] which ignores all received [`Signal::Samples`] (but allows
/// [`EventHandling`])
pub struct Blackhole<T> {
    receiver_connector: ReceiverConnector<Signal<T>>,
    event_handlers: EventHandlers,
    _drop_watch: watch::Sender<()>,
}

impl_block_trait! { <T> Consumer<Signal<T>> for Blackhole<T> }
impl_block_trait! { <T> EventHandling for Blackhole<T> }

impl<T> Blackhole<T>
where
    T: Clone + Send + Sync + 'static,
{
    /// Create new `Blackhole`
    pub fn new() -> Self {
        let (mut receiver, receiver_connector) = new_receiver::<Signal<T>>();
        let (drop_watch_send, mut drop_watch_recv) = watch::channel(());
        let event_handlers = EventHandlers::new();
        let evhdl_clone = event_handlers.clone();
        spawn(async move {
            loop {
                select! {
                    result = drop_watch_recv.changed() => match result {
                        Err(_) => return,
                        _ => (),
                    },
                    result = receiver.recv() => match result {
                        Ok(Signal::Samples { .. }) => (),
                        Ok(Signal::Event(event)) => evhdl_clone.invoke(&event),
                        Err(_) => return,
                    },
                }
            }
        });
        Self {
            receiver_connector,
            event_handlers,
            _drop_watch: drop_watch_send,
        }
    }
}