bevy_async_task/
task_runner.rs1use crate::{AsyncReceiver, AsyncTask, TimedAsyncTask, TimeoutError};
2use bevy_ecs::{
3 component::Tick,
4 system::{ExclusiveSystemParam, ReadOnlySystemParam, SystemMeta, SystemParam},
5 world::{World, unsafe_world_cell::UnsafeWorldCell},
6};
7use bevy_tasks::{AsyncComputeTaskPool, ConditionalSend};
8use bevy_utils::synccell::SyncCell;
9use std::{
10 ops::{Deref, DerefMut},
11 sync::atomic::Ordering,
12 task::Poll,
13};
14
15#[derive(Debug)]
17pub struct TaskRunner<'s, T>(pub(crate) &'s mut Option<AsyncReceiver<T>>);
18
19impl<T> Deref for TaskRunner<'_, T> {
20 type Target = Option<AsyncReceiver<T>>;
21
22 fn deref(&self) -> &Self::Target {
23 self.0
24 }
25}
26impl<T> DerefMut for TaskRunner<'_, T> {
27 fn deref_mut(&mut self) -> &mut Self::Target {
28 self.0
29 }
30}
31
32impl<T> TaskRunner<'_, T>
33where
34 T: ConditionalSend + 'static,
35{
36 pub fn is_idle(&self) -> bool {
38 self.is_none()
39 }
40
41 pub fn is_pending(&self) -> bool {
43 if let Some(rx) = &self.0 {
44 !rx.received.load(Ordering::Relaxed)
45 } else {
46 false
47 }
48 }
49
50 pub fn is_finished(&self) -> bool {
52 if let Some(rx) = &self.0 {
53 rx.received.load(Ordering::Relaxed)
54 } else {
55 false
56 }
57 }
58
59 pub fn start(&mut self, task: impl Into<AsyncTask<T>>) {
63 let task = task.into();
64 let (fut, rx) = task.split();
65 let task_pool = AsyncComputeTaskPool::get();
66 let handle = task_pool.spawn(fut);
67 handle.detach();
68 self.0.replace(rx);
69 }
70
71 pub fn poll(&mut self) -> Poll<T> {
74 match self.0.as_mut() {
75 Some(rx) => match rx.try_recv() {
76 Some(v) => {
77 self.0.take();
78 Poll::Ready(v)
79 }
80 None => Poll::Pending,
81 },
82 None => Poll::Pending,
83 }
84 }
85}
86
87impl<T: Send + 'static> ExclusiveSystemParam for TaskRunner<'_, T> {
88 type State = SyncCell<Option<AsyncReceiver<T>>>;
89 type Item<'s> = TaskRunner<'s, T>;
90
91 fn init(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
92 SyncCell::new(None)
93 }
94
95 fn get_param<'s>(state: &'s mut Self::State, _system_meta: &SystemMeta) -> Self::Item<'s> {
96 TaskRunner(state.get())
97 }
98}
99unsafe impl<T: Send + 'static> ReadOnlySystemParam for TaskRunner<'_, T> {}
101unsafe impl<T: Send + 'static> SystemParam for TaskRunner<'_, T> {
103 type State = SyncCell<Option<AsyncReceiver<T>>>;
104 type Item<'w, 's> = TaskRunner<'s, T>;
105
106 fn init_state(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
107 SyncCell::new(None)
108 }
109
110 #[inline]
111 unsafe fn get_param<'w, 's>(
112 state: &'s mut Self::State,
113 _system_meta: &SystemMeta,
114 _world: UnsafeWorldCell<'w>,
115 _change_tick: Tick,
116 ) -> Self::Item<'w, 's> {
117 TaskRunner(state.get())
118 }
119}
120
121#[derive(Debug)]
123pub struct TimedTaskRunner<'s, T>(
124 pub(crate) &'s mut Option<AsyncReceiver<Result<T, TimeoutError>>>,
125);
126
127impl<T> Deref for TimedTaskRunner<'_, T> {
128 type Target = Option<AsyncReceiver<Result<T, TimeoutError>>>;
129
130 fn deref(&self) -> &Self::Target {
131 self.0
132 }
133}
134impl<T> DerefMut for TimedTaskRunner<'_, T> {
135 fn deref_mut(&mut self) -> &mut Self::Target {
136 self.0
137 }
138}
139
140impl<T> TimedTaskRunner<'_, T>
141where
142 T: ConditionalSend + 'static,
143{
144 pub fn is_idle(&self) -> bool {
146 self.is_none()
147 }
148
149 pub fn is_pending(&self) -> bool {
151 if let Some(rx) = &self.0 {
152 !rx.received.load(Ordering::Relaxed)
153 } else {
154 false
155 }
156 }
157
158 pub fn is_finished(&self) -> bool {
160 if let Some(rx) = &self.0 {
161 rx.received.load(Ordering::Relaxed)
162 } else {
163 false
164 }
165 }
166
167 pub fn start(&mut self, task: impl Into<TimedAsyncTask<T>>) {
171 let task = task.into();
172 let (fut, rx) = task.split();
173 let task_pool = AsyncComputeTaskPool::get();
174 let handle = task_pool.spawn(fut);
175 handle.detach();
176 self.0.replace(rx);
177 }
178
179 pub fn poll(&mut self) -> Poll<Result<T, TimeoutError>> {
182 match self.0.as_mut() {
183 Some(rx) => match rx.try_recv() {
184 Some(v) => {
185 self.0.take();
186 Poll::Ready(v)
187 }
188 None => Poll::Pending,
189 },
190 None => Poll::Pending,
191 }
192 }
193}
194
195impl<T: Send + 'static> ExclusiveSystemParam for TimedTaskRunner<'_, T> {
196 type State = SyncCell<Option<AsyncReceiver<Result<T, TimeoutError>>>>;
197 type Item<'s> = TimedTaskRunner<'s, T>;
198
199 fn init(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
200 SyncCell::new(None)
201 }
202
203 fn get_param<'s>(state: &'s mut Self::State, _system_meta: &SystemMeta) -> Self::Item<'s> {
204 TimedTaskRunner(state.get())
205 }
206}
207unsafe impl<T: Send + 'static> ReadOnlySystemParam for TimedTaskRunner<'_, T> {}
209unsafe impl<T: Send + 'static> SystemParam for TimedTaskRunner<'_, T> {
211 type State = SyncCell<Option<AsyncReceiver<Result<T, TimeoutError>>>>;
212 type Item<'w, 's> = TimedTaskRunner<'s, T>;
213
214 fn init_state(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
215 SyncCell::new(None)
216 }
217
218 #[inline]
219 unsafe fn get_param<'w, 's>(
220 state: &'s mut Self::State,
221 _system_meta: &SystemMeta,
222 _world: UnsafeWorldCell<'w>,
223 _change_tick: Tick,
224 ) -> Self::Item<'w, 's> {
225 TimedTaskRunner(state.get())
226 }
227}