use crate::context::time::{JsDuration, JsInstant};
use crate::sys::time;
use crate::{
Context, JsResult, JsValue,
object::{JsFunction, NativeObject},
realm::Realm,
};
use boa_gc::{Finalize, Trace};
use futures_concurrency::future::FutureGroup;
use futures_lite::{StreamExt, future};
use std::any::Any;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::mem;
use std::rc::Rc;
use std::{cell::RefCell, collections::VecDeque, fmt::Debug, future::Future, pin::Pin};
pub struct NativeJob {
#[allow(clippy::type_complexity)]
f: Box<dyn FnOnce(&mut Context) -> JsResult<JsValue>>,
realm: Option<Realm>,
}
impl Debug for NativeJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NativeJob").finish_non_exhaustive()
}
}
impl NativeJob {
pub fn new<F>(f: F) -> Self
where
F: FnOnce(&mut Context) -> JsResult<JsValue> + 'static,
{
Self {
f: Box::new(f),
realm: None,
}
}
pub fn with_realm<F>(f: F, realm: Realm) -> Self
where
F: FnOnce(&mut Context) -> JsResult<JsValue> + 'static,
{
Self {
f: Box::new(f),
realm: Some(realm),
}
}
#[must_use]
pub const fn realm(&self) -> Option<&Realm> {
self.realm.as_ref()
}
pub fn call(self, context: &mut Context) -> JsResult<JsValue> {
if let Some(realm) = self.realm {
let old_realm = context.enter_realm(realm);
let result = (self.f)(context);
context.enter_realm(old_realm);
result
} else {
(self.f)(context)
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct OnceFlag(Rc<Cell<bool>>);
impl OnceFlag {
pub(crate) fn new() -> Self {
Self(Rc::new(Cell::new(false)))
}
pub(crate) fn set(&self) {
self.0.set(true);
}
pub(crate) fn is_set(&self) -> bool {
self.0.get()
}
}
#[derive(Debug)]
pub struct TimeoutJob {
timeout: JsDuration,
job: NativeJob,
cancelled: OnceFlag,
recurring: bool,
}
impl TimeoutJob {
#[must_use]
pub fn new(job: NativeJob, timeout_in_millis: u64) -> Self {
Self {
timeout: JsDuration::from_millis(timeout_in_millis),
job,
cancelled: OnceFlag::new(),
recurring: false,
}
}
#[must_use]
pub fn recurring(job: NativeJob, timeout_in_millis: u64) -> Self {
Self {
timeout: JsDuration::from_millis(timeout_in_millis),
job,
cancelled: OnceFlag::new(),
recurring: true,
}
}
#[must_use]
pub fn from_duration<F>(f: F, timeout: impl Into<JsDuration>) -> Self
where
F: FnOnce(&mut Context) -> JsResult<JsValue> + 'static,
{
Self::new(NativeJob::new(f), timeout.into().as_millis())
}
#[must_use]
pub fn with_realm<F>(f: F, realm: Realm, timeout: time::Duration) -> Self
where
F: FnOnce(&mut Context) -> JsResult<JsValue> + 'static,
{
Self::new(NativeJob::with_realm(f, realm), timeout.as_millis() as u64)
}
pub fn call(self, context: &mut Context) -> JsResult<JsValue> {
self.job.call(context)
}
#[inline]
#[must_use]
pub fn timeout(&self) -> JsDuration {
self.timeout
}
#[inline]
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.cancelled.is_set()
}
pub(crate) fn cancelled_flag(&self) -> OnceFlag {
self.cancelled.clone()
}
#[must_use]
pub fn is_recurring(&self) -> bool {
self.recurring
}
}
pub struct GenericJob(NativeJob);
impl Debug for GenericJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GenericJob").finish_non_exhaustive()
}
}
impl GenericJob {
pub fn new<F>(f: F, realm: Realm) -> Self
where
F: FnOnce(&mut Context) -> JsResult<JsValue> + 'static,
{
Self(NativeJob::with_realm(f, realm))
}
#[must_use]
pub const fn realm(&self) -> &Realm {
self.0
.realm
.as_ref()
.expect("all generic jobs must have an execution realm")
}
pub fn call(self, context: &mut Context) -> JsResult<JsValue> {
self.0.call(context)
}
}
pub type BoxedFuture<'a> = Pin<Box<dyn Future<Output = JsResult<JsValue>> + 'a>>;
#[allow(clippy::type_complexity)]
pub struct NativeAsyncJob {
f: Box<dyn for<'a> FnOnce(&'a RefCell<&mut Context>) -> BoxedFuture<'a>>,
realm: Option<Realm>,
}
impl Debug for NativeAsyncJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NativeAsyncJob")
.field("f", &"Closure")
.finish()
}
}
impl NativeAsyncJob {
pub fn new<F>(f: F) -> Self
where
F: AsyncFnOnce(&RefCell<&mut Context>) -> JsResult<JsValue> + 'static,
{
Self {
f: Box::new(move |ctx| Box::pin(async move { f(ctx).await })),
realm: None,
}
}
pub fn with_realm<F>(f: F, realm: Realm) -> Self
where
F: AsyncFnOnce(&RefCell<&mut Context>) -> JsResult<JsValue> + 'static,
{
Self {
f: Box::new(move |ctx| Box::pin(async move { f(ctx).await })),
realm: Some(realm),
}
}
#[must_use]
pub const fn realm(&self) -> Option<&Realm> {
self.realm.as_ref()
}
pub fn call<'a, 'b>(
self,
context: &'a RefCell<&'b mut Context>,
) -> impl Future<Output = JsResult<JsValue>> + Unpin + use<'a, 'b> {
let realm = self.realm;
let mut future = if let Some(realm) = &realm {
let old_realm = context.borrow_mut().enter_realm(realm.clone());
let result = (self.f)(context);
context.borrow_mut().enter_realm(old_realm);
result
} else {
(self.f)(context)
};
std::future::poll_fn(move |cx| {
if let Some(realm) = &realm {
let old_realm = context.borrow_mut().enter_realm(realm.clone());
let poll_result = future.as_mut().poll(cx);
context.borrow_mut().enter_realm(old_realm);
poll_result
} else {
future.as_mut().poll(cx)
}
})
}
}
pub struct PromiseJob(NativeJob);
impl Debug for PromiseJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PromiseJob").finish_non_exhaustive()
}
}
impl PromiseJob {
pub fn new<F>(f: F) -> Self
where
F: FnOnce(&mut Context) -> JsResult<JsValue> + 'static,
{
Self(NativeJob::new(f))
}
pub fn with_realm<F>(f: F, realm: Realm) -> Self
where
F: FnOnce(&mut Context) -> JsResult<JsValue> + 'static,
{
Self(NativeJob::with_realm(f, realm))
}
#[must_use]
pub const fn realm(&self) -> Option<&Realm> {
self.0.realm()
}
pub fn call(self, context: &mut Context) -> JsResult<JsValue> {
self.0.call(context)
}
}
#[derive(Trace, Finalize)]
pub struct JobCallback {
callback: JsFunction,
host_defined: Box<dyn NativeObject>,
}
impl Debug for JobCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobCallback")
.field("callback", &self.callback)
.field("host_defined", &"dyn NativeObject")
.finish()
}
}
impl JobCallback {
#[inline]
pub fn new<T: NativeObject>(callback: JsFunction, host_defined: T) -> Self {
Self {
callback,
host_defined: Box::new(host_defined),
}
}
#[inline]
#[must_use]
pub const fn callback(&self) -> &JsFunction {
&self.callback
}
#[inline]
#[must_use]
pub fn host_defined(&self) -> &dyn NativeObject {
&*self.host_defined
}
#[inline]
pub fn host_defined_mut(&mut self) -> &mut dyn NativeObject {
&mut *self.host_defined
}
}
#[non_exhaustive]
#[derive(Debug)]
pub enum Job {
PromiseJob(PromiseJob),
AsyncJob(NativeAsyncJob),
TimeoutJob(TimeoutJob),
GenericJob(GenericJob),
}
impl From<NativeAsyncJob> for Job {
fn from(native_async_job: NativeAsyncJob) -> Self {
Job::AsyncJob(native_async_job)
}
}
impl From<PromiseJob> for Job {
fn from(promise_job: PromiseJob) -> Self {
Job::PromiseJob(promise_job)
}
}
impl From<TimeoutJob> for Job {
fn from(job: TimeoutJob) -> Self {
Job::TimeoutJob(job)
}
}
impl From<GenericJob> for Job {
fn from(job: GenericJob) -> Self {
Job::GenericJob(job)
}
}
pub trait JobExecutor: Any {
fn enqueue_job(self: Rc<Self>, job: Job, context: &mut Context);
fn run_jobs(self: Rc<Self>, context: &mut Context) -> JsResult<()>;
#[expect(async_fn_in_trait, reason = "all our APIs are single-threaded")]
#[allow(
clippy::unused_async,
reason = "this must be overridden by proper async runtimes"
)]
async fn run_jobs_async(self: Rc<Self>, context: &RefCell<&mut Context>) -> JsResult<()>
where
Self: Sized,
{
self.run_jobs(&mut context.borrow_mut())
}
}
#[derive(Debug, Clone, Copy)]
pub struct IdleJobExecutor;
impl JobExecutor for IdleJobExecutor {
fn enqueue_job(self: Rc<Self>, _: Job, _: &mut Context) {}
fn run_jobs(self: Rc<Self>, _: &mut Context) -> JsResult<()> {
Ok(())
}
}
#[allow(clippy::struct_field_names)]
#[derive(Default)]
pub struct SimpleJobExecutor {
promise_jobs: RefCell<VecDeque<PromiseJob>>,
async_jobs: RefCell<VecDeque<NativeAsyncJob>>,
timeout_jobs: RefCell<BTreeMap<JsInstant, TimeoutJob>>,
generic_jobs: RefCell<VecDeque<GenericJob>>,
}
impl SimpleJobExecutor {
fn clear(&self) {
self.promise_jobs.borrow_mut().clear();
self.async_jobs.borrow_mut().clear();
self.timeout_jobs.borrow_mut().clear();
self.generic_jobs.borrow_mut().clear();
}
}
impl Debug for SimpleJobExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SimpleJobExecutor").finish_non_exhaustive()
}
}
impl SimpleJobExecutor {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl JobExecutor for SimpleJobExecutor {
fn enqueue_job(self: Rc<Self>, job: Job, context: &mut Context) {
match job {
Job::PromiseJob(p) => self.promise_jobs.borrow_mut().push_back(p),
Job::AsyncJob(a) => self.async_jobs.borrow_mut().push_back(a),
Job::TimeoutJob(t) => {
let now = context.clock().now();
self.timeout_jobs.borrow_mut().insert(now + t.timeout(), t);
}
Job::GenericJob(g) => self.generic_jobs.borrow_mut().push_back(g),
}
}
fn run_jobs(self: Rc<Self>, context: &mut Context) -> JsResult<()> {
future::block_on(self.run_jobs_async(&RefCell::new(context)))
}
async fn run_jobs_async(self: Rc<Self>, context: &RefCell<&mut Context>) -> JsResult<()>
where
Self: Sized,
{
let mut group = FutureGroup::new();
loop {
for job in mem::take(&mut *self.async_jobs.borrow_mut()) {
group.insert(job.call(context));
}
let no_timeout_jobs_to_run = {
let now = context.borrow().clock().now();
!self.timeout_jobs.borrow().iter().any(|(t, _)| &now >= t)
};
if self.promise_jobs.borrow().is_empty()
&& self.async_jobs.borrow().is_empty()
&& self.generic_jobs.borrow().is_empty()
&& no_timeout_jobs_to_run
&& group.is_empty()
{
break;
}
if let Some(Err(err)) = future::poll_once(group.next()).await.flatten() {
self.clear();
return Err(err);
}
{
let now = context.borrow().clock().now();
let mut timeouts_borrow = self.timeout_jobs.borrow_mut();
let mut jobs_to_keep = timeouts_borrow.split_off(&now);
jobs_to_keep.retain(|_, job| !job.is_cancelled());
let jobs_to_run = mem::replace(&mut *timeouts_borrow, jobs_to_keep);
drop(timeouts_borrow);
for job in jobs_to_run.into_values() {
if let Err(err) = job.call(&mut context.borrow_mut()) {
self.clear();
return Err(err);
}
}
}
let jobs = mem::take(&mut *self.promise_jobs.borrow_mut());
for job in jobs {
if let Err(err) = job.call(&mut context.borrow_mut()) {
self.clear();
return Err(err);
}
}
let jobs = mem::take(&mut *self.generic_jobs.borrow_mut());
for job in jobs {
if let Err(err) = job.call(&mut context.borrow_mut()) {
self.clear();
return Err(err);
}
}
context.borrow_mut().clear_kept_objects();
future::yield_now().await;
}
Ok(())
}
}