bevy_async_task/
task_runner.rs

1use crate::{AsyncReceiver, AsyncTask, TimedAsyncTask, TimeoutError};
2use bevy_ecs::{
3    component::Tick,
4    system::{ExclusiveSystemParam, ReadOnlySystemParam, SystemMeta, SystemParam},
5    world::{World, unsafe_world_cell::UnsafeWorldCell},
6};
7use bevy_tasks::{AsyncComputeTaskPool, ConditionalSend};
8use bevy_utils::synccell::SyncCell;
9use std::{
10    ops::{Deref, DerefMut},
11    sync::atomic::Ordering,
12    task::Poll,
13};
14
15/// A Bevy [`SystemParam`] to execute async tasks in the background.
16#[derive(Debug)]
17pub struct TaskRunner<'s, T>(pub(crate) &'s mut Option<AsyncReceiver<T>>);
18
19impl<T> Deref for TaskRunner<'_, T> {
20    type Target = Option<AsyncReceiver<T>>;
21
22    fn deref(&self) -> &Self::Target {
23        self.0
24    }
25}
26impl<T> DerefMut for TaskRunner<'_, T> {
27    fn deref_mut(&mut self) -> &mut Self::Target {
28        self.0
29    }
30}
31
32impl<T> TaskRunner<'_, T>
33where
34    T: ConditionalSend + 'static,
35{
36    /// Returns whether the task runner is idle.
37    pub fn is_idle(&self) -> bool {
38        self.is_none()
39    }
40
41    /// Returns whether the task runner is pending (running, but not finished).
42    pub fn is_pending(&self) -> bool {
43        if let Some(rx) = &self.0 {
44            !rx.received.load(Ordering::Relaxed)
45        } else {
46            false
47        }
48    }
49
50    /// Returns whether the task runner is finished.
51    pub fn is_finished(&self) -> bool {
52        if let Some(rx) = &self.0 {
53            rx.received.load(Ordering::Relaxed)
54        } else {
55            false
56        }
57    }
58
59    /// Start an async task in the background. If there is an existing task
60    /// pending, it will be dropped and replaced with the given task. If you
61    /// need to run multiple tasks, use the [`TaskPool`](crate::TaskPool).
62    pub fn start(&mut self, task: impl Into<AsyncTask<T>>) {
63        let task = task.into();
64        let (fut, rx) = task.split();
65        let task_pool = AsyncComputeTaskPool::get();
66        let handle = task_pool.spawn(fut);
67        handle.detach();
68        self.0.replace(rx);
69    }
70
71    /// Poll the task runner for the current task status. Possible returns are `Pending` or
72    /// `Ready(T)`.
73    pub fn poll(&mut self) -> Poll<T> {
74        match self.0.as_mut() {
75            Some(rx) => match rx.try_recv() {
76                Some(v) => {
77                    self.0.take();
78                    Poll::Ready(v)
79                }
80                None => Poll::Pending,
81            },
82            None => Poll::Pending,
83        }
84    }
85}
86
87impl<T: Send + 'static> ExclusiveSystemParam for TaskRunner<'_, T> {
88    type State = SyncCell<Option<AsyncReceiver<T>>>;
89    type Item<'s> = TaskRunner<'s, T>;
90
91    fn init(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
92        SyncCell::new(None)
93    }
94
95    fn get_param<'s>(state: &'s mut Self::State, _system_meta: &SystemMeta) -> Self::Item<'s> {
96        TaskRunner(state.get())
97    }
98}
99// SAFETY: only local state is accessed
100unsafe impl<T: Send + 'static> ReadOnlySystemParam for TaskRunner<'_, T> {}
101// SAFETY: only local state is accessed
102unsafe impl<T: Send + 'static> SystemParam for TaskRunner<'_, T> {
103    type State = SyncCell<Option<AsyncReceiver<T>>>;
104    type Item<'w, 's> = TaskRunner<'s, T>;
105
106    fn init_state(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
107        SyncCell::new(None)
108    }
109
110    #[inline]
111    unsafe fn get_param<'w, 's>(
112        state: &'s mut Self::State,
113        _system_meta: &SystemMeta,
114        _world: UnsafeWorldCell<'w>,
115        _change_tick: Tick,
116    ) -> Self::Item<'w, 's> {
117        TaskRunner(state.get())
118    }
119}
120
121/// A Bevy [`SystemParam`] to execute async tasks in the background with a timeout.
122#[derive(Debug)]
123pub struct TimedTaskRunner<'s, T>(
124    pub(crate) &'s mut Option<AsyncReceiver<Result<T, TimeoutError>>>,
125);
126
127impl<T> Deref for TimedTaskRunner<'_, T> {
128    type Target = Option<AsyncReceiver<Result<T, TimeoutError>>>;
129
130    fn deref(&self) -> &Self::Target {
131        self.0
132    }
133}
134impl<T> DerefMut for TimedTaskRunner<'_, T> {
135    fn deref_mut(&mut self) -> &mut Self::Target {
136        self.0
137    }
138}
139
140impl<T> TimedTaskRunner<'_, T>
141where
142    T: ConditionalSend + 'static,
143{
144    /// Returns whether the task runner is idle.
145    pub fn is_idle(&self) -> bool {
146        self.is_none()
147    }
148
149    /// Returns whether the task runner is pending (running, but not finished).
150    pub fn is_pending(&self) -> bool {
151        if let Some(rx) = &self.0 {
152            !rx.received.load(Ordering::Relaxed)
153        } else {
154            false
155        }
156    }
157
158    /// Returns whether the task runner is finished.
159    pub fn is_finished(&self) -> bool {
160        if let Some(rx) = &self.0 {
161            rx.received.load(Ordering::Relaxed)
162        } else {
163            false
164        }
165    }
166
167    /// Start an async task in the background. If there is an existing task
168    /// pending, it will be dropped and replaced with the given task. If you
169    /// need to run multiple tasks, use the [`TimedTaskPool`](crate::TimedTaskPool).
170    pub fn start(&mut self, task: impl Into<TimedAsyncTask<T>>) {
171        let task = task.into();
172        let (fut, rx) = task.split();
173        let task_pool = AsyncComputeTaskPool::get();
174        let handle = task_pool.spawn(fut);
175        handle.detach();
176        self.0.replace(rx);
177    }
178
179    /// Poll the task runner for the current task status. Possible returns are `Pending` or
180    /// `Ready(T)`.
181    pub fn poll(&mut self) -> Poll<Result<T, TimeoutError>> {
182        match self.0.as_mut() {
183            Some(rx) => match rx.try_recv() {
184                Some(v) => {
185                    self.0.take();
186                    Poll::Ready(v)
187                }
188                None => Poll::Pending,
189            },
190            None => Poll::Pending,
191        }
192    }
193}
194
195impl<T: Send + 'static> ExclusiveSystemParam for TimedTaskRunner<'_, T> {
196    type State = SyncCell<Option<AsyncReceiver<Result<T, TimeoutError>>>>;
197    type Item<'s> = TimedTaskRunner<'s, T>;
198
199    fn init(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
200        SyncCell::new(None)
201    }
202
203    fn get_param<'s>(state: &'s mut Self::State, _system_meta: &SystemMeta) -> Self::Item<'s> {
204        TimedTaskRunner(state.get())
205    }
206}
207// SAFETY: only local state is accessed
208unsafe impl<T: Send + 'static> ReadOnlySystemParam for TimedTaskRunner<'_, T> {}
209// SAFETY: only local state is accessed
210unsafe impl<T: Send + 'static> SystemParam for TimedTaskRunner<'_, T> {
211    type State = SyncCell<Option<AsyncReceiver<Result<T, TimeoutError>>>>;
212    type Item<'w, 's> = TimedTaskRunner<'s, T>;
213
214    fn init_state(_world: &mut World, _system_meta: &mut SystemMeta) -> Self::State {
215        SyncCell::new(None)
216    }
217
218    #[inline]
219    unsafe fn get_param<'w, 's>(
220        state: &'s mut Self::State,
221        _system_meta: &SystemMeta,
222        _world: UnsafeWorldCell<'w>,
223        _change_tick: Tick,
224    ) -> Self::Item<'w, 's> {
225        TimedTaskRunner(state.get())
226    }
227}