Skip to main content

rquickjs_core/runtime/
async.rs

1use alloc::{
2    ffi::CString,
3    sync::{Arc, Weak},
4    vec::Vec,
5};
6use core::{ptr::NonNull, result::Result as StdResult, task::Poll};
7#[cfg(feature = "std")]
8use std::println;
9
10#[cfg(feature = "parallel")]
11use std::sync::mpsc::{self, Receiver, Sender};
12
13use async_lock::Mutex;
14
15use super::{
16    opaque::Opaque, raw::RawRuntime, schedular::SchedularPoll, spawner::DriveFuture,
17    InterruptHandler, MemoryUsage, PromiseHook, RejectionTracker,
18};
19use crate::allocator::Allocator;
20#[cfg(feature = "loader")]
21use crate::loader::{Loader, Resolver};
22#[cfg(feature = "parallel")]
23use crate::util::{AssertSendFuture, AssertSyncFuture};
24use crate::{
25    context::AsyncContext, qjs, result::AsyncJobException, util::ManualPoll, Ctx, Exception, Result,
26};
27
28#[derive(Debug)]
29pub(crate) struct InnerRuntime {
30    pub runtime: RawRuntime,
31    #[cfg(feature = "parallel")]
32    pub drop_recv: Receiver<NonNull<qjs::JSContext>>,
33}
34
35impl InnerRuntime {
36    pub fn drop_pending(&self) {
37        #[cfg(feature = "parallel")]
38        while let Ok(x) = self.drop_recv.try_recv() {
39            unsafe { qjs::JS_FreeContext(x.as_ptr()) }
40        }
41    }
42}
43
44impl Drop for InnerRuntime {
45    fn drop(&mut self) {
46        self.drop_pending();
47    }
48}
49
50#[cfg(feature = "parallel")]
51unsafe impl Send for InnerRuntime {}
52
53/// A weak handle to the async runtime.
54///
55/// Holding onto this struct does not prevent the runtime from being dropped.
56#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
57#[derive(Clone)]
58pub struct AsyncWeakRuntime {
59    inner: Weak<Mutex<InnerRuntime>>,
60    #[cfg(feature = "parallel")]
61    drop_send: Sender<NonNull<qjs::JSContext>>,
62}
63
64impl AsyncWeakRuntime {
65    pub fn try_ref(&self) -> Option<AsyncRuntime> {
66        self.inner.upgrade().map(|inner| AsyncRuntime {
67            inner,
68            #[cfg(feature = "parallel")]
69            drop_send: self.drop_send.clone(),
70        })
71    }
72}
73
74/// Asynchronous QuickJS runtime, entry point of the library.
75#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
76#[derive(Clone)]
77pub struct AsyncRuntime {
78    // use Arc instead of Ref so we can use OwnedLock
79    pub(crate) inner: Arc<Mutex<InnerRuntime>>,
80    #[cfg(feature = "parallel")]
81    pub(crate) drop_send: Sender<NonNull<qjs::JSContext>>,
82}
83
84// Since all functions which use runtime are behind a mutex
85// sending the runtime to other threads should be fine.
86#[cfg(feature = "parallel")]
87unsafe impl Send for AsyncRuntime {}
88#[cfg(feature = "parallel")]
89unsafe impl Send for AsyncWeakRuntime {}
90
91// Since a global lock needs to be locked for safe use
92// using runtime in a sync way should be safe as
93// simultaneous accesses is synchronized behind a lock.
94#[cfg(feature = "parallel")]
95unsafe impl Sync for AsyncRuntime {}
96#[cfg(feature = "parallel")]
97unsafe impl Sync for AsyncWeakRuntime {}
98
99impl AsyncRuntime {
100    /// Create a new runtime.
101    ///
102    /// Will generally only fail if not enough memory was available.
103    ///
104    /// # Features
105    /// *If the `"rust-alloc"` feature is enabled the Rust's global allocator will be used in favor of libc's one.*
106    // Annoying false positive clippy lint
107    #[allow(clippy::arc_with_non_send_sync)]
108    pub fn new() -> Result<Self> {
109        let opaque = Opaque::with_spawner();
110        let runtime = unsafe { RawRuntime::new(opaque) }?;
111
112        #[cfg(feature = "parallel")]
113        let (drop_send, drop_recv) = mpsc::channel();
114
115        Ok(Self {
116            inner: Arc::new(Mutex::new(InnerRuntime {
117                runtime,
118                #[cfg(feature = "parallel")]
119                drop_recv,
120            })),
121            #[cfg(feature = "parallel")]
122            drop_send,
123        })
124    }
125
126    /// Create a new runtime using specified allocator
127    ///
128    /// Will generally only fail if not enough memory was available.
129    // Annoying false positive clippy lint
130    #[allow(clippy::arc_with_non_send_sync)]
131    pub fn new_with_alloc<A>(allocator: A) -> Result<Self>
132    where
133        A: Allocator + 'static,
134    {
135        let opaque = Opaque::with_spawner();
136        let runtime = unsafe { RawRuntime::new_with_allocator(opaque, allocator) }?;
137
138        #[cfg(feature = "parallel")]
139        let (drop_send, drop_recv) = mpsc::channel();
140
141        Ok(Self {
142            inner: Arc::new(Mutex::new(InnerRuntime {
143                runtime,
144                #[cfg(feature = "parallel")]
145                drop_recv,
146            })),
147            #[cfg(feature = "parallel")]
148            drop_send,
149        })
150    }
151
152    /// Get weak ref to runtime
153    pub fn weak(&self) -> AsyncWeakRuntime {
154        AsyncWeakRuntime {
155            inner: Arc::downgrade(&self.inner),
156            #[cfg(feature = "parallel")]
157            drop_send: self.drop_send.clone(),
158        }
159    }
160
161    /// Set a closure which is called when a Promise is rejected.
162    #[inline]
163    pub async fn set_host_promise_rejection_tracker(&self, tracker: Option<RejectionTracker>) {
164        unsafe {
165            self.inner
166                .lock()
167                .await
168                .runtime
169                .set_host_promise_rejection_tracker(tracker);
170        }
171    }
172
173    /// Set a closure which is called when a promise is created, resolved, or chained.
174    #[inline]
175    pub async fn set_promise_hook(&self, tracker: Option<PromiseHook>) {
176        unsafe {
177            self.inner.lock().await.runtime.set_promise_hook(tracker);
178        }
179    }
180
181    /// Set a closure which is regularly called by the engine when it is executing code.
182    /// If the provided closure returns `true` the interpreter will raise and uncatchable
183    /// exception and return control flow to the caller.
184    #[inline]
185    pub async fn set_interrupt_handler(&self, handler: Option<InterruptHandler>) {
186        unsafe {
187            self.inner
188                .lock()
189                .await
190                .runtime
191                .set_interrupt_handler(handler);
192        }
193    }
194
195    /// Set the module loader
196    #[cfg(feature = "loader")]
197    #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "loader")))]
198    pub async fn set_loader<R, L>(&self, resolver: R, loader: L)
199    where
200        R: Resolver + 'static,
201        L: Loader + 'static,
202    {
203        unsafe {
204            self.inner.lock().await.runtime.set_loader(resolver, loader);
205        }
206    }
207
208    /// Set the info of the runtime
209    pub async fn set_info<S: Into<Vec<u8>>>(&self, info: S) -> Result<()> {
210        let string = CString::new(info)?;
211        unsafe {
212            self.inner.lock().await.runtime.set_info(string);
213        }
214        Ok(())
215    }
216
217    /// Set a limit on the max amount of memory the runtime will use.
218    ///
219    /// Setting the limit to 0 is equivalent to unlimited memory.
220    ///
221    /// Note that is a Noop when a custom allocator is being used,
222    /// as is the case for the "rust-alloc" or "allocator" features.
223    pub async fn set_memory_limit(&self, limit: usize) {
224        unsafe {
225            self.inner.lock().await.runtime.set_memory_limit(limit);
226        }
227    }
228
229    /// Set a limit on the max size of stack the runtime will use.
230    ///
231    /// The default values is 256x1024 bytes.
232    pub async fn set_max_stack_size(&self, limit: usize) {
233        unsafe {
234            self.inner.lock().await.runtime.set_max_stack_size(limit);
235        }
236    }
237
238    /// Set a memory threshold for garbage collection.
239    pub async fn set_gc_threshold(&self, threshold: usize) {
240        unsafe {
241            self.inner.lock().await.runtime.set_gc_threshold(threshold);
242        }
243    }
244
245    /// Manually run the garbage collection.
246    ///
247    /// Most QuickJS values are reference counted and
248    /// will automatically free themselves when they have no more
249    /// references. The garbage collector is only for collecting
250    /// cyclic references.
251    pub async fn run_gc(&self) {
252        unsafe {
253            let mut lock = self.inner.lock().await;
254            lock.drop_pending();
255            lock.runtime.run_gc();
256        }
257    }
258
259    /// Get memory usage stats
260    pub async fn memory_usage(&self) -> MemoryUsage {
261        unsafe { self.inner.lock().await.runtime.memory_usage() }
262    }
263
264    /// Test for pending jobs
265    ///
266    /// Returns true when at least one job is pending.
267    #[inline]
268    pub async fn is_job_pending(&self) -> bool {
269        let lock = self.inner.lock().await;
270
271        lock.runtime.is_job_pending() || !lock.runtime.get_opaque().spawner_is_empty()
272    }
273
274    /// Execute first pending job
275    ///
276    /// Returns true when job was executed or false when queue is empty or error when exception thrown under execution.
277    #[inline]
278    pub async fn execute_pending_job(&self) -> StdResult<bool, AsyncJobException> {
279        let mut lock = self.inner.lock().await;
280        lock.runtime.update_stack_top();
281        lock.drop_pending();
282
283        let f = ManualPoll::new(|cx| {
284            let job_res = lock.runtime.execute_pending_job().map_err(|e| {
285                let ptr = NonNull::new(e)
286                    .expect("executing pending job returned a null context on error");
287                // JS_ExecutePendingJob returns a borrowed context pointer;
288                // dup it so AsyncContext can own a reference.
289                unsafe { qjs::JS_DupContext(ptr.as_ptr()) };
290                AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
291            })?;
292
293            if job_res {
294                return Poll::Ready(Ok(true));
295            }
296
297            match lock.runtime.get_opaque().poll(cx) {
298                SchedularPoll::ShouldYield => Poll::Pending,
299                SchedularPoll::Empty => Poll::Ready(Ok(false)),
300                SchedularPoll::Pending => Poll::Ready(Ok(false)),
301                SchedularPoll::PendingProgress => Poll::Ready(Ok(true)),
302            }
303        });
304
305        #[cfg(feature = "parallel")]
306        let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
307
308        f.await
309    }
310
311    /// Run all futures and jobs in the runtime until all are finished.
312    #[inline]
313    pub async fn idle(&self) {
314        let mut lock = self.inner.lock().await;
315        lock.runtime.update_stack_top();
316        lock.drop_pending();
317
318        let f = ManualPoll::new(|cx| {
319            loop {
320                let pending = lock.runtime.execute_pending_job().map_err(|e| {
321                    let ptr = NonNull::new(e)
322                        .expect("executing pending job returned a null context on error");
323                    // JS_ExecutePendingJob returns a borrowed context pointer;
324                    // dup it so AsyncContext can own a reference.
325                    unsafe { qjs::JS_DupContext(ptr.as_ptr()) };
326                    AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
327                });
328                match pending {
329                    Err(e) => {
330                        // SAFETY: Runtime is already locked so creating a context is safe.
331                        let ctx = unsafe { Ctx::from_ptr(e.0 .0.ctx().as_ptr()) };
332                        let err = ctx.catch();
333                        if let Some(_x) = err.clone().into_object().and_then(Exception::from_object)
334                        {
335                            // TODO do something better with errors.
336                            #[cfg(feature = "std")]
337                            println!("error executing job: {}", _x);
338                        } else {
339                            #[cfg(feature = "std")]
340                            println!("error executing job: {:?}", err);
341                        }
342                    }
343                    Ok(true) => continue,
344                    Ok(false) => {}
345                }
346
347                match lock.runtime.get_opaque().poll(cx) {
348                    SchedularPoll::ShouldYield => return Poll::Pending,
349                    SchedularPoll::Empty => return Poll::Ready(()),
350                    SchedularPoll::Pending => return Poll::Pending,
351                    SchedularPoll::PendingProgress => {}
352                }
353            }
354        });
355
356        #[cfg(feature = "parallel")]
357        let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
358
359        f.await
360    }
361
362    /// Returns a future that completes when the runtime is dropped.
363    /// If the future is polled it will drive futures spawned inside the runtime completing them
364    /// even if runtime is currently not in use.
365    pub fn drive(&self) -> DriveFuture {
366        DriveFuture::new(self.weak())
367    }
368}
369
370#[cfg(test)]
371macro_rules! async_test_case {
372    ($name:ident => ($rt:ident,$ctx:ident) { $($t:tt)* }) => {
373    #[test]
374    fn $name() {
375        #[cfg(feature = "parallel")]
376        let mut new_thread = tokio::runtime::Builder::new_multi_thread();
377
378        #[cfg(not(feature = "parallel"))]
379        let mut new_thread = tokio::runtime::Builder::new_current_thread();
380
381        let rt = new_thread
382            .enable_all()
383            .build()
384            .unwrap();
385
386        #[cfg(feature = "parallel")]
387        {
388            rt.block_on(async {
389                let $rt = crate::AsyncRuntime::new().unwrap();
390                let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
391
392                $($t)*
393
394            })
395        }
396        #[cfg(not(feature = "parallel"))]
397        {
398            let set = tokio::task::LocalSet::new();
399            set.block_on(&rt, async {
400                let $rt = crate::AsyncRuntime::new().unwrap();
401                let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
402
403                $($t)*
404            })
405        }
406    }
407    };
408}
409
410#[cfg(test)]
411mod test {
412    use std::time::Duration;
413
414    use crate::*;
415
416    use self::context::EvalOptions;
417
418    async_test_case!(basic => (_rt,ctx){
419        ctx.async_with(async |ctx|{
420            let res: i32 = ctx.eval("1 + 1").unwrap();
421            assert_eq!(res,2i32);
422        }).await;
423    });
424
425    async_test_case!(sleep_closure => (_rt,ctx){
426
427        let mut a = 1;
428        let a_ref = &mut a;
429
430
431        ctx.async_with(async |ctx|{
432            tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
433            ctx.globals().set("foo","bar").unwrap();
434            *a_ref += 1;
435        }).await;
436        assert_eq!(a,2);
437    });
438
439    async_test_case!(drive => (rt,ctx){
440        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
441
442        #[cfg(feature = "parallel")]
443        tokio::spawn(rt.drive());
444        #[cfg(not(feature = "parallel"))]
445        tokio::task::spawn_local(rt.drive());
446
447        // Give drive time to start.
448        tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
449
450        let number = Arc::new(AtomicUsize::new(0));
451        let number_clone = number.clone();
452        let gate = Arc::new(tokio::sync::Notify::new());
453        let gate_clone = gate.clone();
454        let done = Arc::new(tokio::sync::Notify::new());
455        let done_clone = done.clone();
456
457        ctx.async_with(async |ctx|{
458            ctx.spawn(async move {
459                gate_clone.notified().await;
460                number_clone.store(1,Ordering::SeqCst);
461                done_clone.notify_one();
462            });
463        }).await;
464        // Task is blocked on gate, so value is definitely still 0.
465        assert_eq!(number.load(Ordering::SeqCst),0);
466        // Unblock the task and wait for it to complete.
467        gate.notify_one();
468        done.notified().await;
469        assert_eq!(number.load(Ordering::SeqCst),1);
470
471    });
472
473    async_test_case!(no_drive => (rt,ctx){
474        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
475
476        let number = Arc::new(AtomicUsize::new(0));
477        let number_clone = number.clone();
478
479        ctx.async_with(async |ctx|{
480            ctx.spawn(async move {
481                tokio::task::yield_now().await;
482                number_clone.store(1,Ordering::SeqCst);
483            });
484        }).await;
485        assert_eq!(number.load(Ordering::SeqCst),0);
486        tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
487        assert_eq!(number.load(Ordering::SeqCst),0);
488
489    });
490
491    async_test_case!(idle => (rt,ctx){
492        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
493
494        let number = Arc::new(AtomicUsize::new(0));
495        let number_clone = number.clone();
496
497        ctx.async_with(async |ctx|{
498            ctx.spawn(async move {
499                tokio::task::yield_now().await;
500                number_clone.store(1,Ordering::SeqCst);
501            });
502        }).await;
503        assert_eq!(number.load(Ordering::SeqCst),0);
504        rt.idle().await;
505        assert_eq!(number.load(Ordering::SeqCst),1);
506
507    });
508
509    async_test_case!(recursive_spawn => (rt,ctx){
510        use tokio::sync::oneshot;
511
512        ctx.async_with(async |ctx|{
513            let ctx_clone = ctx.clone();
514            let (tx,rx) = oneshot::channel::<()>();
515            let (tx2,rx2) = oneshot::channel::<()>();
516            ctx.spawn(async move {
517                tokio::task::yield_now().await;
518
519                let ctx = ctx_clone.clone();
520
521                ctx_clone.spawn(async move {
522                    tokio::task::yield_now().await;
523                    ctx.spawn(async move {
524                        tokio::task::yield_now().await;
525                        tx2.send(()).unwrap();
526                        tokio::task::yield_now().await;
527                    });
528                    tokio::task::yield_now().await;
529                    tx.send(()).unwrap();
530                });
531
532                // Add a bunch of futures just to make sure possible segfaults are more likely to
533                // happen
534                for _ in 0..32{
535                    ctx_clone.spawn(async move {})
536                }
537
538            });
539            tokio::time::timeout(Duration::from_millis(500), rx).await.unwrap().unwrap();
540            tokio::time::timeout(Duration::from_millis(500), rx2).await.unwrap().unwrap();
541        }).await;
542
543    });
544
545    async_test_case!(recursive_spawn_from_script => (rt,ctx) {
546        use std::sync::atomic::{Ordering, AtomicUsize};
547        use crate::prelude::Func;
548
549        static COUNT: AtomicUsize = AtomicUsize::new(0);
550        static SCRIPT: &str = r#"
551
552        async function main() {
553
554          setTimeout(() => {
555            inc_count()
556            setTimeout(async () => {
557                inc_count()
558            }, 100);
559          }, 100);
560        }
561
562        main().catch(print);
563
564
565        "#;
566
567        fn inc_count(){
568            COUNT.fetch_add(1,Ordering::Relaxed);
569        }
570
571        fn set_timeout_spawn<'js>(ctx: Ctx<'js>, callback: Function<'js>, millis: usize) -> Result<()> {
572            ctx.spawn(async move {
573                tokio::time::sleep(Duration::from_millis(millis as u64)).await;
574                callback.call::<_, ()>(()).unwrap();
575            });
576
577            Ok(())
578        }
579
580
581        ctx.async_with(async |ctx|{
582
583            let res: Result<Promise> = (|| {
584                let globals = ctx.globals();
585
586                globals.set("inc_count", Func::from(inc_count))?;
587
588                globals.set("setTimeout", Func::from(set_timeout_spawn))?;
589                let options = EvalOptions{
590                    promise: true,
591                    strict: false,
592                    ..EvalOptions::default()
593                };
594
595                ctx.eval_with_options(SCRIPT, options)?
596            })();
597
598            match res.catch(&ctx){
599                Ok(promise) => {
600                    if let Err(err) = promise.into_future::<Value>().await.catch(&ctx){
601                        eprintln!("{}", err)
602                    }
603                },
604                Err(err) => {
605                    eprintln!("{}", err)
606                },
607            };
608
609        })
610        .await;
611
612        rt.idle().await;
613
614        assert_eq!(COUNT.load(Ordering::Relaxed),2);
615    });
616
617    async_test_case!(interrupt_handler_idle => (rt, ctx) {
618        use std::time::Instant;
619
620        let timeout = Duration::from_millis(100);
621        let start_time = Instant::now();
622
623        rt.set_interrupt_handler(Some(Box::new(move || start_time.elapsed() >= timeout)))
624            .await;
625
626        let _ = ctx.async_with(async |ctx| {
627            ctx.eval::<(), _>(r#"
628                async function example() {
629                    while (true) {
630                        await Promise.resolve();
631                    }
632                }
633                example();
634            "#)
635        }).await;
636
637        // This previously caused an assertion failure in gc_decref_child
638        // due to the interrupt handler corrupting reference counts during
639        // pending job execution.
640        rt.idle().await;
641    });
642
643    #[cfg(feature = "parallel")]
644    fn assert_is_send<T: Send>(t: T) -> T {
645        t
646    }
647
648    #[cfg(feature = "parallel")]
649    fn assert_is_sync<T: Send>(t: T) -> T {
650        t
651    }
652
653    #[cfg(feature = "parallel")]
654    #[tokio::test]
655    async fn ensure_types_are_send_sync() {
656        let rt = AsyncRuntime::new().unwrap();
657
658        std::mem::drop(assert_is_sync(rt.idle()));
659        std::mem::drop(assert_is_sync(rt.execute_pending_job()));
660        std::mem::drop(assert_is_sync(rt.drive()));
661
662        std::mem::drop(assert_is_send(rt.idle()));
663        std::mem::drop(assert_is_send(rt.execute_pending_job()));
664        std::mem::drop(assert_is_send(rt.drive()));
665    }
666}