dasp_signal/bus.rs
1//! An extension to the **Signal** trait that enables multiple signal outputs.
2//!
3//! ### Required Features
4//!
5//! - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
6//! - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
7
8use crate::{Rc, Signal};
9
10#[cfg(not(feature = "std"))]
11type BTreeMap<K, V> = alloc::collections::btree_map::BTreeMap<K, V>;
12#[cfg(feature = "std")]
13type BTreeMap<K, V> = std::collections::BTreeMap<K, V>;
14
15#[cfg(not(feature = "std"))]
16type VecDeque<T> = alloc::collections::vec_deque::VecDeque<T>;
17#[cfg(feature = "std")]
18type VecDeque<T> = std::collections::vec_deque::VecDeque<T>;
19
20/// An extension to the **Signal** trait that enables multiple signal outputs.
21///
22/// ### Required Features
23///
24/// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
25/// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
26pub trait SignalBus: Signal {
27 /// Moves the `Signal` into a `Bus` from which its output may be divided into multiple other
28 /// `Signal`s in the form of `Output`s.
29 ///
30 /// This method allows to create more complex directed acyclic graph structures that
31 /// incorporate concepts like sends, side-chaining, etc, rather than being restricted to tree
32 /// structures where signals can only ever be joined but never divided.
33 ///
34 /// Note: When using multiple `Output`s in this fashion, you will need to be sure to pull the
35 /// frames from each `Output` in sync (whether per frame or per buffer). This is because when
36 /// output A requests `Frame`s before output B, those frames must remain available for output
37 /// B and in turn must be stored in an intermediary ring buffer.
38 ///
39 /// # Example
40 ///
41 /// ```rust
42 /// use dasp_signal::{self as signal, Signal};
43 /// use dasp_signal::bus::SignalBus;
44 ///
45 /// fn main() {
46 /// let frames = [[0.1], [0.2], [0.3], [0.4], [0.5], [0.6]];
47 /// let signal = signal::from_iter(frames.iter().cloned());
48 /// let bus = signal.bus();
49 /// let mut a = bus.send();
50 /// let mut b = bus.send();
51 /// assert_eq!(a.by_ref().take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
52 /// assert_eq!(b.by_ref().take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
53 ///
54 /// let c = bus.send();
55 /// assert_eq!(c.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
56 /// assert_eq!(b.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
57 /// assert_eq!(a.take(3).collect::<Vec<_>>(), vec![[0.4], [0.5], [0.6]]);
58 /// }
59 /// ```
60 ///
61 /// ### Required Features
62 ///
63 /// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
64 /// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
65 fn bus(self) -> Bus<Self>
66 where
67 Self: Sized,
68 {
69 Bus::new(self, BTreeMap::new())
70 }
71}
72
73/// The data shared between each `Output`.
74struct SharedNode<S>
75where
76 S: Signal,
77{
78 signal: S,
79 // The buffer of frames that have not yet been consumed by all outputs.
80 buffer: VecDeque<S::Frame>,
81 // The number of frames in `buffer` that have already been read for each output.
82 frames_read: BTreeMap<usize, usize>,
83 // The next output key.
84 next_key: usize,
85}
86
87/// A type which allows for `send`ing a single `Signal` to multiple outputs.
88///
89/// ### Required Features
90///
91/// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
92/// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
93pub struct Bus<S>
94where
95 S: Signal,
96{
97 node: Rc<core::cell::RefCell<SharedNode<S>>>,
98}
99
100/// An output node to which some signal `S` is `Output`ing its frames.
101///
102/// It may be more accurate to say that the `Output` "pull"s frames from the signal.
103///
104/// ### Required Features
105///
106/// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
107/// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
108pub struct Output<S>
109where
110 S: Signal,
111{
112 key: usize,
113 node: Rc<core::cell::RefCell<SharedNode<S>>>,
114}
115
116impl<S> Bus<S>
117where
118 S: Signal,
119{
120 fn new(signal: S, frames_read: BTreeMap<usize, usize>) -> Self {
121 Bus {
122 node: Rc::new(core::cell::RefCell::new(SharedNode {
123 signal: signal,
124 buffer: VecDeque::new(),
125 frames_read: frames_read,
126 next_key: 0,
127 })),
128 }
129 }
130
131 /// Produce a new Output node to which the signal `S` will output its frames.
132 ///
133 /// ### Required Features
134 ///
135 /// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
136 /// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
137 #[inline]
138 pub fn send(&self) -> Output<S> {
139 let mut node = self.node.borrow_mut();
140
141 // Get the key and increment for the next output.
142 let key = node.next_key;
143 node.next_key = node.next_key.wrapping_add(1);
144
145 // Insert the number of frames read by the new output.
146 let num_frames = node.buffer.len();
147 node.frames_read.insert(key, num_frames);
148
149 Output {
150 key: key,
151 node: self.node.clone(),
152 }
153 }
154}
155
156impl<S> SharedNode<S>
157where
158 S: Signal,
159{
160 // Requests the next frame for the `Output` at the given key.
161 //
162 // If there are no frames pending for the output, a new frame will be requested from the
163 // signal and appended to the ring buffer to be received by the other outputs.
164 fn next_frame(&mut self, key: usize) -> S::Frame {
165 let num_frames = self.buffer.len();
166 let frames_read = self
167 .frames_read
168 .remove(&key)
169 .expect("no frames_read for Output");
170
171 let frame = if frames_read < num_frames {
172 self.buffer[frames_read]
173 } else {
174 let frame = self.signal.next();
175 self.buffer.push_back(frame);
176 frame
177 };
178
179 // If the number of frames read by this output is the lowest, then we can pop the frame
180 // from the front.
181 let least_frames_read = !self
182 .frames_read
183 .values()
184 .any(|&other_frames_read| other_frames_read <= frames_read);
185
186 // If this output had read the least number of frames, pop the front frame and decrement
187 // the frames read counters for each of the other outputs.
188 let new_frames_read = if least_frames_read {
189 self.buffer.pop_front();
190 for other_frames_read in self.frames_read.values_mut() {
191 *other_frames_read -= 1;
192 }
193 frames_read
194 } else {
195 frames_read + 1
196 };
197
198 self.frames_read.insert(key, new_frames_read);
199
200 frame
201 }
202
203 #[inline]
204 fn pending_frames(&self, key: usize) -> usize {
205 self.buffer.len() - self.frames_read[&key]
206 }
207
208 // Drop the given output from the `Bus`.
209 //
210 // Called by the `Output::drop` implementation.
211 fn drop_output(&mut self, key: usize) {
212 self.frames_read.remove(&key);
213 let least_frames_read = self
214 .frames_read
215 .values()
216 .fold(self.buffer.len(), |a, &b| core::cmp::min(a, b));
217 if least_frames_read > 0 {
218 for frames_read in self.frames_read.values_mut() {
219 *frames_read -= least_frames_read;
220 }
221 for _ in 0..least_frames_read {
222 self.buffer.pop_front();
223 }
224 }
225 }
226}
227
228impl<S> Output<S>
229where
230 S: Signal,
231{
232 /// The number of frames that have been requested from the `Signal` `S` by some other `Output`
233 /// that have not yet been requested by this `Output`.
234 ///
235 /// This is useful when using an `Output` to "monitor" some signal, allowing the user to drain
236 /// only frames that have already been requested by some other `Output`.
237 ///
238 /// # Example
239 ///
240 /// ```
241 /// use dasp_signal::{self as signal, Signal};
242 /// use dasp_signal::bus::SignalBus;
243 ///
244 /// fn main() {
245 /// let frames = [[0.1], [0.2], [0.3]];
246 /// let bus = signal::from_iter(frames.iter().cloned()).bus();
247 /// let signal = bus.send();
248 /// let mut monitor = bus.send();
249 /// assert_eq!(signal.take(3).collect::<Vec<_>>(), vec![[0.1], [0.2], [0.3]]);
250 /// assert_eq!(monitor.pending_frames(), 3);
251 /// assert_eq!(monitor.next(), [0.1]);
252 /// assert_eq!(monitor.pending_frames(), 2);
253 /// }
254 /// ```
255 ///
256 /// ### Required Features
257 ///
258 /// - When using `dasp_signal`, this item requires the **bus** feature to be enabled.
259 /// - When using `dasp`, this item requires the **signal-bus** feature to be enabled.
260 #[inline]
261 pub fn pending_frames(&self) -> usize {
262 self.node.borrow().pending_frames(self.key)
263 }
264}
265
266impl<T> SignalBus for T where T: Signal {}
267
268impl<S> Signal for Output<S>
269where
270 S: Signal,
271{
272 type Frame = S::Frame;
273
274 #[inline]
275 fn next(&mut self) -> Self::Frame {
276 self.node.borrow_mut().next_frame(self.key)
277 }
278
279 #[inline]
280 fn is_exhausted(&self) -> bool {
281 let node = self.node.borrow();
282 node.pending_frames(self.key) == 0 && node.signal.is_exhausted()
283 }
284}
285
286impl<S> Drop for Output<S>
287where
288 S: Signal,
289{
290 fn drop(&mut self) {
291 self.node.borrow_mut().drop_output(self.key)
292 }
293}