firewheel_pool/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3use firewheel_core::{
4    channel_config::NonZeroChannelCount,
5    node::{AudioNode, NodeID},
6};
7use firewheel_graph::{backend::AudioBackend, ContextQueue, FirewheelCtx};
8use smallvec::SmallVec;
9use thunderdome::Arena;
10
11#[cfg(feature = "scheduled_events")]
12use firewheel_core::clock::EventInstant;
13
14#[cfg(feature = "sampler")]
15mod sampler;
16#[cfg(feature = "sampler")]
17pub use sampler::SamplerPool;
18
19mod volume_pan;
20pub use volume_pan::VolumePanChain;
21
22#[cfg(feature = "spatial_basic")]
23mod spatial_basic;
24#[cfg(feature = "spatial_basic")]
25pub use spatial_basic::SpatialBasicChain;
26
27#[cfg(feature = "sampler")]
28pub type SamplerPoolVolumePan = AudioNodePool<SamplerPool, VolumePanChain>;
29#[cfg(all(feature = "sampler", feature = "spatial_basic"))]
30pub type SamplerPoolSpatialBasic = AudioNodePool<SamplerPool, SpatialBasicChain>;
31
32/// A trait describing an "FX chain" for use in an [`AudioNodePool`].
33pub trait FxChain: Default {
34    /// Construct the nodes in the FX chain and connect them, returning a list of the
35    /// new node ids.
36    ///
37    /// * `first_node_id` - The ID of the first node in this fx chain instance.
38    /// * `first_node_num_out_channels` - The number of output channels in the first node.
39    /// * `dst_node_id` - The ID of the node that the last node in this FX chain should
40    /// connect to.
41    /// * `dst_num_channels` - The number of input channels on `dst_node_id`.
42    /// * `cx` - The firewheel context.
43    fn construct_and_connect<B: AudioBackend>(
44        &mut self,
45        first_node_id: NodeID,
46        first_node_num_out_channels: NonZeroChannelCount,
47        dst_node_id: NodeID,
48        dst_num_channels: NonZeroChannelCount,
49        cx: &mut FirewheelCtx<B>,
50    ) -> Vec<NodeID>;
51}
52
53struct Worker<N: PoolableNode, FX: FxChain> {
54    first_node_params: N::AudioNode,
55    first_node_id: NodeID,
56
57    fx_state: FxChainState<FX>,
58
59    assigned_worker_id: Option<WorkerID>,
60}
61
62pub struct FxChainState<FX: FxChain> {
63    pub fx_chain: FX,
64    pub node_ids: Vec<NodeID>,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
68pub struct WorkerID(thunderdome::Index);
69
70impl WorkerID {
71    pub const DANGLING: Self = Self(thunderdome::Index::DANGLING);
72}
73
74impl Default for WorkerID {
75    fn default() -> Self {
76        Self::DANGLING
77    }
78}
79
80/// A trait describing the first node in an [`AudioNodePool`].
81pub trait PoolableNode {
82    /// The node parameters
83    type AudioNode: AudioNode + Clone + 'static;
84
85    /// Return the number of output channels for the given configuration.
86    fn num_output_channels(
87        config: Option<&<Self::AudioNode as AudioNode>::Configuration>,
88    ) -> NonZeroChannelCount;
89
90    /// Return `true` if the given parameters signify that the sequence is stopped,
91    /// `false` otherwise.
92    fn params_stopped(params: &Self::AudioNode) -> bool;
93    /// Return `true` if the node state of the given node is stopped.
94    ///
95    /// Return an error if the given `node_id` is invalid.
96    fn node_is_stopped<B: AudioBackend>(
97        node_id: NodeID,
98        cx: &FirewheelCtx<B>,
99    ) -> Result<bool, PoolError>;
100
101    /// Return a score of how ready this node is to accept new work.
102    ///
103    /// The worker with the highest worker score will be chosen for the new work.
104    ///
105    /// Return an error if the given `node_id` is invalid.
106    fn worker_score<B: AudioBackend>(
107        params: &Self::AudioNode,
108        node_id: NodeID,
109        cx: &mut FirewheelCtx<B>,
110    ) -> Result<u64, PoolError>;
111
112    /// Diff the new parameters and push the changes into the event queue.
113    fn diff<B: AudioBackend>(
114        baseline: &Self::AudioNode,
115        new: &Self::AudioNode,
116        event_queue: &mut ContextQueue<B>,
117    );
118
119    /// Notify the node state that a sequence is playing/stopped.
120    ///
121    /// If `stopped` is `true`, then the sequence has been stopped. If `stopped` is
122    /// `false`, then a new sequence has been started.
123    ///
124    /// This is used to account for the delay between sending an event to the node
125    /// and the node receiving the event.
126    ///
127    /// Return an error if the given `node_id` is invalid.
128    fn mark_stopped<B: AudioBackend>(
129        stopped: bool,
130        node_id: NodeID,
131        cx: &mut FirewheelCtx<B>,
132    ) -> Result<(), PoolError>;
133
134    /// Pause the sequence in the node parameters
135    fn pause(params: &mut Self::AudioNode);
136    /// Resume the sequence in the node parameters
137    fn resume(params: &mut Self::AudioNode);
138    /// Stop the sequence in the node parameters
139    fn stop(params: &mut Self::AudioNode);
140}
141
142/// A pool of audio node chains that can dynamically be assigned work.
143pub struct AudioNodePool<N: PoolableNode, FX: FxChain> {
144    workers: Vec<Worker<N, FX>>,
145    worker_ids: Arena<usize>,
146    num_active_workers: usize,
147}
148
149impl<N: PoolableNode, FX: FxChain> AudioNodePool<N, FX>
150where
151    <N::AudioNode as AudioNode>::Configuration: Clone,
152{
153    /// Construct a new sampler pool.
154    ///
155    /// * `num_workers` - The total number of workers that can work in parallel. More workers
156    /// will allow more samples to be played concurrently, but will also increase processing
157    /// overhead. A value of `16` is a good place to start.
158    /// * `first_node` - The state of the first node in each FX chain instance.
159    /// * `first_node_config` - The configuration of the first node in each FX chain instance.
160    /// * `first_node_num_out_channels` - The number of output channels in the first node.
161    /// * `dst_node_id` - The ID of the node that the last effect in each fx chain instance
162    /// will connect to.
163    /// * `dst_num_channels` - The number of input channels in `dst_node_id`.
164    /// * `cx` - The firewheel context.
165    pub fn new<B: AudioBackend>(
166        num_workers: usize,
167        first_node: N::AudioNode,
168        first_node_config: Option<<N::AudioNode as AudioNode>::Configuration>,
169        dst_node_id: NodeID,
170        dst_num_channels: NonZeroChannelCount,
171        cx: &mut FirewheelCtx<B>,
172    ) -> Self {
173        assert_ne!(num_workers, 0);
174
175        let first_node_num_out_channels = N::num_output_channels(first_node_config.as_ref());
176
177        Self {
178            workers: (0..num_workers)
179                .map(|_| {
180                    let first_node_id = cx.add_node(first_node.clone(), first_node_config.clone());
181
182                    let mut fx_chain = FX::default();
183
184                    let fx_ids = fx_chain.construct_and_connect(
185                        first_node_id,
186                        first_node_num_out_channels,
187                        dst_node_id,
188                        dst_num_channels,
189                        cx,
190                    );
191
192                    Worker {
193                        first_node_params: first_node.clone(),
194                        first_node_id,
195
196                        fx_state: FxChainState {
197                            fx_chain,
198                            node_ids: fx_ids,
199                        },
200
201                        assigned_worker_id: None,
202                    }
203                })
204                .collect(),
205            worker_ids: Arena::with_capacity(num_workers),
206            num_active_workers: 0,
207        }
208    }
209
210    pub fn num_workers(&self) -> usize {
211        self.workers.len()
212    }
213
214    /// Queue a new work to play a sequence.
215    ///
216    /// * `params` - The parameters of the sequence to play.
217    /// * `time` - The instant these new parameters should take effect. If this
218    /// is `None`, then the parameters will take effect as soon as the node receives
219    /// the event.
220    /// * `steal` - If this is `true`, then if there are no more workers left in
221    /// in the pool, the oldest one will be stopped and replaced with this new
222    /// one. If this is `false`, then an error will be returned if no more workers
223    /// are left.
224    /// * `cx` - The Firewheel context.
225    /// * `fx_chain` - A closure to add additional nodes to this worker instance.
226    ///
227    /// This will return an error if `params.playback == PlaybackState::Stop`.
228    pub fn new_worker<B: AudioBackend>(
229        &mut self,
230        params: &N::AudioNode,
231        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
232        steal: bool,
233        cx: &mut FirewheelCtx<B>,
234        fx_chain: impl FnOnce(&mut FxChainState<FX>, &mut FirewheelCtx<B>),
235    ) -> Result<NewWorkerResult, NewWorkerError> {
236        if N::params_stopped(params) {
237            return Err(NewWorkerError::ParameterStateIsStop);
238        }
239
240        if !steal && self.num_active_workers == self.workers.len() {
241            return Err(NewWorkerError::NoMoreWorkers);
242        }
243
244        let mut idx = 0;
245        let mut max_score = 0;
246        for (i, worker) in self.workers.iter().enumerate() {
247            if worker.assigned_worker_id.is_none() {
248                idx = i;
249                break;
250            }
251
252            let score =
253                N::worker_score(&worker.first_node_params, worker.first_node_id, cx).unwrap();
254
255            if score == u64::MAX {
256                idx = i;
257                break;
258            }
259
260            if score > max_score {
261                max_score = score;
262                idx = i;
263            }
264        }
265
266        let worker_id = WorkerID(self.worker_ids.insert(idx));
267
268        let worker = &mut self.workers[idx];
269
270        let old_worker_id = worker.assigned_worker_id.take();
271        let was_playing_sequence = if let Some(old_worker_id) = old_worker_id {
272            self.worker_ids.remove(old_worker_id.0);
273
274            !(N::params_stopped(params) || N::node_is_stopped(worker.first_node_id, cx).unwrap())
275        } else {
276            false
277        };
278
279        worker.assigned_worker_id = Some(worker_id);
280        self.num_active_workers += 1;
281
282        #[cfg(not(feature = "scheduled_events"))]
283        let mut event_queue = cx.event_queue(worker.first_node_id);
284        #[cfg(feature = "scheduled_events")]
285        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
286
287        N::diff(&worker.first_node_params, params, &mut event_queue);
288
289        worker.first_node_params = params.clone();
290
291        N::mark_stopped(false, worker.first_node_id, cx).unwrap();
292
293        (fx_chain)(&mut worker.fx_state, cx);
294
295        Ok(NewWorkerResult {
296            worker_id,
297            old_worker_id,
298            was_playing_sequence,
299        })
300    }
301
302    /// Sync the parameters for the given worker.
303    ///
304    /// * `worker_id` - The ID of the worker
305    /// * `params` - The new parameter state to sync
306    /// * `time` - The instant these new parameters should take effect. If this
307    /// is `None`, then the parameters will take effect as soon as the node receives
308    /// the event.
309    /// * `cx` - The Firewheel context
310    ///
311    /// If the parameters signify that the seuquence is stopped, then this worker
312    /// will be removed and the `worker_id` will be invalidated.
313    ///
314    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
315    pub fn sync_worker_params<B: AudioBackend>(
316        &mut self,
317        worker_id: WorkerID,
318        params: &N::AudioNode,
319        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
320        cx: &mut FirewheelCtx<B>,
321    ) -> bool {
322        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
323            return false;
324        };
325
326        let worker = &mut self.workers[idx];
327
328        #[cfg(not(feature = "scheduled_events"))]
329        let mut event_queue = cx.event_queue(worker.first_node_id);
330        #[cfg(feature = "scheduled_events")]
331        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
332
333        N::diff(&worker.first_node_params, params, &mut event_queue);
334
335        worker.first_node_params = params.clone();
336
337        if N::params_stopped(params) {
338            self.worker_ids.remove(worker_id.0);
339            worker.assigned_worker_id = None;
340            self.num_active_workers -= 1;
341        }
342
343        true
344    }
345
346    /// Pause the given worker.
347    ///
348    /// * `worker_id` - The ID of the worker
349    /// * `time` - The instant that the pause should take effect. If this is
350    /// `None`, then the parameters will take effect as soon as the node receives
351    /// the event.
352    /// * `cx` - The Firewheel context
353    ///
354    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
355    pub fn pause<B: AudioBackend>(
356        &mut self,
357        worker_id: WorkerID,
358        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
359        cx: &mut FirewheelCtx<B>,
360    ) -> bool {
361        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
362            return false;
363        };
364
365        let worker = &mut self.workers[idx];
366
367        let mut new_params = worker.first_node_params.clone();
368        N::pause(&mut new_params);
369
370        #[cfg(not(feature = "scheduled_events"))]
371        let mut event_queue = cx.event_queue(worker.first_node_id);
372        #[cfg(feature = "scheduled_events")]
373        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
374
375        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
376
377        true
378    }
379
380    /// Resume the given worker.
381    ///
382    /// * `worker_id` - The ID of the worker
383    /// * `time` - The instant that the resume should take effect. If this is
384    /// `None`, then the parameters will take effect as soon as the node receives
385    /// the event.
386    /// * `cx` - The Firewheel context
387    ///
388    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
389    pub fn resume<B: AudioBackend>(
390        &mut self,
391        worker_id: WorkerID,
392        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
393        cx: &mut FirewheelCtx<B>,
394    ) -> bool {
395        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
396            return false;
397        };
398
399        let worker = &mut self.workers[idx];
400
401        let mut new_params = worker.first_node_params.clone();
402        N::resume(&mut new_params);
403
404        #[cfg(not(feature = "scheduled_events"))]
405        let mut event_queue = cx.event_queue(worker.first_node_id);
406        #[cfg(feature = "scheduled_events")]
407        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
408
409        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
410
411        true
412    }
413
414    /// Stop the given worker.
415    ///
416    /// * `worker_id` - The ID of the worker
417    /// * `time` - The instant that the stop should take effect. If this is
418    /// `None`, then the parameters will take effect as soon as the node receives
419    /// the event.
420    /// * `cx` - The Firewheel context
421    ///
422    /// This will remove the worker and invalidate the given `worker_id`.
423    ///
424    /// Returns `true` if a worker with the given ID exists and was stopped.
425    pub fn stop<B: AudioBackend>(
426        &mut self,
427        worker_id: WorkerID,
428        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
429        cx: &mut FirewheelCtx<B>,
430    ) -> bool {
431        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
432            return false;
433        };
434
435        let worker = &mut self.workers[idx];
436
437        let mut new_params = worker.first_node_params.clone();
438        N::stop(&mut new_params);
439
440        #[cfg(not(feature = "scheduled_events"))]
441        let mut event_queue = cx.event_queue(worker.first_node_id);
442        #[cfg(feature = "scheduled_events")]
443        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
444
445        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
446
447        self.worker_ids.remove(worker_id.0);
448        worker.assigned_worker_id = None;
449        self.num_active_workers -= 1;
450
451        true
452    }
453
454    /// Pause all workers.
455    ///
456    /// * `time` - The instant that the stop should take effect. If this is
457    /// `None`, then the parameters will take effect as soon as the node receives
458    /// the event.
459    pub fn pause_all<B: AudioBackend>(
460        &mut self,
461        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
462        cx: &mut FirewheelCtx<B>,
463    ) {
464        for worker in self.workers.iter_mut() {
465            if worker.assigned_worker_id.is_some() {
466                let mut new_params = worker.first_node_params.clone();
467                N::pause(&mut new_params);
468
469                #[cfg(not(feature = "scheduled_events"))]
470                let mut event_queue = cx.event_queue(worker.first_node_id);
471                #[cfg(feature = "scheduled_events")]
472                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
473
474                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
475            }
476        }
477    }
478
479    /// Resume all workers.
480    ///
481    /// * `time` - The instant that the stop should take effect. If this is
482    /// `None`, then the parameters will take effect as soon as the node receives
483    /// the event.
484    pub fn resume_all<B: AudioBackend>(
485        &mut self,
486        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
487        cx: &mut FirewheelCtx<B>,
488    ) {
489        for worker in self.workers.iter_mut() {
490            if worker.assigned_worker_id.is_some() {
491                let mut new_params = worker.first_node_params.clone();
492                N::resume(&mut new_params);
493
494                #[cfg(not(feature = "scheduled_events"))]
495                let mut event_queue = cx.event_queue(worker.first_node_id);
496                #[cfg(feature = "scheduled_events")]
497                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
498
499                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
500            }
501        }
502    }
503
504    /// Stop all workers.
505    ///
506    /// * `time` - The instant that the stop should take effect. If this is
507    /// `None`, then the parameters will take effect as soon as the node receives
508    /// the event.
509    pub fn stop_all<B: AudioBackend>(
510        &mut self,
511        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
512        cx: &mut FirewheelCtx<B>,
513    ) {
514        for worker in self.workers.iter_mut() {
515            if worker.assigned_worker_id.is_some() {
516                let mut new_params = worker.first_node_params.clone();
517                N::stop(&mut new_params);
518
519                #[cfg(not(feature = "scheduled_events"))]
520                let mut event_queue = cx.event_queue(worker.first_node_id);
521                #[cfg(feature = "scheduled_events")]
522                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
523
524                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
525
526                worker.assigned_worker_id = None;
527            }
528        }
529
530        self.worker_ids.clear();
531        self.num_active_workers = 0;
532    }
533
534    /// Get the first node parameters of the given worker.
535    pub fn first_node(&self, worker_id: WorkerID) -> Option<&N::AudioNode> {
536        self.worker_ids
537            .get(worker_id.0)
538            .map(|idx| &self.workers[*idx].first_node_params)
539    }
540
541    /// Get an immutable reference to the state of the first node of the given worker.
542    pub fn first_node_state<'a, T: 'static, B: AudioBackend>(
543        &self,
544        worker_id: WorkerID,
545        cx: &'a FirewheelCtx<B>,
546    ) -> Option<&'a T> {
547        self.worker_ids
548            .get(worker_id.0)
549            .and_then(|idx| cx.node_state::<T>(self.workers[*idx].first_node_id))
550    }
551
552    /// Get a mutable reference to the state of the first node of the given worker.
553    pub fn first_node_state_mut<'a, T: 'static, B: AudioBackend>(
554        &self,
555        worker_id: WorkerID,
556        cx: &'a mut FirewheelCtx<B>,
557    ) -> Option<&'a mut T> {
558        self.worker_ids
559            .get(worker_id.0)
560            .and_then(|idx| cx.node_state_mut::<T>(self.workers[*idx].first_node_id))
561    }
562
563    pub fn fx_chain(&self, worker_id: WorkerID) -> Option<&FxChainState<FX>> {
564        self.worker_ids
565            .get(worker_id.0)
566            .map(|idx| &self.workers[*idx].fx_state)
567    }
568
569    pub fn fx_chain_mut(&mut self, worker_id: WorkerID) -> Option<&mut FxChainState<FX>> {
570        self.worker_ids
571            .get(worker_id.0)
572            .map(|idx| &mut self.workers[*idx].fx_state)
573    }
574
575    /// Returns `true` if the sequence has either not started playing yet or has finished
576    /// playing.
577    pub fn has_stopped<B: AudioBackend>(&self, worker_id: WorkerID, cx: &FirewheelCtx<B>) -> bool {
578        self.worker_ids
579            .get(worker_id.0)
580            .map(|idx| N::node_is_stopped(self.workers[*idx].first_node_id, cx).unwrap())
581            .unwrap_or(true)
582    }
583
584    /// Poll for the current number of active workers, and return a list of
585    /// workers which have finished playing.
586    ///
587    /// Calling this method is optional.
588    pub fn poll<B: AudioBackend>(&mut self, cx: &FirewheelCtx<B>) -> PollResult {
589        self.num_active_workers = 0;
590        let mut finished_workers = SmallVec::new();
591
592        for worker in self.workers.iter_mut() {
593            if worker.assigned_worker_id.is_some() {
594                if N::node_is_stopped(worker.first_node_id, cx).unwrap() {
595                    let id = worker.assigned_worker_id.take().unwrap();
596                    self.worker_ids.remove(id.0);
597                    finished_workers.push(id);
598                } else {
599                    self.num_active_workers += 1;
600                }
601            }
602        }
603
604        PollResult { finished_workers }
605    }
606
607    /// The total number of active workers.
608    pub fn num_active_workers(&self) -> usize {
609        self.num_active_workers
610    }
611}
612
613#[derive(Debug, Clone, PartialEq)]
614pub struct PollResult {
615    /// The worker IDs which have finished playing. These IDs are now
616    /// invalidated.
617    pub finished_workers: SmallVec<[WorkerID; 4]>,
618}
619
620/// The result of calling [`AudioNodePool::new_worker`].
621pub struct NewWorkerResult {
622    /// The new ID of the worker assigned to play this sequence.
623    pub worker_id: WorkerID,
624
625    /// The ID that was previously assigned to this worker.
626    pub old_worker_id: Option<WorkerID>,
627
628    /// If this is `true`, then this worker was already playing a sequence, and that
629    /// sequence has been stopped.
630    pub was_playing_sequence: bool,
631}
632
633#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
634pub enum NewWorkerError {
635    #[error("Could not create new audio node pool worker: the given parameters signify a stopped sequence")]
636    ParameterStateIsStop,
637    #[error("Could not create new audio node pool worker: the worker pool is full")]
638    NoMoreWorkers,
639}
640
641#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
642pub enum PoolError {
643    #[error("A node with ID {0:?} does not exist in this pool")]
644    InvalidNodeID(NodeID),
645}