async_task_ffi/runnable.rs
1use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem;
5use core::ptr::NonNull;
6use core::sync::atomic::Ordering;
7use core::task::Waker;
8
9use crate::header::Header;
10use crate::raw::RawTask;
11use crate::state::*;
12use crate::Task;
13
14#[cfg(feature = "std")]
15use crate::utils::checked::{Checked, CheckedFuture};
16
17/// Creates a new task.
18///
19/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is
20/// used to await its output.
21///
22/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the
23/// [`Runnable`] vanishes and only reappears when its [`Waker`] wakes the task,
24/// thus scheduling it to be run again.
25///
26/// When the task is woken, its [`Runnable`] is passed to the `schedule`
27/// function. The `schedule` function should not attempt to run the [`Runnable`]
28/// nor to drop it. Instead, it should push it into a task queue so that it can
29/// be processed later.
30///
31/// If you need to spawn a future that does not implement [`Send`] or isn't
32/// `'static`, consider using [`spawn_local()`] or [`spawn_unchecked()`]
33/// instead.
34///
35/// If you need to attach arbitrary data to the task, consider using
36/// [`spawn_with()`].
37///
38/// # Examples
39///
40/// ```
41/// // The future inside the task.
42/// let future = async {
43/// println!("Hello, world!");
44/// };
45///
46/// // A function that schedules the task when it gets woken up.
47/// let (s, r) = flume::unbounded();
48/// let schedule = move |runnable| s.send(runnable).unwrap();
49///
50/// // Create a task with the future and the schedule function.
51/// let (runnable, task) = async_task_ffi::spawn(future, schedule);
52/// ```
53pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
54where
55 F: Future + Send + 'static,
56 F::Output: Send + 'static,
57 S: Fn(Runnable) + Send + Sync + 'static,
58{
59 spawn_with(future, schedule, ())
60}
61
62/// Creates a new task with associated data.
63///
64/// This function is the same as [`spawn()`], except it makes it possible to
65/// attach arbitrary data to the task. This makes it possible to benefit from
66/// the single allocation design of `async_task_ffi` without having to write a
67/// specialized implementation.
68///
69/// The data can be accessed using [`Runnable::data`] or [`Runnable::data_mut`].
70///
71/// # Examples
72///
73/// ```
74/// use async_task_ffi::Runnable;
75///
76/// // The future inside the task.
77/// let future = async {
78/// println!("Hello, world!");
79/// };
80///
81/// // A function that schedules the task when it gets woken up
82/// // and counts the amount of times it has been scheduled.
83/// let (s, r) = flume::unbounded();
84/// let schedule = move |mut runnable: Runnable<usize>| {
85/// *runnable.data_mut() += 1;
86/// s.send(runnable).unwrap();
87/// };
88///
89/// // Create a task with the future, the schedule function
90/// // and the initial data.
91/// let (runnable, task) = async_task_ffi::spawn_with(future, schedule, 0);
92/// ```
93pub fn spawn_with<F, S, D>(future: F, schedule: S, data: D) -> (Runnable<D>, Task<F::Output>)
94where
95 F: Future + Send + 'static,
96 F::Output: Send + 'static,
97 S: Fn(Runnable<D>) + Send + Sync + 'static,
98 D: Send + 'static,
99{
100 unsafe { spawn_unchecked_with(future, schedule, data) }
101}
102
103/// Creates a new thread-local task.
104///
105/// This function is same as [`spawn()`], except it does not require [`Send`] on
106/// `future`. If the [`Runnable`] is used or dropped on another thread, a panic
107/// will occur.
108///
109/// This function is only available when the `std` feature for this crate is
110/// enabled.
111///
112/// # Examples
113///
114/// ```
115/// use async_task_ffi::Runnable;
116/// use flume::{Receiver, Sender};
117/// use std::rc::Rc;
118///
119/// thread_local! {
120/// // A queue that holds scheduled tasks.
121/// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
122/// }
123///
124/// // Make a non-Send future.
125/// let msg: Rc<str> = "Hello, world!".into();
126/// let future = async move {
127/// println!("{}", msg);
128/// };
129///
130/// // A function that schedules the task when it gets woken up.
131/// let s = QUEUE.with(|(s, _)| s.clone());
132/// let schedule = move |runnable| s.send(runnable).unwrap();
133///
134/// // Create a task with the future and the schedule function.
135/// let (runnable, task) = async_task_ffi::spawn_local(future, schedule);
136/// ```
137#[cfg(feature = "std")]
138pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
139where
140 F: Future + 'static,
141 F::Output: 'static,
142 S: Fn(Runnable) + Send + Sync + 'static,
143{
144 // Wrap the future into one that checks which thread it's on.
145 let future = CheckedFuture::new(future);
146
147 unsafe { spawn_unchecked(future, schedule) }
148}
149
150/// Creates a new thread-local task with associated data.
151///
152/// This function is a combination of [`spawn_local()`] and [`spawn_with()`],
153/// except it does not require [`Send`] on `data`. The data is wrapped in a
154/// type that implements [`Deref`][std::ops::Deref] and
155/// [`DerefMut`][std::opts::DerefMut] and panics if used from another thread.
156///
157/// This function is only available when the `std` feature for this crate is
158/// enabled.
159#[cfg(feature = "std")]
160pub fn spawn_local_with<F, S, D>(
161 future: F,
162 schedule: S,
163 data: D,
164) -> (Runnable<Checked<D>>, Task<F::Output>)
165where
166 F: Future + 'static,
167 F::Output: 'static,
168 S: Fn(Runnable<Checked<D>>) + Send + Sync + 'static,
169 D: 'static,
170{
171 // Wrap the future into one that checks which thread it's on.
172 let future = CheckedFuture::new(future);
173
174 // Wrap the data the same way
175 let data = Checked::new(data);
176
177 unsafe { spawn_unchecked_with(future, schedule, data) }
178}
179
180/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
181///
182/// This function is same as [`spawn()`], except it does not require [`Send`],
183/// [`Sync`], and `'static` on `future` and `schedule`.
184///
185/// # Safety
186///
187/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on
188/// the original thread.
189/// - If `future` is not `'static`, borrowed variables must outlive its
190/// [`Runnable`].
191/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be
192/// used and dropped on the original thread.
193/// - If `schedule` is not `'static`, borrowed variables must outlive the task's
194/// [`Waker`].
195///
196/// # Examples
197///
198/// ```
199/// // The future inside the task.
200/// let future = async {
201/// println!("Hello, world!");
202/// };
203///
204/// // If the task gets woken up, it will be sent into this channel.
205/// let (s, r) = flume::unbounded();
206/// let schedule = move |runnable| s.send(runnable).unwrap();
207///
208/// // Create a task with the future and the schedule function.
209/// let (runnable, task) = unsafe { async_task_ffi::spawn_unchecked(future, schedule) };
210/// ```
211pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
212where
213 F: Future,
214 S: Fn(Runnable),
215{
216 spawn_unchecked_with(future, schedule, ())
217}
218
219/// Creates a new task with associated data and without [`Send`], [`Sync`], and
220/// `'static` bounds.
221///
222/// This function is a combination of [`spawn_unchecked()`] and
223/// [`spawn_with()`], except it does not require [`Send`] and `'static` on
224/// `data`.
225///
226/// # Safety
227///
228/// - All of the requirements from [`spawn_unchecked`].
229/// - If `data` is not [`Send`], it must be used and dropped on the original
230/// thread.
231/// - If `data` is not `'static`, borrowed variables must outlive its
232/// [`Runnable`].
233pub unsafe fn spawn_unchecked_with<F, S, D>(
234 future: F,
235 schedule: S,
236 data: D,
237) -> (Runnable<D>, Task<F::Output>)
238where
239 F: Future,
240 S: Fn(Runnable<D>),
241{
242 // Allocate large futures on the heap.
243 let ptr = if mem::size_of::<F>() >= 2048 {
244 let future = alloc::boxed::Box::pin(future);
245 RawTask::<_, F::Output, S, D>::allocate(future, schedule, data)
246 } else {
247 RawTask::<F, F::Output, S, D>::allocate(future, schedule, data)
248 };
249
250 let runnable = Runnable {
251 ptr,
252 _marker: Default::default(),
253 };
254 let task = Task {
255 ptr,
256 _marker: PhantomData,
257 };
258 (runnable, task)
259}
260
261/// A handle to a runnable task.
262///
263/// Every spawned task has a single [`Runnable`] handle, which only exists when
264/// the task is scheduled for running.
265///
266/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the
267/// [`Runnable`] vanishes and only reappears when its [`Waker`] wakes the task,
268/// thus scheduling it to be run again.
269///
270/// Dropping a [`Runnable`] cancels the task, which means its future won't be
271/// polled again, and awaiting the [`Task`] after that will result in a panic.
272///
273/// # Examples
274///
275/// ```
276/// use async_task_ffi::Runnable;
277/// use once_cell::sync::Lazy;
278/// use std::{panic, thread};
279///
280/// // A simple executor.
281/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
282/// let (sender, receiver) = flume::unbounded::<Runnable>();
283/// thread::spawn(|| {
284/// for runnable in receiver {
285/// let _ignore_panic = panic::catch_unwind(|| runnable.run());
286/// }
287/// });
288/// sender
289/// });
290///
291/// // Create a task with a simple future.
292/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
293/// let (runnable, task) = async_task_ffi::spawn(async { 1 + 2 }, schedule);
294///
295/// // Schedule the task and await its output.
296/// runnable.schedule();
297/// assert_eq!(smol::future::block_on(task), 3);
298/// ```
299pub struct Runnable<D = ()> {
300 /// A pointer to the heap-allocated task.
301 pub(crate) ptr: NonNull<()>,
302
303 /// A marker capturing generic type `D`.
304 pub(crate) _marker: PhantomData<D>,
305}
306
307unsafe impl<D: Sync> Send for Runnable<D> {}
308unsafe impl<D: Sync> Sync for Runnable<D> {}
309
310#[cfg(feature = "std")]
311impl<D: std::panic::RefUnwindSafe> std::panic::UnwindSafe for Runnable<D> {}
312#[cfg(feature = "std")]
313impl<D: std::panic::RefUnwindSafe> std::panic::RefUnwindSafe for Runnable<D> {}
314
315impl<D> Runnable<D> {
316 /// Schedules the task.
317 ///
318 /// This is a convenience method that passes the [`Runnable`] to the
319 /// schedule function.
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// // A function that schedules the task when it gets woken up.
325 /// let (s, r) = flume::unbounded();
326 /// let schedule = move |runnable| s.send(runnable).unwrap();
327 ///
328 /// // Create a task with a simple future and the schedule function.
329 /// let (runnable, task) = async_task_ffi::spawn(async {}, schedule);
330 ///
331 /// // Schedule the task.
332 /// assert_eq!(r.len(), 0);
333 /// runnable.schedule();
334 /// assert_eq!(r.len(), 1);
335 /// ```
336 pub fn schedule(self) {
337 let ptr = self.ptr.as_ptr();
338 let header = ptr as *const Header;
339 mem::forget(self);
340
341 unsafe {
342 ((*header).vtable.schedule)(ptr);
343 }
344 }
345
346 /// Runs the task by polling its future.
347 ///
348 /// Returns `true` if the task was woken while running, in which case the
349 /// [`Runnable`] gets rescheduled at the end of this method invocation.
350 /// Otherwise, returns `false` and the [`Runnable`] vanishes until the
351 /// task is woken. The return value is just a hint: `true` usually
352 /// indicates that the task has yielded, i.e. it woke itself and then
353 /// gave the control back to the executor.
354 ///
355 /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`]
356 /// was called, then this method simply destroys the task.
357 ///
358 /// If the polled future panics, this method propagates the panic, and
359 /// awaiting the [`Task`] after that will also result in a panic.
360 ///
361 /// # Examples
362 ///
363 /// ```
364 /// // A function that schedules the task when it gets woken up.
365 /// let (s, r) = flume::unbounded();
366 /// let schedule = move |runnable| s.send(runnable).unwrap();
367 ///
368 /// // Create a task with a simple future and the schedule function.
369 /// let (runnable, task) = async_task_ffi::spawn(async { 1 + 2 }, schedule);
370 ///
371 /// // Run the task and check its output.
372 /// runnable.run();
373 /// assert_eq!(smol::future::block_on(task), 3);
374 /// ```
375 pub fn run(self) -> bool {
376 let ptr = self.ptr.as_ptr();
377 let header = ptr as *const Header;
378 mem::forget(self);
379
380 unsafe { ((*header).vtable.run)(ptr) }
381 }
382
383 /// Returns a waker associated with this task.
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// use smol::future;
389 ///
390 /// // A function that schedules the task when it gets woken up.
391 /// let (s, r) = flume::unbounded();
392 /// let schedule = move |runnable| s.send(runnable).unwrap();
393 ///
394 /// // Create a task with a simple future and the schedule function.
395 /// let (runnable, task) = async_task_ffi::spawn(future::pending::<()>(), schedule);
396 ///
397 /// // Take a waker and run the task.
398 /// let waker = runnable.waker();
399 /// runnable.run();
400 ///
401 /// // Reschedule the task by waking it.
402 /// assert_eq!(r.len(), 0);
403 /// waker.wake();
404 /// assert_eq!(r.len(), 1);
405 /// ```
406 pub fn waker(&self) -> Waker {
407 let ptr = self.ptr.as_ptr();
408 let header = ptr as *const Header;
409
410 unsafe {
411 let raw_waker = ((*header).vtable.clone_waker)(ptr);
412 Waker::from_raw(raw_waker)
413 }
414 }
415
416 /// Returns a reference to the user data associated with this task.
417 ///
418 /// For mutable access see [`data_mut`].
419 ///
420 /// # Examples
421 ///
422 /// ```
423 /// use async_task_ffi::Runnable;
424 ///
425 /// // A function that schedules the task and prints a message.
426 /// let (s, r) = flume::unbounded();
427 /// let schedule = move |runnable: Runnable<&'static str>| {
428 /// println!("{}", runnable.data());
429 /// s.send(runnable).unwrap();
430 /// };
431 ///
432 /// // Create a task with a simple future, the schedule function and a message.
433 /// let (runnable, task) = async_task_ffi::spawn_with(
434 /// async {},
435 /// schedule,
436 /// "Hello from the schedule function!",
437 /// );
438 ///
439 /// // Schedule the task.
440 /// runnable.schedule();
441 /// ```
442 pub fn data(&self) -> &D {
443 let ptr = self.ptr.as_ptr();
444 let header = ptr as *const Header;
445
446 unsafe {
447 let data = ((*header).vtable.get_data)(ptr) as *const D;
448 &*data
449 }
450 }
451
452 /// Returns a mutable reference to the user data associated with this task.
453 ///
454 /// For immutable access see [`data`].
455 ///
456 /// # Examples
457 ///
458 /// ```
459 /// use async_task_ffi::Runnable;
460 ///
461 /// // A function that schedules the task and
462 /// // counts the amount of times it has been.
463 /// let (s, r) = flume::unbounded();
464 /// let schedule = move |mut runnable: Runnable<usize>| {
465 /// let counter = runnable.data_mut();
466 /// println!("{}", counter);
467 /// *counter += 1;
468 /// s.send(runnable).unwrap();
469 /// };
470 ///
471 /// // Create a task with a simple future,
472 /// // the schedule function and the initial counter value.
473 /// let (mut runnable, task) = async_task_ffi::spawn_with(async {}, schedule, 0);
474 ///
475 /// // Schedule the task.
476 /// *runnable.data_mut() += 1;
477 /// runnable.schedule();
478 /// ```
479 pub fn data_mut(&mut self) -> &mut D {
480 let ptr = self.ptr.as_ptr();
481 let header = ptr as *const Header;
482
483 unsafe {
484 let data = ((*header).vtable.get_data)(ptr) as *mut D;
485 &mut *data
486 }
487 }
488
489 /// Consumes the [`Runnable`], returning a pointer to the raw task.
490 ///
491 /// The raw pointer must eventually be converted back into a [`Runnable`]
492 /// by calling [`Runnable::from_raw`] in order to free up the task's
493 /// resources.
494 pub fn into_raw(self) -> *mut () {
495 let ptr = self.ptr;
496 mem::forget(self);
497 ptr.as_ptr()
498 }
499
500 /// Constructs a [`Runnable`] from a raw task pointer.
501 ///
502 /// The raw pointer must have been previously returned by a call to
503 /// [`into_raw`].
504 ///
505 /// # Safety
506 ///
507 /// This function has the same safety requirements as [`spawn_unchecked`]
508 /// and [`spawn_unchecked_with`] on top of the previously mentioned one.
509 pub unsafe fn from_raw(ptr: *mut ()) -> Runnable<D> {
510 Runnable {
511 ptr: NonNull::new_unchecked(ptr),
512 _marker: Default::default(),
513 }
514 }
515}
516
517impl<D> Drop for Runnable<D> {
518 fn drop(&mut self) {
519 let ptr = self.ptr.as_ptr() as *mut ();
520 let header = ptr as *const Header;
521
522 unsafe {
523 let mut state = (*header).state.load(Ordering::Acquire);
524
525 loop {
526 // If the task has been completed or closed, it can't be canceled.
527 if state & (COMPLETED | CLOSED) != 0 {
528 break;
529 }
530
531 // Mark the task as closed.
532 match (*header).state.compare_exchange_weak(
533 state,
534 state | CLOSED,
535 Ordering::AcqRel,
536 Ordering::Acquire,
537 ) {
538 Ok(_) => break,
539 Err(s) => state = s,
540 }
541 }
542
543 // Drop the future.
544 ((*header).vtable.drop_future)(ptr);
545
546 // Mark the task as unscheduled.
547 let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
548
549 // Notify the awaiter that the future has been dropped.
550 if state & AWAITER != 0 {
551 (*header).notify(None);
552 }
553
554 // Drop the task reference.
555 ((*header).vtable.drop_ref)(ptr);
556 }
557 }
558}
559
560impl<D> fmt::Debug for Runnable<D> {
561 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
562 let ptr = self.ptr.as_ptr();
563 let header = ptr as *const Header;
564
565 f.debug_struct("Runnable")
566 .field("header", unsafe { &(*header) })
567 .finish()
568 }
569}