async_task_ffi/
task.rs

1use core::fmt;
2use core::future::Future;
3use core::marker::{PhantomData, Unpin};
4use core::mem;
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::sync::atomic::Ordering;
8use core::task::{Context, Poll};
9
10use crate::header::Header;
11use crate::state::*;
12
13/// A spawned task.
14///
15/// A [`Task`] can be awaited to retrieve the output of its future.
16///
17/// Dropping a [`Task`] cancels it, which means its future won't be polled
18/// again. To drop the [`Task`] handle without canceling it, use
19/// [`detach()`][`Task::detach()`] instead. To cancel a task gracefully and wait
20/// until it is fully destroyed, use the [`cancel()`][Task::cancel()] method.
21///
22/// Note that canceling a task actually wakes it and reschedules one last time.
23/// Then, the executor can destroy the task by simply dropping its
24/// [`Runnable`][`super::Runnable`] or by invoking [`run()`][`super::Runnable::
25/// run()`].
26///
27/// # Examples
28///
29/// ```
30/// use smol::{future, Executor};
31/// use std::thread;
32///
33/// let ex = Executor::new();
34///
35/// // Spawn a future onto the executor.
36/// let task = ex.spawn(async {
37///     println!("Hello from a task!");
38///     1 + 2
39/// });
40///
41/// // Run an executor thread.
42/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
43///
44/// // Wait for the task's output.
45/// assert_eq!(future::block_on(task), 3);
46/// ```
47#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
48pub struct Task<T> {
49    /// A raw task pointer.
50    pub(crate) ptr: NonNull<()>,
51
52    /// A marker capturing generic type `T`.
53    pub(crate) _marker: PhantomData<T>,
54}
55
56unsafe impl<T: Send> Send for Task<T> {}
57unsafe impl<T> Sync for Task<T> {}
58
59impl<T> Unpin for Task<T> {}
60
61#[cfg(feature = "std")]
62impl<T> std::panic::UnwindSafe for Task<T> {}
63#[cfg(feature = "std")]
64impl<T> std::panic::RefUnwindSafe for Task<T> {}
65
66impl<T> Task<T> {
67    /// Detaches the task to let it keep running in the background.
68    ///
69    /// # Examples
70    ///
71    /// ```
72    /// use smol::{Executor, Timer};
73    /// use std::time::Duration;
74    ///
75    /// let ex = Executor::new();
76    ///
77    /// // Spawn a deamon future.
78    /// ex.spawn(async {
79    ///     loop {
80    ///         println!("I'm a daemon task looping forever.");
81    ///         Timer::after(Duration::from_secs(1)).await;
82    ///     }
83    /// })
84    /// .detach();
85    /// ```
86    pub fn detach(self) {
87        let mut this = self;
88        let _out = this.set_detached();
89        mem::forget(this);
90    }
91
92    /// Cancels the task and waits for it to stop running.
93    ///
94    /// Returns the task's output if it was completed just before it got
95    /// canceled, or [`None`] if it didn't complete.
96    ///
97    /// While it's possible to simply drop the [`Task`] to cancel it, this is a
98    /// cleaner way of canceling because it also waits for the task to stop
99    /// running.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use smol::{future, Executor, Timer};
105    /// use std::thread;
106    /// use std::time::Duration;
107    ///
108    /// let ex = Executor::new();
109    ///
110    /// // Spawn a deamon future.
111    /// let task = ex.spawn(async {
112    ///     loop {
113    ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
114    ///         Timer::after(Duration::from_secs(1)).await;
115    ///     }
116    /// });
117    ///
118    /// // Run an executor thread.
119    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
120    ///
121    /// future::block_on(async {
122    ///     Timer::after(Duration::from_secs(3)).await;
123    ///     task.cancel().await;
124    /// });
125    /// ```
126    pub async fn cancel(self) -> Option<T> {
127        let mut this = self;
128        this.set_canceled();
129
130        struct Fut<T>(Task<T>);
131
132        impl<T> Future for Fut<T> {
133            type Output = Option<T>;
134
135            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136                self.0.poll_task(cx)
137            }
138        }
139
140        Fut(this).await
141    }
142
143    /// Consumes the [`Task`], returning a pointer to the raw task.
144    ///
145    /// The raw pointer must eventually be converted back into a [`Task`]
146    /// by calling [`Task::from_raw`] in order to free up its resources.
147    pub fn into_raw(self) -> *mut () {
148        let ptr = self.ptr;
149        mem::forget(self);
150        ptr.as_ptr()
151    }
152
153    /// Constructs a [`Task`] from a raw task pointer.
154    ///
155    /// The raw pointer must have been previously returned by a call to
156    /// [`into_raw`].
157    ///
158    /// # Safety
159    ///
160    /// See above.
161    pub unsafe fn from_raw(ptr: *mut ()) -> Task<T> {
162        Task {
163            ptr: NonNull::new_unchecked(ptr),
164            _marker: Default::default(),
165        }
166    }
167
168    /// Puts the task in canceled state.
169    fn set_canceled(&mut self) {
170        let ptr = self.ptr.as_ptr();
171        let header = ptr as *const Header;
172
173        unsafe {
174            let mut state = (*header).state.load(Ordering::Acquire);
175
176            loop {
177                // If the task has been completed or closed, it can't be canceled.
178                if state & (COMPLETED | CLOSED) != 0 {
179                    break;
180                }
181
182                // If the task is not scheduled nor running, we'll need to schedule it.
183                let new = if state & (SCHEDULED | RUNNING) == 0 {
184                    (state | SCHEDULED | CLOSED) + REFERENCE
185                } else {
186                    state | CLOSED
187                };
188
189                // Mark the task as closed.
190                match (*header).state.compare_exchange_weak(
191                    state,
192                    new,
193                    Ordering::AcqRel,
194                    Ordering::Acquire,
195                ) {
196                    Ok(_) => {
197                        // If the task is not scheduled nor running, schedule it one more time so
198                        // that its future gets dropped by the executor.
199                        if state & (SCHEDULED | RUNNING) == 0 {
200                            ((*header).vtable.schedule)(ptr);
201                        }
202
203                        // Notify the awaiter that the task has been closed.
204                        if state & AWAITER != 0 {
205                            (*header).notify(None);
206                        }
207
208                        break;
209                    }
210                    Err(s) => state = s,
211                }
212            }
213        }
214    }
215
216    /// Puts the task in detached state.
217    fn set_detached(&mut self) -> Option<T> {
218        let ptr = self.ptr.as_ptr();
219        let header = ptr as *const Header;
220
221        unsafe {
222            // A place where the output will be stored in case it needs to be dropped.
223            let mut output = None;
224
225            // Optimistically assume the `Task` is being detached just after creating the
226            // task. This is a common case so if the `Task` is datached, the
227            // overhead of it is only one compare-exchange operation.
228            if let Err(mut state) = (*header).state.compare_exchange_weak(
229                SCHEDULED | TASK | REFERENCE,
230                SCHEDULED | REFERENCE,
231                Ordering::AcqRel,
232                Ordering::Acquire,
233            ) {
234                loop {
235                    // If the task has been completed but not yet closed, that means its output
236                    // must be dropped.
237                    if state & COMPLETED != 0 && state & CLOSED == 0 {
238                        // Mark the task as closed in order to grab its output.
239                        match (*header).state.compare_exchange_weak(
240                            state,
241                            state | CLOSED,
242                            Ordering::AcqRel,
243                            Ordering::Acquire,
244                        ) {
245                            Ok(_) => {
246                                // Read the output.
247                                output =
248                                    Some((((*header).vtable.get_output)(ptr) as *mut T).read());
249
250                                // Update the state variable because we're continuing the loop.
251                                state |= CLOSED;
252                            }
253                            Err(s) => state = s,
254                        }
255                    } else {
256                        // If this is the last reference to the task and it's not closed, then
257                        // close it and schedule one more time so that its future gets dropped by
258                        // the executor.
259                        let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
260                            SCHEDULED | CLOSED | REFERENCE
261                        } else {
262                            state & !TASK
263                        };
264
265                        // Unset the `TASK` flag.
266                        match (*header).state.compare_exchange_weak(
267                            state,
268                            new,
269                            Ordering::AcqRel,
270                            Ordering::Acquire,
271                        ) {
272                            Ok(_) => {
273                                // If this is the last reference to the task, we need to either
274                                // schedule dropping its future or destroy it.
275                                if state & !(REFERENCE - 1) == 0 {
276                                    if state & CLOSED == 0 {
277                                        ((*header).vtable.schedule)(ptr);
278                                    } else {
279                                        ((*header).vtable.destroy)(ptr);
280                                    }
281                                }
282
283                                break;
284                            }
285                            Err(s) => state = s,
286                        }
287                    }
288                }
289            }
290
291            output
292        }
293    }
294
295    /// Polls the task to retrieve its output.
296    ///
297    /// Returns `Some` if the task has completed or `None` if it was closed.
298    ///
299    /// A task becomes closed in the following cases:
300    ///
301    /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or
302    /// `Task::cancel()`. 2. Its output gets awaited by the `Task`.
303    /// 3. It panics while polling the future.
304    /// 4. It is completed and the `Task` gets dropped.
305    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
306        let ptr = self.ptr.as_ptr();
307        let header = ptr as *const Header;
308
309        unsafe {
310            let mut state = (*header).state.load(Ordering::Acquire);
311
312            loop {
313                // If the task has been closed, notify the awaiter and return `None`.
314                if state & CLOSED != 0 {
315                    // If the task is scheduled or running, we need to wait until its future is
316                    // dropped.
317                    if state & (SCHEDULED | RUNNING) != 0 {
318                        // Replace the waker with one associated with the current task.
319                        (*header).register(cx.waker());
320
321                        // Reload the state after registering. It is possible changes occurred just
322                        // before registration so we need to check for that.
323                        state = (*header).state.load(Ordering::Acquire);
324
325                        // If the task is still scheduled or running, we need to wait because its
326                        // future is not dropped yet.
327                        if state & (SCHEDULED | RUNNING) != 0 {
328                            return Poll::Pending;
329                        }
330                    }
331
332                    // Even though the awaiter is most likely the current task, it could also be
333                    // another task.
334                    (*header).notify(Some(cx.waker()));
335                    return Poll::Ready(None);
336                }
337
338                // If the task is not completed, register the current task.
339                if state & COMPLETED == 0 {
340                    // Replace the waker with one associated with the current task.
341                    (*header).register(cx.waker());
342
343                    // Reload the state after registering. It is possible that the task became
344                    // completed or closed just before registration so we need to check for that.
345                    state = (*header).state.load(Ordering::Acquire);
346
347                    // If the task has been closed, restart.
348                    if state & CLOSED != 0 {
349                        continue;
350                    }
351
352                    // If the task is still not completed, we're blocked on it.
353                    if state & COMPLETED == 0 {
354                        return Poll::Pending;
355                    }
356                }
357
358                // Since the task is now completed, mark it as closed in order to grab its
359                // output.
360                match (*header).state.compare_exchange(
361                    state,
362                    state | CLOSED,
363                    Ordering::AcqRel,
364                    Ordering::Acquire,
365                ) {
366                    Ok(_) => {
367                        // Notify the awaiter. Even though the awaiter is most likely the current
368                        // task, it could also be another task.
369                        if state & AWAITER != 0 {
370                            (*header).notify(Some(cx.waker()));
371                        }
372
373                        // Take the output from the task.
374                        let output = ((*header).vtable.get_output)(ptr) as *mut T;
375                        return Poll::Ready(Some(output.read()));
376                    }
377                    Err(s) => state = s,
378                }
379            }
380        }
381    }
382}
383
384impl<T> Drop for Task<T> {
385    fn drop(&mut self) {
386        self.set_canceled();
387        self.set_detached();
388    }
389}
390
391impl<T> Future for Task<T> {
392    type Output = T;
393
394    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
395        match self.poll_task(cx) {
396            Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
397            Poll::Pending => Poll::Pending,
398        }
399    }
400}
401
402impl<T> fmt::Debug for Task<T> {
403    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404        let ptr = self.ptr.as_ptr();
405        let header = ptr as *const Header;
406
407        f.debug_struct("Task")
408            .field("header", unsafe { &(*header) })
409            .finish()
410    }
411}