async_task_ffi/
runnable.rs

1use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem;
5use core::ptr::NonNull;
6use core::sync::atomic::Ordering;
7use core::task::Waker;
8
9use crate::header::Header;
10use crate::raw::RawTask;
11use crate::state::*;
12use crate::Task;
13
14#[cfg(feature = "std")]
15use crate::utils::checked::{Checked, CheckedFuture};
16
17/// Creates a new task.
18///
19/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is
20/// used to await its output.
21///
22/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the
23/// [`Runnable`] vanishes and only reappears when its [`Waker`] wakes the task,
24/// thus scheduling it to be run again.
25///
26/// When the task is woken, its [`Runnable`] is passed to the `schedule`
27/// function. The `schedule` function should not attempt to run the [`Runnable`]
28/// nor to drop it. Instead, it should push it into a task queue so that it can
29/// be processed later.
30///
31/// If you need to spawn a future that does not implement [`Send`] or isn't
32/// `'static`, consider using [`spawn_local()`] or [`spawn_unchecked()`]
33/// instead.
34///
35/// If you need to attach arbitrary data to the task, consider using
36/// [`spawn_with()`].
37///
38/// # Examples
39///
40/// ```
41/// // The future inside the task.
42/// let future = async {
43///     println!("Hello, world!");
44/// };
45///
46/// // A function that schedules the task when it gets woken up.
47/// let (s, r) = flume::unbounded();
48/// let schedule = move |runnable| s.send(runnable).unwrap();
49///
50/// // Create a task with the future and the schedule function.
51/// let (runnable, task) = async_task_ffi::spawn(future, schedule);
52/// ```
53pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
54where
55    F: Future + Send + 'static,
56    F::Output: Send + 'static,
57    S: Fn(Runnable) + Send + Sync + 'static,
58{
59    spawn_with(future, schedule, ())
60}
61
62/// Creates a new task with associated data.
63///
64/// This function is the same as [`spawn()`], except it makes it possible to
65/// attach arbitrary data to the task. This makes it possible to benefit from
66/// the single allocation design of `async_task_ffi` without having to write a
67/// specialized implementation.
68///
69/// The data can be accessed using [`Runnable::data`] or [`Runnable::data_mut`].
70///
71/// # Examples
72///
73/// ```
74/// use async_task_ffi::Runnable;
75///
76/// // The future inside the task.
77/// let future = async {
78///     println!("Hello, world!");
79/// };
80///
81/// // A function that schedules the task when it gets woken up
82/// // and counts the amount of times it has been scheduled.
83/// let (s, r) = flume::unbounded();
84/// let schedule = move |mut runnable: Runnable<usize>| {
85///     *runnable.data_mut() += 1;
86///     s.send(runnable).unwrap();
87/// };
88///
89/// // Create a task with the future, the schedule function
90/// // and the initial data.
91/// let (runnable, task) = async_task_ffi::spawn_with(future, schedule, 0);
92/// ```
93pub fn spawn_with<F, S, D>(future: F, schedule: S, data: D) -> (Runnable<D>, Task<F::Output>)
94where
95    F: Future + Send + 'static,
96    F::Output: Send + 'static,
97    S: Fn(Runnable<D>) + Send + Sync + 'static,
98    D: Send + 'static,
99{
100    unsafe { spawn_unchecked_with(future, schedule, data) }
101}
102
103/// Creates a new thread-local task.
104///
105/// This function is same as [`spawn()`], except it does not require [`Send`] on
106/// `future`. If the [`Runnable`] is used or dropped on another thread, a panic
107/// will occur.
108///
109/// This function is only available when the `std` feature for this crate is
110/// enabled.
111///
112/// # Examples
113///
114/// ```
115/// use async_task_ffi::Runnable;
116/// use flume::{Receiver, Sender};
117/// use std::rc::Rc;
118///
119/// thread_local! {
120///     // A queue that holds scheduled tasks.
121///     static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
122/// }
123///
124/// // Make a non-Send future.
125/// let msg: Rc<str> = "Hello, world!".into();
126/// let future = async move {
127///     println!("{}", msg);
128/// };
129///
130/// // A function that schedules the task when it gets woken up.
131/// let s = QUEUE.with(|(s, _)| s.clone());
132/// let schedule = move |runnable| s.send(runnable).unwrap();
133///
134/// // Create a task with the future and the schedule function.
135/// let (runnable, task) = async_task_ffi::spawn_local(future, schedule);
136/// ```
137#[cfg(feature = "std")]
138pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
139where
140    F: Future + 'static,
141    F::Output: 'static,
142    S: Fn(Runnable) + Send + Sync + 'static,
143{
144    // Wrap the future into one that checks which thread it's on.
145    let future = CheckedFuture::new(future);
146
147    unsafe { spawn_unchecked(future, schedule) }
148}
149
150/// Creates a new thread-local task with associated data.
151///
152/// This function is a combination of [`spawn_local()`] and [`spawn_with()`],
153/// except it does not require [`Send`] on `data`. The data is wrapped in a
154/// type that implements [`Deref`][std::ops::Deref] and
155/// [`DerefMut`][std::opts::DerefMut] and panics if used from another thread.
156///
157/// This function is only available when the `std` feature for this crate is
158/// enabled.
159#[cfg(feature = "std")]
160pub fn spawn_local_with<F, S, D>(
161    future: F,
162    schedule: S,
163    data: D,
164) -> (Runnable<Checked<D>>, Task<F::Output>)
165where
166    F: Future + 'static,
167    F::Output: 'static,
168    S: Fn(Runnable<Checked<D>>) + Send + Sync + 'static,
169    D: 'static,
170{
171    // Wrap the future into one that checks which thread it's on.
172    let future = CheckedFuture::new(future);
173
174    // Wrap the data the same way
175    let data = Checked::new(data);
176
177    unsafe { spawn_unchecked_with(future, schedule, data) }
178}
179
180/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
181///
182/// This function is same as [`spawn()`], except it does not require [`Send`],
183/// [`Sync`], and `'static` on `future` and `schedule`.
184///
185/// # Safety
186///
187/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on
188///   the original thread.
189/// - If `future` is not `'static`, borrowed variables must outlive its
190///   [`Runnable`].
191/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be
192///   used and dropped on the original thread.
193/// - If `schedule` is not `'static`, borrowed variables must outlive the task's
194///   [`Waker`].
195///
196/// # Examples
197///
198/// ```
199/// // The future inside the task.
200/// let future = async {
201///     println!("Hello, world!");
202/// };
203///
204/// // If the task gets woken up, it will be sent into this channel.
205/// let (s, r) = flume::unbounded();
206/// let schedule = move |runnable| s.send(runnable).unwrap();
207///
208/// // Create a task with the future and the schedule function.
209/// let (runnable, task) = unsafe { async_task_ffi::spawn_unchecked(future, schedule) };
210/// ```
211pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
212where
213    F: Future,
214    S: Fn(Runnable),
215{
216    spawn_unchecked_with(future, schedule, ())
217}
218
219/// Creates a new task with associated data and without [`Send`], [`Sync`], and
220/// `'static` bounds.
221///
222/// This function is a combination of [`spawn_unchecked()`] and
223/// [`spawn_with()`], except it does not require [`Send`] and `'static` on
224/// `data`.
225///
226/// # Safety
227///
228/// - All of the requirements from [`spawn_unchecked`].
229/// - If `data` is not [`Send`], it must be used and dropped on the original
230///   thread.
231/// - If `data` is not `'static`, borrowed variables must outlive its
232///   [`Runnable`].
233pub unsafe fn spawn_unchecked_with<F, S, D>(
234    future: F,
235    schedule: S,
236    data: D,
237) -> (Runnable<D>, Task<F::Output>)
238where
239    F: Future,
240    S: Fn(Runnable<D>),
241{
242    // Allocate large futures on the heap.
243    let ptr = if mem::size_of::<F>() >= 2048 {
244        let future = alloc::boxed::Box::pin(future);
245        RawTask::<_, F::Output, S, D>::allocate(future, schedule, data)
246    } else {
247        RawTask::<F, F::Output, S, D>::allocate(future, schedule, data)
248    };
249
250    let runnable = Runnable {
251        ptr,
252        _marker: Default::default(),
253    };
254    let task = Task {
255        ptr,
256        _marker: PhantomData,
257    };
258    (runnable, task)
259}
260
261/// A handle to a runnable task.
262///
263/// Every spawned task has a single [`Runnable`] handle, which only exists when
264/// the task is scheduled for running.
265///
266/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the
267/// [`Runnable`] vanishes and only reappears when its [`Waker`] wakes the task,
268/// thus scheduling it to be run again.
269///
270/// Dropping a [`Runnable`] cancels the task, which means its future won't be
271/// polled again, and awaiting the [`Task`] after that will result in a panic.
272///
273/// # Examples
274///
275/// ```
276/// use async_task_ffi::Runnable;
277/// use once_cell::sync::Lazy;
278/// use std::{panic, thread};
279///
280/// // A simple executor.
281/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
282///     let (sender, receiver) = flume::unbounded::<Runnable>();
283///     thread::spawn(|| {
284///         for runnable in receiver {
285///             let _ignore_panic = panic::catch_unwind(|| runnable.run());
286///         }
287///     });
288///     sender
289/// });
290///
291/// // Create a task with a simple future.
292/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
293/// let (runnable, task) = async_task_ffi::spawn(async { 1 + 2 }, schedule);
294///
295/// // Schedule the task and await its output.
296/// runnable.schedule();
297/// assert_eq!(smol::future::block_on(task), 3);
298/// ```
299pub struct Runnable<D = ()> {
300    /// A pointer to the heap-allocated task.
301    pub(crate) ptr: NonNull<()>,
302
303    /// A marker capturing generic type `D`.
304    pub(crate) _marker: PhantomData<D>,
305}
306
307unsafe impl<D: Sync> Send for Runnable<D> {}
308unsafe impl<D: Sync> Sync for Runnable<D> {}
309
310#[cfg(feature = "std")]
311impl<D: std::panic::RefUnwindSafe> std::panic::UnwindSafe for Runnable<D> {}
312#[cfg(feature = "std")]
313impl<D: std::panic::RefUnwindSafe> std::panic::RefUnwindSafe for Runnable<D> {}
314
315impl<D> Runnable<D> {
316    /// Schedules the task.
317    ///
318    /// This is a convenience method that passes the [`Runnable`] to the
319    /// schedule function.
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// // A function that schedules the task when it gets woken up.
325    /// let (s, r) = flume::unbounded();
326    /// let schedule = move |runnable| s.send(runnable).unwrap();
327    ///
328    /// // Create a task with a simple future and the schedule function.
329    /// let (runnable, task) = async_task_ffi::spawn(async {}, schedule);
330    ///
331    /// // Schedule the task.
332    /// assert_eq!(r.len(), 0);
333    /// runnable.schedule();
334    /// assert_eq!(r.len(), 1);
335    /// ```
336    pub fn schedule(self) {
337        let ptr = self.ptr.as_ptr();
338        let header = ptr as *const Header;
339        mem::forget(self);
340
341        unsafe {
342            ((*header).vtable.schedule)(ptr);
343        }
344    }
345
346    /// Runs the task by polling its future.
347    ///
348    /// Returns `true` if the task was woken while running, in which case the
349    /// [`Runnable`] gets rescheduled at the end of this method invocation.
350    /// Otherwise, returns `false` and the [`Runnable`] vanishes until the
351    /// task is woken. The return value is just a hint: `true` usually
352    /// indicates that the task has yielded, i.e. it woke itself and then
353    /// gave the control back to the executor.
354    ///
355    /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`]
356    /// was called, then this method simply destroys the task.
357    ///
358    /// If the polled future panics, this method propagates the panic, and
359    /// awaiting the [`Task`] after that will also result in a panic.
360    ///
361    /// # Examples
362    ///
363    /// ```
364    /// // A function that schedules the task when it gets woken up.
365    /// let (s, r) = flume::unbounded();
366    /// let schedule = move |runnable| s.send(runnable).unwrap();
367    ///
368    /// // Create a task with a simple future and the schedule function.
369    /// let (runnable, task) = async_task_ffi::spawn(async { 1 + 2 }, schedule);
370    ///
371    /// // Run the task and check its output.
372    /// runnable.run();
373    /// assert_eq!(smol::future::block_on(task), 3);
374    /// ```
375    pub fn run(self) -> bool {
376        let ptr = self.ptr.as_ptr();
377        let header = ptr as *const Header;
378        mem::forget(self);
379
380        unsafe { ((*header).vtable.run)(ptr) }
381    }
382
383    /// Returns a waker associated with this task.
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// use smol::future;
389    ///
390    /// // A function that schedules the task when it gets woken up.
391    /// let (s, r) = flume::unbounded();
392    /// let schedule = move |runnable| s.send(runnable).unwrap();
393    ///
394    /// // Create a task with a simple future and the schedule function.
395    /// let (runnable, task) = async_task_ffi::spawn(future::pending::<()>(), schedule);
396    ///
397    /// // Take a waker and run the task.
398    /// let waker = runnable.waker();
399    /// runnable.run();
400    ///
401    /// // Reschedule the task by waking it.
402    /// assert_eq!(r.len(), 0);
403    /// waker.wake();
404    /// assert_eq!(r.len(), 1);
405    /// ```
406    pub fn waker(&self) -> Waker {
407        let ptr = self.ptr.as_ptr();
408        let header = ptr as *const Header;
409
410        unsafe {
411            let raw_waker = ((*header).vtable.clone_waker)(ptr);
412            Waker::from_raw(raw_waker)
413        }
414    }
415
416    /// Returns a reference to the user data associated with this task.
417    ///
418    /// For mutable access see [`data_mut`].
419    ///
420    /// # Examples
421    ///
422    /// ```
423    /// use async_task_ffi::Runnable;
424    ///
425    /// // A function that schedules the task and prints a message.
426    /// let (s, r) = flume::unbounded();
427    /// let schedule = move |runnable: Runnable<&'static str>| {
428    ///     println!("{}", runnable.data());
429    ///     s.send(runnable).unwrap();
430    /// };
431    ///
432    /// // Create a task with a simple future, the schedule function and a message.
433    /// let (runnable, task) = async_task_ffi::spawn_with(
434    ///     async {},
435    ///     schedule,
436    ///     "Hello from the schedule function!",
437    /// );
438    ///
439    /// // Schedule the task.
440    /// runnable.schedule();
441    /// ```
442    pub fn data(&self) -> &D {
443        let ptr = self.ptr.as_ptr();
444        let header = ptr as *const Header;
445
446        unsafe {
447            let data = ((*header).vtable.get_data)(ptr) as *const D;
448            &*data
449        }
450    }
451
452    /// Returns a mutable reference to the user data associated with this task.
453    ///
454    /// For immutable access see [`data`].
455    ///
456    /// # Examples
457    ///
458    /// ```
459    /// use async_task_ffi::Runnable;
460    ///
461    /// // A function that schedules the task and
462    /// // counts the amount of times it has been.
463    /// let (s, r) = flume::unbounded();
464    /// let schedule = move |mut runnable: Runnable<usize>| {
465    ///     let counter = runnable.data_mut();
466    ///     println!("{}", counter);
467    ///     *counter += 1;
468    ///     s.send(runnable).unwrap();
469    /// };
470    ///
471    /// // Create a task with a simple future,
472    /// // the schedule function and the initial counter value.
473    /// let (mut runnable, task) = async_task_ffi::spawn_with(async {}, schedule, 0);
474    ///
475    /// // Schedule the task.
476    /// *runnable.data_mut() += 1;
477    /// runnable.schedule();
478    /// ```
479    pub fn data_mut(&mut self) -> &mut D {
480        let ptr = self.ptr.as_ptr();
481        let header = ptr as *const Header;
482
483        unsafe {
484            let data = ((*header).vtable.get_data)(ptr) as *mut D;
485            &mut *data
486        }
487    }
488
489    /// Consumes the [`Runnable`], returning a pointer to the raw task.
490    ///
491    /// The raw pointer must eventually be converted back into a [`Runnable`]
492    /// by calling [`Runnable::from_raw`] in order to free up the task's
493    /// resources.
494    pub fn into_raw(self) -> *mut () {
495        let ptr = self.ptr;
496        mem::forget(self);
497        ptr.as_ptr()
498    }
499
500    /// Constructs a [`Runnable`] from a raw task pointer.
501    ///
502    /// The raw pointer must have been previously returned by a call to
503    /// [`into_raw`].
504    ///
505    /// # Safety
506    ///
507    /// This function has the same safety requirements as [`spawn_unchecked`]
508    /// and [`spawn_unchecked_with`] on top of the previously mentioned one.
509    pub unsafe fn from_raw(ptr: *mut ()) -> Runnable<D> {
510        Runnable {
511            ptr: NonNull::new_unchecked(ptr),
512            _marker: Default::default(),
513        }
514    }
515}
516
517impl<D> Drop for Runnable<D> {
518    fn drop(&mut self) {
519        let ptr = self.ptr.as_ptr() as *mut ();
520        let header = ptr as *const Header;
521
522        unsafe {
523            let mut state = (*header).state.load(Ordering::Acquire);
524
525            loop {
526                // If the task has been completed or closed, it can't be canceled.
527                if state & (COMPLETED | CLOSED) != 0 {
528                    break;
529                }
530
531                // Mark the task as closed.
532                match (*header).state.compare_exchange_weak(
533                    state,
534                    state | CLOSED,
535                    Ordering::AcqRel,
536                    Ordering::Acquire,
537                ) {
538                    Ok(_) => break,
539                    Err(s) => state = s,
540                }
541            }
542
543            // Drop the future.
544            ((*header).vtable.drop_future)(ptr);
545
546            // Mark the task as unscheduled.
547            let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
548
549            // Notify the awaiter that the future has been dropped.
550            if state & AWAITER != 0 {
551                (*header).notify(None);
552            }
553
554            // Drop the task reference.
555            ((*header).vtable.drop_ref)(ptr);
556        }
557    }
558}
559
560impl<D> fmt::Debug for Runnable<D> {
561    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
562        let ptr = self.ptr.as_ptr();
563        let header = ptr as *const Header;
564
565        f.debug_struct("Runnable")
566            .field("header", unsafe { &(*header) })
567            .finish()
568    }
569}