1#![cfg_attr(not(feature = "std"), no_std)]
2
3#[cfg(not(feature = "std"))]
4use bevy_platform::prelude::Vec;
5
6use firewheel_core::{
7 channel_config::NonZeroChannelCount,
8 node::{AudioNode, NodeID},
9};
10use firewheel_graph::{backend::AudioBackend, ContextQueue, FirewheelCtx};
11use smallvec::SmallVec;
12use thunderdome::Arena;
13
14#[cfg(feature = "scheduled_events")]
15use firewheel_core::clock::EventInstant;
16
17#[cfg(feature = "sampler")]
18mod sampler;
19#[cfg(feature = "sampler")]
20pub use sampler::SamplerPool;
21
22mod volume_pan;
23pub use volume_pan::VolumePanChain;
24
25#[cfg(feature = "spatial_basic")]
26mod spatial_basic;
27#[cfg(feature = "spatial_basic")]
28pub use spatial_basic::SpatialBasicChain;
29
30#[cfg(feature = "sampler")]
31pub type SamplerPoolVolumePan = AudioNodePool<SamplerPool, VolumePanChain>;
32#[cfg(all(feature = "sampler", feature = "spatial_basic"))]
33pub type SamplerPoolSpatialBasic = AudioNodePool<SamplerPool, SpatialBasicChain>;
34
35pub trait FxChain: Default {
37 fn construct_and_connect<B: AudioBackend>(
47 &mut self,
48 first_node_id: NodeID,
49 first_node_num_out_channels: NonZeroChannelCount,
50 dst_node_id: NodeID,
51 dst_num_channels: NonZeroChannelCount,
52 cx: &mut FirewheelCtx<B>,
53 ) -> Vec<NodeID>;
54}
55
56struct Worker<N: PoolableNode, FX: FxChain> {
57 first_node_params: N::AudioNode,
58 first_node_id: NodeID,
59
60 fx_state: FxChainState<FX>,
61
62 assigned_worker_id: Option<WorkerID>,
63}
64
65#[derive(Debug)]
66pub struct FxChainState<FX: FxChain> {
67 pub fx_chain: FX,
68 pub node_ids: Vec<NodeID>,
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
72pub struct WorkerID(thunderdome::Index);
73
74impl WorkerID {
75 pub const DANGLING: Self = Self(thunderdome::Index::DANGLING);
76}
77
78impl Default for WorkerID {
79 fn default() -> Self {
80 Self::DANGLING
81 }
82}
83
84pub trait PoolableNode {
86 type AudioNode: AudioNode + Clone + 'static;
88
89 fn num_output_channels(
91 config: Option<&<Self::AudioNode as AudioNode>::Configuration>,
92 ) -> NonZeroChannelCount;
93
94 fn params_stopped(params: &Self::AudioNode) -> bool;
97 fn node_is_stopped<B: AudioBackend>(
101 node_id: NodeID,
102 cx: &FirewheelCtx<B>,
103 ) -> Result<bool, PoolError>;
104
105 fn worker_score<B: AudioBackend>(
111 params: &Self::AudioNode,
112 node_id: NodeID,
113 cx: &mut FirewheelCtx<B>,
114 ) -> Result<u64, PoolError>;
115
116 fn diff<B: AudioBackend>(
118 baseline: &Self::AudioNode,
119 new: &Self::AudioNode,
120 event_queue: &mut ContextQueue<B>,
121 );
122
123 fn mark_playing<B: AudioBackend>(
130 node_id: NodeID,
131 cx: &mut FirewheelCtx<B>,
132 ) -> Result<(), PoolError>;
133
134 fn pause(params: &mut Self::AudioNode);
136 fn resume(params: &mut Self::AudioNode);
138 fn stop(params: &mut Self::AudioNode);
140}
141
142pub struct AudioNodePool<N: PoolableNode, FX: FxChain> {
144 workers: Vec<Worker<N, FX>>,
145 worker_ids: Arena<usize>,
146 num_active_workers: usize,
147}
148
149impl<N: PoolableNode, FX: FxChain> AudioNodePool<N, FX>
150where
151 <N::AudioNode as AudioNode>::Configuration: Clone,
152{
153 pub fn new<B: AudioBackend>(
166 num_workers: usize,
167 first_node: N::AudioNode,
168 first_node_config: Option<<N::AudioNode as AudioNode>::Configuration>,
169 dst_node_id: NodeID,
170 dst_num_channels: NonZeroChannelCount,
171 cx: &mut FirewheelCtx<B>,
172 ) -> Self {
173 assert_ne!(num_workers, 0);
174
175 let first_node_num_out_channels = N::num_output_channels(first_node_config.as_ref());
176
177 Self {
178 workers: (0..num_workers)
179 .map(|_| {
180 let first_node_id = cx.add_node(first_node.clone(), first_node_config.clone());
181
182 let mut fx_chain = FX::default();
183
184 let fx_ids = fx_chain.construct_and_connect(
185 first_node_id,
186 first_node_num_out_channels,
187 dst_node_id,
188 dst_num_channels,
189 cx,
190 );
191
192 Worker {
193 first_node_params: first_node.clone(),
194 first_node_id,
195
196 fx_state: FxChainState {
197 fx_chain,
198 node_ids: fx_ids,
199 },
200
201 assigned_worker_id: None,
202 }
203 })
204 .collect(),
205 worker_ids: Arena::with_capacity(num_workers),
206 num_active_workers: 0,
207 }
208 }
209
210 pub fn num_workers(&self) -> usize {
211 self.workers.len()
212 }
213
214 pub fn new_worker<B: AudioBackend>(
229 &mut self,
230 params: &N::AudioNode,
231 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
232 steal: bool,
233 cx: &mut FirewheelCtx<B>,
234 fx_chain: impl FnOnce(&mut FxChainState<FX>, &mut FirewheelCtx<B>),
235 ) -> Result<NewWorkerResult, NewWorkerError> {
236 if N::params_stopped(params) {
237 return Err(NewWorkerError::ParameterStateIsStop);
238 }
239
240 if !steal && self.num_active_workers == self.workers.len() {
241 return Err(NewWorkerError::NoMoreWorkers);
242 }
243
244 let mut idx = 0;
245 let mut max_score = 0;
246 for (i, worker) in self.workers.iter().enumerate() {
247 if worker.assigned_worker_id.is_none() {
248 idx = i;
249 break;
250 }
251
252 let score =
253 N::worker_score(&worker.first_node_params, worker.first_node_id, cx).unwrap();
254
255 if score == u64::MAX {
256 idx = i;
257 break;
258 }
259
260 if score > max_score {
261 max_score = score;
262 idx = i;
263 }
264 }
265
266 let worker_id = WorkerID(self.worker_ids.insert(idx));
267
268 let worker = &mut self.workers[idx];
269
270 let old_worker_id = worker.assigned_worker_id.take();
271 let was_playing_sequence = if let Some(old_worker_id) = old_worker_id {
272 self.worker_ids.remove(old_worker_id.0);
273
274 !(N::params_stopped(params) || N::node_is_stopped(worker.first_node_id, cx).unwrap())
275 } else {
276 false
277 };
278
279 worker.assigned_worker_id = Some(worker_id);
280 self.num_active_workers += 1;
281
282 #[cfg(not(feature = "scheduled_events"))]
283 let mut event_queue = cx.event_queue(worker.first_node_id);
284 #[cfg(feature = "scheduled_events")]
285 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
286
287 N::diff(&worker.first_node_params, params, &mut event_queue);
288
289 worker.first_node_params = params.clone();
290
291 N::mark_playing(worker.first_node_id, cx).unwrap();
292
293 (fx_chain)(&mut worker.fx_state, cx);
294
295 Ok(NewWorkerResult {
296 worker_id,
297 old_worker_id,
298 was_playing_sequence,
299 })
300 }
301
302 pub fn sync_worker_params<B: AudioBackend>(
316 &mut self,
317 worker_id: WorkerID,
318 params: &N::AudioNode,
319 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
320 cx: &mut FirewheelCtx<B>,
321 ) -> bool {
322 let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
323 return false;
324 };
325
326 let worker = &mut self.workers[idx];
327
328 #[cfg(not(feature = "scheduled_events"))]
329 let mut event_queue = cx.event_queue(worker.first_node_id);
330 #[cfg(feature = "scheduled_events")]
331 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
332
333 N::diff(&worker.first_node_params, params, &mut event_queue);
334
335 worker.first_node_params = params.clone();
336
337 if N::params_stopped(params) {
338 self.worker_ids.remove(worker_id.0);
339 worker.assigned_worker_id = None;
340 self.num_active_workers -= 1;
341 }
342
343 true
344 }
345
346 pub fn pause<B: AudioBackend>(
356 &mut self,
357 worker_id: WorkerID,
358 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
359 cx: &mut FirewheelCtx<B>,
360 ) -> bool {
361 let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
362 return false;
363 };
364
365 let worker = &mut self.workers[idx];
366
367 let mut new_params = worker.first_node_params.clone();
368 N::pause(&mut new_params);
369
370 #[cfg(not(feature = "scheduled_events"))]
371 let mut event_queue = cx.event_queue(worker.first_node_id);
372 #[cfg(feature = "scheduled_events")]
373 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
374
375 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
376
377 true
378 }
379
380 pub fn resume<B: AudioBackend>(
390 &mut self,
391 worker_id: WorkerID,
392 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
393 cx: &mut FirewheelCtx<B>,
394 ) -> bool {
395 let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
396 return false;
397 };
398
399 let worker = &mut self.workers[idx];
400
401 let mut new_params = worker.first_node_params.clone();
402 N::resume(&mut new_params);
403
404 #[cfg(not(feature = "scheduled_events"))]
405 let mut event_queue = cx.event_queue(worker.first_node_id);
406 #[cfg(feature = "scheduled_events")]
407 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
408
409 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
410
411 true
412 }
413
414 pub fn stop<B: AudioBackend>(
426 &mut self,
427 worker_id: WorkerID,
428 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
429 cx: &mut FirewheelCtx<B>,
430 ) -> bool {
431 let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
432 return false;
433 };
434
435 let worker = &mut self.workers[idx];
436
437 let mut new_params = worker.first_node_params.clone();
438 N::stop(&mut new_params);
439
440 #[cfg(not(feature = "scheduled_events"))]
441 let mut event_queue = cx.event_queue(worker.first_node_id);
442 #[cfg(feature = "scheduled_events")]
443 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
444
445 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
446
447 self.worker_ids.remove(worker_id.0);
448 worker.assigned_worker_id = None;
449 self.num_active_workers -= 1;
450
451 true
452 }
453
454 pub fn pause_all<B: AudioBackend>(
460 &mut self,
461 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
462 cx: &mut FirewheelCtx<B>,
463 ) {
464 for worker in self.workers.iter_mut() {
465 if worker.assigned_worker_id.is_some() {
466 let mut new_params = worker.first_node_params.clone();
467 N::pause(&mut new_params);
468
469 #[cfg(not(feature = "scheduled_events"))]
470 let mut event_queue = cx.event_queue(worker.first_node_id);
471 #[cfg(feature = "scheduled_events")]
472 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
473
474 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
475 }
476 }
477 }
478
479 pub fn resume_all<B: AudioBackend>(
485 &mut self,
486 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
487 cx: &mut FirewheelCtx<B>,
488 ) {
489 for worker in self.workers.iter_mut() {
490 if worker.assigned_worker_id.is_some() {
491 let mut new_params = worker.first_node_params.clone();
492 N::resume(&mut new_params);
493
494 #[cfg(not(feature = "scheduled_events"))]
495 let mut event_queue = cx.event_queue(worker.first_node_id);
496 #[cfg(feature = "scheduled_events")]
497 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
498
499 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
500 }
501 }
502 }
503
504 pub fn stop_all<B: AudioBackend>(
510 &mut self,
511 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
512 cx: &mut FirewheelCtx<B>,
513 ) {
514 for worker in self.workers.iter_mut() {
515 if worker.assigned_worker_id.is_some() {
516 let mut new_params = worker.first_node_params.clone();
517 N::stop(&mut new_params);
518
519 #[cfg(not(feature = "scheduled_events"))]
520 let mut event_queue = cx.event_queue(worker.first_node_id);
521 #[cfg(feature = "scheduled_events")]
522 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
523
524 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
525
526 worker.assigned_worker_id = None;
527 }
528 }
529
530 self.worker_ids.clear();
531 self.num_active_workers = 0;
532 }
533
534 pub fn first_node(&self, worker_id: WorkerID) -> Option<&N::AudioNode> {
536 self.worker_ids
537 .get(worker_id.0)
538 .map(|idx| &self.workers[*idx].first_node_params)
539 }
540
541 pub fn first_node_state<'a, T: 'static, B: AudioBackend>(
543 &self,
544 worker_id: WorkerID,
545 cx: &'a FirewheelCtx<B>,
546 ) -> Option<&'a T> {
547 self.worker_ids
548 .get(worker_id.0)
549 .and_then(|idx| cx.node_state::<T>(self.workers[*idx].first_node_id))
550 }
551
552 pub fn first_node_state_mut<'a, T: 'static, B: AudioBackend>(
554 &self,
555 worker_id: WorkerID,
556 cx: &'a mut FirewheelCtx<B>,
557 ) -> Option<&'a mut T> {
558 self.worker_ids
559 .get(worker_id.0)
560 .and_then(|idx| cx.node_state_mut::<T>(self.workers[*idx].first_node_id))
561 }
562
563 pub fn fx_chain(&self, worker_id: WorkerID) -> Option<&FxChainState<FX>> {
564 self.worker_ids
565 .get(worker_id.0)
566 .map(|idx| &self.workers[*idx].fx_state)
567 }
568
569 pub fn fx_chain_mut(&mut self, worker_id: WorkerID) -> Option<&mut FxChainState<FX>> {
570 self.worker_ids
571 .get(worker_id.0)
572 .map(|idx| &mut self.workers[*idx].fx_state)
573 }
574
575 pub fn has_stopped<B: AudioBackend>(&self, worker_id: WorkerID, cx: &FirewheelCtx<B>) -> bool {
578 self.worker_ids
579 .get(worker_id.0)
580 .map(|idx| N::node_is_stopped(self.workers[*idx].first_node_id, cx).unwrap())
581 .unwrap_or(true)
582 }
583
584 pub fn poll<B: AudioBackend>(&mut self, cx: &FirewheelCtx<B>) -> PollResult {
589 self.num_active_workers = 0;
590 let mut finished_workers = SmallVec::new();
591
592 for worker in self.workers.iter_mut() {
593 if worker.assigned_worker_id.is_some() {
594 if N::node_is_stopped(worker.first_node_id, cx).unwrap() {
595 let id = worker.assigned_worker_id.take().unwrap();
596 self.worker_ids.remove(id.0);
597 finished_workers.push(id);
598 } else {
599 self.num_active_workers += 1;
600 }
601 }
602 }
603
604 PollResult { finished_workers }
605 }
606
607 pub fn num_active_workers(&self) -> usize {
609 self.num_active_workers
610 }
611}
612
613#[derive(Debug, Clone, PartialEq)]
614pub struct PollResult {
615 pub finished_workers: SmallVec<[WorkerID; 4]>,
618}
619
620#[derive(Debug, Clone, PartialEq, Eq)]
622pub struct NewWorkerResult {
623 pub worker_id: WorkerID,
625
626 pub old_worker_id: Option<WorkerID>,
628
629 pub was_playing_sequence: bool,
632}
633
634#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
635pub enum NewWorkerError {
636 #[error("Could not create new audio node pool worker: the given parameters signify a stopped sequence")]
637 ParameterStateIsStop,
638 #[error("Could not create new audio node pool worker: the worker pool is full")]
639 NoMoreWorkers,
640}
641
642#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
643pub enum PoolError {
644 #[error("A node with ID {0:?} does not exist in this pool")]
645 InvalidNodeID(NodeID),
646}