1use alloc::{
2 ffi::CString,
3 sync::{Arc, Weak},
4 vec::Vec,
5};
6use core::{ptr::NonNull, result::Result as StdResult, task::Poll};
7#[cfg(feature = "std")]
8use std::println;
9
10#[cfg(feature = "parallel")]
11use std::sync::mpsc::{self, Receiver, Sender};
12
13use async_lock::Mutex;
14
15use super::{
16 opaque::Opaque, raw::RawRuntime, schedular::SchedularPoll, spawner::DriveFuture,
17 InterruptHandler, MemoryUsage, PromiseHook, RejectionTracker,
18};
19use crate::allocator::Allocator;
20#[cfg(feature = "loader")]
21use crate::loader::{Loader, Resolver};
22use crate::{
23 context::AsyncContext, result::AsyncJobException, util::ManualPoll, Ctx, Exception, Result,
24};
25#[cfg(feature = "parallel")]
26use crate::{
27 qjs,
28 util::{AssertSendFuture, AssertSyncFuture},
29};
30
31#[derive(Debug)]
32pub(crate) struct InnerRuntime {
33 pub runtime: RawRuntime,
34 #[cfg(feature = "parallel")]
35 pub drop_recv: Receiver<NonNull<qjs::JSContext>>,
36}
37
38impl InnerRuntime {
39 pub fn drop_pending(&self) {
40 #[cfg(feature = "parallel")]
41 while let Ok(x) = self.drop_recv.try_recv() {
42 unsafe { qjs::JS_FreeContext(x.as_ptr()) }
43 }
44 }
45}
46
47impl Drop for InnerRuntime {
48 fn drop(&mut self) {
49 self.drop_pending();
50 }
51}
52
53#[cfg(feature = "parallel")]
54unsafe impl Send for InnerRuntime {}
55
56#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
60#[derive(Clone)]
61pub struct AsyncWeakRuntime {
62 inner: Weak<Mutex<InnerRuntime>>,
63 #[cfg(feature = "parallel")]
64 drop_send: Sender<NonNull<qjs::JSContext>>,
65}
66
67impl AsyncWeakRuntime {
68 pub fn try_ref(&self) -> Option<AsyncRuntime> {
69 self.inner.upgrade().map(|inner| AsyncRuntime {
70 inner,
71 #[cfg(feature = "parallel")]
72 drop_send: self.drop_send.clone(),
73 })
74 }
75}
76
77#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
79#[derive(Clone)]
80pub struct AsyncRuntime {
81 pub(crate) inner: Arc<Mutex<InnerRuntime>>,
83 #[cfg(feature = "parallel")]
84 pub(crate) drop_send: Sender<NonNull<qjs::JSContext>>,
85}
86
87#[cfg(feature = "parallel")]
90unsafe impl Send for AsyncRuntime {}
91#[cfg(feature = "parallel")]
92unsafe impl Send for AsyncWeakRuntime {}
93
94#[cfg(feature = "parallel")]
98unsafe impl Sync for AsyncRuntime {}
99#[cfg(feature = "parallel")]
100unsafe impl Sync for AsyncWeakRuntime {}
101
102impl AsyncRuntime {
103 #[allow(clippy::arc_with_non_send_sync)]
111 pub fn new() -> Result<Self> {
112 let opaque = Opaque::with_spawner();
113 let runtime = unsafe { RawRuntime::new(opaque) }?;
114
115 #[cfg(feature = "parallel")]
116 let (drop_send, drop_recv) = mpsc::channel();
117
118 Ok(Self {
119 inner: Arc::new(Mutex::new(InnerRuntime {
120 runtime,
121 #[cfg(feature = "parallel")]
122 drop_recv,
123 })),
124 #[cfg(feature = "parallel")]
125 drop_send,
126 })
127 }
128
129 #[allow(clippy::arc_with_non_send_sync)]
134 pub fn new_with_alloc<A>(allocator: A) -> Result<Self>
135 where
136 A: Allocator + 'static,
137 {
138 let opaque = Opaque::with_spawner();
139 let runtime = unsafe { RawRuntime::new_with_allocator(opaque, allocator) }?;
140
141 #[cfg(feature = "parallel")]
142 let (drop_send, drop_recv) = mpsc::channel();
143
144 Ok(Self {
145 inner: Arc::new(Mutex::new(InnerRuntime {
146 runtime,
147 #[cfg(feature = "parallel")]
148 drop_recv,
149 })),
150 #[cfg(feature = "parallel")]
151 drop_send,
152 })
153 }
154
155 pub fn weak(&self) -> AsyncWeakRuntime {
157 AsyncWeakRuntime {
158 inner: Arc::downgrade(&self.inner),
159 #[cfg(feature = "parallel")]
160 drop_send: self.drop_send.clone(),
161 }
162 }
163
164 #[inline]
166 pub async fn set_host_promise_rejection_tracker(&self, tracker: Option<RejectionTracker>) {
167 unsafe {
168 self.inner
169 .lock()
170 .await
171 .runtime
172 .set_host_promise_rejection_tracker(tracker);
173 }
174 }
175
176 #[inline]
178 pub async fn set_promise_hook(&self, tracker: Option<PromiseHook>) {
179 unsafe {
180 self.inner.lock().await.runtime.set_promise_hook(tracker);
181 }
182 }
183
184 #[inline]
188 pub async fn set_interrupt_handler(&self, handler: Option<InterruptHandler>) {
189 unsafe {
190 self.inner
191 .lock()
192 .await
193 .runtime
194 .set_interrupt_handler(handler);
195 }
196 }
197
198 #[cfg(feature = "loader")]
200 #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "loader")))]
201 pub async fn set_loader<R, L>(&self, resolver: R, loader: L)
202 where
203 R: Resolver + 'static,
204 L: Loader + 'static,
205 {
206 unsafe {
207 self.inner.lock().await.runtime.set_loader(resolver, loader);
208 }
209 }
210
211 pub async fn set_info<S: Into<Vec<u8>>>(&self, info: S) -> Result<()> {
213 let string = CString::new(info)?;
214 unsafe {
215 self.inner.lock().await.runtime.set_info(string);
216 }
217 Ok(())
218 }
219
220 pub async fn set_memory_limit(&self, limit: usize) {
227 unsafe {
228 self.inner.lock().await.runtime.set_memory_limit(limit);
229 }
230 }
231
232 pub async fn set_max_stack_size(&self, limit: usize) {
236 unsafe {
237 self.inner.lock().await.runtime.set_max_stack_size(limit);
238 }
239 }
240
241 pub async fn set_gc_threshold(&self, threshold: usize) {
243 unsafe {
244 self.inner.lock().await.runtime.set_gc_threshold(threshold);
245 }
246 }
247
248 pub async fn run_gc(&self) {
255 unsafe {
256 let mut lock = self.inner.lock().await;
257 lock.drop_pending();
258 lock.runtime.run_gc();
259 }
260 }
261
262 pub async fn memory_usage(&self) -> MemoryUsage {
264 unsafe { self.inner.lock().await.runtime.memory_usage() }
265 }
266
267 #[inline]
271 pub async fn is_job_pending(&self) -> bool {
272 let lock = self.inner.lock().await;
273
274 lock.runtime.is_job_pending() || !lock.runtime.get_opaque().spawner_is_empty()
275 }
276
277 #[inline]
281 pub async fn execute_pending_job(&self) -> StdResult<bool, AsyncJobException> {
282 let mut lock = self.inner.lock().await;
283 lock.runtime.update_stack_top();
284 lock.drop_pending();
285
286 let f = ManualPoll::new(|cx| {
287 let job_res = lock.runtime.execute_pending_job().map_err(|e| {
288 let ptr = NonNull::new(e)
289 .expect("executing pending job returned a null context on error");
290 AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
291 })?;
292
293 if job_res {
294 return Poll::Ready(Ok(true));
295 }
296
297 match lock.runtime.get_opaque().poll(cx) {
298 SchedularPoll::ShouldYield => Poll::Pending,
299 SchedularPoll::Empty => Poll::Ready(Ok(false)),
300 SchedularPoll::Pending => Poll::Ready(Ok(false)),
301 SchedularPoll::PendingProgress => Poll::Ready(Ok(true)),
302 }
303 });
304
305 #[cfg(feature = "parallel")]
306 let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
307
308 f.await
309 }
310
311 #[inline]
313 pub async fn idle(&self) {
314 let mut lock = self.inner.lock().await;
315 lock.runtime.update_stack_top();
316 lock.drop_pending();
317
318 let f = ManualPoll::new(|cx| {
319 loop {
320 let pending = lock.runtime.execute_pending_job().map_err(|e| {
321 let ptr = NonNull::new(e)
322 .expect("executing pending job returned a null context on error");
323 AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
324 });
325 match pending {
326 Err(e) => {
327 let ctx = unsafe { Ctx::from_ptr(e.0 .0.ctx().as_ptr()) };
329 let err = ctx.catch();
330 if let Some(_x) = err.clone().into_object().and_then(Exception::from_object)
331 {
332 #[cfg(feature = "std")]
334 println!("error executing job: {}", _x);
335 } else {
336 #[cfg(feature = "std")]
337 println!("error executing job: {:?}", err);
338 }
339 }
340 Ok(true) => continue,
341 Ok(false) => {}
342 }
343
344 match lock.runtime.get_opaque().poll(cx) {
345 SchedularPoll::ShouldYield => return Poll::Pending,
346 SchedularPoll::Empty => return Poll::Ready(()),
347 SchedularPoll::Pending => return Poll::Pending,
348 SchedularPoll::PendingProgress => {}
349 }
350 }
351 });
352
353 #[cfg(feature = "parallel")]
354 let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
355
356 f.await
357 }
358
359 pub fn drive(&self) -> DriveFuture {
363 DriveFuture::new(self.weak())
364 }
365}
366
367#[cfg(test)]
368macro_rules! async_test_case {
369 ($name:ident => ($rt:ident,$ctx:ident) { $($t:tt)* }) => {
370 #[test]
371 fn $name() {
372 #[cfg(feature = "parallel")]
373 let mut new_thread = tokio::runtime::Builder::new_multi_thread();
374
375 #[cfg(not(feature = "parallel"))]
376 let mut new_thread = tokio::runtime::Builder::new_current_thread();
377
378 let rt = new_thread
379 .enable_all()
380 .build()
381 .unwrap();
382
383 #[cfg(feature = "parallel")]
384 {
385 rt.block_on(async {
386 let $rt = crate::AsyncRuntime::new().unwrap();
387 let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
388
389 $($t)*
390
391 })
392 }
393 #[cfg(not(feature = "parallel"))]
394 {
395 let set = tokio::task::LocalSet::new();
396 set.block_on(&rt, async {
397 let $rt = crate::AsyncRuntime::new().unwrap();
398 let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
399
400 $($t)*
401 })
402 }
403 }
404 };
405}
406
407#[cfg(test)]
408mod test {
409 use std::time::Duration;
410
411 use crate::*;
412
413 use self::context::EvalOptions;
414
415 async_test_case!(basic => (_rt,ctx){
416 async_with!(&ctx => |ctx|{
417 let res: i32 = ctx.eval("1 + 1").unwrap();
418 assert_eq!(res,2i32);
419 }).await;
420 });
421
422 async_test_case!(sleep_closure => (_rt,ctx){
423
424 let mut a = 1;
425 let a_ref = &mut a;
426
427
428 async_with!(&ctx => |ctx|{
429 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
430 ctx.globals().set("foo","bar").unwrap();
431 *a_ref += 1;
432 }).await;
433 assert_eq!(a,2);
434 });
435
436 async_test_case!(drive => (rt,ctx){
437 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
438
439 #[cfg(feature = "parallel")]
440 tokio::spawn(rt.drive());
441 #[cfg(not(feature = "parallel"))]
442 tokio::task::spawn_local(rt.drive());
443
444 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
446
447 let number = Arc::new(AtomicUsize::new(0));
448 let number_clone = number.clone();
449
450 async_with!(&ctx => |ctx|{
451 ctx.spawn(async move {
452 tokio::task::yield_now().await;
453 number_clone.store(1,Ordering::SeqCst);
454 });
455 }).await;
456 assert_eq!(number.load(Ordering::SeqCst),0);
457 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
459 assert_eq!(number.load(Ordering::SeqCst),1);
460
461 });
462
463 async_test_case!(no_drive => (rt,ctx){
464 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
465
466 let number = Arc::new(AtomicUsize::new(0));
467 let number_clone = number.clone();
468
469 async_with!(&ctx => |ctx|{
470 ctx.spawn(async move {
471 tokio::task::yield_now().await;
472 number_clone.store(1,Ordering::SeqCst);
473 });
474 }).await;
475 assert_eq!(number.load(Ordering::SeqCst),0);
476 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
477 assert_eq!(number.load(Ordering::SeqCst),0);
478
479 });
480
481 async_test_case!(idle => (rt,ctx){
482 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
483
484 let number = Arc::new(AtomicUsize::new(0));
485 let number_clone = number.clone();
486
487 async_with!(&ctx => |ctx|{
488 ctx.spawn(async move {
489 tokio::task::yield_now().await;
490 number_clone.store(1,Ordering::SeqCst);
491 });
492 }).await;
493 assert_eq!(number.load(Ordering::SeqCst),0);
494 rt.idle().await;
495 assert_eq!(number.load(Ordering::SeqCst),1);
496
497 });
498
499 async_test_case!(recursive_spawn => (rt,ctx){
500 use tokio::sync::oneshot;
501
502 async_with!(&ctx => |ctx|{
503 let ctx_clone = ctx.clone();
504 let (tx,rx) = oneshot::channel::<()>();
505 let (tx2,rx2) = oneshot::channel::<()>();
506 ctx.spawn(async move {
507 tokio::task::yield_now().await;
508
509 let ctx = ctx_clone.clone();
510
511 ctx_clone.spawn(async move {
512 tokio::task::yield_now().await;
513 ctx.spawn(async move {
514 tokio::task::yield_now().await;
515 tx2.send(()).unwrap();
516 tokio::task::yield_now().await;
517 });
518 tokio::task::yield_now().await;
519 tx.send(()).unwrap();
520 });
521
522 for _ in 0..32{
525 ctx_clone.spawn(async move {})
526 }
527
528 });
529 tokio::time::timeout(Duration::from_millis(500), rx).await.unwrap().unwrap();
530 tokio::time::timeout(Duration::from_millis(500), rx2).await.unwrap().unwrap();
531 }).await;
532
533 });
534
535 async_test_case!(recursive_spawn_from_script => (rt,ctx) {
536 use std::sync::atomic::{Ordering, AtomicUsize};
537 use crate::prelude::Func;
538
539 static COUNT: AtomicUsize = AtomicUsize::new(0);
540 static SCRIPT: &str = r#"
541
542 async function main() {
543
544 setTimeout(() => {
545 inc_count()
546 setTimeout(async () => {
547 inc_count()
548 }, 100);
549 }, 100);
550 }
551
552 main().catch(print);
553
554
555 "#;
556
557 fn inc_count(){
558 COUNT.fetch_add(1,Ordering::Relaxed);
559 }
560
561 fn set_timeout_spawn<'js>(ctx: Ctx<'js>, callback: Function<'js>, millis: usize) -> Result<()> {
562 ctx.spawn(async move {
563 tokio::time::sleep(Duration::from_millis(millis as u64)).await;
564 callback.call::<_, ()>(()).unwrap();
565 });
566
567 Ok(())
568 }
569
570
571 async_with!(ctx => |ctx|{
572
573 let res: Result<Promise> = (|| {
574 let globals = ctx.globals();
575
576 globals.set("inc_count", Func::from(inc_count))?;
577
578 globals.set("setTimeout", Func::from(set_timeout_spawn))?;
579 let options = EvalOptions{
580 promise: true,
581 strict: false,
582 ..EvalOptions::default()
583 };
584
585 ctx.eval_with_options(SCRIPT, options)?
586 })();
587
588 match res.catch(&ctx){
589 Ok(promise) => {
590 if let Err(err) = promise.into_future::<Value>().await.catch(&ctx){
591 eprintln!("{}", err)
592 }
593 },
594 Err(err) => {
595 eprintln!("{}", err)
596 },
597 };
598
599 })
600 .await;
601
602 rt.idle().await;
603
604 assert_eq!(COUNT.load(Ordering::Relaxed),2);
605 });
606
607 #[cfg(feature = "parallel")]
608 fn assert_is_send<T: Send>(t: T) -> T {
609 t
610 }
611
612 #[cfg(feature = "parallel")]
613 fn assert_is_sync<T: Send>(t: T) -> T {
614 t
615 }
616
617 #[cfg(feature = "parallel")]
618 #[tokio::test]
619 async fn ensure_types_are_send_sync() {
620 let rt = AsyncRuntime::new().unwrap();
621
622 std::mem::drop(assert_is_sync(rt.idle()));
623 std::mem::drop(assert_is_sync(rt.execute_pending_job()));
624 std::mem::drop(assert_is_sync(rt.drive()));
625
626 std::mem::drop(assert_is_send(rt.idle()));
627 std::mem::drop(assert_is_send(rt.execute_pending_job()));
628 std::mem::drop(assert_is_send(rt.drive()));
629 }
630}