bevy_async_runner/
runner.rs

1use std::future::Future;
2use bevy::prelude::{warn, Commands, IntoSystem, Resource, SystemInput};
3use bevy::tasks::IoTaskPool;
4use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
5use tokio::sync::mpsc::error::TryRecvError;
6
7type ExecuteSystemFn = Box<dyn FnOnce(&mut Commands) + Send + Sync>;
8
9#[derive(Resource)]
10pub struct AsyncRunner {
11    channel: (
12        UnboundedSender<ExecuteSystemFn>,
13        UnboundedReceiver<ExecuteSystemFn>
14    ),
15    #[cfg(not(all(feature = "tokio-runtime", not(target_arch = "wasm32"))))]
16    pool: &'static IoTaskPool,
17    #[cfg(all(feature = "tokio-runtime", not(target_arch = "wasm32")))]
18    pool: tokio::runtime::Runtime,
19}
20
21impl AsyncRunner {
22    pub fn new() -> AsyncRunner {
23        AsyncRunner {
24            channel: unbounded_channel(),
25
26            #[cfg(not(all(feature = "tokio-runtime", not(target_arch = "wasm32"))))]
27            pool: IoTaskPool::get(),
28            #[cfg(all(feature = "tokio-runtime", not(target_arch = "wasm32")))]
29            pool: tokio::runtime::Builder::new_multi_thread()
30                .enable_all()
31                .build()
32                .unwrap(),
33        }
34    }
35
36    /// Takes a join handle, and runs an ECS system with its value once completed
37    pub fn schedule<
38        S: IntoSystem<I, (), M> + Send + Sync + 'static,
39        M: 'static,
40        I: SystemInput<Inner<'static>: Send + Sync> + Send + Sync + 'static
41    >(
42        &self,
43        task: impl Future<Output = I::Inner<'static>> + Sync + Send + 'static,
44        system: S,
45    ) {
46        let task = self.pool.spawn((async |sender: UnboundedSender<ExecuteSystemFn>| {
47            let result = task.await;
48
49            let boxed_result = Box::new(result);
50
51            let execute = move |commands: &mut Commands| {
52                commands.run_system_cached_with(
53                    system,
54                    *boxed_result
55                );
56            };
57
58            sender.send(Box::new(execute)).unwrap();
59        })(self.channel.0.clone()));
60
61        // IoTaskPool won't finish it unless we detach
62        #[cfg(not(all(feature = "tokio-runtime", not(target_arch = "wasm32"))))]
63        task.detach();
64    }
65
66    /// Loop over all completed join handles and run the systems
67    pub fn run(&mut self, mut commands: Commands) {
68        loop {
69            match self.channel.1.try_recv() {
70                Ok(execute) => {
71                    execute(&mut commands);
72                }
73                Err(e) => {
74                    match e {
75                        TryRecvError::Empty => {}
76                        TryRecvError::Disconnected => {
77                            warn!("AsyncRunner communication channel terminated");
78                            break;
79                        }
80                    }
81
82                    break;
83                }
84            }
85        }
86    }
87}