1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
//! [`Task`] for execution by a [`platform::dart::executor`].
//!
//! [`platform::dart::executor`]: crate::platform::executor
use std::{
cell::{Cell, RefCell},
fmt,
mem::ManuallyDrop,
rc::Rc,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
use futures::future::LocalBoxFuture;
use crate::platform::dart::executor::task_wake;
/// Inner [`Task`]'s data.
struct Inner {
/// An actual [`Future`] that this [`Task`] is driving.
///
/// [`Future`]: std::future::Future
future: LocalBoxFuture<'static, ()>,
/// Handle for waking up this [`Task`].
waker: Waker,
}
impl fmt::Debug for Inner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Inner")
.field("waker", &self.waker)
.finish_non_exhaustive()
}
}
/// Wrapper for a [`Future`] that can be polled by an external single threaded
/// Dart executor.
///
/// [`Future`]: std::future::Future
#[derive(Debug)]
pub struct Task {
/// [`Task`]'s inner data containing an actual [`Future`] and its
/// [`Waker`]. Dropped on the [`Task`] completion.
///
/// [`Future`]: std::future::Future
inner: RefCell<Option<Inner>>,
/// Indicates whether there is a [`Poll::Pending`] awake request of this
/// [`Task`].
is_scheduled: Cell<bool>,
}
impl Task {
/// Spawns a new [`Task`] that will drive the given [`Future`].
///
/// [`Future`]: std::future::Future
pub fn spawn(future: LocalBoxFuture<'static, ()>) {
let this = Rc::new(Self {
inner: RefCell::new(None),
is_scheduled: Cell::new(false),
});
let waker =
unsafe { Waker::from_raw(Self::into_raw_waker(Rc::clone(&this))) };
drop(this.inner.borrow_mut().replace(Inner { future, waker }));
Self::wake_by_ref(&this);
}
/// Polls the underlying [`Future`].
///
/// Polling after [`Future`]'s completion is no-op.
///
/// [`Future`]: std::future::Future
pub fn poll(&self) -> Poll<()> {
let mut borrow = self.inner.borrow_mut();
// Just ignore poll request if the `Future` is completed.
let Some(inner) = borrow.as_mut() else {
return Poll::Ready(());
};
let poll = {
let mut cx = Context::from_waker(&inner.waker);
inner.future.as_mut().poll(&mut cx)
};
self.is_scheduled.set(false);
// Cleanup resources if future is ready.
if poll.is_ready() {
*borrow = None;
}
poll
}
/// Calls the [`task_wake()`] function by the provided reference if this
/// [`Task`] s incomplete and there are no [`Poll::Pending`] awake requests
/// already.
fn wake_by_ref(this: &Rc<Self>) {
if !this.is_scheduled.replace(true) {
task_wake(Rc::clone(this));
}
}
/// Pretty much a copy of [`std::task::Wake`] implementation but for
/// `Rc<?Send + ?Sync>` instead of `Arc<Send + Sync>` since we are sure
/// that everything will run on a single thread.
fn into_raw_waker(this: Rc<Self>) -> RawWaker {
#![allow(clippy::missing_docs_in_private_items)]
// Refer to `RawWakerVTable::new()` documentation for better
// understanding of what the following functions do.
unsafe fn raw_clone(ptr: *const ()) -> RawWaker {
let ptr = ManuallyDrop::new(Rc::from_raw(ptr.cast::<Task>()));
Task::into_raw_waker(Rc::clone(&(*ptr)))
}
unsafe fn raw_wake(ptr: *const ()) {
let ptr = Rc::from_raw(ptr.cast::<Task>());
Task::wake_by_ref(&ptr);
}
unsafe fn raw_wake_by_ref(ptr: *const ()) {
let ptr = ManuallyDrop::new(Rc::from_raw(ptr.cast::<Task>()));
Task::wake_by_ref(&ptr);
}
unsafe fn raw_drop(ptr: *const ()) {
drop(Rc::from_raw(ptr.cast::<Task>()));
}
const VTABLE: RawWakerVTable =
RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop);
RawWaker::new(Rc::into_raw(this).cast::<()>(), &VTABLE)
}
}