rw_deno_core/
tasks.rs

1// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
2
3use futures::task::AtomicWaker;
4use std::marker::PhantomData;
5use std::ops::DerefMut;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::sync::Mutex;
10use std::task::Context;
11use std::task::Poll;
12
13type UnsendTask = Box<dyn FnOnce(&mut v8::HandleScope) + 'static>;
14type SendTask = Box<dyn FnOnce(&mut v8::HandleScope) + Send + 'static>;
15
16static_assertions::assert_not_impl_any!(V8TaskSpawnerFactory: Send);
17static_assertions::assert_not_impl_any!(V8TaskSpawner: Send);
18static_assertions::assert_impl_all!(V8CrossThreadTaskSpawner: Send);
19
20/// The [`V8TaskSpawnerFactory`] must be created on the same thread as the thread that runs tasks.
21///
22/// This factory is not [`Send`] because it may contain `!Send` tasks submitted by a
23/// [`V8CrossThreadTaskSpawner`]. It is only safe to send this object to another thread if you plan on
24/// submitting [`Send`] tasks to it, which is what [`V8CrossThreadTaskSpawner`] does.
25#[derive(Default)]
26pub(crate) struct V8TaskSpawnerFactory {
27  // TODO(mmastrac): ideally we wouldn't box if we could use arena allocation and a max submission size
28  // TODO(mmastrac): we may want to split the Send and !Send tasks
29  /// The set of tasks, non-empty if `has_tasks` is set.
30  tasks: Mutex<Vec<SendTask>>,
31  /// A flag we can poll without any locks.
32  has_tasks: AtomicBool,
33  /// The polled waker, woken on task submission.
34  waker: AtomicWaker,
35  /// Mark as `!Send`. See note above.
36  _unsend_marker: PhantomData<*const ()>,
37}
38
39impl V8TaskSpawnerFactory {
40  pub fn new_same_thread_spawner(self: Arc<Self>) -> V8TaskSpawner {
41    V8TaskSpawner {
42      tasks: self,
43      _unsend_marker: PhantomData,
44    }
45  }
46
47  pub fn new_cross_thread_spawner(self: Arc<Self>) -> V8CrossThreadTaskSpawner {
48    V8CrossThreadTaskSpawner { tasks: self }
49  }
50
51  /// `false` guarantees that there are no queued tasks, while `true` means that it is likely (but not guaranteed)
52  /// that tasks exist.
53  ///
54  /// Calls should prefer using the waker, but this is left while we rework the event loop.
55  pub fn has_pending_tasks(&self) -> bool {
56    // Ensure that all reads after this point happen-after we load from the atomic
57    self.has_tasks.load(Ordering::Acquire)
58  }
59
60  /// Poll this set of tasks, returning a non-empty set of tasks if there have
61  /// been any queued, or registering the waker if not.
62  pub fn poll_inner(&self, cx: &mut Context) -> Poll<Vec<UnsendTask>> {
63    // Check the flag first -- if it's false we definitely have no tasks. AcqRel semantics ensure
64    // that this read happens-before the vector read below, and that any writes happen-after the success
65    // case.
66    if self
67      .has_tasks
68      .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
69      .is_err()
70    {
71      self.waker.register(cx.waker());
72      return Poll::Pending;
73    }
74
75    let mut lock = self.tasks.lock().unwrap();
76    let tasks = std::mem::take(lock.deref_mut());
77    if tasks.is_empty() {
78      // Unlikely race lost -- the task submission to the queue and flag are not atomic, so it's
79      // possible we ended up with an extra poll here. This only shows up under Miri, but as it is
80      // possible we do need to handle it.
81      self.waker.register(cx.waker());
82      return Poll::Pending;
83    }
84
85    // SAFETY: we are removing the Send trait as we return the tasks here to prevent
86    // these tasks from accidentally leaking to another thread.
87    let tasks = unsafe { std::mem::transmute(tasks) };
88    Poll::Ready(tasks)
89  }
90
91  fn spawn(&self, task: SendTask) {
92    self.tasks.lock().unwrap().push(task);
93    // TODO(mmastrac): can we skip the mutex here?
94    // Release ordering means that the writes in the above lock happen-before the atomic store
95    self.has_tasks.store(true, Ordering::Release);
96    self.waker.wake();
97  }
98}
99
100/// Allows for submission of v8 tasks on the same thread.
101#[derive(Clone)]
102pub struct V8TaskSpawner {
103  // TODO(mmastrac): can we split the waker into a send and !send one?
104  tasks: Arc<V8TaskSpawnerFactory>,
105  _unsend_marker: PhantomData<*const ()>,
106}
107
108impl V8TaskSpawner {
109  /// Spawn a task that runs within the [`crate::JsRuntime`] event loop from the same thread
110  /// that the runtime is running on. This function is re-entrant-safe and may be called from
111  /// ops, from outside of a [`v8::HandleScope`] in a plain `async`` task, or even from within
112  /// another, previously-spawned task.
113  ///
114  /// The task is handed off to be run the next time the event loop is polled, and there are
115  /// no guarantees as to when this may happen.
116  ///
117  /// # Safety
118  ///
119  /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
120  /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
121  ///
122  /// For example, if the code called by this task can raise an exception, the task must ensure
123  /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
124  /// event loop's [`v8::HandleScope`].
125  pub fn spawn<F>(&self, f: F)
126  where
127    F: FnOnce(&mut v8::HandleScope) + 'static,
128  {
129    let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>)> = Box::new(f);
130    // SAFETY: we are transmuting Send into a !Send handle but we guarantee this object will never
131    // leave the current thread because `V8TaskSpawner` is !Send.
132    let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>) + Send> =
133      unsafe { std::mem::transmute(task) };
134    self.tasks.spawn(task)
135  }
136}
137
138/// Allows for submission of v8 tasks on any thread.
139#[derive(Clone)]
140pub struct V8CrossThreadTaskSpawner {
141  tasks: Arc<V8TaskSpawnerFactory>,
142}
143
144// SAFETY: the underlying V8TaskSpawnerFactory is not Send, but we always submit Send tasks
145// to it from this spawner.
146unsafe impl Send for V8CrossThreadTaskSpawner {}
147
148impl V8CrossThreadTaskSpawner {
149  /// Spawn a task that runs within the [`crate::JsRuntime`] event loop, potentially (but not
150  /// required to be) from a different thread than the runtime is running on.
151  ///
152  /// The task is handed off to be run the next time the event loop is polled, and there are
153  /// no guarantees as to when this may happen.
154  ///
155  /// # Safety
156  ///
157  /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
158  /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
159  ///
160  /// For example, if the code called by this task can raise an exception, the task must ensure
161  /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
162  /// event loop's [`v8::HandleScope`].
163  pub fn spawn<F>(&self, f: F)
164  where
165    F: FnOnce(&mut v8::HandleScope) + Send + 'static,
166  {
167    self.tasks.spawn(Box::new(f))
168  }
169
170  /// Spawn a task that runs within the [`crate::JsRuntime`] event loop from a different thread
171  /// than the runtime is running on.
172  ///
173  /// This function will deadlock if called from the same thread as the [`crate::JsRuntime`], and
174  /// there are no checks for this case.
175  ///
176  /// As this function blocks until the task has run to completion (or panics/deadlocks), it is
177  /// safe to borrow data from the local environment and use it within the closure.
178  ///
179  /// The task is handed off to be run the next time the event loop is polled, and there are
180  /// no guarantees as to when this may happen, however the function will not return until the
181  /// task has been fully run to completion.
182  ///
183  /// # Safety
184  ///
185  /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
186  /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
187  ///
188  /// For example, if the code called by this task can raise an exception, the task must ensure
189  /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
190  /// event loop's [`v8::HandleScope`].
191  pub fn spawn_blocking<'a, F, T>(&self, f: F) -> T
192  where
193    F: FnOnce(&mut v8::HandleScope) -> T + Send + 'a,
194    T: Send + 'a,
195  {
196    let (tx, rx) = std::sync::mpsc::sync_channel(0);
197    let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>) + Send> =
198      Box::new(|scope| {
199        let r = f(scope);
200        _ = tx.send(r);
201      });
202    // SAFETY: We can safely transmute to the 'static lifetime because we guarantee this method will either
203    // complete fully by the time it returns, deadlock or panic.
204    let task: SendTask = unsafe { std::mem::transmute(task) };
205    self.tasks.spawn(task);
206    rx.recv().unwrap()
207  }
208}
209
210#[cfg(test)]
211mod tests {
212  use super::*;
213  use std::future::poll_fn;
214  use tokio::task::LocalSet;
215
216  // https://github.com/tokio-rs/tokio/issues/6155
217  #[test]
218  #[cfg(not(all(miri, target_os = "linux")))]
219  fn test_spawner_serial() {
220    let runtime = tokio::runtime::Builder::new_multi_thread()
221      .worker_threads(1)
222      .build()
223      .unwrap();
224    runtime.block_on(async {
225      let factory = Arc::<V8TaskSpawnerFactory>::default();
226      let cross_thread_spawner = factory.clone().new_cross_thread_spawner();
227      let local_set = LocalSet::new();
228
229      const COUNT: usize = 1000;
230
231      let task = runtime.spawn(async move {
232        for _ in 0..COUNT {
233          cross_thread_spawner.spawn(|_| {});
234        }
235      });
236
237      local_set.spawn_local(async move {
238        let mut count = 0;
239        loop {
240          count += poll_fn(|cx| factory.poll_inner(cx)).await.len();
241          if count >= COUNT {
242            break;
243          }
244        }
245      });
246
247      local_set.await;
248      _ = task.await;
249    });
250  }
251
252  // https://github.com/tokio-rs/tokio/issues/6155
253  #[test]
254  #[cfg(not(all(miri, target_os = "linux")))]
255  fn test_spawner_parallel() {
256    let runtime = tokio::runtime::Builder::new_multi_thread()
257      .worker_threads(1)
258      .build()
259      .unwrap();
260    runtime.block_on(async {
261      let factory = Arc::<V8TaskSpawnerFactory>::default();
262      let cross_thread_spawner = factory.clone().new_cross_thread_spawner();
263      let local_set = LocalSet::new();
264
265      const COUNT: usize = 100;
266      let mut tasks = vec![];
267      for _ in 0..COUNT {
268        let cross_thread_spawner = cross_thread_spawner.clone();
269        tasks.push(runtime.spawn(async move {
270          cross_thread_spawner.spawn(|_| {});
271        }));
272      }
273
274      local_set.spawn_local(async move {
275        let mut count = 0;
276        loop {
277          count += poll_fn(|cx| factory.poll_inner(cx)).await.len();
278          if count >= COUNT {
279            break;
280          }
281        }
282      });
283
284      local_set.await;
285      for task in tasks.drain(..) {
286        _ = task.await;
287      }
288    });
289  }
290}