bevy_defer/
lib.rs

1#![doc=include_str!("../README.md")]
2#![allow(clippy::type_complexity)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4use bevy::app::{App, First, Plugin, PostUpdate, PreUpdate, Update};
5use bevy::ecs::component::Component;
6use bevy::ecs::intern::Interned;
7use bevy::ecs::message::Message;
8use bevy::ecs::query::{QueryFilter, ReadOnlyQueryData, ReleaseStateQueryData};
9use bevy::ecs::schedule::IntoScheduleConfigs as _;
10use bevy::ecs::system::Command;
11use bevy::prelude::EntityCommands;
12use bevy::state::prelude::State;
13use bevy::state::state::States;
14use bevy::time::TimeSystems;
15use std::fmt::Formatter;
16use std::{any::type_name, pin::Pin};
17
18pub mod access;
19pub mod cancellation;
20mod commands;
21mod entity_commands;
22mod errors;
23mod event;
24mod executor;
25pub mod ext;
26mod fetch;
27mod inspect;
28mod queue;
29pub use inspect::{EntityInspectors, InspectEntity};
30pub mod reactors;
31pub mod signals;
32mod spawn;
33pub(crate) mod sync;
34pub mod tween;
35pub use access::async_asset::AssetSet;
36pub use access::async_query::{OwnedQueryState, OwnedReadonlyQueryState};
37pub use access::AsyncWorld;
38pub use async_executor::Task;
39use bevy::ecs::{
40    schedule::{ScheduleLabel, SystemSet},
41    system::Commands,
42    world::World,
43};
44use bevy::reflect::std_traits::ReflectDefault;
45pub use errors::AccessError;
46pub use event::EventChannel;
47pub use executor::{in_async_context, AsyncExecutor};
48#[doc(hidden)]
49pub use fetch::{fetch, fetch0, fetch1, fetch2, FetchEntity, FetchOne, FetchWorld};
50pub use queue::LoopForFrameData;
51pub use queue::QueryQueue;
52use reactors::Reactors;
53pub use spawn::ScopedTasks;
54#[doc(hidden)]
55#[cfg(feature = "spawn_macro")]
56pub mod spawn_macro;
57
58/// Systems in `bevy_defer`.
59pub mod systems {
60    pub use crate::event::react_to_message;
61    pub use crate::executor::run_async_executor;
62    pub use crate::queue::{run_fixed_queue, run_time_series, run_watch_queries};
63    pub use crate::reactors::{react_to_component_change, react_to_state};
64
65    #[cfg(feature = "bevy_animation")]
66    pub use crate::ext::anim::react_to_animation;
67    #[cfg(feature = "bevy_animation")]
68    pub use crate::ext::anim::react_to_main_animation_change;
69    #[cfg(feature = "bevy_scene")]
70    pub use crate::ext::scene::react_to_scene_load;
71}
72
73pub use crate::sync::oneshot::channel;
74use std::future::Future;
75
76pub(crate) static CHANNEL_CLOSED: &str = "channel closed unexpectedly";
77
78#[doc(hidden)]
79pub use bevy::ecs::entity::Entity;
80#[doc(hidden)]
81pub use bevy::ecs::system::{NonSend, Res, SystemParam};
82#[doc(hidden)]
83pub use bevy::log::error;
84#[doc(hidden)]
85pub use ref_cast::RefCast;
86
87use queue::run_fixed_queue;
88use signals::Signals;
89
90#[cfg(feature = "derive")]
91pub use bevy_defer_derive::{async_access, async_dyn};
92
93/// Result type of spawned tasks.
94pub type AccessResult<T = ()> = Result<T, AccessError>;
95
96pub type BoxedFuture = Pin<Box<dyn Future<Output = AccessResult>>>;
97
98pub type BoxedSharedFuture = Pin<Box<dyn Future<Output = AccessResult> + Send + Sync>>;
99
100#[derive(Debug, Default, Clone, Copy)]
101
102/// The core `bevy_defer` plugin that does not run its executors.
103///
104/// You should almost always use [`AsyncPlugin::empty`] or [`AsyncPlugin::default_settings`] instead.
105pub struct CoreAsyncPlugin;
106
107impl Plugin for CoreAsyncPlugin {
108    fn build(&self, app: &mut App) {
109        app.init_non_send_resource::<QueryQueue>()
110            .init_non_send_resource::<AsyncExecutor>()
111            .init_resource::<Reactors>()
112            .init_resource::<EntityInspectors>()
113            .register_type::<Signals>()
114            .register_type_data::<Signals, ReflectDefault>()
115            .init_schedule(BeforeAsyncExecutor)
116            .add_systems(First, systems::run_time_series.after(TimeSystems))
117            .add_systems(Update, run_fixed_queue)
118            .add_systems(BeforeAsyncExecutor, systems::run_watch_queries);
119
120        #[cfg(feature = "bevy_scene")]
121        app.add_systems(BeforeAsyncExecutor, systems::react_to_scene_load);
122        #[cfg(feature = "bevy_animation")]
123        app.add_systems(BeforeAsyncExecutor, systems::react_to_animation);
124        #[cfg(feature = "bevy_animation")]
125        app.add_systems(BeforeAsyncExecutor, systems::react_to_main_animation_change);
126    }
127}
128
129/// A schedule that runs before [`run_async_executor`](systems::run_async_executor).
130///
131/// By default this runs `watch` queries and reactors.
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ScheduleLabel)]
133pub struct BeforeAsyncExecutor;
134
135/// Runs the [`BeforeAsyncExecutor`] schedule.
136///
137/// By default this runs `watch` queries and reactors.
138pub fn run_before_async_executor(world: &mut World) {
139    world.run_schedule(BeforeAsyncExecutor)
140}
141
142/// An `bevy_defer` plugin that can run the executor through user configuration.
143///
144/// This plugin is not unique and can be used repeatedly to add runs.
145#[derive(Debug)]
146pub struct AsyncPlugin {
147    schedules: Vec<(Interned<dyn ScheduleLabel>, Option<Interned<dyn SystemSet>>)>,
148}
149
150impl AsyncPlugin {
151    /// Equivalent to [`CoreAsyncPlugin`].
152    ///
153    /// Use [`AsyncPlugin::run_in`] and [`AsyncPlugin::run_in_set`] to add runs.
154    pub fn empty() -> Self {
155        AsyncPlugin {
156            schedules: Vec::new(),
157        }
158    }
159
160    /// Run in [`Update`] once.
161    ///
162    /// This is usually enough, be sure to order your
163    /// systems against [`run_async_executor`](systems::run_async_executor) correctly if needed.
164    pub fn default_settings() -> Self {
165        AsyncPlugin {
166            schedules: vec![(Interned(Box::leak(Box::new(Update))), None)],
167        }
168    }
169
170    /// Run in [`PreUpdate`], [`Update`] and [`PostUpdate`].
171    pub fn busy_schedule() -> Self {
172        AsyncPlugin {
173            schedules: vec![
174                (Interned(Box::leak(Box::new(PreUpdate))), None),
175                (Interned(Box::leak(Box::new(Update))), None),
176                (Interned(Box::leak(Box::new(PostUpdate))), None),
177            ],
178        }
179    }
180}
181
182impl AsyncPlugin {
183    /// Run the executor in a specific `Schedule`.
184    pub fn run_in(mut self, schedule: impl ScheduleLabel) -> Self {
185        self.schedules
186            .push((Interned(Box::leak(Box::new(schedule))), None));
187        self
188    }
189
190    /// Run the executor in a specific `Schedule` and `SystemSet`.
191    pub fn run_in_set(mut self, schedule: impl ScheduleLabel, set: impl SystemSet) -> Self {
192        self.schedules.push((
193            Interned(Box::leak(Box::new(schedule))),
194            Some(Interned(Box::leak(Box::new(set)))),
195        ));
196        self
197    }
198}
199
200impl Plugin for AsyncPlugin {
201    fn build(&self, app: &mut App) {
202        use crate::systems::*;
203        if !app.is_plugin_added::<CoreAsyncPlugin>() {
204            app.add_plugins(CoreAsyncPlugin);
205        }
206        for (schedule, set) in &self.schedules {
207            if let Some(set) = set {
208                app.add_systems(
209                    *schedule,
210                    (
211                        run_before_async_executor.before(run_async_executor),
212                        run_async_executor,
213                    )
214                        .in_set(*set),
215                );
216            } else {
217                app.add_systems(
218                    *schedule,
219                    (
220                        run_before_async_executor.before(run_async_executor),
221                        run_async_executor,
222                    ),
223                );
224            }
225        }
226    }
227
228    fn is_unique(&self) -> bool {
229        false
230    }
231}
232
233/// Extension for [`World`] and [`App`].
234pub trait AsyncExtension {
235    /// Spawn a task to be run on the [`AsyncExecutor`].
236    fn spawn_task(&mut self, f: impl Future<Output = AccessResult> + 'static) -> &mut Self;
237
238    /// Spawn a `bevy_defer` compatible future, the future is constrained to a [`States`]
239    /// and will be cancelled upon exiting the state.
240    ///
241    /// # Errors
242    ///
243    /// If not in the specified state.
244    fn spawn_state_scoped<S: States>(
245        &mut self,
246        state: S,
247        fut: impl Future<Output = AccessResult> + 'static,
248    ) -> AccessResult;
249
250    /// Initialize [`EventChannel<E>`].
251    fn register_oneshot_event<E: Send + Sync + 'static>(&mut self) -> &mut Self;
252
253    /// Registers a method that prints an entity in `bevy_defer`.
254    ///
255    /// This method will be used for printing [`AccessError`].
256    /// [`InspectEntity`] can be used for custom debug printing
257    ///  in `bevy_defer`'s scope.
258    ///
259    /// Only the first successful formatting function will be called according to their priorities.
260    /// A [`Name`](bevy::core::Name) based method is automatically added at priority `0`.
261    fn register_inspect_entity_by_component<C: Component>(
262        &mut self,
263        priority: i32,
264        f: impl Fn(Entity, &C, &mut Formatter) + Send + Sync + 'static,
265    ) -> &mut Self;
266
267    /// Registers a method that prints an entity in `bevy_defer`.
268    ///
269    /// This method will be used for printing [`AccessError`].
270    /// [`InspectEntity`] can be used for custom debug printing
271    ///  in `bevy_defer`'s scope.
272    ///
273    /// Only the first successful formatting function will be called according to their priorities.
274    /// A [`Name`](bevy::core::Name) based method is automatically added at priority `0`.
275    fn register_inspect_entity_by_query<
276        Q: ReadOnlyQueryData + ReleaseStateQueryData + 'static,
277        F: QueryFilter + 'static,
278    >(
279        &mut self,
280        priority: i32,
281        f: impl Fn(Q::Item<'_, '_>, &mut Formatter) + Send + Sync + 'static,
282    ) -> &mut Self;
283}
284
285impl AsyncExtension for World {
286    fn spawn_task(&mut self, f: impl Future<Output = AccessResult> + 'static) -> &mut Self {
287        self.non_send_resource::<AsyncExecutor>().spawn(f);
288        self
289    }
290
291    fn spawn_state_scoped<S: States>(
292        &mut self,
293        state: S,
294        fut: impl Future<Output = AccessResult> + 'static,
295    ) -> AccessResult {
296        match self.get_resource::<State<S>>() {
297            Some(s) if s.get() == &state => (),
298            _ => {
299                return Err(AccessError::NotInState {
300                    ty: type_name::<S>(),
301                })
302            }
303        };
304        let task = self.non_send_resource::<AsyncExecutor>().spawn_task(fut);
305        if let Some(mut res) = self.get_resource_mut::<ScopedTasks<S>>() {
306            res.tasks.entry(state).or_default().push(task);
307        } else {
308            error!(
309                "Cannot spawn state scoped futures without `react_to_state::<{}>`.",
310                type_name::<S>()
311            )
312        }
313        Ok(())
314    }
315
316    fn register_inspect_entity_by_component<C: Component>(
317        &mut self,
318        priority: i32,
319        f: impl Fn(Entity, &C, &mut Formatter) + Send + Sync + 'static,
320    ) -> &mut Self {
321        self.resource_mut::<EntityInspectors>().push(priority, f);
322        self
323    }
324
325    fn register_inspect_entity_by_query<
326        Q: ReadOnlyQueryData + ReleaseStateQueryData + 'static,
327        F: QueryFilter + 'static,
328    >(
329        &mut self,
330        priority: i32,
331        f: impl Fn(Q::Item<'_, '_>, &mut Formatter) + Send + Sync + 'static,
332    ) -> &mut Self {
333        self.resource_mut::<EntityInspectors>()
334            .push_query::<Q, F>(priority, f);
335        self
336    }
337
338    fn register_oneshot_event<E: Send + Sync + 'static>(&mut self) -> &mut Self {
339        self.init_resource::<EventChannel<E>>();
340        self
341    }
342}
343
344impl AsyncExtension for App {
345    fn spawn_task(&mut self, f: impl Future<Output = AccessResult> + 'static) -> &mut Self {
346        self.world().non_send_resource::<AsyncExecutor>().spawn(f);
347        self
348    }
349
350    fn spawn_state_scoped<S: States>(
351        &mut self,
352        state: S,
353        fut: impl Future<Output = AccessResult> + 'static,
354    ) -> AccessResult {
355        self.world_mut().spawn_state_scoped(state, fut)
356    }
357
358    fn register_oneshot_event<E: Send + Sync + 'static>(&mut self) -> &mut Self {
359        self.world_mut().register_oneshot_event::<E>();
360        self
361    }
362
363    fn register_inspect_entity_by_component<C: Component>(
364        &mut self,
365        priority: i32,
366        f: impl Fn(Entity, &C, &mut Formatter) + Send + Sync + 'static,
367    ) -> &mut Self {
368        self.world_mut()
369            .register_inspect_entity_by_component(priority, f);
370        self
371    }
372
373    fn register_inspect_entity_by_query<
374        Q: ReadOnlyQueryData + ReleaseStateQueryData + 'static,
375        F: QueryFilter + 'static,
376    >(
377        &mut self,
378        priority: i32,
379        f: impl Fn(Q::Item<'_, '_>, &mut Formatter) + Send + Sync + 'static,
380    ) -> &mut Self {
381        self.world_mut()
382            .register_inspect_entity_by_query::<Q, F>(priority, f);
383        self
384    }
385}
386
387/// Extension for [`App`] to add reactors.
388pub trait AppReactorExtension {
389    /// React to changes in an [`Message`] by duplicating events to [`EventChannel<E>`].
390    ///
391    /// Initializes the resource [`EventChannel<E>`].
392    fn react_to_message<E: Message + Clone>(&mut self) -> &mut Self;
393
394    /// React to changes in a [`States`].
395    fn react_to_state<S: States>(&mut self) -> &mut Self;
396
397    /// React to changes in a [`Component`].
398    fn react_to_component_change<C: Component + Eq + Clone + Default>(&mut self) -> &mut Self;
399}
400
401impl AppReactorExtension for App {
402    fn react_to_message<E: Message + Clone>(&mut self) -> &mut Self {
403        self.register_oneshot_event::<E>();
404        self.add_systems(BeforeAsyncExecutor, systems::react_to_message::<E>);
405        self
406    }
407
408    fn react_to_state<S: States>(&mut self) -> &mut Self {
409        self.add_systems(BeforeAsyncExecutor, systems::react_to_state::<S>);
410        self.init_resource::<ScopedTasks<S>>();
411        self
412    }
413
414    fn react_to_component_change<C: Component + Eq + Clone + Default>(&mut self) -> &mut Self {
415        self.add_systems(BeforeAsyncExecutor, systems::react_to_component_change::<C>);
416        self
417    }
418}
419
420/// Extension for [`Commands`].
421pub trait AsyncCommandsExtension {
422    /// Spawn a task to be run on the [`AsyncExecutor`].
423    ///
424    /// Unlike [`AsyncExtension::spawn_task`] this accepts a closure so
425    /// that users can smuggle `!Send` futures across thread boundaries.
426    ///
427    /// ```rust
428    /// # /*
429    /// move || async move {
430    ///     ...
431    /// }
432    /// # */
433    /// ```
434    fn spawn_task<F: Future<Output = AccessResult> + 'static>(
435        &mut self,
436        f: impl FnOnce() -> F + Send + 'static,
437    ) -> &mut Self;
438
439    /// Spawn a `bevy_defer` compatible future, the future is constrained to a [`States`]
440    /// and will be cancelled upon exiting the state.
441    ///
442    /// # Errors
443    ///
444    /// If not in the specified state.
445    fn spawn_state_scoped<S: States, F: Future<Output = AccessResult> + 'static>(
446        &mut self,
447        state: S,
448        fut: impl FnOnce() -> F + Send + 'static,
449    ) -> &mut Self;
450}
451
452impl AsyncCommandsExtension for Commands<'_, '_> {
453    fn spawn_task<F: Future<Output = AccessResult> + 'static>(
454        &mut self,
455        f: impl (FnOnce() -> F) + Send + 'static,
456    ) -> &mut Self {
457        self.queue(SpawnFn::new(f));
458        self
459    }
460
461    fn spawn_state_scoped<S: States, F: Future<Output = AccessResult> + 'static>(
462        &mut self,
463        state: S,
464        f: impl FnOnce() -> F + Send + 'static,
465    ) -> &mut Self {
466        self.queue(StateScopedSpawnFn::new(state, f));
467        self
468    }
469}
470
471/// Extension for [`Commands`].
472pub trait AsyncEntityCommandsExtension {
473    /// Spawn a task to be run on the [`AsyncExecutor`].
474    ///
475    /// Unlike [`AsyncExtension::spawn_task`] this accepts a closure so
476    /// that users can smuggle `!Send` futures across thread boundaries.
477    ///
478    /// ```rust
479    /// # /*
480    /// move || async move {
481    ///     ...
482    /// }
483    /// # */
484    /// ```
485    fn spawn_task<F: Future<Output = AccessResult> + 'static>(
486        &mut self,
487        f: impl FnOnce(Entity) -> F + Send + 'static,
488    ) -> &mut Self;
489
490    /// Spawn a `bevy_defer` compatible future, the future is constrained to a [`States`]
491    /// and will be cancelled upon exiting the state.
492    ///
493    /// # Errors
494    ///
495    /// If not in the specified state.
496    fn spawn_state_scoped<S: States, F: Future<Output = AccessResult> + 'static>(
497        &mut self,
498        state: S,
499        fut: impl FnOnce(Entity) -> F + Send + 'static,
500    ) -> &mut Self;
501}
502
503impl AsyncEntityCommandsExtension for EntityCommands<'_> {
504    fn spawn_task<F: Future<Output = AccessResult> + 'static>(
505        &mut self,
506        f: impl (FnOnce(Entity) -> F) + Send + 'static,
507    ) -> &mut Self {
508        let entity = self.id();
509        self.commands().queue(SpawnFn::new(move || f(entity)));
510        self
511    }
512
513    fn spawn_state_scoped<S: States, F: Future<Output = AccessResult> + 'static>(
514        &mut self,
515        state: S,
516        f: impl FnOnce(Entity) -> F + Send + 'static,
517    ) -> &mut Self {
518        let entity = self.id();
519        self.commands()
520            .queue(StateScopedSpawnFn::new(state, move || f(entity)));
521        self
522    }
523}
524
525/// [`Command`] for spawning a task.
526pub struct SpawnFn(
527    Box<dyn (FnOnce() -> Pin<Box<dyn Future<Output = AccessResult>>>) + Send + 'static>,
528);
529
530impl SpawnFn {
531    fn new<F: Future<Output = AccessResult> + 'static>(
532        f: impl (FnOnce() -> F) + Send + 'static,
533    ) -> Self {
534        Self(Box::new(move || Box::pin(f())))
535    }
536}
537
538impl Command for SpawnFn {
539    fn apply(self, world: &mut World) {
540        world.spawn_task(self.0());
541    }
542}
543
544/// [`Command`] for spawning a task.
545pub struct StateScopedSpawnFn<S: States> {
546    future: Box<dyn (FnOnce() -> Pin<Box<dyn Future<Output = AccessResult>>>) + Send + 'static>,
547    state: S,
548}
549
550impl<S: States> StateScopedSpawnFn<S> {
551    fn new<F: Future<Output = AccessResult> + 'static>(
552        state: S,
553        f: impl (FnOnce() -> F) + Send + 'static,
554    ) -> Self {
555        Self {
556            future: Box::new(move || Box::pin(f())),
557            state,
558        }
559    }
560}
561
562impl<S: States> Command for StateScopedSpawnFn<S> {
563    fn apply(self, world: &mut World) {
564        let _ = world.spawn_state_scoped(self.state, (self.future)());
565    }
566}
567
568#[doc(hidden)]
569#[must_use = "Defer must not be dropped immediately."]
570pub struct Defer<F: FnOnce() -> AccessResult>(pub Option<F>);
571
572impl<F: FnOnce() -> AccessResult> Drop for Defer<F> {
573    fn drop(&mut self) {
574        if in_async_context() {
575            let _ = (self.0.take().unwrap())();
576        }
577    }
578}
579
580/// Defer a set of statements that automatically triggers when the future is either completed
581/// or dropped (i.e. via select!)
582///
583/// # Returns
584///
585/// Returns a variable that triggers these statements on drop.
586///
587/// # Note
588///
589/// If the world is dropped, these statements will NOT be ran since `bevy_defer` functions
590/// will likely panic if the world is no longer available.
591///
592/// # Example
593///
594/// ```
595/// # use bevy_defer::{AccessResult, defer};
596/// # async fn dance(entity: i32) -> AccessResult { Ok(()) }
597/// # fn stop_dancing(entity: i32) -> AccessResult { Ok(()) }
598/// # async fn test() -> AccessResult {
599/// # let entity = 1;
600/// // Must be bound to a variable and not `_` so it lives until the end of the future.
601/// let _deferred = defer! {
602///     // stop dancing when the future is dropped
603///     stop_dancing(entity)?;
604/// };
605/// dance(entity).await?;
606/// # Ok(()) }
607/// ```
608#[macro_export]
609macro_rules! defer {
610    ($($tt: tt)*) => {
611        $crate::Defer(Some(|| {
612            let _ = {$($tt)*};
613            Ok(())
614        }))
615    };
616}
617
618/// Try run a potentially async block of arguments with result type [`AccessError`],
619/// always discard the result and does not log the error.
620///
621/// # Example
622///
623/// ```
624/// # use bevy_defer::{AccessResult, attempt};
625/// # async fn dance(entity: i32) -> AccessResult { Ok(()) }
626/// # async fn stop_dancing(entity: i32) -> AccessResult { Ok(()) }
627/// # async fn test() -> AccessResult {
628/// # let entity = 1;
629/// dance(entity).await?;
630/// // Must be bound to a variable and not `_` so it lives until the end of the future.
631/// attempt! {
632///     stop_dancing(entity).await?;
633/// };
634/// # Ok(()) }
635/// ```
636#[macro_export]
637macro_rules! attempt {
638    ($($tt:tt)*) => {
639        let _: $crate::AccessResult<()> = async {
640            let _ = {$($tt)*};
641            Ok(())
642        }.await;
643    };
644}
645
646/// For doctests only.
647#[doc(hidden)]
648#[allow(unused)]
649#[macro_export]
650macro_rules! test_spawn {
651    ($expr: expr) => {{
652        use ::bevy::prelude::*;
653        use ::bevy_defer::access::*;
654        use ::bevy_defer::*;
655        use bevy::state::app::StatesPlugin;
656        #[derive(Debug, Clone, Copy, Component, Resource, Event, Asset, TypePath)]
657        pub struct Int(i32);
658
659        #[derive(Debug, Clone, Copy, Component, Resource, Event, Asset, TypePath)]
660        pub struct Str(&'static str);
661
662        #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, States)]
663        pub enum MyState {
664            A,
665            B,
666            C,
667        };
668
669        let mut app = ::bevy::app::App::new();
670        app.add_plugins(MinimalPlugins);
671        app.add_plugins(StatesPlugin);
672        app.add_plugins(AssetPlugin::default());
673        app.init_asset::<Image>();
674        app.add_plugins(bevy_defer::AsyncPlugin::default_settings());
675        app.world_mut().spawn(Int(4));
676        app.world_mut().spawn(Str("Ferris"));
677        app.insert_resource(Int(4));
678        app.insert_resource(Str("Ferris"));
679        app.insert_non_send_resource(Int(4));
680        app.insert_non_send_resource(Str("Ferris"));
681        app.insert_state(MyState::A);
682        app.spawn_task(async move {
683            $expr;
684            AsyncWorld.quit();
685            Ok(())
686        });
687        app.run();
688    }};
689}