async_task_ffi/task.rs
1use core::fmt;
2use core::future::Future;
3use core::marker::{PhantomData, Unpin};
4use core::mem;
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::sync::atomic::Ordering;
8use core::task::{Context, Poll};
9
10use crate::header::Header;
11use crate::state::*;
12
13/// A spawned task.
14///
15/// A [`Task`] can be awaited to retrieve the output of its future.
16///
17/// Dropping a [`Task`] cancels it, which means its future won't be polled
18/// again. To drop the [`Task`] handle without canceling it, use
19/// [`detach()`][`Task::detach()`] instead. To cancel a task gracefully and wait
20/// until it is fully destroyed, use the [`cancel()`][Task::cancel()] method.
21///
22/// Note that canceling a task actually wakes it and reschedules one last time.
23/// Then, the executor can destroy the task by simply dropping its
24/// [`Runnable`][`super::Runnable`] or by invoking [`run()`][`super::Runnable::
25/// run()`].
26///
27/// # Examples
28///
29/// ```
30/// use smol::{future, Executor};
31/// use std::thread;
32///
33/// let ex = Executor::new();
34///
35/// // Spawn a future onto the executor.
36/// let task = ex.spawn(async {
37/// println!("Hello from a task!");
38/// 1 + 2
39/// });
40///
41/// // Run an executor thread.
42/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
43///
44/// // Wait for the task's output.
45/// assert_eq!(future::block_on(task), 3);
46/// ```
47#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
48pub struct Task<T> {
49 /// A raw task pointer.
50 pub(crate) ptr: NonNull<()>,
51
52 /// A marker capturing generic type `T`.
53 pub(crate) _marker: PhantomData<T>,
54}
55
56unsafe impl<T: Send> Send for Task<T> {}
57unsafe impl<T> Sync for Task<T> {}
58
59impl<T> Unpin for Task<T> {}
60
61#[cfg(feature = "std")]
62impl<T> std::panic::UnwindSafe for Task<T> {}
63#[cfg(feature = "std")]
64impl<T> std::panic::RefUnwindSafe for Task<T> {}
65
66impl<T> Task<T> {
67 /// Detaches the task to let it keep running in the background.
68 ///
69 /// # Examples
70 ///
71 /// ```
72 /// use smol::{Executor, Timer};
73 /// use std::time::Duration;
74 ///
75 /// let ex = Executor::new();
76 ///
77 /// // Spawn a deamon future.
78 /// ex.spawn(async {
79 /// loop {
80 /// println!("I'm a daemon task looping forever.");
81 /// Timer::after(Duration::from_secs(1)).await;
82 /// }
83 /// })
84 /// .detach();
85 /// ```
86 pub fn detach(self) {
87 let mut this = self;
88 let _out = this.set_detached();
89 mem::forget(this);
90 }
91
92 /// Cancels the task and waits for it to stop running.
93 ///
94 /// Returns the task's output if it was completed just before it got
95 /// canceled, or [`None`] if it didn't complete.
96 ///
97 /// While it's possible to simply drop the [`Task`] to cancel it, this is a
98 /// cleaner way of canceling because it also waits for the task to stop
99 /// running.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// use smol::{future, Executor, Timer};
105 /// use std::thread;
106 /// use std::time::Duration;
107 ///
108 /// let ex = Executor::new();
109 ///
110 /// // Spawn a deamon future.
111 /// let task = ex.spawn(async {
112 /// loop {
113 /// println!("Even though I'm in an infinite loop, you can still cancel me!");
114 /// Timer::after(Duration::from_secs(1)).await;
115 /// }
116 /// });
117 ///
118 /// // Run an executor thread.
119 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
120 ///
121 /// future::block_on(async {
122 /// Timer::after(Duration::from_secs(3)).await;
123 /// task.cancel().await;
124 /// });
125 /// ```
126 pub async fn cancel(self) -> Option<T> {
127 let mut this = self;
128 this.set_canceled();
129
130 struct Fut<T>(Task<T>);
131
132 impl<T> Future for Fut<T> {
133 type Output = Option<T>;
134
135 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136 self.0.poll_task(cx)
137 }
138 }
139
140 Fut(this).await
141 }
142
143 /// Consumes the [`Task`], returning a pointer to the raw task.
144 ///
145 /// The raw pointer must eventually be converted back into a [`Task`]
146 /// by calling [`Task::from_raw`] in order to free up its resources.
147 pub fn into_raw(self) -> *mut () {
148 let ptr = self.ptr;
149 mem::forget(self);
150 ptr.as_ptr()
151 }
152
153 /// Constructs a [`Task`] from a raw task pointer.
154 ///
155 /// The raw pointer must have been previously returned by a call to
156 /// [`into_raw`].
157 ///
158 /// # Safety
159 ///
160 /// See above.
161 pub unsafe fn from_raw(ptr: *mut ()) -> Task<T> {
162 Task {
163 ptr: NonNull::new_unchecked(ptr),
164 _marker: Default::default(),
165 }
166 }
167
168 /// Puts the task in canceled state.
169 fn set_canceled(&mut self) {
170 let ptr = self.ptr.as_ptr();
171 let header = ptr as *const Header;
172
173 unsafe {
174 let mut state = (*header).state.load(Ordering::Acquire);
175
176 loop {
177 // If the task has been completed or closed, it can't be canceled.
178 if state & (COMPLETED | CLOSED) != 0 {
179 break;
180 }
181
182 // If the task is not scheduled nor running, we'll need to schedule it.
183 let new = if state & (SCHEDULED | RUNNING) == 0 {
184 (state | SCHEDULED | CLOSED) + REFERENCE
185 } else {
186 state | CLOSED
187 };
188
189 // Mark the task as closed.
190 match (*header).state.compare_exchange_weak(
191 state,
192 new,
193 Ordering::AcqRel,
194 Ordering::Acquire,
195 ) {
196 Ok(_) => {
197 // If the task is not scheduled nor running, schedule it one more time so
198 // that its future gets dropped by the executor.
199 if state & (SCHEDULED | RUNNING) == 0 {
200 ((*header).vtable.schedule)(ptr);
201 }
202
203 // Notify the awaiter that the task has been closed.
204 if state & AWAITER != 0 {
205 (*header).notify(None);
206 }
207
208 break;
209 }
210 Err(s) => state = s,
211 }
212 }
213 }
214 }
215
216 /// Puts the task in detached state.
217 fn set_detached(&mut self) -> Option<T> {
218 let ptr = self.ptr.as_ptr();
219 let header = ptr as *const Header;
220
221 unsafe {
222 // A place where the output will be stored in case it needs to be dropped.
223 let mut output = None;
224
225 // Optimistically assume the `Task` is being detached just after creating the
226 // task. This is a common case so if the `Task` is datached, the
227 // overhead of it is only one compare-exchange operation.
228 if let Err(mut state) = (*header).state.compare_exchange_weak(
229 SCHEDULED | TASK | REFERENCE,
230 SCHEDULED | REFERENCE,
231 Ordering::AcqRel,
232 Ordering::Acquire,
233 ) {
234 loop {
235 // If the task has been completed but not yet closed, that means its output
236 // must be dropped.
237 if state & COMPLETED != 0 && state & CLOSED == 0 {
238 // Mark the task as closed in order to grab its output.
239 match (*header).state.compare_exchange_weak(
240 state,
241 state | CLOSED,
242 Ordering::AcqRel,
243 Ordering::Acquire,
244 ) {
245 Ok(_) => {
246 // Read the output.
247 output =
248 Some((((*header).vtable.get_output)(ptr) as *mut T).read());
249
250 // Update the state variable because we're continuing the loop.
251 state |= CLOSED;
252 }
253 Err(s) => state = s,
254 }
255 } else {
256 // If this is the last reference to the task and it's not closed, then
257 // close it and schedule one more time so that its future gets dropped by
258 // the executor.
259 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
260 SCHEDULED | CLOSED | REFERENCE
261 } else {
262 state & !TASK
263 };
264
265 // Unset the `TASK` flag.
266 match (*header).state.compare_exchange_weak(
267 state,
268 new,
269 Ordering::AcqRel,
270 Ordering::Acquire,
271 ) {
272 Ok(_) => {
273 // If this is the last reference to the task, we need to either
274 // schedule dropping its future or destroy it.
275 if state & !(REFERENCE - 1) == 0 {
276 if state & CLOSED == 0 {
277 ((*header).vtable.schedule)(ptr);
278 } else {
279 ((*header).vtable.destroy)(ptr);
280 }
281 }
282
283 break;
284 }
285 Err(s) => state = s,
286 }
287 }
288 }
289 }
290
291 output
292 }
293 }
294
295 /// Polls the task to retrieve its output.
296 ///
297 /// Returns `Some` if the task has completed or `None` if it was closed.
298 ///
299 /// A task becomes closed in the following cases:
300 ///
301 /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or
302 /// `Task::cancel()`. 2. Its output gets awaited by the `Task`.
303 /// 3. It panics while polling the future.
304 /// 4. It is completed and the `Task` gets dropped.
305 fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
306 let ptr = self.ptr.as_ptr();
307 let header = ptr as *const Header;
308
309 unsafe {
310 let mut state = (*header).state.load(Ordering::Acquire);
311
312 loop {
313 // If the task has been closed, notify the awaiter and return `None`.
314 if state & CLOSED != 0 {
315 // If the task is scheduled or running, we need to wait until its future is
316 // dropped.
317 if state & (SCHEDULED | RUNNING) != 0 {
318 // Replace the waker with one associated with the current task.
319 (*header).register(cx.waker());
320
321 // Reload the state after registering. It is possible changes occurred just
322 // before registration so we need to check for that.
323 state = (*header).state.load(Ordering::Acquire);
324
325 // If the task is still scheduled or running, we need to wait because its
326 // future is not dropped yet.
327 if state & (SCHEDULED | RUNNING) != 0 {
328 return Poll::Pending;
329 }
330 }
331
332 // Even though the awaiter is most likely the current task, it could also be
333 // another task.
334 (*header).notify(Some(cx.waker()));
335 return Poll::Ready(None);
336 }
337
338 // If the task is not completed, register the current task.
339 if state & COMPLETED == 0 {
340 // Replace the waker with one associated with the current task.
341 (*header).register(cx.waker());
342
343 // Reload the state after registering. It is possible that the task became
344 // completed or closed just before registration so we need to check for that.
345 state = (*header).state.load(Ordering::Acquire);
346
347 // If the task has been closed, restart.
348 if state & CLOSED != 0 {
349 continue;
350 }
351
352 // If the task is still not completed, we're blocked on it.
353 if state & COMPLETED == 0 {
354 return Poll::Pending;
355 }
356 }
357
358 // Since the task is now completed, mark it as closed in order to grab its
359 // output.
360 match (*header).state.compare_exchange(
361 state,
362 state | CLOSED,
363 Ordering::AcqRel,
364 Ordering::Acquire,
365 ) {
366 Ok(_) => {
367 // Notify the awaiter. Even though the awaiter is most likely the current
368 // task, it could also be another task.
369 if state & AWAITER != 0 {
370 (*header).notify(Some(cx.waker()));
371 }
372
373 // Take the output from the task.
374 let output = ((*header).vtable.get_output)(ptr) as *mut T;
375 return Poll::Ready(Some(output.read()));
376 }
377 Err(s) => state = s,
378 }
379 }
380 }
381 }
382}
383
384impl<T> Drop for Task<T> {
385 fn drop(&mut self) {
386 self.set_canceled();
387 self.set_detached();
388 }
389}
390
391impl<T> Future for Task<T> {
392 type Output = T;
393
394 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
395 match self.poll_task(cx) {
396 Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
397 Poll::Pending => Poll::Pending,
398 }
399 }
400}
401
402impl<T> fmt::Debug for Task<T> {
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 let ptr = self.ptr.as_ptr();
405 let header = ptr as *const Header;
406
407 f.debug_struct("Task")
408 .field("header", unsafe { &(*header) })
409 .finish()
410 }
411}