dasp_signal/
bus.rs

1//! An extension to the **Signal** trait that enables multiple signal outputs.
2//!
3//! ### Required Features
4//!
5//! - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
6//! - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
7
8use crate::{Rc, Signal};
9
10#[cfg(not(feature = "std"))]
11type BTreeMap<K, V> = alloc::collections::btree_map::BTreeMap<K, V>;
12#[cfg(feature = "std")]
13type BTreeMap<K, V> = std::collections::BTreeMap<K, V>;
14
15#[cfg(not(feature = "std"))]
16type VecDeque<T> = alloc::collections::vec_deque::VecDeque<T>;
17#[cfg(feature = "std")]
18type VecDeque<T> = std::collections::vec_deque::VecDeque<T>;
19
20/// An extension to the **Signal** trait that enables multiple signal outputs.
21///
22/// ### Required Features
23///
24/// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
25/// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
26pub trait SignalBus: Signal {
27    /// Moves the `Signal` into a `Bus` from which its output may be divided into multiple other
28    /// `Signal`s in the form of `Output`s.
29    ///
30    /// This method allows to create more complex directed acyclic graph structures that
31    /// incorporate concepts like sends, side-chaining, etc, rather than being restricted to tree
32    /// structures where signals can only ever be joined but never divided.
33    ///
34    /// Note: When using multiple `Output`s in this fashion, you will need to be sure to pull the
35    /// frames from each `Output` in sync (whether per frame or per buffer). This is because when
36    /// output A requests `Frame`s before output B, those frames must remain available for output
37    /// B and in turn must be stored in an intermediary ring buffer.
38    ///
39    /// # Example
40    ///
41    /// ```rust
42    /// use dasp_signal::{self as signal, Signal};
43    /// use dasp_signal::bus::SignalBus;
44    ///
45    /// fn main() {
46    ///     let frames = [[0.1], [0.2], [0.3], [0.4], [0.5], [0.6]];
47    ///     let signal = signal::from_iter(frames.iter().cloned());
48    ///     let bus = signal.bus();
49    ///     let mut a = bus.send();
50    ///     let mut b = bus.send();
51    ///     assert_eq!(a.by_ref().take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
52    ///     assert_eq!(b.by_ref().take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
53    ///
54    ///     let c = bus.send();
55    ///     assert_eq!(c.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
56    ///     assert_eq!(b.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
57    ///     assert_eq!(a.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
58    /// }
59    /// ```
60    ///
61    /// ### Required Features
62    ///
63    /// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
64    /// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
65    fn bus(self) -> Bus<Self>
66    where
67        Self: Sized,
68    {
69        Bus::new(self, BTreeMap::new())
70    }
71}
72
73/// The data shared between each `Output`.
74struct SharedNode<S>
75where
76    S: Signal,
77{
78    signal: S,
79    // The buffer of frames that have not yet been consumed by all outputs.
80    buffer: VecDeque<S::Frame>,
81    // The number of frames in `buffer` that have already been read for each output.
82    frames_read: BTreeMap<usize, usize>,
83    // The next output key.
84    next_key: usize,
85}
86
87/// A type which allows for `send`ing a single `Signal` to multiple outputs.
88///
89/// ### Required Features
90///
91/// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
92/// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
93pub struct Bus<S>
94where
95    S: Signal,
96{
97    node: Rc<core::cell::RefCell<SharedNode<S>>>,
98}
99
100/// An output node to which some signal `S` is `Output`ing its frames.
101///
102/// It may be more accurate to say that the `Output` "pull"s frames from the signal.
103///
104/// ### Required Features
105///
106/// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
107/// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
108pub struct Output<S>
109where
110    S: Signal,
111{
112    key: usize,
113    node: Rc<core::cell::RefCell<SharedNode<S>>>,
114}
115
116impl<S> Bus<S>
117where
118    S: Signal,
119{
120    fn new(signal: S, frames_read: BTreeMap<usize, usize>) -> Self {
121        Bus {
122            node: Rc::new(core::cell::RefCell::new(SharedNode {
123                signal: signal,
124                buffer: VecDeque::new(),
125                frames_read: frames_read,
126                next_key: 0,
127            })),
128        }
129    }
130
131    /// Produce a new Output node to which the signal `S` will output its frames.
132    ///
133    /// ### Required Features
134    ///
135    /// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
136    /// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
137    #[inline]
138    pub fn send(&self) -> Output<S> {
139        let mut node = self.node.borrow_mut();
140
141        // Get the key and increment for the next output.
142        let key = node.next_key;
143        node.next_key = node.next_key.wrapping_add(1);
144
145        // Insert the number of frames read by the new output.
146        let num_frames = node.buffer.len();
147        node.frames_read.insert(key, num_frames);
148
149        Output {
150            key: key,
151            node: self.node.clone(),
152        }
153    }
154}
155
156impl<S> SharedNode<S>
157where
158    S: Signal,
159{
160    // Requests the next frame for the `Output` at the given key.
161    //
162    // If there are no frames pending for the output, a new frame will be requested from the
163    // signal and appended to the ring buffer to be received by the other outputs.
164    fn next_frame(&mut self, key: usize) -> S::Frame {
165        let num_frames = self.buffer.len();
166        let frames_read = self
167            .frames_read
168            .remove(&key)
169            .expect("no frames_read for Output");
170
171        let frame = if frames_read < num_frames {
172            self.buffer[frames_read]
173        } else {
174            let frame = self.signal.next();
175            self.buffer.push_back(frame);
176            frame
177        };
178
179        // If the number of frames read by this output is the lowest, then we can pop the frame
180        // from the front.
181        let least_frames_read = !self
182            .frames_read
183            .values()
184            .any(|&other_frames_read| other_frames_read <= frames_read);
185
186        // If this output had read the least number of frames, pop the front frame and decrement
187        // the frames read counters for each of the other outputs.
188        let new_frames_read = if least_frames_read {
189            self.buffer.pop_front();
190            for other_frames_read in self.frames_read.values_mut() {
191                *other_frames_read -= 1;
192            }
193            frames_read
194        } else {
195            frames_read + 1
196        };
197
198        self.frames_read.insert(key, new_frames_read);
199
200        frame
201    }
202
203    #[inline]
204    fn pending_frames(&self, key: usize) -> usize {
205        self.buffer.len() - self.frames_read[&key]
206    }
207
208    // Drop the given output from the `Bus`.
209    //
210    // Called by the `Output::drop` implementation.
211    fn drop_output(&mut self, key: usize) {
212        self.frames_read.remove(&key);
213        let least_frames_read = self
214            .frames_read
215            .values()
216            .fold(self.buffer.len(), |a, &b| core::cmp::min(a, b));
217        if least_frames_read > 0 {
218            for frames_read in self.frames_read.values_mut() {
219                *frames_read -= least_frames_read;
220            }
221            for _ in 0..least_frames_read {
222                self.buffer.pop_front();
223            }
224        }
225    }
226}
227
228impl<S> Output<S>
229where
230    S: Signal,
231{
232    /// The number of frames that have been requested from the `Signal` `S` by some other `Output`
233    /// that have not yet been requested by this `Output`.
234    ///
235    /// This is useful when using an `Output` to "monitor" some signal, allowing the user to drain
236    /// only frames that have already been requested by some other `Output`.
237    ///
238    /// # Example
239    ///
240    /// ```
241    /// use dasp_signal::{self as signal, Signal};
242    /// use dasp_signal::bus::SignalBus;
243    ///
244    /// fn main() {
245    ///     let frames = [[0.1], [0.2], [0.3]];
246    ///     let bus = signal::from_iter(frames.iter().cloned()).bus();
247    ///     let signal = bus.send();
248    ///     let mut monitor = bus.send();
249    ///     assert_eq!(signal.take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
250    ///     assert_eq!(monitor.pending_frames(), 3);
251    ///     assert_eq!(monitor.next(), [0.1]);
252    ///     assert_eq!(monitor.pending_frames(), 2);
253    /// }
254    /// ```
255    ///
256    /// ### Required Features
257    ///
258    /// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
259    /// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
260    #[inline]
261    pub fn pending_frames(&self) -> usize {
262        self.node.borrow().pending_frames(self.key)
263    }
264}
265
266impl<T> SignalBus for T where T: Signal {}
267
268impl<S> Signal for Output<S>
269where
270    S: Signal,
271{
272    type Frame = S::Frame;
273
274    #[inline]
275    fn next(&mut self) -> Self::Frame {
276        self.node.borrow_mut().next_frame(self.key)
277    }
278
279    #[inline]
280    fn is_exhausted(&self) -> bool {
281        let node = self.node.borrow();
282        node.pending_frames(self.key) == 0 && node.signal.is_exhausted()
283    }
284}
285
286impl<S> Drop for Output<S>
287where
288    S: Signal,
289{
290    fn drop(&mut self) {
291        self.node.borrow_mut().drop_output(self.key)
292    }
293}