use std::{
ffi::CString,
ptr::NonNull,
result::Result as StdResult,
sync::{Arc, Weak},
task::Poll,
};
#[cfg(feature = "parallel")]
use std::sync::mpsc::{self, Receiver, Sender};
use async_lock::Mutex;
use super::{
raw::{Opaque, RawRuntime},
schedular::SchedularPoll,
spawner::DriveFuture,
InterruptHandler, MemoryUsage,
};
#[cfg(feature = "allocator")]
use crate::allocator::Allocator;
#[cfg(feature = "loader")]
use crate::loader::{Loader, Resolver};
use crate::{
context::AsyncContext, result::AsyncJobException, util::ManualPoll, Ctx, Error, Exception,
Result,
};
#[cfg(feature = "parallel")]
use crate::{
qjs,
util::{AssertSendFuture, AssertSyncFuture},
};
#[derive(Debug)]
pub(crate) struct InnerRuntime {
pub runtime: RawRuntime,
#[cfg(feature = "parallel")]
pub drop_recv: Receiver<NonNull<qjs::JSContext>>,
}
impl InnerRuntime {
pub fn drop_pending(&self) {
#[cfg(feature = "parallel")]
while let Ok(x) = self.drop_recv.try_recv() {
unsafe { qjs::JS_FreeContext(x.as_ptr()) }
}
}
}
impl Drop for InnerRuntime {
fn drop(&mut self) {
self.drop_pending();
}
}
#[cfg(feature = "parallel")]
unsafe impl Send for InnerRuntime {}
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
#[derive(Clone)]
pub struct AsyncWeakRuntime {
inner: Weak<Mutex<InnerRuntime>>,
#[cfg(feature = "parallel")]
drop_send: Sender<NonNull<qjs::JSContext>>,
}
impl AsyncWeakRuntime {
pub fn try_ref(&self) -> Option<AsyncRuntime> {
self.inner.upgrade().map(|inner| AsyncRuntime {
inner,
#[cfg(feature = "parallel")]
drop_send: self.drop_send.clone(),
})
}
}
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
#[derive(Clone)]
pub struct AsyncRuntime {
pub(crate) inner: Arc<Mutex<InnerRuntime>>,
#[cfg(feature = "parallel")]
pub(crate) drop_send: Sender<NonNull<qjs::JSContext>>,
}
#[cfg(feature = "parallel")]
unsafe impl Send for AsyncRuntime {}
#[cfg(feature = "parallel")]
unsafe impl Send for AsyncWeakRuntime {}
#[cfg(feature = "parallel")]
unsafe impl Sync for AsyncRuntime {}
#[cfg(feature = "parallel")]
unsafe impl Sync for AsyncWeakRuntime {}
impl AsyncRuntime {
#[allow(clippy::arc_with_non_send_sync)]
pub fn new() -> Result<Self> {
let opaque = Opaque::with_spawner();
let runtime = unsafe { RawRuntime::new(opaque) }.ok_or(Error::Allocation)?;
#[cfg(feature = "parallel")]
let (drop_send, drop_recv) = mpsc::channel();
Ok(Self {
inner: Arc::new(Mutex::new(InnerRuntime {
runtime,
#[cfg(feature = "parallel")]
drop_recv,
})),
#[cfg(feature = "parallel")]
drop_send,
})
}
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "allocator")))]
#[cfg(feature = "allocator")]
#[allow(clippy::arc_with_non_send_sync)]
pub fn new_with_alloc<A>(allocator: A) -> Result<Self>
where
A: Allocator + 'static,
{
let opaque = Opaque::with_spawner();
let runtime = unsafe { RawRuntime::new_with_allocator(opaque, allocator) }
.ok_or(Error::Allocation)?;
#[cfg(feature = "parallel")]
let (drop_send, drop_recv) = mpsc::channel();
Ok(Self {
inner: Arc::new(Mutex::new(InnerRuntime {
runtime,
#[cfg(feature = "parallel")]
drop_recv,
})),
#[cfg(feature = "parallel")]
drop_send,
})
}
pub fn weak(&self) -> AsyncWeakRuntime {
AsyncWeakRuntime {
inner: Arc::downgrade(&self.inner),
#[cfg(feature = "parallel")]
drop_send: self.drop_send.clone(),
}
}
#[inline]
pub async fn set_interrupt_handler(&self, handler: Option<InterruptHandler>) {
unsafe {
self.inner
.lock()
.await
.runtime
.set_interrupt_handler(handler);
}
}
#[cfg(feature = "loader")]
#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "loader")))]
pub async fn set_loader<R, L>(&self, resolver: R, loader: L)
where
R: Resolver + 'static,
L: Loader + 'static,
{
unsafe {
self.inner.lock().await.runtime.set_loader(resolver, loader);
}
}
pub async fn set_info<S: Into<Vec<u8>>>(&self, info: S) -> Result<()> {
let string = CString::new(info)?;
unsafe {
self.inner.lock().await.runtime.set_info(string);
}
Ok(())
}
pub async fn set_memory_limit(&self, limit: usize) {
unsafe {
self.inner.lock().await.runtime.set_memory_limit(limit);
}
}
pub async fn set_max_stack_size(&self, limit: usize) {
unsafe {
self.inner.lock().await.runtime.set_max_stack_size(limit);
}
}
pub async fn set_gc_threshold(&self, threshold: usize) {
unsafe {
self.inner.lock().await.runtime.set_gc_threshold(threshold);
}
}
pub async fn run_gc(&self) {
unsafe {
let mut lock = self.inner.lock().await;
lock.drop_pending();
lock.runtime.run_gc();
}
}
pub async fn memory_usage(&self) -> MemoryUsage {
unsafe { self.inner.lock().await.runtime.memory_usage() }
}
#[inline]
pub async fn is_job_pending(&self) -> bool {
let mut lock = self.inner.lock().await;
lock.runtime.is_job_pending()
|| !unsafe { lock.runtime.get_opaque_mut().spawner() }.is_empty()
}
#[inline]
pub async fn execute_pending_job(&self) -> StdResult<bool, AsyncJobException> {
let mut lock = self.inner.lock().await;
lock.runtime.update_stack_top();
lock.drop_pending();
let f = ManualPoll::new(|cx| {
let job_res = lock.runtime.execute_pending_job().map_err(|e| {
let ptr = NonNull::new(e)
.expect("executing pending job returned a null context on error");
AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
})?;
if job_res {
return Poll::Ready(Ok(true));
}
match unsafe { lock.runtime.get_opaque_mut() }.spawner().poll(cx) {
SchedularPoll::ShouldYield => Poll::Pending,
SchedularPoll::Empty => Poll::Ready(Ok(false)),
SchedularPoll::Pending => Poll::Ready(Ok(false)),
SchedularPoll::PendingProgress => Poll::Ready(Ok(true)),
}
});
#[cfg(feature = "parallel")]
let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
f.await
}
#[inline]
pub async fn idle(&self) {
let mut lock = self.inner.lock().await;
lock.runtime.update_stack_top();
lock.drop_pending();
let f = ManualPoll::new(|cx| {
loop {
let pending = lock.runtime.execute_pending_job().map_err(|e| {
let ptr = NonNull::new(e)
.expect("executing pending job returned a null context on error");
AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
});
match pending {
Err(e) => {
let ctx = unsafe { Ctx::from_ptr(e.0 .0.ctx.as_ptr()) };
let err = ctx.catch();
if let Some(x) = err.clone().into_object().and_then(Exception::from_object)
{
println!("error executing job: {}", x);
} else {
println!("error executing job: {:?}", err);
}
}
Ok(true) => continue,
Ok(false) => {}
}
match unsafe { lock.runtime.get_opaque_mut() }.spawner().poll(cx) {
SchedularPoll::ShouldYield => return Poll::Pending,
SchedularPoll::Empty => return Poll::Ready(()),
SchedularPoll::Pending => return Poll::Pending,
SchedularPoll::PendingProgress => {}
}
}
});
#[cfg(feature = "parallel")]
let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
f.await
}
pub fn drive(&self) -> DriveFuture {
DriveFuture::new(self.weak())
}
}
#[cfg(test)]
macro_rules! async_test_case {
($name:ident => ($rt:ident,$ctx:ident) { $($t:tt)* }) => {
#[test]
fn $name() {
let rt = if cfg!(feature = "parallel") {
tokio::runtime::Builder::new_multi_thread()
} else {
tokio::runtime::Builder::new_current_thread()
}
.enable_all()
.build()
.unwrap();
#[cfg(feature = "parallel")]
{
rt.block_on(async {
let $rt = crate::AsyncRuntime::new().unwrap();
let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
$($t)*
})
}
#[cfg(not(feature = "parallel"))]
{
let set = tokio::task::LocalSet::new();
set.block_on(&rt, async {
let $rt = crate::AsyncRuntime::new().unwrap();
let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
$($t)*
})
}
}
};
}
#[cfg(test)]
mod test {
use std::time::Duration;
use crate::*;
use self::context::EvalOptions;
async_test_case!(basic => (_rt,ctx){
async_with!(&ctx => |ctx|{
let res: i32 = ctx.eval("1 + 1").unwrap();
assert_eq!(res,2i32);
}).await;
});
async_test_case!(sleep_closure => (_rt,ctx){
let mut a = 1;
let a_ref = &mut a;
async_with!(&ctx => |ctx|{
tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
ctx.globals().set("foo","bar").unwrap();
*a_ref += 1;
}).await;
assert_eq!(a,2);
});
async_test_case!(drive => (rt,ctx){
use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
#[cfg(feature = "parallel")]
tokio::spawn(rt.drive());
#[cfg(not(feature = "parallel"))]
tokio::task::spawn_local(rt.drive());
tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
let number = Arc::new(AtomicUsize::new(0));
let number_clone = number.clone();
async_with!(&ctx => |ctx|{
ctx.spawn(async move {
tokio::task::yield_now().await;
number_clone.store(1,Ordering::SeqCst);
});
}).await;
assert_eq!(number.load(Ordering::SeqCst),0);
tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
assert_eq!(number.load(Ordering::SeqCst),1);
});
async_test_case!(no_drive => (rt,ctx){
use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
let number = Arc::new(AtomicUsize::new(0));
let number_clone = number.clone();
async_with!(&ctx => |ctx|{
ctx.spawn(async move {
tokio::task::yield_now().await;
number_clone.store(1,Ordering::SeqCst);
});
}).await;
assert_eq!(number.load(Ordering::SeqCst),0);
tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
assert_eq!(number.load(Ordering::SeqCst),0);
});
async_test_case!(idle => (rt,ctx){
use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
let number = Arc::new(AtomicUsize::new(0));
let number_clone = number.clone();
async_with!(&ctx => |ctx|{
ctx.spawn(async move {
tokio::task::yield_now().await;
number_clone.store(1,Ordering::SeqCst);
});
}).await;
assert_eq!(number.load(Ordering::SeqCst),0);
rt.idle().await;
assert_eq!(number.load(Ordering::SeqCst),1);
});
async_test_case!(recursive_spawn => (rt,ctx){
use tokio::sync::oneshot;
async_with!(&ctx => |ctx|{
let ctx_clone = ctx.clone();
let (tx,rx) = oneshot::channel::<()>();
let (tx2,rx2) = oneshot::channel::<()>();
ctx.spawn(async move {
tokio::task::yield_now().await;
let ctx = ctx_clone.clone();
ctx_clone.spawn(async move {
tokio::task::yield_now().await;
ctx.spawn(async move {
tokio::task::yield_now().await;
tx2.send(()).unwrap();
tokio::task::yield_now().await;
});
tokio::task::yield_now().await;
tx.send(()).unwrap();
});
for _ in 0..32{
ctx_clone.spawn(async move {})
}
});
tokio::time::timeout(Duration::from_millis(500), rx).await.unwrap().unwrap();
tokio::time::timeout(Duration::from_millis(500), rx2).await.unwrap().unwrap();
}).await;
});
async_test_case!(recursive_spawn_from_script => (rt,ctx) {
use std::sync::atomic::{Ordering, AtomicUsize};
use crate::prelude::Func;
static COUNT: AtomicUsize = AtomicUsize::new(0);
static SCRIPT: &str = r#"
async function main() {
setTimeout(() => {
inc_count()
setTimeout(async () => {
inc_count()
}, 100);
}, 100);
}
main().catch(print);
"#;
fn inc_count(){
COUNT.fetch_add(1,Ordering::Relaxed);
}
fn set_timeout_spawn<'js>(ctx: Ctx<'js>, callback: Function<'js>, millis: usize) -> Result<()> {
ctx.spawn(async move {
tokio::time::sleep(Duration::from_millis(millis as u64)).await;
callback.call::<_, ()>(()).unwrap();
});
Ok(())
}
async_with!(ctx => |ctx|{
let res: Result<Promise> = (|| {
let globals = ctx.globals();
globals.set("inc_count", Func::from(inc_count))?;
globals.set("setTimeout", Func::from(set_timeout_spawn))?;
let options = EvalOptions{
promise: true,
strict: false,
..EvalOptions::default()
};
ctx.eval_with_options(SCRIPT, options)?
})();
match res.catch(&ctx){
Ok(promise) => {
if let Err(err) = promise.into_future::<Value>().await.catch(&ctx){
eprintln!("{}", err)
}
},
Err(err) => {
eprintln!("{}", err)
},
};
})
.await;
rt.idle().await;
assert_eq!(COUNT.load(Ordering::Relaxed),2);
});
#[cfg(feature = "parallel")]
fn assert_is_send<T: Send>(t: T) -> T {
t
}
#[cfg(feature = "parallel")]
fn assert_is_sync<T: Send>(t: T) -> T {
t
}
#[cfg(feature = "parallel")]
#[tokio::test]
async fn ensure_types_are_send_sync() {
let rt = AsyncRuntime::new().unwrap();
std::mem::drop(assert_is_sync(rt.idle()));
std::mem::drop(assert_is_sync(rt.execute_pending_job()));
std::mem::drop(assert_is_sync(rt.drive()));
std::mem::drop(assert_is_send(rt.idle()));
std::mem::drop(assert_is_send(rt.execute_pending_job()));
std::mem::drop(assert_is_send(rt.drive()));
}
}