use crate::{
SeedlingSystems,
context::{PreStreamRestartEvent, SampleRate, StreamRestartEvent},
edge::{PendingConnections, PendingEdge},
error::SeedlingError,
node::{AudioState, DiffTimestamp, EffectId, FirewheelNode, RegisterNode},
pool::label::PoolLabelContainer,
prelude::{AudioEvents, PoolLabel},
sample::{OnComplete, PlaybackSettings, QueuedSample, SamplePlayer},
time::{Audio, AudioTime},
};
use bevy_app::prelude::*;
use bevy_asset::prelude::*;
use bevy_ecs::{
component::ComponentId, entity::EntityCloner, lifecycle::HookContext, prelude::*,
system::QueryLens, world::DeferredWorld,
};
use core::ops::{Deref, RangeInclusive};
use firewheel::{
clock::{DurationSamples, DurationSeconds},
nodes::{
sampler::{PlayFrom, SamplerConfig, SamplerNode, SamplerState},
volume::VolumeNode,
},
};
use queue::SkipTimer;
use sample_effects::{EffectOf, SampleEffects};
pub mod dynamic;
pub mod label;
mod queue;
pub mod sample_effects;
pub(crate) struct SamplePoolPlugin;
impl Plugin for SamplePoolPlugin {
fn build(&self, app: &mut App) {
app.register_node::<SamplerNode>()
.register_node_state::<SamplerNode, SamplerState>()
.add_systems(
Last,
(
(
queue::assign_default,
dynamic::update_dynamic_pools,
populate_pool,
queue::grow_pools,
)
.chain()
.before(SeedlingSystems::Acquire),
poll_finished
.before(SeedlingSystems::Pool)
.after(SeedlingSystems::Connect),
watch_sample_players
.before(SeedlingSystems::Queue)
.after(SeedlingSystems::Pool),
(queue::assign_work, queue::update_followers)
.chain()
.in_set(SeedlingSystems::Pool),
(queue::tick_skipped, queue::mark_skipped)
.chain()
.after(SeedlingSystems::Pool),
),
)
.add_observer(remove_finished)
.add_observer(generate_snapshots)
.add_observer(apply_snapshots)
.add_observer(Sampler::observe_replace)
.add_plugins(dynamic::DynamicPlugin);
}
}
#[derive(Debug, Component)]
#[component(immutable, on_insert = Self::on_insert_hook)]
#[require(PoolMarker, SamplerConfig)]
#[cfg_attr(feature = "reflect", derive(bevy_reflect::Reflect))]
pub struct SamplerPool<T: PoolLabel + Component + Clone>(pub T);
impl<T: PoolLabel + Component + Clone> SamplerPool<T> {
fn on_insert_hook(mut world: DeferredWorld, context: HookContext) {
world.commands().queue(move |world: &mut World| {
let id = match world.component_id::<T>() {
Some(id) => id,
None => world.register_component::<T>(),
};
let Some(value) = world.get::<SamplerPool<T>>(context.entity) else {
return;
};
let container = PoolLabelContainer::new(&value.0, id);
world.entity_mut(context.entity).insert(container);
});
}
}
#[derive(Component, Default)]
struct PoolMarker;
#[derive(Debug, Component)]
#[relationship(relationship_target = PoolSamplers)]
struct PoolSamplerOf(pub Entity);
#[derive(Debug, Component)]
#[relationship_target(relationship = PoolSamplerOf, linked_spawn)]
struct PoolSamplers(Vec<Entity>);
#[derive(Debug, Component)]
#[relationship(relationship_target = Sampler)]
#[component(on_remove = Self::on_remove_hook)]
#[cfg_attr(feature = "reflect", derive(bevy_reflect::Reflect))]
pub struct SamplerOf(pub Entity);
impl SamplerOf {
fn on_remove_hook(mut world: DeferredWorld, context: HookContext) {
if let Some(mut sampler) = world.get_mut::<SamplerNode>(context.entity) {
sampler.stop();
}
}
}
#[derive(Component)]
#[relationship_target(relationship = SamplerOf)]
#[component(on_insert = Self::on_insert_hook)]
#[cfg_attr(feature = "reflect", derive(bevy_reflect::Reflect))]
#[cfg_attr(feature = "reflect", reflect(from_reflect = false))]
pub struct Sampler {
#[relationship]
sampler: Entity,
sample_rate: Option<SampleRate>,
#[cfg_attr(feature = "reflect", reflect(ignore))]
state: Option<SamplerState>,
}
impl Sampler {
pub fn sampler(&self) -> Entity {
self.sampler
}
pub fn is_playing(&self) -> bool {
self.state
.as_ref()
.map(|s| !s.stopped())
.unwrap_or_default()
}
pub fn playhead_frames(&self) -> DurationSamples {
self.try_playhead_frames().unwrap()
}
pub fn try_playhead_frames(&self) -> Option<DurationSamples> {
self.state.as_ref().map(|s| s.playhead_frames())
}
pub fn playhead_seconds(&self) -> DurationSeconds {
self.try_playhead_seconds().unwrap()
}
pub fn try_playhead_seconds(&self) -> Option<DurationSeconds> {
let state = self.state.as_ref()?;
let sample_rate = self.sample_rate.as_ref()?;
Some(state.playhead_seconds(sample_rate.get()))
}
}
impl core::fmt::Debug for Sampler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SamplerAssignment")
.field("sampler", &self.sampler)
.finish_non_exhaustive()
}
}
impl Sampler {
fn on_insert_hook(mut world: DeferredWorld, context: HookContext) {
let sampler = world.get::<Sampler>(context.entity).unwrap().sampler;
let sample_rate = world.resource::<SampleRate>().clone();
if let Some(state) = world
.get::<AudioState<SamplerState>>(sampler)
.map(|s| s.0.clone())
{
let mut sampler = world.get_mut::<Sampler>(context.entity).unwrap();
sampler.state = Some(state);
sampler.sample_rate = Some(sample_rate);
}
}
fn observe_replace(
trigger: On<Replace, Self>,
target: Query<&SampleEffects>,
mut commands: Commands,
) {
let Ok(effects) = target.get(trigger.entity) else {
return;
};
for effect in effects.iter() {
if let Ok(mut entity) = commands.get_entity(effect) {
entity.try_remove::<crate::node::follower::Followers>();
}
}
}
}
#[derive(Component)]
struct SamplerSnapshot {
pub playhead: f64,
}
fn generate_snapshots(
_: On<PreStreamRestartEvent>,
sample_players: Query<(Entity, Option<&Sampler>), With<SamplePlayer>>,
mut commands: Commands,
) {
for (entity, sampler) in &sample_players {
let playhead = sampler
.and_then(|s| s.try_playhead_seconds())
.unwrap_or_default();
commands.entity(entity).insert(SamplerSnapshot {
playhead: playhead.0,
});
}
}
fn apply_snapshots(
trigger: On<StreamRestartEvent>,
mut sample_players: Query<(
Entity,
&SamplerSnapshot,
&SamplePlayer,
&mut PlaybackSettings,
Has<QueuedSample>,
Has<Sampler>,
)>,
server: Res<AssetServer>,
mut assets: ResMut<Assets<crate::sample::AudioSample>>,
mut commands: Commands,
) {
let rates_changed = trigger.previous_rate != trigger.current_rate;
for (entity, snapshot, player, mut settings, has_queued, has_sampler) in &mut sample_players {
let active = has_queued || has_sampler;
let mut commands = commands.entity(entity);
if rates_changed && active {
let new_player = player.clone();
if let Some(sample) = new_player.sample.path() {
assets.remove(&new_player.sample);
server.reload(sample);
}
settings.play();
settings.play_from = PlayFrom::Seconds(snapshot.playhead);
commands.insert(new_player).remove::<Sampler>();
}
commands.remove::<SamplerSnapshot>();
}
}
#[derive(Component)]
struct PoolShape(Vec<ComponentId>);
fn fetch_effect_ids(
effects: &[Entity],
lens: &mut QueryLens<&EffectId>,
) -> core::result::Result<Vec<ComponentId>, SeedlingError> {
let query = lens.query();
let mut effect_ids = Vec::new();
effect_ids.reserve_exact(effects.len());
for entity in effects {
let id = query
.get(*entity)
.map_err(|_| SeedlingError::MissingEffect {
empty_entity: *entity,
})?;
effect_ids.push(id.0);
}
Ok(effect_ids)
}
fn watch_sample_players(
mut q: Query<(Entity, &mut SamplerNode, &mut AudioEvents, &SamplerOf)>,
mut samples: Query<
(
&mut PlaybackSettings,
&mut AudioEvents,
Option<&DiffTimestamp>,
),
Without<SamplerOf>,
>,
time: Res<bevy_time::Time<Audio>>,
mut commands: Commands,
) -> Result {
let render_range = time.render_range();
for (sampler_entity, mut sampler_node, mut events, sample) in q.iter_mut() {
let Ok((mut settings, mut source_events, timestamp)) = samples.get_mut(sample.0) else {
continue;
};
sampler_node.play = settings.play;
sampler_node.play_from = settings.play_from;
sampler_node.speed = settings.speed;
if source_events.active_within(render_range.start, render_range.end) {
source_events.value_at(render_range.start, render_range.end, settings.as_mut())?;
}
events.merge_timelines_and_clear(&mut source_events, time.now());
if let Some(timestamp) = timestamp {
commands.entity(sampler_entity).insert(timestamp.clone());
commands.entity(sample.0).remove::<DiffTimestamp>();
}
}
Ok(())
}
fn spawn_chain(
bus: Entity,
config: Option<SamplerConfig>,
effects: &[Entity],
commands: &mut Commands,
) -> Entity {
let sampler = commands
.spawn((
SamplerNode::default(),
config.unwrap_or_default(),
PoolSamplerOf(bus),
))
.id();
let effects = effects.to_vec();
commands.queue(move |world: &mut World| -> Result {
let mut cloner = EntityCloner::build_opt_out(world);
cloner.deny::<EffectOf>();
let mut cloner = cloner.finish();
let mut chain = Vec::new();
chain.reserve_exact(effects.len() + 1);
for effect in effects {
chain.push(cloner.spawn_clone(world, effect));
}
chain.push(bus);
world
.get_entity_mut(sampler)?
.add_children(&chain)
.entry::<PendingConnections>()
.or_default()
.into_mut()
.push(PendingEdge::new(chain[0], None));
for pair in chain.windows(2) {
world
.get_entity_mut(pair[0])?
.entry::<PendingConnections>()
.or_default()
.into_mut()
.push(PendingEdge::new(pair[1], None));
}
Ok(())
});
sampler
}
#[derive(Debug, Clone, Component)]
#[cfg_attr(feature = "reflect", derive(bevy_reflect::Reflect))]
pub struct PoolSize(pub RangeInclusive<usize>);
#[derive(Debug, Clone, Resource)]
#[cfg_attr(feature = "reflect", derive(bevy_reflect::Reflect))]
pub struct DefaultPoolSize(pub RangeInclusive<usize>);
impl Default for DefaultPoolSize {
fn default() -> Self {
Self(4..=32)
}
}
fn populate_pool(
q: Query<
(
Entity,
&SamplerConfig,
Option<&PoolSize>,
Option<&SampleEffects>,
Option<&EffectId>,
),
(
With<PoolLabelContainer>,
With<PoolMarker>,
Without<PoolSamplers>,
),
>,
mut effects: Query<&EffectId>,
default_pool_size: Res<DefaultPoolSize>,
mut commands: Commands,
) -> Result {
for (pool, config, size, pool_effects, effect_id) in &q {
if effect_id.is_none() {
commands.entity(pool).insert(VolumeNode::default());
}
let component_ids = fetch_effect_ids(
pool_effects.map(|e| e.deref()).unwrap_or(&[]),
&mut effects.as_query_lens(),
)?;
let size = size
.map(|p| p.0.clone())
.unwrap_or(default_pool_size.0.clone());
commands
.entity(pool)
.insert((PoolShape(component_ids), PoolSize(size.clone())));
let size = (*size.start()).max(1);
for _ in 0..size {
spawn_chain(
pool,
Some(*config),
pool_effects.map(|e| e.deref()).unwrap_or(&[]),
&mut commands,
);
}
}
Ok(())
}
#[derive(Debug, EntityEvent)]
#[cfg_attr(feature = "reflect", derive(bevy_reflect::Reflect))]
pub struct PlaybackCompletion {
pub entity: Entity,
pub reason: CompletionReason,
}
#[derive(Debug)]
#[cfg_attr(feature = "reflect", derive(bevy_reflect::Reflect))]
pub enum CompletionReason {
PlaybackComplete,
PlaybackInterrupted,
QueueLifetimeElapsed,
}
fn remove_finished(
trigger: On<PlaybackCompletion>,
samples: Query<&PlaybackSettings>,
mut commands: Commands,
) -> Result {
let sample_entity = trigger.event_target();
let (Ok(mut entity), Ok(settings)) = (
commands.get_entity(sample_entity),
samples.get(sample_entity),
) else {
return Ok(());
};
match settings.on_complete {
OnComplete::Preserve => {
entity.remove::<(Sampler, QueuedSample, SkipTimer)>();
}
OnComplete::Remove => {
entity
.despawn_related::<SampleEffects>()
.remove_with_requires::<(
SamplePlayer,
PoolLabelContainer,
Sampler,
QueuedSample,
SkipTimer,
AudioEvents,
)>();
}
OnComplete::Despawn => {
entity.despawn();
}
}
Ok(())
}
fn poll_finished(
nodes: Query<(&SamplerNode, &SamplerOf, &AudioState<SamplerState>)>,
mut commands: Commands,
) {
for (node, active, state) in nodes.iter() {
let finished = *node.play && state.0.finished() == node.play.id();
if finished {
commands.trigger(PlaybackCompletion {
entity: active.0,
reason: CompletionReason::PlaybackComplete,
});
}
}
}
#[derive(Debug)]
pub struct PoolDespawn<T>(T);
impl<T: PoolLabel + Component + Clone> PoolDespawn<T> {
pub fn new(label: T) -> Self {
Self(label)
}
}
impl<T: PoolLabel + Component + Clone> Command for PoolDespawn<T> {
fn apply(self, world: &mut World) {
let mut roots = world.query_filtered::<(Entity, &PoolLabelContainer), (
With<SamplerPool<T>>,
With<PoolSamplers>,
With<FirewheelNode>,
)>();
let roots: Vec<_> = roots
.iter(world)
.map(|(root, label)| (root, label.clone()))
.collect();
let mut commands = world.commands();
let interned = self.0.intern();
for (root, label) in roots {
if label.label == interned {
commands.entity(root).despawn();
}
}
}
}
pub trait PoolCommands {
fn despawn_pool<T: PoolLabel + Component + Clone>(&mut self, label: T);
}
impl PoolCommands for Commands<'_, '_> {
fn despawn_pool<T: PoolLabel + Component + Clone>(&mut self, label: T) {
self.queue(PoolDespawn::new(label));
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
prelude::*,
sample_effects,
test::{prepare_app, run},
};
use bevy_seedling_macros::PoolLabel;
#[derive(PoolLabel, Clone, Debug, PartialEq, Eq, Hash)]
struct TestPool;
#[test]
fn test_spawn() {
let mut app = prepare_app(|mut commands: Commands| {
commands.spawn((
SamplerPool(TestPool),
sample_effects![LowPassNode::default()],
));
});
run(
&mut app,
|q: Query<&PoolSamplers, With<SamplerPool<TestPool>>>| {
assert_eq!(q.iter().len(), 1);
},
);
}
#[test]
fn test_despawn() {
let mut app = prepare_app(|mut commands: Commands| {
commands.spawn((
SamplerPool(TestPool),
PoolSize(4..=32),
sample_effects![LowPassNode::default()],
));
});
run(&mut app, |pool_nodes: Query<&FirewheelNode>| {
assert_eq!(pool_nodes.iter().count(), 11);
});
run(&mut app, |mut commands: Commands| {
commands.despawn_pool(TestPool);
});
app.update();
run(&mut app, |pool_nodes: Query<&FirewheelNode>| {
assert_eq!(pool_nodes.iter().count(), 2);
});
}
#[test]
fn test_playback_starts() {
let mut app = prepare_app(|mut commands: Commands, server: Res<AssetServer>| {
commands.spawn((
SamplerPool(TestPool),
sample_effects![LowPassNode::default()],
));
commands.spawn((
TestPool,
SamplePlayer::new(server.load("caw.ogg")).looping(),
EmptyComponent,
));
});
loop {
let players = run(
&mut app,
|q: Query<Entity, (With<SamplePlayer>, With<Sampler>)>| q.iter().len(),
);
if players == 1 {
break;
}
app.update();
}
}
#[derive(Component)]
struct EmptyComponent;
#[test]
fn test_remove_in_dynamic() {
let mut app = prepare_app(|mut commands: Commands, server: Res<AssetServer>| {
commands.spawn((VolumeNode::default(), dynamic::DynamicBus));
commands.spawn((
SamplePlayer::new(server.load("sine_440hz_1ms.wav")),
EmptyComponent,
PlaybackSettings::default().remove(),
sample_effects![LowPassNode::default()],
));
});
loop {
let players = run(
&mut app,
|q: Query<Entity, (With<SamplePlayer>, With<EmptyComponent>)>| q.iter().len(),
);
if players == 0 {
break;
}
app.update();
}
let world = app.world_mut();
let mut q = world.query_filtered::<EntityRef, With<EmptyComponent>>();
let entity = q.single(world).unwrap();
let archetype = entity.archetype();
assert_eq!(archetype.components().len(), 1);
assert!(entity.contains::<EmptyComponent>());
}
#[test]
fn test_remove_in_pool() {
let mut app = prepare_app(|mut commands: Commands, server: Res<AssetServer>| {
commands.spawn((
SamplerPool(TestPool),
sample_effects![LowPassNode::default()],
));
commands.spawn((
TestPool,
SamplePlayer::new(server.load("sine_440hz_1ms.wav")),
EmptyComponent,
PlaybackSettings {
on_complete: OnComplete::Remove,
..Default::default()
},
));
});
loop {
let players = run(
&mut app,
|q: Query<Entity, (With<SamplePlayer>, With<EmptyComponent>)>| q.iter().len(),
);
if players == 0 {
break;
}
app.update();
}
let world = app.world_mut();
let mut q = world.query_filtered::<EntityRef, With<EmptyComponent>>();
let entity = q.single(world).unwrap();
let archetype = entity.archetype();
assert_eq!(archetype.components().len(), 1);
assert!(entity.contains::<EmptyComponent>());
let total_lpfs = run(&mut app, |fx: Query<&LowPassNode>| fx.iter().len());
assert_eq!(total_lpfs, 5);
}
#[test]
fn test_remove_stolen_players() {
let mut app = prepare_app(|mut commands: Commands, server: Res<AssetServer>| {
commands.spawn((SamplerPool(TestPool), PoolSize(4..=4)));
commands
.spawn((VolumeNode::default(), MainBus))
.connect(crate::edge::AudioGraphOutput);
for _ in 0..8 {
commands.spawn((TestPool, SamplePlayer::new(server.load("caw.ogg"))));
}
});
loop {
let world = app.world_mut();
let mut q = world.query_filtered::<Entity, With<Sampler>>();
if q.iter(world).len() != 0 {
break;
}
app.update();
}
for _ in 0..2 {
app.update();
}
let world = app.world_mut();
let mut q = world.query_filtered::<Entity, With<SamplePlayer>>();
assert_eq!(q.iter(world).len(), 4);
}
}