deno_core 0.248.0

A modern JavaScript/TypeScript runtime built with V8, Rust, and Tokio
Documentation
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use futures::task::AtomicWaker;
use std::marker::PhantomData;
use std::ops::DerefMut;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;

type UnsendTask = Box<dyn FnOnce(&mut v8::HandleScope) + 'static>;
type SendTask = Box<dyn FnOnce(&mut v8::HandleScope) + Send + 'static>;

static_assertions::assert_not_impl_any!(V8TaskSpawnerFactory: Send);
static_assertions::assert_not_impl_any!(V8TaskSpawner: Send);
static_assertions::assert_impl_all!(V8CrossThreadTaskSpawner: Send);

/// The [`V8TaskSpawnerFactory`] must be created on the same thread as the thread that runs tasks.
///
/// This factory is not [`Send`] because it may contain `!Send` tasks submitted by a
/// [`V8CrossThreadTaskSpawner`]. It is only safe to send this object to another thread if you plan on
/// submitting [`Send`] tasks to it, which is what [`V8CrossThreadTaskSpawner`] does.
#[derive(Default)]
pub(crate) struct V8TaskSpawnerFactory {
  // TODO(mmastrac): ideally we wouldn't box if we could use arena allocation and a max submission size
  // TODO(mmastrac): we may want to split the Send and !Send tasks
  /// The set of tasks, non-empty if `has_tasks` is set.
  tasks: Mutex<Vec<SendTask>>,
  /// A flag we can poll without any locks.
  has_tasks: AtomicBool,
  /// The polled waker, woken on task submission.
  waker: AtomicWaker,
  /// Mark as `!Send`. See note above.
  _unsend_marker: PhantomData<*const ()>,
}

impl V8TaskSpawnerFactory {
  pub fn new_same_thread_spawner(self: Arc<Self>) -> V8TaskSpawner {
    V8TaskSpawner {
      tasks: self,
      _unsend_marker: PhantomData,
    }
  }

  pub fn new_cross_thread_spawner(self: Arc<Self>) -> V8CrossThreadTaskSpawner {
    V8CrossThreadTaskSpawner { tasks: self }
  }

  /// `false` guarantees that there are no queued tasks, while `true` means that it is likely (but not guaranteed)
  /// that tasks exist.
  ///
  /// Calls should prefer using the waker, but this is left while we rework the event loop.
  pub fn has_pending_tasks(&self) -> bool {
    // Ensure that all reads after this point happen-after we load from the atomic
    self.has_tasks.load(Ordering::Acquire)
  }

  /// Poll this set of tasks, returning a non-empty set of tasks if there have
  /// been any queued, or registering the waker if not.
  pub fn poll_inner(&self, cx: &mut Context) -> Poll<Vec<UnsendTask>> {
    // Check the flag first -- if it's false we definitely have no tasks. AcqRel semantics ensure
    // that this read happens-before the vector read below, and that any writes happen-after the success
    // case.
    if self
      .has_tasks
      .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
      .is_err()
    {
      self.waker.register(cx.waker());
      return Poll::Pending;
    }

    let mut lock = self.tasks.lock().unwrap();
    let tasks = std::mem::take(lock.deref_mut());
    if tasks.is_empty() {
      // Unlikely race lost -- the task submission to the queue and flag are not atomic, so it's
      // possible we ended up with an extra poll here. This only shows up under Miri, but as it is
      // possible we do need to handle it.
      self.waker.register(cx.waker());
      return Poll::Pending;
    }

    // SAFETY: we are removing the Send trait as we return the tasks here to prevent
    // these tasks from accidentally leaking to another thread.
    let tasks = unsafe { std::mem::transmute(tasks) };
    Poll::Ready(tasks)
  }

  fn spawn(&self, task: SendTask) {
    self.tasks.lock().unwrap().push(task);
    // TODO(mmastrac): can we skip the mutex here?
    // Release ordering means that the writes in the above lock happen-before the atomic store
    self.has_tasks.store(true, Ordering::Release);
    self.waker.wake();
  }
}

/// Allows for submission of v8 tasks on the same thread.
#[derive(Clone)]
pub struct V8TaskSpawner {
  // TODO(mmastrac): can we split the waker into a send and !send one?
  tasks: Arc<V8TaskSpawnerFactory>,
  _unsend_marker: PhantomData<*const ()>,
}

impl V8TaskSpawner {
  /// Spawn a task that runs within the [`crate::JsRuntime`] event loop from the same thread
  /// that the runtime is running on. This function is re-entrant-safe and may be called from
  /// ops, from outside of a [`v8::HandleScope`] in a plain `async`` task, or even from within
  /// another, previously-spawned task.
  ///
  /// The task is handed off to be run the next time the event loop is polled, and there are
  /// no guarantees as to when this may happen.
  ///
  /// # Safety
  ///
  /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
  /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
  ///
  /// For example, if the code called by this task can raise an exception, the task must ensure
  /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
  /// event loop's [`v8::HandleScope`].
  pub fn spawn<F>(&self, f: F)
  where
    F: FnOnce(&mut v8::HandleScope) + 'static,
  {
    let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>)> = Box::new(f);
    // SAFETY: we are transmuting Send into a !Send handle but we guarantee this object will never
    // leave the current thread because `V8TaskSpawner` is !Send.
    let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>) + Send> =
      unsafe { std::mem::transmute(task) };
    self.tasks.spawn(task)
  }
}

/// Allows for submission of v8 tasks on any thread.
#[derive(Clone)]
pub struct V8CrossThreadTaskSpawner {
  tasks: Arc<V8TaskSpawnerFactory>,
}

// SAFETY: the underlying V8TaskSpawnerFactory is not Send, but we always submit Send tasks
// to it from this spawner.
unsafe impl Send for V8CrossThreadTaskSpawner {}

impl V8CrossThreadTaskSpawner {
  /// Spawn a task that runs within the [`crate::JsRuntime`] event loop, potentially (but not
  /// required to be) from a different thread than the runtime is running on.
  ///
  /// The task is handed off to be run the next time the event loop is polled, and there are
  /// no guarantees as to when this may happen.
  ///
  /// # Safety
  ///
  /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
  /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
  ///
  /// For example, if the code called by this task can raise an exception, the task must ensure
  /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
  /// event loop's [`v8::HandleScope`].
  pub fn spawn<F>(&self, f: F)
  where
    F: FnOnce(&mut v8::HandleScope) + Send + 'static,
  {
    self.tasks.spawn(Box::new(f))
  }

  /// Spawn a task that runs within the [`crate::JsRuntime`] event loop from a different thread
  /// than the runtime is running on.
  ///
  /// This function will deadlock if called from the same thread as the [`crate::JsRuntime`], and
  /// there are no checks for this case.
  ///
  /// As this function blocks until the task has run to completion (or panics/deadlocks), it is
  /// safe to borrow data from the local environment and use it within the closure.
  ///
  /// The task is handed off to be run the next time the event loop is polled, and there are
  /// no guarantees as to when this may happen, however the function will not return until the
  /// task has been fully run to completion.
  ///
  /// # Safety
  ///
  /// The task shares the same [`v8::HandleScope`] as the core event loop, which means that it
  /// must maintain the scope in a valid state to avoid corrupting or destroying the runtime.
  ///
  /// For example, if the code called by this task can raise an exception, the task must ensure
  /// that it calls that code within a new [`v8::TryCatch`] to avoid the exception leaking to the
  /// event loop's [`v8::HandleScope`].
  pub fn spawn_blocking<'a, F, T>(&self, f: F) -> T
  where
    F: FnOnce(&mut v8::HandleScope) -> T + Send + 'a,
    T: Send + 'a,
  {
    let (tx, rx) = std::sync::mpsc::sync_channel(0);
    let task: Box<dyn FnOnce(&mut v8::HandleScope<'_>) + Send> =
      Box::new(|scope| {
        let r = f(scope);
        _ = tx.send(r);
      });
    // SAFETY: We can safely transmute to the 'static lifetime because we guarantee this method will either
    // complete fully by the time it returns, deadlock or panic.
    let task: SendTask = unsafe { std::mem::transmute(task) };
    self.tasks.spawn(task);
    rx.recv().unwrap()
  }
}

#[cfg(test)]
mod tests {
  use super::*;
  use std::future::poll_fn;
  use tokio::task::LocalSet;

  // https://github.com/tokio-rs/tokio/issues/6155
  #[test]
  #[cfg(not(all(miri, target_os = "linux")))]
  fn test_spawner_serial() {
    let runtime = tokio::runtime::Builder::new_multi_thread()
      .worker_threads(1)
      .build()
      .unwrap();
    runtime.block_on(async {
      let factory = Arc::<V8TaskSpawnerFactory>::default();
      let cross_thread_spawner = factory.clone().new_cross_thread_spawner();
      let local_set = LocalSet::new();

      const COUNT: usize = 1000;

      let task = runtime.spawn(async move {
        for _ in 0..COUNT {
          cross_thread_spawner.spawn(|_| {});
        }
      });

      local_set.spawn_local(async move {
        let mut count = 0;
        loop {
          count += poll_fn(|cx| factory.poll_inner(cx)).await.len();
          if count >= COUNT {
            break;
          }
        }
      });

      local_set.await;
      _ = task.await;
    });
  }

  // https://github.com/tokio-rs/tokio/issues/6155
  #[test]
  #[cfg(not(all(miri, target_os = "linux")))]
  fn test_spawner_parallel() {
    let runtime = tokio::runtime::Builder::new_multi_thread()
      .worker_threads(1)
      .build()
      .unwrap();
    runtime.block_on(async {
      let factory = Arc::<V8TaskSpawnerFactory>::default();
      let cross_thread_spawner = factory.clone().new_cross_thread_spawner();
      let local_set = LocalSet::new();

      const COUNT: usize = 100;
      let mut tasks = vec![];
      for _ in 0..COUNT {
        let cross_thread_spawner = cross_thread_spawner.clone();
        tasks.push(runtime.spawn(async move {
          cross_thread_spawner.spawn(|_| {});
        }));
      }

      local_set.spawn_local(async move {
        let mut count = 0;
        loop {
          count += poll_fn(|cx| factory.poll_inner(cx)).await.len();
          if count >= COUNT {
            break;
          }
        }
      });

      local_set.await;
      for task in tasks.drain(..) {
        _ = task.await;
      }
    });
  }
}