Skip to main content

firewheel_pool/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3#[cfg(not(feature = "std"))]
4use bevy_platform::prelude::Vec;
5
6use firewheel_core::{
7    channel_config::NonZeroChannelCount,
8    node::{AudioNode, NodeID},
9};
10use firewheel_graph::{backend::AudioBackend, ContextQueue, FirewheelCtx};
11use smallvec::SmallVec;
12use thunderdome::Arena;
13
14#[cfg(feature = "scheduled_events")]
15use firewheel_core::clock::EventInstant;
16
17#[cfg(feature = "sampler")]
18mod sampler;
19#[cfg(feature = "sampler")]
20pub use sampler::SamplerPool;
21
22mod volume_pan;
23pub use volume_pan::VolumePanChain;
24
25#[cfg(feature = "spatial_basic")]
26mod spatial_basic;
27#[cfg(feature = "spatial_basic")]
28pub use spatial_basic::SpatialBasicChain;
29
30#[cfg(feature = "sampler")]
31pub type SamplerPoolVolumePan = AudioNodePool<SamplerPool, VolumePanChain>;
32#[cfg(all(feature = "sampler", feature = "spatial_basic"))]
33pub type SamplerPoolSpatialBasic = AudioNodePool<SamplerPool, SpatialBasicChain>;
34
35/// A trait describing an "FX chain" for use in an [`AudioNodePool`].
36pub trait FxChain: Default {
37    /// Construct the nodes in the FX chain and connect them, returning a list of the
38    /// new node ids.
39    ///
40    /// * `first_node_id` - The ID of the first node in this fx chain instance.
41    /// * `first_node_num_out_channels` - The number of output channels in the first node.
42    /// * `dst_node_id` - The ID of the node that the last node in this FX chain should
43    /// connect to.
44    /// * `dst_num_channels` - The number of input channels on `dst_node_id`.
45    /// * `cx` - The firewheel context.
46    fn construct_and_connect<B: AudioBackend>(
47        &mut self,
48        first_node_id: NodeID,
49        first_node_num_out_channels: NonZeroChannelCount,
50        dst_node_id: NodeID,
51        dst_num_channels: NonZeroChannelCount,
52        cx: &mut FirewheelCtx<B>,
53    ) -> Vec<NodeID>;
54}
55
56struct Worker<N: PoolableNode, FX: FxChain> {
57    first_node_params: N::AudioNode,
58    first_node_id: NodeID,
59
60    fx_state: FxChainState<FX>,
61
62    assigned_worker_id: Option<WorkerID>,
63}
64
65#[derive(Debug)]
66pub struct FxChainState<FX: FxChain> {
67    pub fx_chain: FX,
68    pub node_ids: Vec<NodeID>,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
72pub struct WorkerID(thunderdome::Index);
73
74impl WorkerID {
75    pub const DANGLING: Self = Self(thunderdome::Index::DANGLING);
76}
77
78impl Default for WorkerID {
79    fn default() -> Self {
80        Self::DANGLING
81    }
82}
83
84/// A trait describing the first node in an [`AudioNodePool`].
85pub trait PoolableNode {
86    /// The node parameters
87    type AudioNode: AudioNode + Clone + 'static;
88
89    /// Return the number of output channels for the given configuration.
90    fn num_output_channels(
91        config: Option<&<Self::AudioNode as AudioNode>::Configuration>,
92    ) -> NonZeroChannelCount;
93
94    /// Return `true` if the given parameters signify that the sequence is stopped,
95    /// `false` otherwise.
96    fn params_stopped(params: &Self::AudioNode) -> bool;
97    /// Return `true` if the node state of the given node is stopped.
98    ///
99    /// Return an error if the given `node_id` is invalid.
100    fn node_is_stopped<B: AudioBackend>(
101        node_id: NodeID,
102        cx: &FirewheelCtx<B>,
103    ) -> Result<bool, PoolError>;
104
105    /// Return a score of how ready this node is to accept new work.
106    ///
107    /// The worker with the highest worker score will be chosen for the new work.
108    ///
109    /// Return an error if the given `node_id` is invalid.
110    fn worker_score<B: AudioBackend>(
111        params: &Self::AudioNode,
112        node_id: NodeID,
113        cx: &mut FirewheelCtx<B>,
114    ) -> Result<u64, PoolError>;
115
116    /// Diff the new parameters and push the changes into the event queue.
117    fn diff<B: AudioBackend>(
118        baseline: &Self::AudioNode,
119        new: &Self::AudioNode,
120        event_queue: &mut ContextQueue<B>,
121    );
122
123    /// Notify the node state that a sequence is playing.
124    ///
125    /// This is used to account for the delay between sending an event to the node
126    /// and the node receiving the event.
127    ///
128    /// Return an error if the given `node_id` is invalid.
129    fn mark_playing<B: AudioBackend>(
130        node_id: NodeID,
131        cx: &mut FirewheelCtx<B>,
132    ) -> Result<(), PoolError>;
133
134    /// Pause the sequence in the node parameters
135    fn pause(params: &mut Self::AudioNode);
136    /// Resume the sequence in the node parameters
137    fn resume(params: &mut Self::AudioNode);
138    /// Stop the sequence in the node parameters
139    fn stop(params: &mut Self::AudioNode);
140}
141
142/// A pool of audio node chains that can dynamically be assigned work.
143pub struct AudioNodePool<N: PoolableNode, FX: FxChain> {
144    workers: Vec<Worker<N, FX>>,
145    worker_ids: Arena<usize>,
146    num_active_workers: usize,
147}
148
149impl<N: PoolableNode, FX: FxChain> AudioNodePool<N, FX>
150where
151    <N::AudioNode as AudioNode>::Configuration: Clone,
152{
153    /// Construct a new sampler pool.
154    ///
155    /// * `num_workers` - The total number of workers that can work in parallel. More workers
156    /// will allow more samples to be played concurrently, but will also increase processing
157    /// overhead. A value of `16` is a good place to start.
158    /// * `first_node` - The state of the first node in each FX chain instance.
159    /// * `first_node_config` - The configuration of the first node in each FX chain instance.
160    /// * `first_node_num_out_channels` - The number of output channels in the first node.
161    /// * `dst_node_id` - The ID of the node that the last effect in each fx chain instance
162    /// will connect to.
163    /// * `dst_num_channels` - The number of input channels in `dst_node_id`.
164    /// * `cx` - The firewheel context.
165    pub fn new<B: AudioBackend>(
166        num_workers: usize,
167        first_node: N::AudioNode,
168        first_node_config: Option<<N::AudioNode as AudioNode>::Configuration>,
169        dst_node_id: NodeID,
170        dst_num_channels: NonZeroChannelCount,
171        cx: &mut FirewheelCtx<B>,
172    ) -> Self {
173        assert_ne!(num_workers, 0);
174
175        let first_node_num_out_channels = N::num_output_channels(first_node_config.as_ref());
176
177        Self {
178            workers: (0..num_workers)
179                .map(|_| {
180                    let first_node_id = cx.add_node(first_node.clone(), first_node_config.clone());
181
182                    let mut fx_chain = FX::default();
183
184                    let fx_ids = fx_chain.construct_and_connect(
185                        first_node_id,
186                        first_node_num_out_channels,
187                        dst_node_id,
188                        dst_num_channels,
189                        cx,
190                    );
191
192                    Worker {
193                        first_node_params: first_node.clone(),
194                        first_node_id,
195
196                        fx_state: FxChainState {
197                            fx_chain,
198                            node_ids: fx_ids,
199                        },
200
201                        assigned_worker_id: None,
202                    }
203                })
204                .collect(),
205            worker_ids: Arena::with_capacity(num_workers),
206            num_active_workers: 0,
207        }
208    }
209
210    pub fn num_workers(&self) -> usize {
211        self.workers.len()
212    }
213
214    /// Queue a new work to play a sequence.
215    ///
216    /// * `params` - The parameters of the sequence to play.
217    /// * `time` - The instant these new parameters should take effect. If this
218    /// is `None`, then the parameters will take effect as soon as the node receives
219    /// the event.
220    /// * `steal` - If this is `true`, then if there are no more workers left in
221    /// in the pool, the oldest one will be stopped and replaced with this new
222    /// one. If this is `false`, then an error will be returned if no more workers
223    /// are left.
224    /// * `cx` - The Firewheel context.
225    /// * `fx_chain` - A closure to add additional nodes to this worker instance.
226    ///
227    /// This will return an error if `params.playback == PlaybackState::Stop`.
228    pub fn new_worker<B: AudioBackend>(
229        &mut self,
230        params: &N::AudioNode,
231        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
232        steal: bool,
233        cx: &mut FirewheelCtx<B>,
234        fx_chain: impl FnOnce(&mut FxChainState<FX>, &mut FirewheelCtx<B>),
235    ) -> Result<NewWorkerResult, NewWorkerError> {
236        if N::params_stopped(params) {
237            return Err(NewWorkerError::ParameterStateIsStop);
238        }
239
240        if !steal && self.num_active_workers == self.workers.len() {
241            return Err(NewWorkerError::NoMoreWorkers);
242        }
243
244        let mut idx = 0;
245        let mut max_score = 0;
246        for (i, worker) in self.workers.iter().enumerate() {
247            if worker.assigned_worker_id.is_none() {
248                idx = i;
249                break;
250            }
251
252            let score =
253                N::worker_score(&worker.first_node_params, worker.first_node_id, cx).unwrap();
254
255            if score == u64::MAX {
256                idx = i;
257                break;
258            }
259
260            if score > max_score {
261                max_score = score;
262                idx = i;
263            }
264        }
265
266        let worker_id = WorkerID(self.worker_ids.insert(idx));
267
268        let worker = &mut self.workers[idx];
269
270        let old_worker_id = worker.assigned_worker_id.take();
271        let was_playing_sequence = if let Some(old_worker_id) = old_worker_id {
272            self.worker_ids.remove(old_worker_id.0);
273
274            !(N::params_stopped(params) || N::node_is_stopped(worker.first_node_id, cx).unwrap())
275        } else {
276            false
277        };
278
279        worker.assigned_worker_id = Some(worker_id);
280        self.num_active_workers += 1;
281
282        #[cfg(not(feature = "scheduled_events"))]
283        let mut event_queue = cx.event_queue(worker.first_node_id);
284        #[cfg(feature = "scheduled_events")]
285        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
286
287        N::diff(&worker.first_node_params, params, &mut event_queue);
288
289        worker.first_node_params = params.clone();
290
291        N::mark_playing(worker.first_node_id, cx).unwrap();
292
293        (fx_chain)(&mut worker.fx_state, cx);
294
295        Ok(NewWorkerResult {
296            worker_id,
297            old_worker_id,
298            was_playing_sequence,
299        })
300    }
301
302    /// Sync the parameters for the given worker.
303    ///
304    /// * `worker_id` - The ID of the worker
305    /// * `params` - The new parameter state to sync
306    /// * `time` - The instant these new parameters should take effect. If this
307    /// is `None`, then the parameters will take effect as soon as the node receives
308    /// the event.
309    /// * `cx` - The Firewheel context
310    ///
311    /// If the parameters signify that the seuquence is stopped, then this worker
312    /// will be removed and the `worker_id` will be invalidated.
313    ///
314    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
315    pub fn sync_worker_params<B: AudioBackend>(
316        &mut self,
317        worker_id: WorkerID,
318        params: &N::AudioNode,
319        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
320        cx: &mut FirewheelCtx<B>,
321    ) -> bool {
322        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
323            return false;
324        };
325
326        let worker = &mut self.workers[idx];
327
328        #[cfg(not(feature = "scheduled_events"))]
329        let mut event_queue = cx.event_queue(worker.first_node_id);
330        #[cfg(feature = "scheduled_events")]
331        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
332
333        N::diff(&worker.first_node_params, params, &mut event_queue);
334
335        worker.first_node_params = params.clone();
336
337        if N::params_stopped(params) {
338            self.worker_ids.remove(worker_id.0);
339            worker.assigned_worker_id = None;
340            self.num_active_workers -= 1;
341        }
342
343        true
344    }
345
346    /// Pause the given worker.
347    ///
348    /// * `worker_id` - The ID of the worker
349    /// * `time` - The instant that the pause should take effect. If this is
350    /// `None`, then the parameters will take effect as soon as the node receives
351    /// the event.
352    /// * `cx` - The Firewheel context
353    ///
354    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
355    pub fn pause<B: AudioBackend>(
356        &mut self,
357        worker_id: WorkerID,
358        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
359        cx: &mut FirewheelCtx<B>,
360    ) -> bool {
361        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
362            return false;
363        };
364
365        let worker = &mut self.workers[idx];
366
367        let mut new_params = worker.first_node_params.clone();
368        N::pause(&mut new_params);
369
370        #[cfg(not(feature = "scheduled_events"))]
371        let mut event_queue = cx.event_queue(worker.first_node_id);
372        #[cfg(feature = "scheduled_events")]
373        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
374
375        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
376
377        true
378    }
379
380    /// Resume the given worker.
381    ///
382    /// * `worker_id` - The ID of the worker
383    /// * `time` - The instant that the resume should take effect. If this is
384    /// `None`, then the parameters will take effect as soon as the node receives
385    /// the event.
386    /// * `cx` - The Firewheel context
387    ///
388    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
389    pub fn resume<B: AudioBackend>(
390        &mut self,
391        worker_id: WorkerID,
392        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
393        cx: &mut FirewheelCtx<B>,
394    ) -> bool {
395        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
396            return false;
397        };
398
399        let worker = &mut self.workers[idx];
400
401        let mut new_params = worker.first_node_params.clone();
402        N::resume(&mut new_params);
403
404        #[cfg(not(feature = "scheduled_events"))]
405        let mut event_queue = cx.event_queue(worker.first_node_id);
406        #[cfg(feature = "scheduled_events")]
407        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
408
409        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
410
411        true
412    }
413
414    /// Stop the given worker.
415    ///
416    /// * `worker_id` - The ID of the worker
417    /// * `time` - The instant that the stop should take effect. If this is
418    /// `None`, then the parameters will take effect as soon as the node receives
419    /// the event.
420    /// * `cx` - The Firewheel context
421    ///
422    /// This will remove the worker and invalidate the given `worker_id`.
423    ///
424    /// Returns `true` if a worker with the given ID exists and was stopped.
425    pub fn stop<B: AudioBackend>(
426        &mut self,
427        worker_id: WorkerID,
428        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
429        cx: &mut FirewheelCtx<B>,
430    ) -> bool {
431        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
432            return false;
433        };
434
435        let worker = &mut self.workers[idx];
436
437        let mut new_params = worker.first_node_params.clone();
438        N::stop(&mut new_params);
439
440        #[cfg(not(feature = "scheduled_events"))]
441        let mut event_queue = cx.event_queue(worker.first_node_id);
442        #[cfg(feature = "scheduled_events")]
443        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
444
445        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
446
447        self.worker_ids.remove(worker_id.0);
448        worker.assigned_worker_id = None;
449        self.num_active_workers -= 1;
450
451        true
452    }
453
454    /// Pause all workers.
455    ///
456    /// * `time` - The instant that the stop should take effect. If this is
457    /// `None`, then the parameters will take effect as soon as the node receives
458    /// the event.
459    pub fn pause_all<B: AudioBackend>(
460        &mut self,
461        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
462        cx: &mut FirewheelCtx<B>,
463    ) {
464        for worker in self.workers.iter_mut() {
465            if worker.assigned_worker_id.is_some() {
466                let mut new_params = worker.first_node_params.clone();
467                N::pause(&mut new_params);
468
469                #[cfg(not(feature = "scheduled_events"))]
470                let mut event_queue = cx.event_queue(worker.first_node_id);
471                #[cfg(feature = "scheduled_events")]
472                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
473
474                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
475            }
476        }
477    }
478
479    /// Resume all workers.
480    ///
481    /// * `time` - The instant that the stop should take effect. If this is
482    /// `None`, then the parameters will take effect as soon as the node receives
483    /// the event.
484    pub fn resume_all<B: AudioBackend>(
485        &mut self,
486        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
487        cx: &mut FirewheelCtx<B>,
488    ) {
489        for worker in self.workers.iter_mut() {
490            if worker.assigned_worker_id.is_some() {
491                let mut new_params = worker.first_node_params.clone();
492                N::resume(&mut new_params);
493
494                #[cfg(not(feature = "scheduled_events"))]
495                let mut event_queue = cx.event_queue(worker.first_node_id);
496                #[cfg(feature = "scheduled_events")]
497                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
498
499                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
500            }
501        }
502    }
503
504    /// Stop all workers.
505    ///
506    /// * `time` - The instant that the stop should take effect. If this is
507    /// `None`, then the parameters will take effect as soon as the node receives
508    /// the event.
509    pub fn stop_all<B: AudioBackend>(
510        &mut self,
511        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
512        cx: &mut FirewheelCtx<B>,
513    ) {
514        for worker in self.workers.iter_mut() {
515            if worker.assigned_worker_id.is_some() {
516                let mut new_params = worker.first_node_params.clone();
517                N::stop(&mut new_params);
518
519                #[cfg(not(feature = "scheduled_events"))]
520                let mut event_queue = cx.event_queue(worker.first_node_id);
521                #[cfg(feature = "scheduled_events")]
522                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
523
524                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
525
526                worker.assigned_worker_id = None;
527            }
528        }
529
530        self.worker_ids.clear();
531        self.num_active_workers = 0;
532    }
533
534    /// Get the first node parameters of the given worker.
535    pub fn first_node(&self, worker_id: WorkerID) -> Option<&N::AudioNode> {
536        self.worker_ids
537            .get(worker_id.0)
538            .map(|idx| &self.workers[*idx].first_node_params)
539    }
540
541    /// Get an immutable reference to the state of the first node of the given worker.
542    pub fn first_node_state<'a, T: 'static, B: AudioBackend>(
543        &self,
544        worker_id: WorkerID,
545        cx: &'a FirewheelCtx<B>,
546    ) -> Option<&'a T> {
547        self.worker_ids
548            .get(worker_id.0)
549            .and_then(|idx| cx.node_state::<T>(self.workers[*idx].first_node_id))
550    }
551
552    /// Get a mutable reference to the state of the first node of the given worker.
553    pub fn first_node_state_mut<'a, T: 'static, B: AudioBackend>(
554        &self,
555        worker_id: WorkerID,
556        cx: &'a mut FirewheelCtx<B>,
557    ) -> Option<&'a mut T> {
558        self.worker_ids
559            .get(worker_id.0)
560            .and_then(|idx| cx.node_state_mut::<T>(self.workers[*idx].first_node_id))
561    }
562
563    pub fn fx_chain(&self, worker_id: WorkerID) -> Option<&FxChainState<FX>> {
564        self.worker_ids
565            .get(worker_id.0)
566            .map(|idx| &self.workers[*idx].fx_state)
567    }
568
569    pub fn fx_chain_mut(&mut self, worker_id: WorkerID) -> Option<&mut FxChainState<FX>> {
570        self.worker_ids
571            .get(worker_id.0)
572            .map(|idx| &mut self.workers[*idx].fx_state)
573    }
574
575    /// Returns `true` if the sequence has either not started playing yet or has finished
576    /// playing.
577    pub fn has_stopped<B: AudioBackend>(&self, worker_id: WorkerID, cx: &FirewheelCtx<B>) -> bool {
578        self.worker_ids
579            .get(worker_id.0)
580            .map(|idx| N::node_is_stopped(self.workers[*idx].first_node_id, cx).unwrap())
581            .unwrap_or(true)
582    }
583
584    /// Poll for the current number of active workers, and return a list of
585    /// workers which have finished playing.
586    ///
587    /// Calling this method is optional.
588    pub fn poll<B: AudioBackend>(&mut self, cx: &FirewheelCtx<B>) -> PollResult {
589        self.num_active_workers = 0;
590        let mut finished_workers = SmallVec::new();
591
592        for worker in self.workers.iter_mut() {
593            if worker.assigned_worker_id.is_some() {
594                if N::node_is_stopped(worker.first_node_id, cx).unwrap() {
595                    let id = worker.assigned_worker_id.take().unwrap();
596                    self.worker_ids.remove(id.0);
597                    finished_workers.push(id);
598                } else {
599                    self.num_active_workers += 1;
600                }
601            }
602        }
603
604        PollResult { finished_workers }
605    }
606
607    /// The total number of active workers.
608    pub fn num_active_workers(&self) -> usize {
609        self.num_active_workers
610    }
611}
612
613#[derive(Debug, Clone, PartialEq)]
614pub struct PollResult {
615    /// The worker IDs which have finished playing. These IDs are now
616    /// invalidated.
617    pub finished_workers: SmallVec<[WorkerID; 4]>,
618}
619
620/// The result of calling [`AudioNodePool::new_worker`].
621#[derive(Debug, Clone, PartialEq, Eq)]
622pub struct NewWorkerResult {
623    /// The new ID of the worker assigned to play this sequence.
624    pub worker_id: WorkerID,
625
626    /// The ID that was previously assigned to this worker.
627    pub old_worker_id: Option<WorkerID>,
628
629    /// If this is `true`, then this worker was already playing a sequence, and that
630    /// sequence has been stopped.
631    pub was_playing_sequence: bool,
632}
633
634#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
635pub enum NewWorkerError {
636    #[error("Could not create new audio node pool worker: the given parameters signify a stopped sequence")]
637    ParameterStateIsStop,
638    #[error("Could not create new audio node pool worker: the worker pool is full")]
639    NoMoreWorkers,
640}
641
642#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
643pub enum PoolError {
644    #[error("A node with ID {0:?} does not exist in this pool")]
645    InvalidNodeID(NodeID),
646}