bevy_async_runner/
runner.rs1use std::future::Future;
2use bevy::prelude::{warn, trace, Commands, IntoSystem, Resource, SystemInput};
3use bevy::tasks::IoTaskPool;
4use maybe_sync::{MaybeSend, MaybeSync};
5use tokio::sync::mpsc::{unbounded_channel, UnboundedSender, UnboundedReceiver};
6use tokio::sync::mpsc::error::TryRecvError;
7
8type ExecuteSystemFn = Box<dyn FnOnce(&mut Commands) + Send + Sync>;
9
10#[derive(Resource)]
11pub struct AsyncRunner {
12 channel: (
13 UnboundedSender<ExecuteSystemFn>,
14 UnboundedReceiver<ExecuteSystemFn>
15 ),
16 #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-runtime-multi-thread")))]
17 pool: &'static IoTaskPool,
18 #[cfg(any(feature = "tokio-runtime-multi-thread", feature = "tokio-runtime"))]
19 pool: tokio::runtime::Runtime,
20}
21
22impl AsyncRunner {
23 pub fn new() -> AsyncRunner {
24
25 #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-runtime-multi-thread")))]
26 let pool = {
27 trace!("Fetching IoTaskPool runtime via IoTaskPool::get()");
28 IoTaskPool::get()
29 };
30 #[cfg(feature = "tokio-runtime-multi-thread")]
31 let pool= {
32 trace!("Starting tokio runtime via Builder::new_multi_thread()");
33 tokio::runtime::Builder::new_multi_thread()
34 .enable_all()
35 .build()
36 .unwrap()
37 };
38 #[cfg(all(not(feature = "tokio-runtime-multi-thread"), feature = "tokio-runtime"))]
39 let pool = {
40 trace!("Starting tokio runtime via Builder::new_current_thread()");
41 tokio::runtime::Builder::new_current_thread()
42 .enable_all()
43 .build()
44 .unwrap()
45 };
46
47 AsyncRunner {
48 channel: unbounded_channel(),
49 pool
50 }
51 }
52
53 pub fn schedule<
55 S: IntoSystem<I, (), M> + Send + Sync + 'static,
56 M: 'static,
57 I: SystemInput<Inner<'static>: Send + Sync> + Send + Sync + 'static
58 >(
59 &self,
60 task: impl Future<Output = I::Inner<'static>> + MaybeSend + MaybeSync + 'static,
61 system: S,
62 ) {
63 let task = self.pool.spawn((async |sender: UnboundedSender<ExecuteSystemFn>| {
64 let result = task.await;
65
66 let boxed_result = Box::new(result);
67
68 let execute = move |commands: &mut Commands| {
69 commands.run_system_cached_with(
70 system,
71 *boxed_result
72 );
73 };
74
75 sender.send(Box::new(execute)).unwrap();
76 })(self.channel.0.clone()));
77
78 #[cfg(all(not(feature = "tokio-runtime"), not(feature = "tokio-runtime-multi-thread")))]
80 task.detach();
81 }
82
83 pub fn run(&mut self, mut commands: Commands) {
85 loop {
86 match self.channel.1.try_recv() {
87 Ok(execute) => {
88 execute(&mut commands);
89 }
90 Err(e) => {
91 match e {
92 TryRecvError::Empty => {}
93 TryRecvError::Disconnected => {
94 warn!("AsyncRunner communication channel terminated");
95 break;
96 }
97 }
98
99 break;
100 }
101 }
102 }
103 }
104}