bevy_async_runner/
runner.rs

1use std::future::Future;
2use bevy::prelude::{warn, trace, Commands, IntoSystem, Resource, SystemInput};
3use bevy::tasks::IoTaskPool;
4use maybe_sync::{MaybeSend, MaybeSync};
5use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
6use tokio::sync::mpsc::error::TryRecvError;
7
8type ExecuteSystemFn = Box<dyn FnOnce(&mut Commands) + Send + Sync>;
9
10#[derive(Resource)]
11pub struct AsyncRunner {
12    channel: (
13        UnboundedSender<ExecuteSystemFn>,
14        UnboundedReceiver<ExecuteSystemFn>
15    ),
16    #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-runtime-multi-thread")))]
17    pool: &'static IoTaskPool,
18    #[cfg(any(feature = "tokio-runtime-multi-thread", feature = "tokio-runtime"))]
19    pool: tokio::runtime::Runtime,
20}
21
22impl AsyncRunner {
23    pub fn new() -> AsyncRunner {
24
25        #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-runtime-multi-thread")))]
26        let pool = {
27            trace!("Fetching IoTaskPool runtime via IoTaskPool::get()");
28            IoTaskPool::get()
29        };
30        #[cfg(feature = "tokio-runtime-multi-thread")]
31        let pool= {
32            trace!("Starting tokio runtime via Builder::new_multi_thread()");
33            tokio::runtime::Builder::new_multi_thread()
34                .enable_all()
35                .build()
36                .unwrap()
37        };
38        #[cfg(all(not(feature = "tokio-runtime-multi-thread"), feature = "tokio-runtime"))]
39        let pool = {
40            trace!("Starting tokio runtime via Builder::new_current_thread()");
41            tokio::runtime::Builder::new_current_thread()
42                .enable_all()
43                .build()
44                .unwrap()
45        };
46
47        AsyncRunner {
48            channel: unbounded_channel(),
49            pool
50        }
51    }
52
53    /// Takes a join handle, and runs an ECS system with its value once completed
54    pub fn schedule<
55        S: IntoSystem<I, (), M> + Send + Sync + 'static,
56        M: 'static,
57        I: SystemInput<Inner<'static>: Send + Sync> + Send + Sync + 'static
58    >(
59        &self,
60        task: impl Future<Output = I::Inner<'static>> + MaybeSend + MaybeSync + 'static,
61        system: S,
62    ) {
63        let task = self.pool.spawn((async |sender: UnboundedSender<ExecuteSystemFn>| {
64            let result = task.await;
65
66            let boxed_result = Box::new(result);
67
68            let execute = move |commands: &mut Commands| {
69                commands.run_system_cached_with(
70                    system,
71                    *boxed_result
72                );
73            };
74
75            sender.send(Box::new(execute)).unwrap();
76        })(self.channel.0.clone()));
77
78        // IoTaskPool won't finish it unless we detach
79        #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-runtime-multi-thread")))]
80        task.detach();
81    }
82
83    /// Loop over all completed join handles and run the systems
84    pub fn run(&mut self, mut commands: Commands) {
85        loop {
86            match self.channel.1.try_recv() {
87                Ok(execute) => {
88                    execute(&mut commands);
89                }
90                Err(e) => {
91                    match e {
92                        TryRecvError::Empty => {}
93                        TryRecvError::Disconnected => {
94                            warn!("AsyncRunner communication channel terminated");
95                            break;
96                        }
97                    }
98
99                    break;
100                }
101            }
102        }
103    }
104}