1use alloc::{
2 ffi::CString,
3 sync::{Arc, Weak},
4 vec::Vec,
5};
6use core::{ptr::NonNull, result::Result as StdResult, task::Poll};
7#[cfg(feature = "std")]
8use std::println;
9
10#[cfg(feature = "parallel")]
11use std::sync::mpsc::{self, Receiver, Sender};
12
13use async_lock::Mutex;
14
15use super::{
16 opaque::Opaque, raw::RawRuntime, schedular::SchedularPoll, spawner::DriveFuture,
17 InterruptHandler, MemoryUsage, PromiseHook, RejectionTracker,
18};
19use crate::allocator::Allocator;
20#[cfg(feature = "loader")]
21use crate::loader::{Loader, Resolver};
22#[cfg(feature = "parallel")]
23use crate::util::{AssertSendFuture, AssertSyncFuture};
24use crate::{
25 context::AsyncContext, qjs, result::AsyncJobException, util::ManualPoll, Ctx, Exception, Result,
26};
27
28#[derive(Debug)]
29pub(crate) struct InnerRuntime {
30 pub runtime: RawRuntime,
31 #[cfg(feature = "parallel")]
32 pub drop_recv: Receiver<NonNull<qjs::JSContext>>,
33}
34
35impl InnerRuntime {
36 pub fn drop_pending(&self) {
37 #[cfg(feature = "parallel")]
38 while let Ok(x) = self.drop_recv.try_recv() {
39 unsafe { qjs::JS_FreeContext(x.as_ptr()) }
40 }
41 }
42}
43
44impl Drop for InnerRuntime {
45 fn drop(&mut self) {
46 self.drop_pending();
47 }
48}
49
50#[cfg(feature = "parallel")]
51unsafe impl Send for InnerRuntime {}
52
53#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
57#[derive(Clone)]
58pub struct AsyncWeakRuntime {
59 inner: Weak<Mutex<InnerRuntime>>,
60 #[cfg(feature = "parallel")]
61 drop_send: Sender<NonNull<qjs::JSContext>>,
62}
63
64impl AsyncWeakRuntime {
65 pub fn try_ref(&self) -> Option<AsyncRuntime> {
66 self.inner.upgrade().map(|inner| AsyncRuntime {
67 inner,
68 #[cfg(feature = "parallel")]
69 drop_send: self.drop_send.clone(),
70 })
71 }
72}
73
74#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
76#[derive(Clone)]
77pub struct AsyncRuntime {
78 pub(crate) inner: Arc<Mutex<InnerRuntime>>,
80 #[cfg(feature = "parallel")]
81 pub(crate) drop_send: Sender<NonNull<qjs::JSContext>>,
82}
83
84#[cfg(feature = "parallel")]
87unsafe impl Send for AsyncRuntime {}
88#[cfg(feature = "parallel")]
89unsafe impl Send for AsyncWeakRuntime {}
90
91#[cfg(feature = "parallel")]
95unsafe impl Sync for AsyncRuntime {}
96#[cfg(feature = "parallel")]
97unsafe impl Sync for AsyncWeakRuntime {}
98
99impl AsyncRuntime {
100 #[allow(clippy::arc_with_non_send_sync)]
108 pub fn new() -> Result<Self> {
109 let opaque = Opaque::with_spawner();
110 let runtime = unsafe { RawRuntime::new(opaque) }?;
111
112 #[cfg(feature = "parallel")]
113 let (drop_send, drop_recv) = mpsc::channel();
114
115 Ok(Self {
116 inner: Arc::new(Mutex::new(InnerRuntime {
117 runtime,
118 #[cfg(feature = "parallel")]
119 drop_recv,
120 })),
121 #[cfg(feature = "parallel")]
122 drop_send,
123 })
124 }
125
126 #[allow(clippy::arc_with_non_send_sync)]
131 pub fn new_with_alloc<A>(allocator: A) -> Result<Self>
132 where
133 A: Allocator + 'static,
134 {
135 let opaque = Opaque::with_spawner();
136 let runtime = unsafe { RawRuntime::new_with_allocator(opaque, allocator) }?;
137
138 #[cfg(feature = "parallel")]
139 let (drop_send, drop_recv) = mpsc::channel();
140
141 Ok(Self {
142 inner: Arc::new(Mutex::new(InnerRuntime {
143 runtime,
144 #[cfg(feature = "parallel")]
145 drop_recv,
146 })),
147 #[cfg(feature = "parallel")]
148 drop_send,
149 })
150 }
151
152 pub fn weak(&self) -> AsyncWeakRuntime {
154 AsyncWeakRuntime {
155 inner: Arc::downgrade(&self.inner),
156 #[cfg(feature = "parallel")]
157 drop_send: self.drop_send.clone(),
158 }
159 }
160
161 #[inline]
163 pub async fn set_host_promise_rejection_tracker(&self, tracker: Option<RejectionTracker>) {
164 unsafe {
165 self.inner
166 .lock()
167 .await
168 .runtime
169 .set_host_promise_rejection_tracker(tracker);
170 }
171 }
172
173 #[inline]
175 pub async fn set_promise_hook(&self, tracker: Option<PromiseHook>) {
176 unsafe {
177 self.inner.lock().await.runtime.set_promise_hook(tracker);
178 }
179 }
180
181 #[inline]
185 pub async fn set_interrupt_handler(&self, handler: Option<InterruptHandler>) {
186 unsafe {
187 self.inner
188 .lock()
189 .await
190 .runtime
191 .set_interrupt_handler(handler);
192 }
193 }
194
195 #[cfg(feature = "loader")]
197 #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "loader")))]
198 pub async fn set_loader<R, L>(&self, resolver: R, loader: L)
199 where
200 R: Resolver + 'static,
201 L: Loader + 'static,
202 {
203 unsafe {
204 self.inner.lock().await.runtime.set_loader(resolver, loader);
205 }
206 }
207
208 pub async fn set_info<S: Into<Vec<u8>>>(&self, info: S) -> Result<()> {
210 let string = CString::new(info)?;
211 unsafe {
212 self.inner.lock().await.runtime.set_info(string);
213 }
214 Ok(())
215 }
216
217 pub async fn set_memory_limit(&self, limit: usize) {
224 unsafe {
225 self.inner.lock().await.runtime.set_memory_limit(limit);
226 }
227 }
228
229 pub async fn set_max_stack_size(&self, limit: usize) {
233 unsafe {
234 self.inner.lock().await.runtime.set_max_stack_size(limit);
235 }
236 }
237
238 pub async fn set_gc_threshold(&self, threshold: usize) {
240 unsafe {
241 self.inner.lock().await.runtime.set_gc_threshold(threshold);
242 }
243 }
244
245 pub async fn run_gc(&self) {
252 unsafe {
253 let mut lock = self.inner.lock().await;
254 lock.drop_pending();
255 lock.runtime.run_gc();
256 }
257 }
258
259 pub async fn memory_usage(&self) -> MemoryUsage {
261 unsafe { self.inner.lock().await.runtime.memory_usage() }
262 }
263
264 #[inline]
268 pub async fn is_job_pending(&self) -> bool {
269 let lock = self.inner.lock().await;
270
271 lock.runtime.is_job_pending() || !lock.runtime.get_opaque().spawner_is_empty()
272 }
273
274 #[inline]
278 pub async fn execute_pending_job(&self) -> StdResult<bool, AsyncJobException> {
279 let mut lock = self.inner.lock().await;
280 lock.runtime.update_stack_top();
281 lock.drop_pending();
282
283 let f = ManualPoll::new(|cx| {
284 let job_res = lock.runtime.execute_pending_job().map_err(|e| {
285 let ptr = NonNull::new(e)
286 .expect("executing pending job returned a null context on error");
287 unsafe { qjs::JS_DupContext(ptr.as_ptr()) };
290 AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
291 })?;
292
293 if job_res {
294 return Poll::Ready(Ok(true));
295 }
296
297 match lock.runtime.get_opaque().poll(cx) {
298 SchedularPoll::ShouldYield => Poll::Pending,
299 SchedularPoll::Empty => Poll::Ready(Ok(false)),
300 SchedularPoll::Pending => Poll::Ready(Ok(false)),
301 SchedularPoll::PendingProgress => Poll::Ready(Ok(true)),
302 }
303 });
304
305 #[cfg(feature = "parallel")]
306 let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
307
308 f.await
309 }
310
311 #[inline]
313 pub async fn idle(&self) {
314 let mut lock = self.inner.lock().await;
315 lock.runtime.update_stack_top();
316 lock.drop_pending();
317
318 let f = ManualPoll::new(|cx| {
319 loop {
320 let pending = lock.runtime.execute_pending_job().map_err(|e| {
321 let ptr = NonNull::new(e)
322 .expect("executing pending job returned a null context on error");
323 unsafe { qjs::JS_DupContext(ptr.as_ptr()) };
326 AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
327 });
328 match pending {
329 Err(e) => {
330 let ctx = unsafe { Ctx::from_ptr(e.0 .0.ctx().as_ptr()) };
332 let err = ctx.catch();
333 if let Some(_x) = err.clone().into_object().and_then(Exception::from_object)
334 {
335 #[cfg(feature = "std")]
337 println!("error executing job: {}", _x);
338 } else {
339 #[cfg(feature = "std")]
340 println!("error executing job: {:?}", err);
341 }
342 }
343 Ok(true) => continue,
344 Ok(false) => {}
345 }
346
347 match lock.runtime.get_opaque().poll(cx) {
348 SchedularPoll::ShouldYield => return Poll::Pending,
349 SchedularPoll::Empty => return Poll::Ready(()),
350 SchedularPoll::Pending => return Poll::Pending,
351 SchedularPoll::PendingProgress => {}
352 }
353 }
354 });
355
356 #[cfg(feature = "parallel")]
357 let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
358
359 f.await
360 }
361
362 pub fn drive(&self) -> DriveFuture {
366 DriveFuture::new(self.weak())
367 }
368}
369
370#[cfg(test)]
371macro_rules! async_test_case {
372 ($name:ident => ($rt:ident,$ctx:ident) { $($t:tt)* }) => {
373 #[test]
374 fn $name() {
375 #[cfg(feature = "parallel")]
376 let mut new_thread = tokio::runtime::Builder::new_multi_thread();
377
378 #[cfg(not(feature = "parallel"))]
379 let mut new_thread = tokio::runtime::Builder::new_current_thread();
380
381 let rt = new_thread
382 .enable_all()
383 .build()
384 .unwrap();
385
386 #[cfg(feature = "parallel")]
387 {
388 rt.block_on(async {
389 let $rt = crate::AsyncRuntime::new().unwrap();
390 let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
391
392 $($t)*
393
394 })
395 }
396 #[cfg(not(feature = "parallel"))]
397 {
398 let set = tokio::task::LocalSet::new();
399 set.block_on(&rt, async {
400 let $rt = crate::AsyncRuntime::new().unwrap();
401 let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
402
403 $($t)*
404 })
405 }
406 }
407 };
408}
409
410#[cfg(test)]
411mod test {
412 use std::time::Duration;
413
414 use crate::*;
415
416 use self::context::EvalOptions;
417
418 async_test_case!(basic => (_rt,ctx){
419 ctx.async_with(async |ctx|{
420 let res: i32 = ctx.eval("1 + 1").unwrap();
421 assert_eq!(res,2i32);
422 }).await;
423 });
424
425 async_test_case!(sleep_closure => (_rt,ctx){
426
427 let mut a = 1;
428 let a_ref = &mut a;
429
430
431 ctx.async_with(async |ctx|{
432 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
433 ctx.globals().set("foo","bar").unwrap();
434 *a_ref += 1;
435 }).await;
436 assert_eq!(a,2);
437 });
438
439 async_test_case!(drive => (rt,ctx){
440 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
441
442 #[cfg(feature = "parallel")]
443 tokio::spawn(rt.drive());
444 #[cfg(not(feature = "parallel"))]
445 tokio::task::spawn_local(rt.drive());
446
447 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
449
450 let number = Arc::new(AtomicUsize::new(0));
451 let number_clone = number.clone();
452 let gate = Arc::new(tokio::sync::Notify::new());
453 let gate_clone = gate.clone();
454 let done = Arc::new(tokio::sync::Notify::new());
455 let done_clone = done.clone();
456
457 ctx.async_with(async |ctx|{
458 ctx.spawn(async move {
459 gate_clone.notified().await;
460 number_clone.store(1,Ordering::SeqCst);
461 done_clone.notify_one();
462 });
463 }).await;
464 assert_eq!(number.load(Ordering::SeqCst),0);
466 gate.notify_one();
468 done.notified().await;
469 assert_eq!(number.load(Ordering::SeqCst),1);
470
471 });
472
473 async_test_case!(no_drive => (rt,ctx){
474 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
475
476 let number = Arc::new(AtomicUsize::new(0));
477 let number_clone = number.clone();
478
479 ctx.async_with(async |ctx|{
480 ctx.spawn(async move {
481 tokio::task::yield_now().await;
482 number_clone.store(1,Ordering::SeqCst);
483 });
484 }).await;
485 assert_eq!(number.load(Ordering::SeqCst),0);
486 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
487 assert_eq!(number.load(Ordering::SeqCst),0);
488
489 });
490
491 async_test_case!(idle => (rt,ctx){
492 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
493
494 let number = Arc::new(AtomicUsize::new(0));
495 let number_clone = number.clone();
496
497 ctx.async_with(async |ctx|{
498 ctx.spawn(async move {
499 tokio::task::yield_now().await;
500 number_clone.store(1,Ordering::SeqCst);
501 });
502 }).await;
503 assert_eq!(number.load(Ordering::SeqCst),0);
504 rt.idle().await;
505 assert_eq!(number.load(Ordering::SeqCst),1);
506
507 });
508
509 async_test_case!(recursive_spawn => (rt,ctx){
510 use tokio::sync::oneshot;
511
512 ctx.async_with(async |ctx|{
513 let ctx_clone = ctx.clone();
514 let (tx,rx) = oneshot::channel::<()>();
515 let (tx2,rx2) = oneshot::channel::<()>();
516 ctx.spawn(async move {
517 tokio::task::yield_now().await;
518
519 let ctx = ctx_clone.clone();
520
521 ctx_clone.spawn(async move {
522 tokio::task::yield_now().await;
523 ctx.spawn(async move {
524 tokio::task::yield_now().await;
525 tx2.send(()).unwrap();
526 tokio::task::yield_now().await;
527 });
528 tokio::task::yield_now().await;
529 tx.send(()).unwrap();
530 });
531
532 for _ in 0..32{
535 ctx_clone.spawn(async move {})
536 }
537
538 });
539 tokio::time::timeout(Duration::from_millis(500), rx).await.unwrap().unwrap();
540 tokio::time::timeout(Duration::from_millis(500), rx2).await.unwrap().unwrap();
541 }).await;
542
543 });
544
545 async_test_case!(recursive_spawn_from_script => (rt,ctx) {
546 use std::sync::atomic::{Ordering, AtomicUsize};
547 use crate::prelude::Func;
548
549 static COUNT: AtomicUsize = AtomicUsize::new(0);
550 static SCRIPT: &str = r#"
551
552 async function main() {
553
554 setTimeout(() => {
555 inc_count()
556 setTimeout(async () => {
557 inc_count()
558 }, 100);
559 }, 100);
560 }
561
562 main().catch(print);
563
564
565 "#;
566
567 fn inc_count(){
568 COUNT.fetch_add(1,Ordering::Relaxed);
569 }
570
571 fn set_timeout_spawn<'js>(ctx: Ctx<'js>, callback: Function<'js>, millis: usize) -> Result<()> {
572 ctx.spawn(async move {
573 tokio::time::sleep(Duration::from_millis(millis as u64)).await;
574 callback.call::<_, ()>(()).unwrap();
575 });
576
577 Ok(())
578 }
579
580
581 ctx.async_with(async |ctx|{
582
583 let res: Result<Promise> = (|| {
584 let globals = ctx.globals();
585
586 globals.set("inc_count", Func::from(inc_count))?;
587
588 globals.set("setTimeout", Func::from(set_timeout_spawn))?;
589 let options = EvalOptions{
590 promise: true,
591 strict: false,
592 ..EvalOptions::default()
593 };
594
595 ctx.eval_with_options(SCRIPT, options)?
596 })();
597
598 match res.catch(&ctx){
599 Ok(promise) => {
600 if let Err(err) = promise.into_future::<Value>().await.catch(&ctx){
601 eprintln!("{}", err)
602 }
603 },
604 Err(err) => {
605 eprintln!("{}", err)
606 },
607 };
608
609 })
610 .await;
611
612 rt.idle().await;
613
614 assert_eq!(COUNT.load(Ordering::Relaxed),2);
615 });
616
617 async_test_case!(interrupt_handler_idle => (rt, ctx) {
618 use std::time::Instant;
619
620 let timeout = Duration::from_millis(100);
621 let start_time = Instant::now();
622
623 rt.set_interrupt_handler(Some(Box::new(move || start_time.elapsed() >= timeout)))
624 .await;
625
626 let _ = ctx.async_with(async |ctx| {
627 ctx.eval::<(), _>(r#"
628 async function example() {
629 while (true) {
630 await Promise.resolve();
631 }
632 }
633 example();
634 "#)
635 }).await;
636
637 rt.idle().await;
641 });
642
643 #[cfg(feature = "parallel")]
644 fn assert_is_send<T: Send>(t: T) -> T {
645 t
646 }
647
648 #[cfg(feature = "parallel")]
649 fn assert_is_sync<T: Send>(t: T) -> T {
650 t
651 }
652
653 #[cfg(feature = "parallel")]
654 #[tokio::test]
655 async fn ensure_types_are_send_sync() {
656 let rt = AsyncRuntime::new().unwrap();
657
658 std::mem::drop(assert_is_sync(rt.idle()));
659 std::mem::drop(assert_is_sync(rt.execute_pending_job()));
660 std::mem::drop(assert_is_sync(rt.drive()));
661
662 std::mem::drop(assert_is_send(rt.idle()));
663 std::mem::drop(assert_is_send(rt.execute_pending_job()));
664 std::mem::drop(assert_is_send(rt.drive()));
665 }
666}