1#![cfg_attr(not(feature = "std"), no_std)]
2
3use firewheel_core::{
4 channel_config::NonZeroChannelCount,
5 node::{AudioNode, NodeID},
6};
7use firewheel_graph::{backend::AudioBackend, ContextQueue, FirewheelCtx};
8use smallvec::SmallVec;
9use thunderdome::Arena;
10
11#[cfg(feature = "scheduled_events")]
12use firewheel_core::clock::EventInstant;
13
14#[cfg(feature = "sampler")]
15mod sampler;
16#[cfg(feature = "sampler")]
17pub use sampler::SamplerPool;
18
19mod volume_pan;
20pub use volume_pan::VolumePanChain;
21
22#[cfg(feature = "spatial_basic")]
23mod spatial_basic;
24#[cfg(feature = "spatial_basic")]
25pub use spatial_basic::SpatialBasicChain;
26
27#[cfg(feature = "sampler")]
28pub type SamplerPoolVolumePan = AudioNodePool<SamplerPool, VolumePanChain>;
29#[cfg(all(feature = "sampler", feature = "spatial_basic"))]
30pub type SamplerPoolSpatialBasic = AudioNodePool<SamplerPool, SpatialBasicChain>;
31
32pub trait FxChain: Default {
34 fn construct_and_connect<B: AudioBackend>(
44 &mut self,
45 first_node_id: NodeID,
46 first_node_num_out_channels: NonZeroChannelCount,
47 dst_node_id: NodeID,
48 dst_num_channels: NonZeroChannelCount,
49 cx: &mut FirewheelCtx<B>,
50 ) -> Vec<NodeID>;
51}
52
53struct Worker<N: PoolableNode, FX: FxChain> {
54 first_node_params: N::AudioNode,
55 first_node_id: NodeID,
56
57 fx_state: FxChainState<FX>,
58
59 assigned_worker_id: Option<WorkerID>,
60}
61
62#[derive(Debug)]
63pub struct FxChainState<FX: FxChain> {
64 pub fx_chain: FX,
65 pub node_ids: Vec<NodeID>,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
69pub struct WorkerID(thunderdome::Index);
70
71impl WorkerID {
72 pub const DANGLING: Self = Self(thunderdome::Index::DANGLING);
73}
74
75impl Default for WorkerID {
76 fn default() -> Self {
77 Self::DANGLING
78 }
79}
80
81pub trait PoolableNode {
83 type AudioNode: AudioNode + Clone + 'static;
85
86 fn num_output_channels(
88 config: Option<&<Self::AudioNode as AudioNode>::Configuration>,
89 ) -> NonZeroChannelCount;
90
91 fn params_stopped(params: &Self::AudioNode) -> bool;
94 fn node_is_stopped<B: AudioBackend>(
98 node_id: NodeID,
99 cx: &FirewheelCtx<B>,
100 ) -> Result<bool, PoolError>;
101
102 fn worker_score<B: AudioBackend>(
108 params: &Self::AudioNode,
109 node_id: NodeID,
110 cx: &mut FirewheelCtx<B>,
111 ) -> Result<u64, PoolError>;
112
113 fn diff<B: AudioBackend>(
115 baseline: &Self::AudioNode,
116 new: &Self::AudioNode,
117 event_queue: &mut ContextQueue<B>,
118 );
119
120 fn mark_playing<B: AudioBackend>(
127 node_id: NodeID,
128 cx: &mut FirewheelCtx<B>,
129 ) -> Result<(), PoolError>;
130
131 fn pause(params: &mut Self::AudioNode);
133 fn resume(params: &mut Self::AudioNode);
135 fn stop(params: &mut Self::AudioNode);
137}
138
139pub struct AudioNodePool<N: PoolableNode, FX: FxChain> {
141 workers: Vec<Worker<N, FX>>,
142 worker_ids: Arena<usize>,
143 num_active_workers: usize,
144}
145
146impl<N: PoolableNode, FX: FxChain> AudioNodePool<N, FX>
147where
148 <N::AudioNode as AudioNode>::Configuration: Clone,
149{
150 pub fn new<B: AudioBackend>(
163 num_workers: usize,
164 first_node: N::AudioNode,
165 first_node_config: Option<<N::AudioNode as AudioNode>::Configuration>,
166 dst_node_id: NodeID,
167 dst_num_channels: NonZeroChannelCount,
168 cx: &mut FirewheelCtx<B>,
169 ) -> Self {
170 assert_ne!(num_workers, 0);
171
172 let first_node_num_out_channels = N::num_output_channels(first_node_config.as_ref());
173
174 Self {
175 workers: (0..num_workers)
176 .map(|_| {
177 let first_node_id = cx.add_node(first_node.clone(), first_node_config.clone());
178
179 let mut fx_chain = FX::default();
180
181 let fx_ids = fx_chain.construct_and_connect(
182 first_node_id,
183 first_node_num_out_channels,
184 dst_node_id,
185 dst_num_channels,
186 cx,
187 );
188
189 Worker {
190 first_node_params: first_node.clone(),
191 first_node_id,
192
193 fx_state: FxChainState {
194 fx_chain,
195 node_ids: fx_ids,
196 },
197
198 assigned_worker_id: None,
199 }
200 })
201 .collect(),
202 worker_ids: Arena::with_capacity(num_workers),
203 num_active_workers: 0,
204 }
205 }
206
207 pub fn num_workers(&self) -> usize {
208 self.workers.len()
209 }
210
211 pub fn new_worker<B: AudioBackend>(
226 &mut self,
227 params: &N::AudioNode,
228 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
229 steal: bool,
230 cx: &mut FirewheelCtx<B>,
231 fx_chain: impl FnOnce(&mut FxChainState<FX>, &mut FirewheelCtx<B>),
232 ) -> Result<NewWorkerResult, NewWorkerError> {
233 if N::params_stopped(params) {
234 return Err(NewWorkerError::ParameterStateIsStop);
235 }
236
237 if !steal && self.num_active_workers == self.workers.len() {
238 return Err(NewWorkerError::NoMoreWorkers);
239 }
240
241 let mut idx = 0;
242 let mut max_score = 0;
243 for (i, worker) in self.workers.iter().enumerate() {
244 if worker.assigned_worker_id.is_none() {
245 idx = i;
246 break;
247 }
248
249 let score =
250 N::worker_score(&worker.first_node_params, worker.first_node_id, cx).unwrap();
251
252 if score == u64::MAX {
253 idx = i;
254 break;
255 }
256
257 if score > max_score {
258 max_score = score;
259 idx = i;
260 }
261 }
262
263 let worker_id = WorkerID(self.worker_ids.insert(idx));
264
265 let worker = &mut self.workers[idx];
266
267 let old_worker_id = worker.assigned_worker_id.take();
268 let was_playing_sequence = if let Some(old_worker_id) = old_worker_id {
269 self.worker_ids.remove(old_worker_id.0);
270
271 !(N::params_stopped(params) || N::node_is_stopped(worker.first_node_id, cx).unwrap())
272 } else {
273 false
274 };
275
276 worker.assigned_worker_id = Some(worker_id);
277 self.num_active_workers += 1;
278
279 #[cfg(not(feature = "scheduled_events"))]
280 let mut event_queue = cx.event_queue(worker.first_node_id);
281 #[cfg(feature = "scheduled_events")]
282 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
283
284 N::diff(&worker.first_node_params, params, &mut event_queue);
285
286 worker.first_node_params = params.clone();
287
288 N::mark_playing(worker.first_node_id, cx).unwrap();
289
290 (fx_chain)(&mut worker.fx_state, cx);
291
292 Ok(NewWorkerResult {
293 worker_id,
294 old_worker_id,
295 was_playing_sequence,
296 })
297 }
298
299 pub fn sync_worker_params<B: AudioBackend>(
313 &mut self,
314 worker_id: WorkerID,
315 params: &N::AudioNode,
316 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
317 cx: &mut FirewheelCtx<B>,
318 ) -> bool {
319 let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
320 return false;
321 };
322
323 let worker = &mut self.workers[idx];
324
325 #[cfg(not(feature = "scheduled_events"))]
326 let mut event_queue = cx.event_queue(worker.first_node_id);
327 #[cfg(feature = "scheduled_events")]
328 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
329
330 N::diff(&worker.first_node_params, params, &mut event_queue);
331
332 worker.first_node_params = params.clone();
333
334 if N::params_stopped(params) {
335 self.worker_ids.remove(worker_id.0);
336 worker.assigned_worker_id = None;
337 self.num_active_workers -= 1;
338 }
339
340 true
341 }
342
343 pub fn pause<B: AudioBackend>(
353 &mut self,
354 worker_id: WorkerID,
355 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
356 cx: &mut FirewheelCtx<B>,
357 ) -> bool {
358 let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
359 return false;
360 };
361
362 let worker = &mut self.workers[idx];
363
364 let mut new_params = worker.first_node_params.clone();
365 N::pause(&mut new_params);
366
367 #[cfg(not(feature = "scheduled_events"))]
368 let mut event_queue = cx.event_queue(worker.first_node_id);
369 #[cfg(feature = "scheduled_events")]
370 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
371
372 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
373
374 true
375 }
376
377 pub fn resume<B: AudioBackend>(
387 &mut self,
388 worker_id: WorkerID,
389 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
390 cx: &mut FirewheelCtx<B>,
391 ) -> bool {
392 let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
393 return false;
394 };
395
396 let worker = &mut self.workers[idx];
397
398 let mut new_params = worker.first_node_params.clone();
399 N::resume(&mut new_params);
400
401 #[cfg(not(feature = "scheduled_events"))]
402 let mut event_queue = cx.event_queue(worker.first_node_id);
403 #[cfg(feature = "scheduled_events")]
404 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
405
406 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
407
408 true
409 }
410
411 pub fn stop<B: AudioBackend>(
423 &mut self,
424 worker_id: WorkerID,
425 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
426 cx: &mut FirewheelCtx<B>,
427 ) -> bool {
428 let Some(idx) = self.worker_ids.get(worker_id.0).copied() else {
429 return false;
430 };
431
432 let worker = &mut self.workers[idx];
433
434 let mut new_params = worker.first_node_params.clone();
435 N::stop(&mut new_params);
436
437 #[cfg(not(feature = "scheduled_events"))]
438 let mut event_queue = cx.event_queue(worker.first_node_id);
439 #[cfg(feature = "scheduled_events")]
440 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
441
442 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
443
444 self.worker_ids.remove(worker_id.0);
445 worker.assigned_worker_id = None;
446 self.num_active_workers -= 1;
447
448 true
449 }
450
451 pub fn pause_all<B: AudioBackend>(
457 &mut self,
458 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
459 cx: &mut FirewheelCtx<B>,
460 ) {
461 for worker in self.workers.iter_mut() {
462 if worker.assigned_worker_id.is_some() {
463 let mut new_params = worker.first_node_params.clone();
464 N::pause(&mut new_params);
465
466 #[cfg(not(feature = "scheduled_events"))]
467 let mut event_queue = cx.event_queue(worker.first_node_id);
468 #[cfg(feature = "scheduled_events")]
469 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
470
471 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
472 }
473 }
474 }
475
476 pub fn resume_all<B: AudioBackend>(
482 &mut self,
483 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
484 cx: &mut FirewheelCtx<B>,
485 ) {
486 for worker in self.workers.iter_mut() {
487 if worker.assigned_worker_id.is_some() {
488 let mut new_params = worker.first_node_params.clone();
489 N::resume(&mut new_params);
490
491 #[cfg(not(feature = "scheduled_events"))]
492 let mut event_queue = cx.event_queue(worker.first_node_id);
493 #[cfg(feature = "scheduled_events")]
494 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
495
496 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
497 }
498 }
499 }
500
501 pub fn stop_all<B: AudioBackend>(
507 &mut self,
508 #[cfg(feature = "scheduled_events")] time: Option<EventInstant>,
509 cx: &mut FirewheelCtx<B>,
510 ) {
511 for worker in self.workers.iter_mut() {
512 if worker.assigned_worker_id.is_some() {
513 let mut new_params = worker.first_node_params.clone();
514 N::stop(&mut new_params);
515
516 #[cfg(not(feature = "scheduled_events"))]
517 let mut event_queue = cx.event_queue(worker.first_node_id);
518 #[cfg(feature = "scheduled_events")]
519 let mut event_queue = cx.event_queue_scheduled(worker.first_node_id, time);
520
521 N::diff(&worker.first_node_params, &new_params, &mut event_queue);
522
523 worker.assigned_worker_id = None;
524 }
525 }
526
527 self.worker_ids.clear();
528 self.num_active_workers = 0;
529 }
530
531 pub fn first_node(&self, worker_id: WorkerID) -> Option<&N::AudioNode> {
533 self.worker_ids
534 .get(worker_id.0)
535 .map(|idx| &self.workers[*idx].first_node_params)
536 }
537
538 pub fn first_node_state<'a, T: 'static, B: AudioBackend>(
540 &self,
541 worker_id: WorkerID,
542 cx: &'a FirewheelCtx<B>,
543 ) -> Option<&'a T> {
544 self.worker_ids
545 .get(worker_id.0)
546 .and_then(|idx| cx.node_state::<T>(self.workers[*idx].first_node_id))
547 }
548
549 pub fn first_node_state_mut<'a, T: 'static, B: AudioBackend>(
551 &self,
552 worker_id: WorkerID,
553 cx: &'a mut FirewheelCtx<B>,
554 ) -> Option<&'a mut T> {
555 self.worker_ids
556 .get(worker_id.0)
557 .and_then(|idx| cx.node_state_mut::<T>(self.workers[*idx].first_node_id))
558 }
559
560 pub fn fx_chain(&self, worker_id: WorkerID) -> Option<&FxChainState<FX>> {
561 self.worker_ids
562 .get(worker_id.0)
563 .map(|idx| &self.workers[*idx].fx_state)
564 }
565
566 pub fn fx_chain_mut(&mut self, worker_id: WorkerID) -> Option<&mut FxChainState<FX>> {
567 self.worker_ids
568 .get(worker_id.0)
569 .map(|idx| &mut self.workers[*idx].fx_state)
570 }
571
572 pub fn has_stopped<B: AudioBackend>(&self, worker_id: WorkerID, cx: &FirewheelCtx<B>) -> bool {
575 self.worker_ids
576 .get(worker_id.0)
577 .map(|idx| N::node_is_stopped(self.workers[*idx].first_node_id, cx).unwrap())
578 .unwrap_or(true)
579 }
580
581 pub fn poll<B: AudioBackend>(&mut self, cx: &FirewheelCtx<B>) -> PollResult {
586 self.num_active_workers = 0;
587 let mut finished_workers = SmallVec::new();
588
589 for worker in self.workers.iter_mut() {
590 if worker.assigned_worker_id.is_some() {
591 if N::node_is_stopped(worker.first_node_id, cx).unwrap() {
592 let id = worker.assigned_worker_id.take().unwrap();
593 self.worker_ids.remove(id.0);
594 finished_workers.push(id);
595 } else {
596 self.num_active_workers += 1;
597 }
598 }
599 }
600
601 PollResult { finished_workers }
602 }
603
604 pub fn num_active_workers(&self) -> usize {
606 self.num_active_workers
607 }
608}
609
610#[derive(Debug, Clone, PartialEq)]
611pub struct PollResult {
612 pub finished_workers: SmallVec<[WorkerID; 4]>,
615}
616
617#[derive(Debug, Clone, PartialEq, Eq)]
619pub struct NewWorkerResult {
620 pub worker_id: WorkerID,
622
623 pub old_worker_id: Option<WorkerID>,
625
626 pub was_playing_sequence: bool,
629}
630
631#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
632pub enum NewWorkerError {
633 #[error("Could not create new audio node pool worker: the given parameters signify a stopped sequence")]
634 ParameterStateIsStop,
635 #[error("Could not create new audio node pool worker: the worker pool is full")]
636 NoMoreWorkers,
637}
638
639#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
640pub enum PoolError {
641 #[error("A node with ID {0:?} does not exist in this pool")]
642 InvalidNodeID(NodeID),
643}