rw_deno_core/tasks.rs
1// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
2
3use futures::task::AtomicWaker;
4use std::marker::PhantomData;
5use std::ops::DerefMut;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::sync::Mutex;
10use std::task::Context;
11use std::task::Poll;
12
13type UnsendTask = Box<dyn FnOnce(&mut v8::HandleScope) + 'static>;
14type SendTask = Box<dyn FnOnce(&mut v8::HandleScope) + Send + 'static>;
15
16static_assertions::assert_not_impl_any!(V8TaskSpawnerFactory: Send);
17static_assertions::assert_not_impl_any!(V8TaskSpawner: Send);
18static_assertions::assert_impl_all!(V8CrossThreadTaskSpawner: Send);
19
20/// The [`V8TaskSpawnerFactory`] must be created on the same thread as the thread that runs tasks.
21///
22/// This factory is not [`Send`] because it may contain `!Send` tasks submitted by a
23/// [`V8CrossThreadTaskSpawner`]. It is only safe to send this object to another thread if you plan on
24/// submitting [`Send`] tasks to it, which is what [`V8CrossThreadTaskSpawner`] does.
25#[derive(Default)]
26pub(crate) struct V8TaskSpawnerFactory {
27 // TODO(mmastrac): ideally we wouldn't box if we could use arena allocation and a max submission size
28 // TODO(mmastrac): we may want to split the Send and !Send tasks
29 /// The set of tasks, non-empty if `has_tasks` is set.
30 tasks: Mutex<Vec<SendTask>>,
31 /// A flag we can poll without any locks.
32 has_tasks: AtomicBool,
33 /// The polled waker, woken on task submission.
34 waker: AtomicWaker,
35 /// Mark as `!Send`. See note above.
36 _unsend_marker: PhantomData<*const ()>,
37}
38
39impl V8TaskSpawnerFactory {
40 pub fn new_same_thread_spawner(self: Arc<Self>) -> V8TaskSpawner {
41 V8TaskSpawner {
42 tasks: self,
43 _unsend_marker: PhantomData,
44 }
45 }
46
47 pub fn new_cross_thread_spawner(self: Arc<Self>) -> V8CrossThreadTaskSpawner {
48 V8CrossThreadTaskSpawner { tasks: self }
49 }
50
51 /// `false` guarantees that there are no queued tasks, while `true` means that it is likely (but not guaranteed)
52 /// that tasks exist.
53 ///
54 /// Calls should prefer using the waker, but this is left while we rework the event loop.
55 pub fn has_pending_tasks(&self) -> bool {
56 // Ensure that all reads after this point happen-after we load from the atomic
57 self.has_tasks.load(Ordering::Acquire)
58 }
59
60 /// Poll this set of tasks, returning a non-empty set of tasks if there have
61 /// been any queued, or registering the waker if not.
62 pub fn poll_inner(&self, cx: &mut Context) -> Poll<Vec<UnsendTask>> {
63 // Check the flag first -- if it's false we definitely have no tasks. AcqRel semantics ensure
64 // that this read happens-before the vector read below, and that any writes happen-after the success
65 // case.
66 if self
67 .has_tasks
68 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
69 .is_err()
70 {
71 self.waker.register(cx.waker());
72 return Poll::Pending;
73 }
74
75 let mut lock = self.tasks.lock().unwrap();
76 let tasks = std::mem::take(lock.deref_mut());
77 if tasks.is_empty() {
78 // Unlikely race lost -- the task submission to the queue and flag are not atomic, so it's
79 // possible we ended up with an extra poll here. This only shows up under Miri, but as it is
80 // possible we do need to handle it.
81 self.waker.register(cx.waker());
82 return Poll::Pending;
83 }
84
85 // SAFETY: we are removing the Send trait as we return the tasks here to prevent
86 // these tasks from accidentally leaking to another thread.
87 let tasks = unsafe { std::mem::transmute(tasks) };
88 Poll::Ready(tasks)
89 }
90
91 fn spawn(&self, task: SendTask) {
92 self.tasks.lock().unwrap().push(task);
93 // TODO(mmastrac): can we skip the mutex here?
94 // Release ordering means that the writes in the above lock happen-before the atomic store
95 self.has_tasks.store(true, Ordering::Release);
96 self.waker.wake();
97 }
98}
99
100/// Allows for submission of v8 tasks on the same thread.
101#[derive(Clone)]
102pub struct V8TaskSpawner {
103 // TODO(mmastrac): can we split the waker into a send and !send one?
104 tasks: Arc<V8TaskSpawnerFactory>,
105 _unsend_marker: PhantomData<*const ()>,
106}
107
108impl V8TaskSpawner {
109 /// Spawn a task that runs within the [`crate::JsRuntime`] event loop from the same thread
110 /// that the runtime is running on. This function is re-entrant-safe and may be called from
111 /// ops, from outside of a [`v8::HandleScope`] in a plain `async`` task, or even from within
112 /// another, previously-spawned task.
113 ///
114 /// The task is handed off to be run the next time the event loop is polled, and there are
115 /// no guarantees as to when this may happen.
116 ///
117 /// # Safety
118 ///
119 /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
120 /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
121 ///
122 /// For example, if the code called by this task can raise an exception, the task must ensure
123 /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
124 /// event loop's [`v8::HandleScope`].
125 pub fn spawn<F>(&self, f: F)
126 where
127 F: FnOnce(&mut v8::HandleScope) + 'static,
128 {
129 let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>)> = Box::new(f);
130 // SAFETY: we are transmuting Send into a !Send handle but we guarantee this object will never
131 // leave the current thread because `V8TaskSpawner` is !Send.
132 let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>) + Send> =
133 unsafe { std::mem::transmute(task) };
134 self.tasks.spawn(task)
135 }
136}
137
138/// Allows for submission of v8 tasks on any thread.
139#[derive(Clone)]
140pub struct V8CrossThreadTaskSpawner {
141 tasks: Arc<V8TaskSpawnerFactory>,
142}
143
144// SAFETY: the underlying V8TaskSpawnerFactory is not Send, but we always submit Send tasks
145// to it from this spawner.
146unsafe impl Send for V8CrossThreadTaskSpawner {}
147
148impl V8CrossThreadTaskSpawner {
149 /// Spawn a task that runs within the [`crate::JsRuntime`] event loop, potentially (but not
150 /// required to be) from a different thread than the runtime is running on.
151 ///
152 /// The task is handed off to be run the next time the event loop is polled, and there are
153 /// no guarantees as to when this may happen.
154 ///
155 /// # Safety
156 ///
157 /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
158 /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
159 ///
160 /// For example, if the code called by this task can raise an exception, the task must ensure
161 /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
162 /// event loop's [`v8::HandleScope`].
163 pub fn spawn<F>(&self, f: F)
164 where
165 F: FnOnce(&mut v8::HandleScope) + Send + 'static,
166 {
167 self.tasks.spawn(Box::new(f))
168 }
169
170 /// Spawn a task that runs within the [`crate::JsRuntime`] event loop from a different thread
171 /// than the runtime is running on.
172 ///
173 /// This function will deadlock if called from the same thread as the [`crate::JsRuntime`], and
174 /// there are no checks for this case.
175 ///
176 /// As this function blocks until the task has run to completion (or panics/deadlocks), it is
177 /// safe to borrow data from the local environment and use it within the closure.
178 ///
179 /// The task is handed off to be run the next time the event loop is polled, and there are
180 /// no guarantees as to when this may happen, however the function will not return until the
181 /// task has been fully run to completion.
182 ///
183 /// # Safety
184 ///
185 /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
186 /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
187 ///
188 /// For example, if the code called by this task can raise an exception, the task must ensure
189 /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
190 /// event loop's [`v8::HandleScope`].
191 pub fn spawn_blocking<'a, F, T>(&self, f: F) -> T
192 where
193 F: FnOnce(&mut v8::HandleScope) -> T + Send + 'a,
194 T: Send + 'a,
195 {
196 let (tx, rx) = std::sync::mpsc::sync_channel(0);
197 let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>) + Send> =
198 Box::new(|scope| {
199 let r = f(scope);
200 _ = tx.send(r);
201 });
202 // SAFETY: We can safely transmute to the 'static lifetime because we guarantee this method will either
203 // complete fully by the time it returns, deadlock or panic.
204 let task: SendTask = unsafe { std::mem::transmute(task) };
205 self.tasks.spawn(task);
206 rx.recv().unwrap()
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use std::future::poll_fn;
214 use tokio::task::LocalSet;
215
216 // https://github.com/tokio-rs/tokio/issues/6155
217 #[test]
218 #[cfg(not(all(miri, target_os = "linux")))]
219 fn test_spawner_serial() {
220 let runtime = tokio::runtime::Builder::new_multi_thread()
221 .worker_threads(1)
222 .build()
223 .unwrap();
224 runtime.block_on(async {
225 let factory = Arc::<V8TaskSpawnerFactory>::default();
226 let cross_thread_spawner = factory.clone().new_cross_thread_spawner();
227 let local_set = LocalSet::new();
228
229 const COUNT: usize = 1000;
230
231 let task = runtime.spawn(async move {
232 for _ in 0..COUNT {
233 cross_thread_spawner.spawn(|_| {});
234 }
235 });
236
237 local_set.spawn_local(async move {
238 let mut count = 0;
239 loop {
240 count += poll_fn(|cx| factory.poll_inner(cx)).await.len();
241 if count >= COUNT {
242 break;
243 }
244 }
245 });
246
247 local_set.await;
248 _ = task.await;
249 });
250 }
251
252 // https://github.com/tokio-rs/tokio/issues/6155
253 #[test]
254 #[cfg(not(all(miri, target_os = "linux")))]
255 fn test_spawner_parallel() {
256 let runtime = tokio::runtime::Builder::new_multi_thread()
257 .worker_threads(1)
258 .build()
259 .unwrap();
260 runtime.block_on(async {
261 let factory = Arc::<V8TaskSpawnerFactory>::default();
262 let cross_thread_spawner = factory.clone().new_cross_thread_spawner();
263 let local_set = LocalSet::new();
264
265 const COUNT: usize = 100;
266 let mut tasks = vec![];
267 for _ in 0..COUNT {
268 let cross_thread_spawner = cross_thread_spawner.clone();
269 tasks.push(runtime.spawn(async move {
270 cross_thread_spawner.spawn(|_| {});
271 }));
272 }
273
274 local_set.spawn_local(async move {
275 let mut count = 0;
276 loop {
277 count += poll_fn(|cx| factory.poll_inner(cx)).await.len();
278 if count >= COUNT {
279 break;
280 }
281 }
282 });
283
284 local_set.await;
285 for task in tasks.drain(..) {
286 _ = task.await;
287 }
288 });
289 }
290}