medea_jason/platform/dart/executor/
task.rs

1//! [`Task`] for execution by a [`platform::dart::executor`].
2//!
3//! [`platform::dart::executor`]: crate::platform::executor
4
5use std::{
6    cell::RefCell,
7    sync::{
8        Arc,
9        atomic::{AtomicBool, Ordering},
10    },
11    task::{Context, Waker},
12    thread::{self, ThreadId},
13};
14
15use derive_more::with_trait::Debug;
16use futures::{
17    future::LocalBoxFuture,
18    task::{self, ArcWake},
19};
20
21use crate::platform::dart::executor::task_wake;
22
23/// Inner [`Task`]'s data.
24#[derive(Debug)]
25struct Inner {
26    /// An actual [`Future`] that this [`Task`] is driving.
27    #[debug(skip)]
28    future: LocalBoxFuture<'static, ()>,
29
30    /// Handle for waking up this [`Task`].
31    waker: Waker,
32}
33
34/// Wrapper for a [`Future`] that can be polled by an external single threaded
35/// Dart executor.
36#[derive(Debug)]
37pub struct Task {
38    /// [`Task`]'s inner data containing an actual [`Future`] and its
39    /// [`Waker`]. Dropped on the [`Task`] completion.
40    inner: RefCell<Option<Inner>>,
41
42    /// Indicates whether there is a [`Poll::Pending`] awake request of this
43    /// [`Task`].
44    is_scheduled: AtomicBool,
45
46    /// ID of the thread this [`Task`] was created on and must be polled on.
47    thread_id: ThreadId,
48}
49
50/// [`Task`] can be sent across threads safely because it ensures that the
51/// underlying [`Future`] will only be touched from a single thread it was
52/// created on.
53unsafe impl Send for Task {}
54/// [`Task`] can be shared across threads safely because it ensures that the
55/// underlying [`Future`] will only be touched from a single thread it was
56/// created on.
57unsafe impl Sync for Task {}
58
59impl ArcWake for Task {
60    /// Commands an external Dart executor to poll this [`Task`] if it's
61    /// incomplete and there are no [`Poll::Pending`] awake requests already.
62    ///
63    /// [`Poll::Pending`]: task::Poll::Pending
64    fn wake_by_ref(arc_self: &Arc<Self>) {
65        if !arc_self.is_scheduled.swap(true, Ordering::AcqRel) {
66            task_wake(Arc::clone(arc_self));
67        }
68    }
69}
70
71impl Task {
72    /// Spawns a new [`Task`] that will drive the given [`Future`].
73    ///
74    /// Must be called on the same thread where the [`Task`] will be polled,
75    /// otherwise polling will panic.
76    pub fn spawn(future: LocalBoxFuture<'static, ()>) {
77        let this = Arc::new(Self {
78            inner: RefCell::new(None),
79            is_scheduled: AtomicBool::new(false),
80            thread_id: thread::current().id(),
81        });
82
83        let waker = task::waker(Arc::clone(&this));
84        drop(this.inner.borrow_mut().replace(Inner { future, waker }));
85
86        Self::wake_by_ref(&this);
87    }
88
89    /// Polls the underlying [`Future`].
90    ///
91    /// Polling after [`Future`]'s completion is no-op.
92    ///
93    /// # Panics
94    ///
95    /// If called not on the same thread where this [`Task`] was originally
96    /// created.
97    pub fn poll(&self) {
98        assert_eq!(
99            self.thread_id,
100            thread::current().id(),
101            "`dart::executor::Task` can only be polled on the same thread \
102             where it was originally created",
103        );
104
105        let mut borrow = self.inner.borrow_mut();
106
107        // Just ignore poll request if the `Future` is completed.
108        let Some(inner) = borrow.as_mut() else {
109            return;
110        };
111
112        self.is_scheduled.store(false, Ordering::Release);
113
114        let poll = {
115            let mut cx = Context::from_waker(&inner.waker);
116            inner.future.as_mut().poll(&mut cx)
117        };
118
119        // Cleanup resources if future is ready.
120        if poll.is_ready() {
121            *borrow = None;
122        }
123    }
124}