firewheel_pool/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3use firewheel_core::{
4    channel_config::NonZeroChannelCount,
5    node::{AudioNode, NodeID},
6};
7use firewheel_graph::{backend::AudioBackend, ContextQueue, FirewheelCtx};
8use smallvec::SmallVec;
9use thunderdome::Arena;
10
11#[cfg(feature = "scheduled_events")]
12use firewheel_core::clock::EventInstant;
13
14#[cfg(feature = "sampler")]
15mod sampler;
16#[cfg(feature = "sampler")]
17pub use sampler::SamplerPool;
18
19mod volume_pan;
20pub use volume_pan::VolumePanChain;
21
22#[cfg(feature = "spatial_basic")]
23mod spatial_basic;
24#[cfg(feature = "spatial_basic")]
25pub use spatial_basic::SpatialBasicChain;
26
27#[cfg(feature = "sampler")]
28pub type SamplerPoolVolumePan = AudioNodePool<SamplerPool, VolumePanChain>;
29#[cfg(all(feature = "sampler", feature = "spatial_basic"))]
30pub type SamplerPoolSpatialBasic = AudioNodePool<SamplerPool, SpatialBasicChain>;
31
32/// A trait describing an "FX chain" for use in an [`AudioNodePool`].
33pub trait FxChain: Default {
34    /// Construct the nodes in the FX chain and connect them, returning a list of the
35    /// new node ids.
36    ///
37    /// * `first_node_id` - The ID of the first node in this fx chain instance.
38    /// * `first_node_num_out_channels` - The number of output channels in the first node.
39    /// * `dst_node_id` - The ID of the node that the last node in this FX chain should
40    /// connect to.
41    /// * `dst_num_channels` - The number of input channels on `dst_node_id`.
42    /// * `cx` - The firewheel context.
43    fn construct_and_connect<B: AudioBackend>(
44        &mut self,
45        first_node_id: NodeID,
46        first_node_num_out_channels: NonZeroChannelCount,
47        dst_node_id: NodeID,
48        dst_num_channels: NonZeroChannelCount,
49        cx: &mut FirewheelCtx<B>,
50    ) -> Vec<NodeID>;
51}
52
53struct Worker<N: PoolableNode, FX: FxChain> {
54    first_node_params: N::AudioNode,
55    first_node_id: NodeID,
56
57    fx_state: FxChainState<FX>,
58
59    assigned_worker_id: Option<WorkerID>,
60}
61
62#[derive(Debug)]
63pub struct FxChainState<FX: FxChain> {
64    pub fx_chain: FX,
65    pub node_ids: Vec<NodeID>,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct WorkerID(thunderdome::Index);
70
71impl WorkerID {
72    pub const DANGLING: Self = Self(thunderdome::Index::DANGLING);
73}
74
75impl Default for WorkerID {
76    fn default() -> Self {
77        Self::DANGLING
78    }
79}
80
81/// A trait describing the first node in an [`AudioNodePool`].
82pub trait PoolableNode {
83    /// The node parameters
84    type AudioNode: AudioNode + Clone + 'static;
85
86    /// Return the number of output channels for the given configuration.
87    fn num_output_channels(
88        config: Option<&<Self::AudioNode as AudioNode>::Configuration>,
89    ) -> NonZeroChannelCount;
90
91    /// Return `true` if the given parameters signify that the sequence is stopped,
92    /// `false` otherwise.
93    fn params_stopped(params: &Self::AudioNode) -> bool;
94    /// Return `true` if the node state of the given node is stopped.
95    ///
96    /// Return an error if the given `node_id` is invalid.
97    fn node_is_stopped<B: AudioBackend>(
98        node_id: NodeID,
99        cx: &FirewheelCtx<B>,
100    ) -> Result<bool, PoolError>;
101
102    /// Return a score of how ready this node is to accept new work.
103    ///
104    /// The worker with the highest worker score will be chosen for the new work.
105    ///
106    /// Return an error if the given `node_id` is invalid.
107    fn worker_score<B: AudioBackend>(
108        params: &Self::AudioNode,
109        node_id: NodeID,
110        cx: &mut FirewheelCtx<B>,
111    ) -> Result<u64, PoolError>;
112
113    /// Diff the new parameters and push the changes into the event queue.
114    fn diff<B: AudioBackend>(
115        baseline: &Self::AudioNode,
116        new: &Self::AudioNode,
117        event_queue: &mut ContextQueue<B>,
118    );
119
120    /// Notify the node state that a sequence is playing.
121    ///
122    /// This is used to account for the delay between sending an event to the node
123    /// and the node receiving the event.
124    ///
125    /// Return an error if the given `node_id` is invalid.
126    fn mark_playing<B: AudioBackend>(
127        node_id: NodeID,
128        cx: &mut FirewheelCtx<B>,
129    ) -> Result<(), PoolError>;
130
131    /// Pause the sequence in the node parameters
132    fn pause(params: &mut Self::AudioNode);
133    /// Resume the sequence in the node parameters
134    fn resume(params: &mut Self::AudioNode);
135    /// Stop the sequence in the node parameters
136    fn stop(params: &mut Self::AudioNode);
137}
138
139/// A pool of audio node chains that can dynamically be assigned work.
140pub struct AudioNodePool<N: PoolableNode, FX: FxChain> {
141    workers: Vec<Worker<N, FX>>,
142    worker_ids: Arena<usize>,
143    num_active_workers: usize,
144}
145
146impl<N: PoolableNode, FX: FxChain> AudioNodePool<N, FX>
147where
148    <N::AudioNode as AudioNode>::Configuration: Clone,
149{
150    /// Construct a new sampler pool.
151    ///
152    /// * `num_workers` - The total number of workers that can work in parallel. More workers
153    /// will allow more samples to be played concurrently, but will also increase processing
154    /// overhead. A value of `16` is a good place to start.
155    /// * `first_node` - The state of the first node in each FX chain instance.
156    /// * `first_node_config` - The configuration of the first node in each FX chain instance.
157    /// * `first_node_num_out_channels` - The number of output channels in the first node.
158    /// * `dst_node_id` - The ID of the node that the last effect in each fx chain instance
159    /// will connect to.
160    /// * `dst_num_channels` - The number of input channels in `dst_node_id`.
161    /// * `cx` - The firewheel context.
162    pub fn new<B: AudioBackend>(
163        num_workers: usize,
164        first_node: N::AudioNode,
165        first_node_config: Option<<N::AudioNode as AudioNode>::Configuration>,
166        dst_node_id: NodeID,
167        dst_num_channels: NonZeroChannelCount,
168        cx: &mut FirewheelCtx<B>,
169    ) -> Self {
170        assert_ne!(num_workers, 0);
171
172        let first_node_num_out_channels = N::num_output_channels(first_node_config.as_ref());
173
174        Self {
175            workers: (0..num_workers)
176                .map(|_| {
177                    let first_node_id = cx.add_node(first_node.clone(), first_node_config.clone());
178
179                    let mut fx_chain = FX::default();
180
181                    let fx_ids = fx_chain.construct_and_connect(
182                        first_node_id,
183                        first_node_num_out_channels,
184                        dst_node_id,
185                        dst_num_channels,
186                        cx,
187                    );
188
189                    Worker {
190                        first_node_params: first_node.clone(),
191                        first_node_id,
192
193                        fx_state: FxChainState {
194                            fx_chain,
195                            node_ids: fx_ids,
196                        },
197
198                        assigned_worker_id: None,
199                    }
200                })
201                .collect(),
202            worker_ids: Arena::with_capacity(num_workers),
203            num_active_workers: 0,
204        }
205    }
206
207    pub fn num_workers(&self) -> usize {
208        self.workers.len()
209    }
210
211    /// Queue a new work to play a sequence.
212    ///
213    /// * `params` - The parameters of the sequence to play.
214    /// * `time` - The instant these new parameters should take effect. If this
215    /// is `None`, then the parameters will take effect as soon as the node receives
216    /// the event.
217    /// * `steal` - If this is `true`, then if there are no more workers left in
218    /// in the pool, the oldest one will be stopped and replaced with this new
219    /// one. If this is `false`, then an error will be returned if no more workers
220    /// are left.
221    /// * `cx` - The Firewheel context.
222    /// * `fx_chain` - A closure to add additional nodes to this worker instance.
223    ///
224    /// This will return an error if `params.playback == PlaybackState::Stop`.
225    pub fn new_worker<B: AudioBackend>(
226        &mut self,
227        params: &N::AudioNode,
228        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
229        steal: bool,
230        cx: &mut FirewheelCtx<B>,
231        fx_chain: impl FnOnce(&mut FxChainState<FX>, &mut FirewheelCtx<B>),
232    ) -> Result<NewWorkerResult, NewWorkerError> {
233        if N::params_stopped(params) {
234            return Err(NewWorkerError::ParameterStateIsStop);
235        }
236
237        if !steal && self.num_active_workers == self.workers.len() {
238            return Err(NewWorkerError::NoMoreWorkers);
239        }
240
241        let mut idx = 0;
242        let mut max_score = 0;
243        for (i, worker) in self.workers.iter().enumerate() {
244            if worker.assigned_worker_id.is_none() {
245                idx = i;
246                break;
247            }
248
249            let score =
250                N::worker_score(&worker.first_node_params, worker.first_node_id, cx).unwrap();
251
252            if score == u64::MAX {
253                idx = i;
254                break;
255            }
256
257            if score > max_score {
258                max_score = score;
259                idx = i;
260            }
261        }
262
263        let worker_id = WorkerID(self.worker_ids.insert(idx));
264
265        let worker = &mut self.workers[idx];
266
267        let old_worker_id = worker.assigned_worker_id.take();
268        let was_playing_sequence = if let Some(old_worker_id) = old_worker_id {
269            self.worker_ids.remove(old_worker_id.0);
270
271            !(N::params_stopped(params) || N::node_is_stopped(worker.first_node_id, cx).unwrap())
272        } else {
273            false
274        };
275
276        worker.assigned_worker_id = Some(worker_id);
277        self.num_active_workers += 1;
278
279        #[cfg(not(feature = "scheduled_events"))]
280        let mut event_queue = cx.event_queue(worker.first_node_id);
281        #[cfg(feature = "scheduled_events")]
282        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
283
284        N::diff(&worker.first_node_params, params, &mut event_queue);
285
286        worker.first_node_params = params.clone();
287
288        N::mark_playing(worker.first_node_id, cx).unwrap();
289
290        (fx_chain)(&mut worker.fx_state, cx);
291
292        Ok(NewWorkerResult {
293            worker_id,
294            old_worker_id,
295            was_playing_sequence,
296        })
297    }
298
299    /// Sync the parameters for the given worker.
300    ///
301    /// * `worker_id` - The ID of the worker
302    /// * `params` - The new parameter state to sync
303    /// * `time` - The instant these new parameters should take effect. If this
304    /// is `None`, then the parameters will take effect as soon as the node receives
305    /// the event.
306    /// * `cx` - The Firewheel context
307    ///
308    /// If the parameters signify that the seuquence is stopped, then this worker
309    /// will be removed and the `worker_id` will be invalidated.
310    ///
311    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
312    pub fn sync_worker_params<B: AudioBackend>(
313        &mut self,
314        worker_id: WorkerID,
315        params: &N::AudioNode,
316        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
317        cx: &mut FirewheelCtx<B>,
318    ) -> bool {
319        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
320            return false;
321        };
322
323        let worker = &mut self.workers[idx];
324
325        #[cfg(not(feature = "scheduled_events"))]
326        let mut event_queue = cx.event_queue(worker.first_node_id);
327        #[cfg(feature = "scheduled_events")]
328        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
329
330        N::diff(&worker.first_node_params, params, &mut event_queue);
331
332        worker.first_node_params = params.clone();
333
334        if N::params_stopped(params) {
335            self.worker_ids.remove(worker_id.0);
336            worker.assigned_worker_id = None;
337            self.num_active_workers -= 1;
338        }
339
340        true
341    }
342
343    /// Pause the given worker.
344    ///
345    /// * `worker_id` - The ID of the worker
346    /// * `time` - The instant that the pause should take effect. If this is
347    /// `None`, then the parameters will take effect as soon as the node receives
348    /// the event.
349    /// * `cx` - The Firewheel context
350    ///
351    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
352    pub fn pause<B: AudioBackend>(
353        &mut self,
354        worker_id: WorkerID,
355        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
356        cx: &mut FirewheelCtx<B>,
357    ) -> bool {
358        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
359            return false;
360        };
361
362        let worker = &mut self.workers[idx];
363
364        let mut new_params = worker.first_node_params.clone();
365        N::pause(&mut new_params);
366
367        #[cfg(not(feature = "scheduled_events"))]
368        let mut event_queue = cx.event_queue(worker.first_node_id);
369        #[cfg(feature = "scheduled_events")]
370        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
371
372        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
373
374        true
375    }
376
377    /// Resume the given worker.
378    ///
379    /// * `worker_id` - The ID of the worker
380    /// * `time` - The instant that the resume should take effect. If this is
381    /// `None`, then the parameters will take effect as soon as the node receives
382    /// the event.
383    /// * `cx` - The Firewheel context
384    ///
385    /// Returns `true` if a worker with the given ID exists, `false` otherwise.
386    pub fn resume<B: AudioBackend>(
387        &mut self,
388        worker_id: WorkerID,
389        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
390        cx: &mut FirewheelCtx<B>,
391    ) -> bool {
392        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
393            return false;
394        };
395
396        let worker = &mut self.workers[idx];
397
398        let mut new_params = worker.first_node_params.clone();
399        N::resume(&mut new_params);
400
401        #[cfg(not(feature = "scheduled_events"))]
402        let mut event_queue = cx.event_queue(worker.first_node_id);
403        #[cfg(feature = "scheduled_events")]
404        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
405
406        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
407
408        true
409    }
410
411    /// Stop the given worker.
412    ///
413    /// * `worker_id` - The ID of the worker
414    /// * `time` - The instant that the stop should take effect. If this is
415    /// `None`, then the parameters will take effect as soon as the node receives
416    /// the event.
417    /// * `cx` - The Firewheel context
418    ///
419    /// This will remove the worker and invalidate the given `worker_id`.
420    ///
421    /// Returns `true` if a worker with the given ID exists and was stopped.
422    pub fn stop<B: AudioBackend>(
423        &mut self,
424        worker_id: WorkerID,
425        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
426        cx: &mut FirewheelCtx<B>,
427    ) -> bool {
428        let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
429            return false;
430        };
431
432        let worker = &mut self.workers[idx];
433
434        let mut new_params = worker.first_node_params.clone();
435        N::stop(&mut new_params);
436
437        #[cfg(not(feature = "scheduled_events"))]
438        let mut event_queue = cx.event_queue(worker.first_node_id);
439        #[cfg(feature = "scheduled_events")]
440        let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
441
442        N::diff(&worker.first_node_params, &new_params, &mut event_queue);
443
444        self.worker_ids.remove(worker_id.0);
445        worker.assigned_worker_id = None;
446        self.num_active_workers -= 1;
447
448        true
449    }
450
451    /// Pause all workers.
452    ///
453    /// * `time` - The instant that the stop should take effect. If this is
454    /// `None`, then the parameters will take effect as soon as the node receives
455    /// the event.
456    pub fn pause_all<B: AudioBackend>(
457        &mut self,
458        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
459        cx: &mut FirewheelCtx<B>,
460    ) {
461        for worker in self.workers.iter_mut() {
462            if worker.assigned_worker_id.is_some() {
463                let mut new_params = worker.first_node_params.clone();
464                N::pause(&mut new_params);
465
466                #[cfg(not(feature = "scheduled_events"))]
467                let mut event_queue = cx.event_queue(worker.first_node_id);
468                #[cfg(feature = "scheduled_events")]
469                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
470
471                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
472            }
473        }
474    }
475
476    /// Resume all workers.
477    ///
478    /// * `time` - The instant that the stop should take effect. If this is
479    /// `None`, then the parameters will take effect as soon as the node receives
480    /// the event.
481    pub fn resume_all<B: AudioBackend>(
482        &mut self,
483        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
484        cx: &mut FirewheelCtx<B>,
485    ) {
486        for worker in self.workers.iter_mut() {
487            if worker.assigned_worker_id.is_some() {
488                let mut new_params = worker.first_node_params.clone();
489                N::resume(&mut new_params);
490
491                #[cfg(not(feature = "scheduled_events"))]
492                let mut event_queue = cx.event_queue(worker.first_node_id);
493                #[cfg(feature = "scheduled_events")]
494                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
495
496                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
497            }
498        }
499    }
500
501    /// Stop all workers.
502    ///
503    /// * `time` - The instant that the stop should take effect. If this is
504    /// `None`, then the parameters will take effect as soon as the node receives
505    /// the event.
506    pub fn stop_all<B: AudioBackend>(
507        &mut self,
508        #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
509        cx: &mut FirewheelCtx<B>,
510    ) {
511        for worker in self.workers.iter_mut() {
512            if worker.assigned_worker_id.is_some() {
513                let mut new_params = worker.first_node_params.clone();
514                N::stop(&mut new_params);
515
516                #[cfg(not(feature = "scheduled_events"))]
517                let mut event_queue = cx.event_queue(worker.first_node_id);
518                #[cfg(feature = "scheduled_events")]
519                let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
520
521                N::diff(&worker.first_node_params, &new_params, &mut event_queue);
522
523                worker.assigned_worker_id = None;
524            }
525        }
526
527        self.worker_ids.clear();
528        self.num_active_workers = 0;
529    }
530
531    /// Get the first node parameters of the given worker.
532    pub fn first_node(&self, worker_id: WorkerID) -> Option<&N::AudioNode> {
533        self.worker_ids
534            .get(worker_id.0)
535            .map(|idx| &self.workers[*idx].first_node_params)
536    }
537
538    /// Get an immutable reference to the state of the first node of the given worker.
539    pub fn first_node_state<'a, T: 'static, B: AudioBackend>(
540        &self,
541        worker_id: WorkerID,
542        cx: &'a FirewheelCtx<B>,
543    ) -> Option<&'a T> {
544        self.worker_ids
545            .get(worker_id.0)
546            .and_then(|idx| cx.node_state::<T>(self.workers[*idx].first_node_id))
547    }
548
549    /// Get a mutable reference to the state of the first node of the given worker.
550    pub fn first_node_state_mut<'a, T: 'static, B: AudioBackend>(
551        &self,
552        worker_id: WorkerID,
553        cx: &'a mut FirewheelCtx<B>,
554    ) -> Option<&'a mut T> {
555        self.worker_ids
556            .get(worker_id.0)
557            .and_then(|idx| cx.node_state_mut::<T>(self.workers[*idx].first_node_id))
558    }
559
560    pub fn fx_chain(&self, worker_id: WorkerID) -> Option<&FxChainState<FX>> {
561        self.worker_ids
562            .get(worker_id.0)
563            .map(|idx| &self.workers[*idx].fx_state)
564    }
565
566    pub fn fx_chain_mut(&mut self, worker_id: WorkerID) -> Option<&mut FxChainState<FX>> {
567        self.worker_ids
568            .get(worker_id.0)
569            .map(|idx| &mut self.workers[*idx].fx_state)
570    }
571
572    /// Returns `true` if the sequence has either not started playing yet or has finished
573    /// playing.
574    pub fn has_stopped<B: AudioBackend>(&self, worker_id: WorkerID, cx: &FirewheelCtx<B>) -> bool {
575        self.worker_ids
576            .get(worker_id.0)
577            .map(|idx| N::node_is_stopped(self.workers[*idx].first_node_id, cx).unwrap())
578            .unwrap_or(true)
579    }
580
581    /// Poll for the current number of active workers, and return a list of
582    /// workers which have finished playing.
583    ///
584    /// Calling this method is optional.
585    pub fn poll<B: AudioBackend>(&mut self, cx: &FirewheelCtx<B>) -> PollResult {
586        self.num_active_workers = 0;
587        let mut finished_workers = SmallVec::new();
588
589        for worker in self.workers.iter_mut() {
590            if worker.assigned_worker_id.is_some() {
591                if N::node_is_stopped(worker.first_node_id, cx).unwrap() {
592                    let id = worker.assigned_worker_id.take().unwrap();
593                    self.worker_ids.remove(id.0);
594                    finished_workers.push(id);
595                } else {
596                    self.num_active_workers += 1;
597                }
598            }
599        }
600
601        PollResult { finished_workers }
602    }
603
604    /// The total number of active workers.
605    pub fn num_active_workers(&self) -> usize {
606        self.num_active_workers
607    }
608}
609
610#[derive(Debug, Clone, PartialEq)]
611pub struct PollResult {
612    /// The worker IDs which have finished playing. These IDs are now
613    /// invalidated.
614    pub finished_workers: SmallVec<[WorkerID; 4]>,
615}
616
617/// The result of calling [`AudioNodePool::new_worker`].
618#[derive(Debug, Clone, PartialEq, Eq)]
619pub struct NewWorkerResult {
620    /// The new ID of the worker assigned to play this sequence.
621    pub worker_id: WorkerID,
622
623    /// The ID that was previously assigned to this worker.
624    pub old_worker_id: Option<WorkerID>,
625
626    /// If this is `true`, then this worker was already playing a sequence, and that
627    /// sequence has been stopped.
628    pub was_playing_sequence: bool,
629}
630
631#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
632pub enum NewWorkerError {
633    #[error("Could not create new audio node pool worker: the given parameters signify a stopped sequence")]
634    ParameterStateIsStop,
635    #[error("Could not create new audio node pool worker: the worker pool is full")]
636    NoMoreWorkers,
637}
638
639#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
640pub enum PoolError {
641    #[error("A node with ID {0:?} does not exist in this pool")]
642    InvalidNodeID(NodeID),
643}