rquickjs_core/runtime/
async.rs

1use alloc::{
2    ffi::CString,
3    sync::{Arc, Weak},
4    vec::Vec,
5};
6use core::{ptr::NonNull, result::Result as StdResult, task::Poll};
7#[cfg(feature = "std")]
8use std::println;
9
10#[cfg(feature = "parallel")]
11use std::sync::mpsc::{self, Receiver, Sender};
12
13use async_lock::Mutex;
14
15use super::{
16    opaque::Opaque, raw::RawRuntime, schedular::SchedularPoll, spawner::DriveFuture,
17    InterruptHandler, MemoryUsage, PromiseHook, RejectionTracker,
18};
19use crate::allocator::Allocator;
20#[cfg(feature = "loader")]
21use crate::loader::{Loader, Resolver};
22use crate::{
23    context::AsyncContext, result::AsyncJobException, util::ManualPoll, Ctx, Exception, Result,
24};
25#[cfg(feature = "parallel")]
26use crate::{
27    qjs,
28    util::{AssertSendFuture, AssertSyncFuture},
29};
30
31#[derive(Debug)]
32pub(crate) struct InnerRuntime {
33    pub runtime: RawRuntime,
34    #[cfg(feature = "parallel")]
35    pub drop_recv: Receiver<NonNull<qjs::JSContext>>,
36}
37
38impl InnerRuntime {
39    pub fn drop_pending(&self) {
40        #[cfg(feature = "parallel")]
41        while let Ok(x) = self.drop_recv.try_recv() {
42            unsafe { qjs::JS_FreeContext(x.as_ptr()) }
43        }
44    }
45}
46
47impl Drop for InnerRuntime {
48    fn drop(&mut self) {
49        self.drop_pending();
50    }
51}
52
53#[cfg(feature = "parallel")]
54unsafe impl Send for InnerRuntime {}
55
56/// A weak handle to the async runtime.
57///
58/// Holding onto this struct does not prevent the runtime from being dropped.
59#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
60#[derive(Clone)]
61pub struct AsyncWeakRuntime {
62    inner: Weak<Mutex<InnerRuntime>>,
63    #[cfg(feature = "parallel")]
64    drop_send: Sender<NonNull<qjs::JSContext>>,
65}
66
67impl AsyncWeakRuntime {
68    pub fn try_ref(&self) -> Option<AsyncRuntime> {
69        self.inner.upgrade().map(|inner| AsyncRuntime {
70            inner,
71            #[cfg(feature = "parallel")]
72            drop_send: self.drop_send.clone(),
73        })
74    }
75}
76
77/// Asynchronous QuickJS runtime, entry point of the library.
78#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
79#[derive(Clone)]
80pub struct AsyncRuntime {
81    // use Arc instead of Ref so we can use OwnedLock
82    pub(crate) inner: Arc<Mutex<InnerRuntime>>,
83    #[cfg(feature = "parallel")]
84    pub(crate) drop_send: Sender<NonNull<qjs::JSContext>>,
85}
86
87// Since all functions which use runtime are behind a mutex
88// sending the runtime to other threads should be fine.
89#[cfg(feature = "parallel")]
90unsafe impl Send for AsyncRuntime {}
91#[cfg(feature = "parallel")]
92unsafe impl Send for AsyncWeakRuntime {}
93
94// Since a global lock needs to be locked for safe use
95// using runtime in a sync way should be safe as
96// simultaneous accesses is synchronized behind a lock.
97#[cfg(feature = "parallel")]
98unsafe impl Sync for AsyncRuntime {}
99#[cfg(feature = "parallel")]
100unsafe impl Sync for AsyncWeakRuntime {}
101
102impl AsyncRuntime {
103    /// Create a new runtime.
104    ///
105    /// Will generally only fail if not enough memory was available.
106    ///
107    /// # Features
108    /// *If the `"rust-alloc"` feature is enabled the Rust's global allocator will be used in favor of libc's one.*
109    // Annoying false positive clippy lint
110    #[allow(clippy::arc_with_non_send_sync)]
111    pub fn new() -> Result<Self> {
112        let opaque = Opaque::with_spawner();
113        let runtime = unsafe { RawRuntime::new(opaque) }?;
114
115        #[cfg(feature = "parallel")]
116        let (drop_send, drop_recv) = mpsc::channel();
117
118        Ok(Self {
119            inner: Arc::new(Mutex::new(InnerRuntime {
120                runtime,
121                #[cfg(feature = "parallel")]
122                drop_recv,
123            })),
124            #[cfg(feature = "parallel")]
125            drop_send,
126        })
127    }
128
129    /// Create a new runtime using specified allocator
130    ///
131    /// Will generally only fail if not enough memory was available.
132    // Annoying false positive clippy lint
133    #[allow(clippy::arc_with_non_send_sync)]
134    pub fn new_with_alloc<A>(allocator: A) -> Result<Self>
135    where
136        A: Allocator + 'static,
137    {
138        let opaque = Opaque::with_spawner();
139        let runtime = unsafe { RawRuntime::new_with_allocator(opaque, allocator) }?;
140
141        #[cfg(feature = "parallel")]
142        let (drop_send, drop_recv) = mpsc::channel();
143
144        Ok(Self {
145            inner: Arc::new(Mutex::new(InnerRuntime {
146                runtime,
147                #[cfg(feature = "parallel")]
148                drop_recv,
149            })),
150            #[cfg(feature = "parallel")]
151            drop_send,
152        })
153    }
154
155    /// Get weak ref to runtime
156    pub fn weak(&self) -> AsyncWeakRuntime {
157        AsyncWeakRuntime {
158            inner: Arc::downgrade(&self.inner),
159            #[cfg(feature = "parallel")]
160            drop_send: self.drop_send.clone(),
161        }
162    }
163
164    /// Set a closure which is called when a Promise is rejected.
165    #[inline]
166    pub async fn set_host_promise_rejection_tracker(&self, tracker: Option<RejectionTracker>) {
167        unsafe {
168            self.inner
169                .lock()
170                .await
171                .runtime
172                .set_host_promise_rejection_tracker(tracker);
173        }
174    }
175
176    /// Set a closure which is called when a promise is created, resolved, or chained.
177    #[inline]
178    pub async fn set_promise_hook(&self, tracker: Option<PromiseHook>) {
179        unsafe {
180            self.inner.lock().await.runtime.set_promise_hook(tracker);
181        }
182    }
183
184    /// Set a closure which is regularly called by the engine when it is executing code.
185    /// If the provided closure returns `true` the interpreter will raise and uncatchable
186    /// exception and return control flow to the caller.
187    #[inline]
188    pub async fn set_interrupt_handler(&self, handler: Option<InterruptHandler>) {
189        unsafe {
190            self.inner
191                .lock()
192                .await
193                .runtime
194                .set_interrupt_handler(handler);
195        }
196    }
197
198    /// Set the module loader
199    #[cfg(feature = "loader")]
200    #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "loader")))]
201    pub async fn set_loader<R, L>(&self, resolver: R, loader: L)
202    where
203        R: Resolver + 'static,
204        L: Loader + 'static,
205    {
206        unsafe {
207            self.inner.lock().await.runtime.set_loader(resolver, loader);
208        }
209    }
210
211    /// Set the info of the runtime
212    pub async fn set_info<S: Into<Vec<u8>>>(&self, info: S) -> Result<()> {
213        let string = CString::new(info)?;
214        unsafe {
215            self.inner.lock().await.runtime.set_info(string);
216        }
217        Ok(())
218    }
219
220    /// Set a limit on the max amount of memory the runtime will use.
221    ///
222    /// Setting the limit to 0 is equivalent to unlimited memory.
223    ///
224    /// Note that is a Noop when a custom allocator is being used,
225    /// as is the case for the "rust-alloc" or "allocator" features.
226    pub async fn set_memory_limit(&self, limit: usize) {
227        unsafe {
228            self.inner.lock().await.runtime.set_memory_limit(limit);
229        }
230    }
231
232    /// Set a limit on the max size of stack the runtime will use.
233    ///
234    /// The default values is 256x1024 bytes.
235    pub async fn set_max_stack_size(&self, limit: usize) {
236        unsafe {
237            self.inner.lock().await.runtime.set_max_stack_size(limit);
238        }
239    }
240
241    /// Set a memory threshold for garbage collection.
242    pub async fn set_gc_threshold(&self, threshold: usize) {
243        unsafe {
244            self.inner.lock().await.runtime.set_gc_threshold(threshold);
245        }
246    }
247
248    /// Manually run the garbage collection.
249    ///
250    /// Most QuickJS values are reference counted and
251    /// will automatically free themselves when they have no more
252    /// references. The garbage collector is only for collecting
253    /// cyclic references.
254    pub async fn run_gc(&self) {
255        unsafe {
256            let mut lock = self.inner.lock().await;
257            lock.drop_pending();
258            lock.runtime.run_gc();
259        }
260    }
261
262    /// Get memory usage stats
263    pub async fn memory_usage(&self) -> MemoryUsage {
264        unsafe { self.inner.lock().await.runtime.memory_usage() }
265    }
266
267    /// Test for pending jobs
268    ///
269    /// Returns true when at least one job is pending.
270    #[inline]
271    pub async fn is_job_pending(&self) -> bool {
272        let lock = self.inner.lock().await;
273
274        lock.runtime.is_job_pending() || !lock.runtime.get_opaque().spawner_is_empty()
275    }
276
277    /// Execute first pending job
278    ///
279    /// Returns true when job was executed or false when queue is empty or error when exception thrown under execution.
280    #[inline]
281    pub async fn execute_pending_job(&self) -> StdResult<bool, AsyncJobException> {
282        let mut lock = self.inner.lock().await;
283        lock.runtime.update_stack_top();
284        lock.drop_pending();
285
286        let f = ManualPoll::new(|cx| {
287            let job_res = lock.runtime.execute_pending_job().map_err(|e| {
288                let ptr = NonNull::new(e)
289                    .expect("executing pending job returned a null context on error");
290                AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
291            })?;
292
293            if job_res {
294                return Poll::Ready(Ok(true));
295            }
296
297            match lock.runtime.get_opaque().poll(cx) {
298                SchedularPoll::ShouldYield => Poll::Pending,
299                SchedularPoll::Empty => Poll::Ready(Ok(false)),
300                SchedularPoll::Pending => Poll::Ready(Ok(false)),
301                SchedularPoll::PendingProgress => Poll::Ready(Ok(true)),
302            }
303        });
304
305        #[cfg(feature = "parallel")]
306        let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
307
308        f.await
309    }
310
311    /// Run all futures and jobs in the runtime until all are finished.
312    #[inline]
313    pub async fn idle(&self) {
314        let mut lock = self.inner.lock().await;
315        lock.runtime.update_stack_top();
316        lock.drop_pending();
317
318        let f = ManualPoll::new(|cx| {
319            loop {
320                let pending = lock.runtime.execute_pending_job().map_err(|e| {
321                    let ptr = NonNull::new(e)
322                        .expect("executing pending job returned a null context on error");
323                    AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
324                });
325                match pending {
326                    Err(e) => {
327                        // SAFETY: Runtime is already locked so creating a context is safe.
328                        let ctx = unsafe { Ctx::from_ptr(e.0 .0.ctx().as_ptr()) };
329                        let err = ctx.catch();
330                        if let Some(_x) = err.clone().into_object().and_then(Exception::from_object)
331                        {
332                            // TODO do something better with errors.
333                            #[cfg(feature = "std")]
334                            println!("error executing job: {}", _x);
335                        } else {
336                            #[cfg(feature = "std")]
337                            println!("error executing job: {:?}", err);
338                        }
339                    }
340                    Ok(true) => continue,
341                    Ok(false) => {}
342                }
343
344                match lock.runtime.get_opaque().poll(cx) {
345                    SchedularPoll::ShouldYield => return Poll::Pending,
346                    SchedularPoll::Empty => return Poll::Ready(()),
347                    SchedularPoll::Pending => return Poll::Pending,
348                    SchedularPoll::PendingProgress => {}
349                }
350            }
351        });
352
353        #[cfg(feature = "parallel")]
354        let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
355
356        f.await
357    }
358
359    /// Returns a future that completes when the runtime is dropped.
360    /// If the future is polled it will drive futures spawned inside the runtime completing them
361    /// even if runtime is currently not in use.
362    pub fn drive(&self) -> DriveFuture {
363        DriveFuture::new(self.weak())
364    }
365}
366
367#[cfg(test)]
368macro_rules! async_test_case {
369    ($name:ident => ($rt:ident,$ctx:ident) { $($t:tt)* }) => {
370    #[test]
371    fn $name() {
372        #[cfg(feature = "parallel")]
373        let mut new_thread = tokio::runtime::Builder::new_multi_thread();
374
375        #[cfg(not(feature = "parallel"))]
376        let mut new_thread = tokio::runtime::Builder::new_current_thread();
377
378        let rt = new_thread
379            .enable_all()
380            .build()
381            .unwrap();
382
383        #[cfg(feature = "parallel")]
384        {
385            rt.block_on(async {
386                let $rt = crate::AsyncRuntime::new().unwrap();
387                let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
388
389                $($t)*
390
391            })
392        }
393        #[cfg(not(feature = "parallel"))]
394        {
395            let set = tokio::task::LocalSet::new();
396            set.block_on(&rt, async {
397                let $rt = crate::AsyncRuntime::new().unwrap();
398                let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
399
400                $($t)*
401            })
402        }
403    }
404    };
405}
406
407#[cfg(test)]
408mod test {
409    use std::time::Duration;
410
411    use crate::*;
412
413    use self::context::EvalOptions;
414
415    async_test_case!(basic => (_rt,ctx){
416        async_with!(&ctx => |ctx|{
417            let res: i32 = ctx.eval("1 + 1").unwrap();
418            assert_eq!(res,2i32);
419        }).await;
420    });
421
422    async_test_case!(sleep_closure => (_rt,ctx){
423
424        let mut a = 1;
425        let a_ref = &mut a;
426
427
428        async_with!(&ctx => |ctx|{
429            tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
430            ctx.globals().set("foo","bar").unwrap();
431            *a_ref += 1;
432        }).await;
433        assert_eq!(a,2);
434    });
435
436    async_test_case!(drive => (rt,ctx){
437        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
438
439        #[cfg(feature = "parallel")]
440        tokio::spawn(rt.drive());
441        #[cfg(not(feature = "parallel"))]
442        tokio::task::spawn_local(rt.drive());
443
444        // Give drive time to start.
445        tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
446
447        let number = Arc::new(AtomicUsize::new(0));
448        let number_clone = number.clone();
449
450        async_with!(&ctx => |ctx|{
451            ctx.spawn(async move {
452                tokio::task::yield_now().await;
453                number_clone.store(1,Ordering::SeqCst);
454            });
455        }).await;
456        assert_eq!(number.load(Ordering::SeqCst),0);
457        // Give drive time to finish the task.
458        tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
459        assert_eq!(number.load(Ordering::SeqCst),1);
460
461    });
462
463    async_test_case!(no_drive => (rt,ctx){
464        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
465
466        let number = Arc::new(AtomicUsize::new(0));
467        let number_clone = number.clone();
468
469        async_with!(&ctx => |ctx|{
470            ctx.spawn(async move {
471                tokio::task::yield_now().await;
472                number_clone.store(1,Ordering::SeqCst);
473            });
474        }).await;
475        assert_eq!(number.load(Ordering::SeqCst),0);
476        tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
477        assert_eq!(number.load(Ordering::SeqCst),0);
478
479    });
480
481    async_test_case!(idle => (rt,ctx){
482        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
483
484        let number = Arc::new(AtomicUsize::new(0));
485        let number_clone = number.clone();
486
487        async_with!(&ctx => |ctx|{
488            ctx.spawn(async move {
489                tokio::task::yield_now().await;
490                number_clone.store(1,Ordering::SeqCst);
491            });
492        }).await;
493        assert_eq!(number.load(Ordering::SeqCst),0);
494        rt.idle().await;
495        assert_eq!(number.load(Ordering::SeqCst),1);
496
497    });
498
499    async_test_case!(recursive_spawn => (rt,ctx){
500        use tokio::sync::oneshot;
501
502        async_with!(&ctx => |ctx|{
503            let ctx_clone = ctx.clone();
504            let (tx,rx) = oneshot::channel::<()>();
505            let (tx2,rx2) = oneshot::channel::<()>();
506            ctx.spawn(async move {
507                tokio::task::yield_now().await;
508
509                let ctx = ctx_clone.clone();
510
511                ctx_clone.spawn(async move {
512                    tokio::task::yield_now().await;
513                    ctx.spawn(async move {
514                        tokio::task::yield_now().await;
515                        tx2.send(()).unwrap();
516                        tokio::task::yield_now().await;
517                    });
518                    tokio::task::yield_now().await;
519                    tx.send(()).unwrap();
520                });
521
522                // Add a bunch of futures just to make sure possible segfaults are more likely to
523                // happen
524                for _ in 0..32{
525                    ctx_clone.spawn(async move {})
526                }
527
528            });
529            tokio::time::timeout(Duration::from_millis(500), rx).await.unwrap().unwrap();
530            tokio::time::timeout(Duration::from_millis(500), rx2).await.unwrap().unwrap();
531        }).await;
532
533    });
534
535    async_test_case!(recursive_spawn_from_script => (rt,ctx) {
536        use std::sync::atomic::{Ordering, AtomicUsize};
537        use crate::prelude::Func;
538
539        static COUNT: AtomicUsize = AtomicUsize::new(0);
540        static SCRIPT: &str = r#"
541
542        async function main() {
543
544          setTimeout(() => {
545            inc_count()
546            setTimeout(async () => {
547                inc_count()
548            }, 100);
549          }, 100);
550        }
551
552        main().catch(print);
553
554
555        "#;
556
557        fn inc_count(){
558            COUNT.fetch_add(1,Ordering::Relaxed);
559        }
560
561        fn set_timeout_spawn<'js>(ctx: Ctx<'js>, callback: Function<'js>, millis: usize) -> Result<()> {
562            ctx.spawn(async move {
563                tokio::time::sleep(Duration::from_millis(millis as u64)).await;
564                callback.call::<_, ()>(()).unwrap();
565            });
566
567            Ok(())
568        }
569
570
571        async_with!(ctx => |ctx|{
572
573            let res: Result<Promise> = (|| {
574                let globals = ctx.globals();
575
576                globals.set("inc_count", Func::from(inc_count))?;
577
578                globals.set("setTimeout", Func::from(set_timeout_spawn))?;
579                let options = EvalOptions{
580                    promise: true,
581                    strict: false,
582                    ..EvalOptions::default()
583                };
584
585                ctx.eval_with_options(SCRIPT, options)?
586            })();
587
588            match res.catch(&ctx){
589                Ok(promise) => {
590                    if let Err(err) = promise.into_future::<Value>().await.catch(&ctx){
591                        eprintln!("{}", err)
592                    }
593                },
594                Err(err) => {
595                    eprintln!("{}", err)
596                },
597            };
598
599        })
600        .await;
601
602        rt.idle().await;
603
604        assert_eq!(COUNT.load(Ordering::Relaxed),2);
605    });
606
607    #[cfg(feature = "parallel")]
608    fn assert_is_send<T: Send>(t: T) -> T {
609        t
610    }
611
612    #[cfg(feature = "parallel")]
613    fn assert_is_sync<T: Send>(t: T) -> T {
614        t
615    }
616
617    #[cfg(feature = "parallel")]
618    #[tokio::test]
619    async fn ensure_types_are_send_sync() {
620        let rt = AsyncRuntime::new().unwrap();
621
622        std::mem::drop(assert_is_sync(rt.idle()));
623        std::mem::drop(assert_is_sync(rt.execute_pending_job()));
624        std::mem::drop(assert_is_sync(rt.drive()));
625
626        std::mem::drop(assert_is_send(rt.idle()));
627        std::mem::drop(assert_is_send(rt.execute_pending_job()));
628        std::mem::drop(assert_is_send(rt.drive()));
629    }
630}