medea_jason/platform/dart/executor/task.rs
1//! [`Task`] for execution by a [`platform::dart::executor`].
2//!
3//! [`platform::dart::executor`]: crate::platform::executor
4
5use std::{
6 cell::RefCell,
7 sync::{
8 Arc,
9 atomic::{AtomicBool, Ordering},
10 },
11 task::{Context, Waker},
12 thread::{self, ThreadId},
13};
14
15use derive_more::with_trait::Debug;
16use futures::{
17 future::LocalBoxFuture,
18 task::{self, ArcWake},
19};
20
21use crate::platform::dart::executor::task_wake;
22
23/// Inner [`Task`]'s data.
24#[derive(Debug)]
25struct Inner {
26 /// An actual [`Future`] that this [`Task`] is driving.
27 #[debug(skip)]
28 future: LocalBoxFuture<'static, ()>,
29
30 /// Handle for waking up this [`Task`].
31 waker: Waker,
32}
33
34/// Wrapper for a [`Future`] that can be polled by an external single threaded
35/// Dart executor.
36#[derive(Debug)]
37pub struct Task {
38 /// [`Task`]'s inner data containing an actual [`Future`] and its
39 /// [`Waker`]. Dropped on the [`Task`] completion.
40 inner: RefCell<Option<Inner>>,
41
42 /// Indicates whether there is a [`Poll::Pending`] awake request of this
43 /// [`Task`].
44 is_scheduled: AtomicBool,
45
46 /// ID of the thread this [`Task`] was created on and must be polled on.
47 thread_id: ThreadId,
48}
49
50/// [`Task`] can be sent across threads safely because it ensures that the
51/// underlying [`Future`] will only be touched from a single thread it was
52/// created on.
53unsafe impl Send for Task {}
54/// [`Task`] can be shared across threads safely because it ensures that the
55/// underlying [`Future`] will only be touched from a single thread it was
56/// created on.
57unsafe impl Sync for Task {}
58
59impl ArcWake for Task {
60 /// Commands an external Dart executor to poll this [`Task`] if it's
61 /// incomplete and there are no [`Poll::Pending`] awake requests already.
62 ///
63 /// [`Poll::Pending`]: task::Poll::Pending
64 fn wake_by_ref(arc_self: &Arc<Self>) {
65 if !arc_self.is_scheduled.swap(true, Ordering::AcqRel) {
66 task_wake(Arc::clone(arc_self));
67 }
68 }
69}
70
71impl Task {
72 /// Spawns a new [`Task`] that will drive the given [`Future`].
73 ///
74 /// Must be called on the same thread where the [`Task`] will be polled,
75 /// otherwise polling will panic.
76 pub fn spawn(future: LocalBoxFuture<'static, ()>) {
77 let this = Arc::new(Self {
78 inner: RefCell::new(None),
79 is_scheduled: AtomicBool::new(false),
80 thread_id: thread::current().id(),
81 });
82
83 let waker = task::waker(Arc::clone(&this));
84 drop(this.inner.borrow_mut().replace(Inner { future, waker }));
85
86 Self::wake_by_ref(&this);
87 }
88
89 /// Polls the underlying [`Future`].
90 ///
91 /// Polling after [`Future`]'s completion is no-op.
92 ///
93 /// # Panics
94 ///
95 /// If called not on the same thread where this [`Task`] was originally
96 /// created.
97 pub fn poll(&self) {
98 assert_eq!(
99 self.thread_id,
100 thread::current().id(),
101 "`dart::executor::Task` can only be polled on the same thread \
102 where it was originally created",
103 );
104
105 let mut borrow = self.inner.borrow_mut();
106
107 // Just ignore poll request if the `Future` is completed.
108 let Some(inner) = borrow.as_mut() else {
109 return;
110 };
111
112 self.is_scheduled.store(false, Ordering::Release);
113
114 let poll = {
115 let mut cx = Context::from_waker(&inner.waker);
116 inner.future.as_mut().poll(&mut cx)
117 };
118
119 // Cleanup resources if future is ready.
120 if poll.is_ready() {
121 *borrow = None;
122 }
123 }
124}