1#![doc=include_str!("../README.md")]
2#![allow(clippy::type_complexity)]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4use bevy::app::{App, First, Plugin, PostUpdate, PreUpdate, Update};
5use bevy::ecs::component::Component;
6use bevy::ecs::intern::Interned;
7use bevy::ecs::message::Message;
8use bevy::ecs::query::{QueryFilter, ReadOnlyQueryData, ReleaseStateQueryData};
9use bevy::ecs::schedule::IntoScheduleConfigs as _;
10use bevy::ecs::system::Command;
11use bevy::prelude::EntityCommands;
12use bevy::state::prelude::State;
13use bevy::state::state::States;
14use bevy::time::TimeSystems;
15use std::fmt::Formatter;
16use std::{any::type_name, pin::Pin};
17
18pub mod access;
19pub mod cancellation;
20mod commands;
21mod entity_commands;
22mod errors;
23mod event;
24mod executor;
25pub mod ext;
26mod fetch;
27mod inspect;
28mod queue;
29pub use inspect::{EntityInspectors, InspectEntity};
30pub mod reactors;
31pub mod signals;
32mod spawn;
33pub(crate) mod sync;
34pub mod tween;
35pub use access::async_asset::AssetSet;
36pub use access::async_query::{OwnedQueryState, OwnedReadonlyQueryState};
37pub use access::AsyncWorld;
38pub use async_executor::Task;
39use bevy::ecs::{
40 schedule::{ScheduleLabel, SystemSet},
41 system::Commands,
42 world::World,
43};
44use bevy::reflect::std_traits::ReflectDefault;
45pub use errors::AccessError;
46pub use event::EventChannel;
47pub use executor::{in_async_context, AsyncExecutor};
48#[doc(hidden)]
49pub use fetch::{fetch, fetch0, fetch1, fetch2, FetchEntity, FetchOne, FetchWorld};
50pub use queue::LoopForFrameData;
51pub use queue::QueryQueue;
52use reactors::Reactors;
53pub use spawn::ScopedTasks;
54#[doc(hidden)]
55#[cfg(feature = "spawn_macro")]
56pub mod spawn_macro;
57
58pub mod systems {
60 pub use crate::event::react_to_message;
61 pub use crate::executor::run_async_executor;
62 pub use crate::queue::{run_fixed_queue, run_time_series, run_watch_queries};
63 pub use crate::reactors::{react_to_component_change, react_to_state};
64
65 #[cfg(feature = "bevy_animation")]
66 pub use crate::ext::anim::react_to_animation;
67 #[cfg(feature = "bevy_animation")]
68 pub use crate::ext::anim::react_to_main_animation_change;
69 #[cfg(feature = "bevy_scene")]
70 pub use crate::ext::scene::react_to_scene_load;
71}
72
73pub use crate::sync::oneshot::channel;
74use std::future::Future;
75
76pub(crate) static CHANNEL_CLOSED: &str = "channel closed unexpectedly";
77
78#[doc(hidden)]
79pub use bevy::ecs::entity::Entity;
80#[doc(hidden)]
81pub use bevy::ecs::system::{NonSend, Res, SystemParam};
82#[doc(hidden)]
83pub use bevy::log::error;
84#[doc(hidden)]
85pub use ref_cast::RefCast;
86
87use queue::run_fixed_queue;
88use signals::Signals;
89
90#[cfg(feature = "derive")]
91pub use bevy_defer_derive::{async_access, async_dyn};
92
93pub type AccessResult<T = ()> = Result<T, AccessError>;
95
96pub type BoxedFuture = Pin<Box<dyn Future<Output = AccessResult>>>;
97
98pub type BoxedSharedFuture = Pin<Box<dyn Future<Output = AccessResult> + Send + Sync>>;
99
100#[derive(Debug, Default, Clone, Copy)]
101
102pub struct CoreAsyncPlugin;
106
107impl Plugin for CoreAsyncPlugin {
108 fn build(&self, app: &mut App) {
109 app.init_non_send_resource::<QueryQueue>()
110 .init_non_send_resource::<AsyncExecutor>()
111 .init_resource::<Reactors>()
112 .init_resource::<EntityInspectors>()
113 .register_type::<Signals>()
114 .register_type_data::<Signals, ReflectDefault>()
115 .init_schedule(BeforeAsyncExecutor)
116 .add_systems(First, systems::run_time_series.after(TimeSystems))
117 .add_systems(Update, run_fixed_queue)
118 .add_systems(BeforeAsyncExecutor, systems::run_watch_queries);
119
120 #[cfg(feature = "bevy_scene")]
121 app.add_systems(BeforeAsyncExecutor, systems::react_to_scene_load);
122 #[cfg(feature = "bevy_animation")]
123 app.add_systems(BeforeAsyncExecutor, systems::react_to_animation);
124 #[cfg(feature = "bevy_animation")]
125 app.add_systems(BeforeAsyncExecutor, systems::react_to_main_animation_change);
126 }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ScheduleLabel)]
133pub struct BeforeAsyncExecutor;
134
135pub fn run_before_async_executor(world: &mut World) {
139 world.run_schedule(BeforeAsyncExecutor)
140}
141
142#[derive(Debug)]
146pub struct AsyncPlugin {
147 schedules: Vec<(Interned<dyn ScheduleLabel>, Option<Interned<dyn SystemSet>>)>,
148}
149
150impl AsyncPlugin {
151 pub fn empty() -> Self {
155 AsyncPlugin {
156 schedules: Vec::new(),
157 }
158 }
159
160 pub fn default_settings() -> Self {
165 AsyncPlugin {
166 schedules: vec![(Interned(Box::leak(Box::new(Update))), None)],
167 }
168 }
169
170 pub fn busy_schedule() -> Self {
172 AsyncPlugin {
173 schedules: vec![
174 (Interned(Box::leak(Box::new(PreUpdate))), None),
175 (Interned(Box::leak(Box::new(Update))), None),
176 (Interned(Box::leak(Box::new(PostUpdate))), None),
177 ],
178 }
179 }
180}
181
182impl AsyncPlugin {
183 pub fn run_in(mut self, schedule: impl ScheduleLabel) -> Self {
185 self.schedules
186 .push((Interned(Box::leak(Box::new(schedule))), None));
187 self
188 }
189
190 pub fn run_in_set(mut self, schedule: impl ScheduleLabel, set: impl SystemSet) -> Self {
192 self.schedules.push((
193 Interned(Box::leak(Box::new(schedule))),
194 Some(Interned(Box::leak(Box::new(set)))),
195 ));
196 self
197 }
198}
199
200impl Plugin for AsyncPlugin {
201 fn build(&self, app: &mut App) {
202 use crate::systems::*;
203 if !app.is_plugin_added::<CoreAsyncPlugin>() {
204 app.add_plugins(CoreAsyncPlugin);
205 }
206 for (schedule, set) in &self.schedules {
207 if let Some(set) = set {
208 app.add_systems(
209 *schedule,
210 (
211 run_before_async_executor.before(run_async_executor),
212 run_async_executor,
213 )
214 .in_set(*set),
215 );
216 } else {
217 app.add_systems(
218 *schedule,
219 (
220 run_before_async_executor.before(run_async_executor),
221 run_async_executor,
222 ),
223 );
224 }
225 }
226 }
227
228 fn is_unique(&self) -> bool {
229 false
230 }
231}
232
233pub trait AsyncExtension {
235 fn spawn_task(&mut self, f: impl Future<Output = AccessResult> + 'static) -> &mut Self;
237
238 fn spawn_state_scoped<S: States>(
245 &mut self,
246 state: S,
247 fut: impl Future<Output = AccessResult> + 'static,
248 ) -> AccessResult;
249
250 fn register_oneshot_event<E: Send + Sync + 'static>(&mut self) -> &mut Self;
252
253 fn register_inspect_entity_by_component<C: Component>(
262 &mut self,
263 priority: i32,
264 f: impl Fn(Entity, &C, &mut Formatter) + Send + Sync + 'static,
265 ) -> &mut Self;
266
267 fn register_inspect_entity_by_query<
276 Q: ReadOnlyQueryData + ReleaseStateQueryData + 'static,
277 F: QueryFilter + 'static,
278 >(
279 &mut self,
280 priority: i32,
281 f: impl Fn(Q::Item<'_, '_>, &mut Formatter) + Send + Sync + 'static,
282 ) -> &mut Self;
283}
284
285impl AsyncExtension for World {
286 fn spawn_task(&mut self, f: impl Future<Output = AccessResult> + 'static) -> &mut Self {
287 self.non_send_resource::<AsyncExecutor>().spawn(f);
288 self
289 }
290
291 fn spawn_state_scoped<S: States>(
292 &mut self,
293 state: S,
294 fut: impl Future<Output = AccessResult> + 'static,
295 ) -> AccessResult {
296 match self.get_resource::<State<S>>() {
297 Some(s) if s.get() == &state => (),
298 _ => {
299 return Err(AccessError::NotInState {
300 ty: type_name::<S>(),
301 })
302 }
303 };
304 let task = self.non_send_resource::<AsyncExecutor>().spawn_task(fut);
305 if let Some(mut res) = self.get_resource_mut::<ScopedTasks<S>>() {
306 res.tasks.entry(state).or_default().push(task);
307 } else {
308 error!(
309 "Cannot spawn state scoped futures without `react_to_state::<{}>`.",
310 type_name::<S>()
311 )
312 }
313 Ok(())
314 }
315
316 fn register_inspect_entity_by_component<C: Component>(
317 &mut self,
318 priority: i32,
319 f: impl Fn(Entity, &C, &mut Formatter) + Send + Sync + 'static,
320 ) -> &mut Self {
321 self.resource_mut::<EntityInspectors>().push(priority, f);
322 self
323 }
324
325 fn register_inspect_entity_by_query<
326 Q: ReadOnlyQueryData + ReleaseStateQueryData + 'static,
327 F: QueryFilter + 'static,
328 >(
329 &mut self,
330 priority: i32,
331 f: impl Fn(Q::Item<'_, '_>, &mut Formatter) + Send + Sync + 'static,
332 ) -> &mut Self {
333 self.resource_mut::<EntityInspectors>()
334 .push_query::<Q, F>(priority, f);
335 self
336 }
337
338 fn register_oneshot_event<E: Send + Sync + 'static>(&mut self) -> &mut Self {
339 self.init_resource::<EventChannel<E>>();
340 self
341 }
342}
343
344impl AsyncExtension for App {
345 fn spawn_task(&mut self, f: impl Future<Output = AccessResult> + 'static) -> &mut Self {
346 self.world().non_send_resource::<AsyncExecutor>().spawn(f);
347 self
348 }
349
350 fn spawn_state_scoped<S: States>(
351 &mut self,
352 state: S,
353 fut: impl Future<Output = AccessResult> + 'static,
354 ) -> AccessResult {
355 self.world_mut().spawn_state_scoped(state, fut)
356 }
357
358 fn register_oneshot_event<E: Send + Sync + 'static>(&mut self) -> &mut Self {
359 self.world_mut().register_oneshot_event::<E>();
360 self
361 }
362
363 fn register_inspect_entity_by_component<C: Component>(
364 &mut self,
365 priority: i32,
366 f: impl Fn(Entity, &C, &mut Formatter) + Send + Sync + 'static,
367 ) -> &mut Self {
368 self.world_mut()
369 .register_inspect_entity_by_component(priority, f);
370 self
371 }
372
373 fn register_inspect_entity_by_query<
374 Q: ReadOnlyQueryData + ReleaseStateQueryData + 'static,
375 F: QueryFilter + 'static,
376 >(
377 &mut self,
378 priority: i32,
379 f: impl Fn(Q::Item<'_, '_>, &mut Formatter) + Send + Sync + 'static,
380 ) -> &mut Self {
381 self.world_mut()
382 .register_inspect_entity_by_query::<Q, F>(priority, f);
383 self
384 }
385}
386
387pub trait AppReactorExtension {
389 fn react_to_message<E: Message + Clone>(&mut self) -> &mut Self;
393
394 fn react_to_state<S: States>(&mut self) -> &mut Self;
396
397 fn react_to_component_change<C: Component + Eq + Clone + Default>(&mut self) -> &mut Self;
399}
400
401impl AppReactorExtension for App {
402 fn react_to_message<E: Message + Clone>(&mut self) -> &mut Self {
403 self.register_oneshot_event::<E>();
404 self.add_systems(BeforeAsyncExecutor, systems::react_to_message::<E>);
405 self
406 }
407
408 fn react_to_state<S: States>(&mut self) -> &mut Self {
409 self.add_systems(BeforeAsyncExecutor, systems::react_to_state::<S>);
410 self.init_resource::<ScopedTasks<S>>();
411 self
412 }
413
414 fn react_to_component_change<C: Component + Eq + Clone + Default>(&mut self) -> &mut Self {
415 self.add_systems(BeforeAsyncExecutor, systems::react_to_component_change::<C>);
416 self
417 }
418}
419
420pub trait AsyncCommandsExtension {
422 fn spawn_task<F: Future<Output = AccessResult> + 'static>(
435 &mut self,
436 f: impl FnOnce() -> F + Send + 'static,
437 ) -> &mut Self;
438
439 fn spawn_state_scoped<S: States, F: Future<Output = AccessResult> + 'static>(
446 &mut self,
447 state: S,
448 fut: impl FnOnce() -> F + Send + 'static,
449 ) -> &mut Self;
450}
451
452impl AsyncCommandsExtension for Commands<'_, '_> {
453 fn spawn_task<F: Future<Output = AccessResult> + 'static>(
454 &mut self,
455 f: impl (FnOnce() -> F) + Send + 'static,
456 ) -> &mut Self {
457 self.queue(SpawnFn::new(f));
458 self
459 }
460
461 fn spawn_state_scoped<S: States, F: Future<Output = AccessResult> + 'static>(
462 &mut self,
463 state: S,
464 f: impl FnOnce() -> F + Send + 'static,
465 ) -> &mut Self {
466 self.queue(StateScopedSpawnFn::new(state, f));
467 self
468 }
469}
470
471pub trait AsyncEntityCommandsExtension {
473 fn spawn_task<F: Future<Output = AccessResult> + 'static>(
486 &mut self,
487 f: impl FnOnce(Entity) -> F + Send + 'static,
488 ) -> &mut Self;
489
490 fn spawn_state_scoped<S: States, F: Future<Output = AccessResult> + 'static>(
497 &mut self,
498 state: S,
499 fut: impl FnOnce(Entity) -> F + Send + 'static,
500 ) -> &mut Self;
501}
502
503impl AsyncEntityCommandsExtension for EntityCommands<'_> {
504 fn spawn_task<F: Future<Output = AccessResult> + 'static>(
505 &mut self,
506 f: impl (FnOnce(Entity) -> F) + Send + 'static,
507 ) -> &mut Self {
508 let entity = self.id();
509 self.commands().queue(SpawnFn::new(move || f(entity)));
510 self
511 }
512
513 fn spawn_state_scoped<S: States, F: Future<Output = AccessResult> + 'static>(
514 &mut self,
515 state: S,
516 f: impl FnOnce(Entity) -> F + Send + 'static,
517 ) -> &mut Self {
518 let entity = self.id();
519 self.commands()
520 .queue(StateScopedSpawnFn::new(state, move || f(entity)));
521 self
522 }
523}
524
525pub struct SpawnFn(
527 Box<dyn (FnOnce() -> Pin<Box<dyn Future<Output = AccessResult>>>) + Send + 'static>,
528);
529
530impl SpawnFn {
531 fn new<F: Future<Output = AccessResult> + 'static>(
532 f: impl (FnOnce() -> F) + Send + 'static,
533 ) -> Self {
534 Self(Box::new(move || Box::pin(f())))
535 }
536}
537
538impl Command for SpawnFn {
539 fn apply(self, world: &mut World) {
540 world.spawn_task(self.0());
541 }
542}
543
544pub struct StateScopedSpawnFn<S: States> {
546 future: Box<dyn (FnOnce() -> Pin<Box<dyn Future<Output = AccessResult>>>) + Send + 'static>,
547 state: S,
548}
549
550impl<S: States> StateScopedSpawnFn<S> {
551 fn new<F: Future<Output = AccessResult> + 'static>(
552 state: S,
553 f: impl (FnOnce() -> F) + Send + 'static,
554 ) -> Self {
555 Self {
556 future: Box::new(move || Box::pin(f())),
557 state,
558 }
559 }
560}
561
562impl<S: States> Command for StateScopedSpawnFn<S> {
563 fn apply(self, world: &mut World) {
564 let _ = world.spawn_state_scoped(self.state, (self.future)());
565 }
566}
567
568#[doc(hidden)]
569#[must_use = "Defer must not be dropped immediately."]
570pub struct Defer<F: FnOnce() -> AccessResult>(pub Option<F>);
571
572impl<F: FnOnce() -> AccessResult> Drop for Defer<F> {
573 fn drop(&mut self) {
574 if in_async_context() {
575 let _ = (self.0.take().unwrap())();
576 }
577 }
578}
579
580#[macro_export]
609macro_rules! defer {
610 ($($tt: tt)*) => {
611 $crate::Defer(Some(|| {
612 let _ = {$($tt)*};
613 Ok(())
614 }))
615 };
616}
617
618#[macro_export]
637macro_rules! attempt {
638 ($($tt:tt)*) => {
639 let _: $crate::AccessResult<()> = async {
640 let _ = {$($tt)*};
641 Ok(())
642 }.await;
643 };
644}
645
646#[doc(hidden)]
648#[allow(unused)]
649#[macro_export]
650macro_rules! test_spawn {
651 ($expr: expr) => {{
652 use ::bevy::prelude::*;
653 use ::bevy_defer::access::*;
654 use ::bevy_defer::*;
655 use bevy::state::app::StatesPlugin;
656 #[derive(Debug, Clone, Copy, Component, Resource, Event, Asset, TypePath)]
657 pub struct Int(i32);
658
659 #[derive(Debug, Clone, Copy, Component, Resource, Event, Asset, TypePath)]
660 pub struct Str(&'static str);
661
662 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, States)]
663 pub enum MyState {
664 A,
665 B,
666 C,
667 };
668
669 let mut app = ::bevy::app::App::new();
670 app.add_plugins(MinimalPlugins);
671 app.add_plugins(StatesPlugin);
672 app.add_plugins(AssetPlugin::default());
673 app.init_asset::<Image>();
674 app.add_plugins(bevy_defer::AsyncPlugin::default_settings());
675 app.world_mut().spawn(Int(4));
676 app.world_mut().spawn(Str("Ferris"));
677 app.insert_resource(Int(4));
678 app.insert_resource(Str("Ferris"));
679 app.insert_non_send_resource(Int(4));
680 app.insert_non_send_resource(Str("Ferris"));
681 app.insert_state(MyState::A);
682 app.spawn_task(async move {
683 $expr;
684 AsyncWorld.quit();
685 Ok(())
686 });
687 app.run();
688 }};
689}