rquickjs_core/runtime/
async.rs

1use std::{
2    ffi::CString,
3    ptr::NonNull,
4    result::Result as StdResult,
5    sync::{Arc, Weak},
6    task::Poll,
7};
8
9#[cfg(feature = "parallel")]
10use std::sync::mpsc::{self, Receiver, Sender};
11
12use async_lock::Mutex;
13
14use super::{
15    opaque::Opaque, raw::RawRuntime, schedular::SchedularPoll, spawner::DriveFuture,
16    InterruptHandler, MemoryUsage,
17};
18use crate::allocator::Allocator;
19#[cfg(feature = "loader")]
20use crate::loader::{Loader, Resolver};
21use crate::{
22    context::AsyncContext, result::AsyncJobException, util::ManualPoll, Ctx, Exception, Result,
23};
24#[cfg(feature = "parallel")]
25use crate::{
26    qjs,
27    util::{AssertSendFuture, AssertSyncFuture},
28};
29
30#[derive(Debug)]
31pub(crate) struct InnerRuntime {
32    pub runtime: RawRuntime,
33    #[cfg(feature = "parallel")]
34    pub drop_recv: Receiver<NonNull<qjs::JSContext>>,
35}
36
37impl InnerRuntime {
38    pub fn drop_pending(&self) {
39        #[cfg(feature = "parallel")]
40        while let Ok(x) = self.drop_recv.try_recv() {
41            unsafe { qjs::JS_FreeContext(x.as_ptr()) }
42        }
43    }
44}
45
46impl Drop for InnerRuntime {
47    fn drop(&mut self) {
48        self.drop_pending();
49    }
50}
51
52#[cfg(feature = "parallel")]
53unsafe impl Send for InnerRuntime {}
54
55/// A weak handle to the async runtime.
56///
57/// Holding onto this struct does not prevent the runtime from being dropped.
58#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
59#[derive(Clone)]
60pub struct AsyncWeakRuntime {
61    inner: Weak<Mutex<InnerRuntime>>,
62    #[cfg(feature = "parallel")]
63    drop_send: Sender<NonNull<qjs::JSContext>>,
64}
65
66impl AsyncWeakRuntime {
67    pub fn try_ref(&self) -> Option<AsyncRuntime> {
68        self.inner.upgrade().map(|inner| AsyncRuntime {
69            inner,
70            #[cfg(feature = "parallel")]
71            drop_send: self.drop_send.clone(),
72        })
73    }
74}
75
76/// Asynchronous QuickJS runtime, entry point of the library.
77#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
78#[derive(Clone)]
79pub struct AsyncRuntime {
80    // use Arc instead of Ref so we can use OwnedLock
81    pub(crate) inner: Arc<Mutex<InnerRuntime>>,
82    #[cfg(feature = "parallel")]
83    pub(crate) drop_send: Sender<NonNull<qjs::JSContext>>,
84}
85
86// Since all functions which use runtime are behind a mutex
87// sending the runtime to other threads should be fine.
88#[cfg(feature = "parallel")]
89unsafe impl Send for AsyncRuntime {}
90#[cfg(feature = "parallel")]
91unsafe impl Send for AsyncWeakRuntime {}
92
93// Since a global lock needs to be locked for safe use
94// using runtime in a sync way should be safe as
95// simultaneous accesses is synchronized behind a lock.
96#[cfg(feature = "parallel")]
97unsafe impl Sync for AsyncRuntime {}
98#[cfg(feature = "parallel")]
99unsafe impl Sync for AsyncWeakRuntime {}
100
101impl AsyncRuntime {
102    /// Create a new runtime.
103    ///
104    /// Will generally only fail if not enough memory was available.
105    ///
106    /// # Features
107    /// *If the `"rust-alloc"` feature is enabled the Rust's global allocator will be used in favor of libc's one.*
108    // Annoying false positive clippy lint
109    #[allow(clippy::arc_with_non_send_sync)]
110    pub fn new() -> Result<Self> {
111        let opaque = Opaque::with_spawner();
112        let runtime = unsafe { RawRuntime::new(opaque) }?;
113
114        #[cfg(feature = "parallel")]
115        let (drop_send, drop_recv) = mpsc::channel();
116
117        Ok(Self {
118            inner: Arc::new(Mutex::new(InnerRuntime {
119                runtime,
120                #[cfg(feature = "parallel")]
121                drop_recv,
122            })),
123            #[cfg(feature = "parallel")]
124            drop_send,
125        })
126    }
127
128    /// Create a new runtime using specified allocator
129    ///
130    /// Will generally only fail if not enough memory was available.
131    // Annoying false positive clippy lint
132    #[allow(clippy::arc_with_non_send_sync)]
133    pub fn new_with_alloc<A>(allocator: A) -> Result<Self>
134    where
135        A: Allocator + 'static,
136    {
137        let opaque = Opaque::with_spawner();
138        let runtime = unsafe { RawRuntime::new_with_allocator(opaque, allocator) }?;
139
140        #[cfg(feature = "parallel")]
141        let (drop_send, drop_recv) = mpsc::channel();
142
143        Ok(Self {
144            inner: Arc::new(Mutex::new(InnerRuntime {
145                runtime,
146                #[cfg(feature = "parallel")]
147                drop_recv,
148            })),
149            #[cfg(feature = "parallel")]
150            drop_send,
151        })
152    }
153
154    /// Get weak ref to runtime
155    pub fn weak(&self) -> AsyncWeakRuntime {
156        AsyncWeakRuntime {
157            inner: Arc::downgrade(&self.inner),
158            #[cfg(feature = "parallel")]
159            drop_send: self.drop_send.clone(),
160        }
161    }
162
163    /// Set a closure which is regularly called by the engine when it is executing code.
164    /// If the provided closure returns `true` the interpreter will raise and uncatchable
165    /// exception and return control flow to the caller.
166    #[inline]
167    pub async fn set_interrupt_handler(&self, handler: Option<InterruptHandler>) {
168        unsafe {
169            self.inner
170                .lock()
171                .await
172                .runtime
173                .set_interrupt_handler(handler);
174        }
175    }
176
177    /// Set the module loader
178    #[cfg(feature = "loader")]
179    #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "loader")))]
180    pub async fn set_loader<R, L>(&self, resolver: R, loader: L)
181    where
182        R: Resolver + 'static,
183        L: Loader + 'static,
184    {
185        unsafe {
186            self.inner.lock().await.runtime.set_loader(resolver, loader);
187        }
188    }
189
190    /// Set the info of the runtime
191    pub async fn set_info<S: Into<Vec<u8>>>(&self, info: S) -> Result<()> {
192        let string = CString::new(info)?;
193        unsafe {
194            self.inner.lock().await.runtime.set_info(string);
195        }
196        Ok(())
197    }
198
199    /// Set a limit on the max amount of memory the runtime will use.
200    ///
201    /// Setting the limit to 0 is equivalent to unlimited memory.
202    ///
203    /// Note that is a Noop when a custom allocator is being used,
204    /// as is the case for the "rust-alloc" or "allocator" features.
205    pub async fn set_memory_limit(&self, limit: usize) {
206        unsafe {
207            self.inner.lock().await.runtime.set_memory_limit(limit);
208        }
209    }
210
211    /// Set a limit on the max size of stack the runtime will use.
212    ///
213    /// The default values is 256x1024 bytes.
214    pub async fn set_max_stack_size(&self, limit: usize) {
215        unsafe {
216            self.inner.lock().await.runtime.set_max_stack_size(limit);
217        }
218    }
219
220    /// Set a memory threshold for garbage collection.
221    pub async fn set_gc_threshold(&self, threshold: usize) {
222        unsafe {
223            self.inner.lock().await.runtime.set_gc_threshold(threshold);
224        }
225    }
226
227    /// Manually run the garbage collection.
228    ///
229    /// Most QuickJS values are reference counted and
230    /// will automatically free themselves when they have no more
231    /// references. The garbage collector is only for collecting
232    /// cyclic references.
233    pub async fn run_gc(&self) {
234        unsafe {
235            let mut lock = self.inner.lock().await;
236            lock.drop_pending();
237            lock.runtime.run_gc();
238        }
239    }
240
241    /// Get memory usage stats
242    pub async fn memory_usage(&self) -> MemoryUsage {
243        unsafe { self.inner.lock().await.runtime.memory_usage() }
244    }
245
246    /// Test for pending jobs
247    ///
248    /// Returns true when at least one job is pending.
249    #[inline]
250    pub async fn is_job_pending(&self) -> bool {
251        let lock = self.inner.lock().await;
252
253        lock.runtime.is_job_pending() || !lock.runtime.get_opaque().spawner_is_empty()
254    }
255
256    /// Execute first pending job
257    ///
258    /// Returns true when job was executed or false when queue is empty or error when exception thrown under execution.
259    #[inline]
260    pub async fn execute_pending_job(&self) -> StdResult<bool, AsyncJobException> {
261        let mut lock = self.inner.lock().await;
262        lock.runtime.update_stack_top();
263        lock.drop_pending();
264
265        let f = ManualPoll::new(|cx| {
266            let job_res = lock.runtime.execute_pending_job().map_err(|e| {
267                let ptr = NonNull::new(e)
268                    .expect("executing pending job returned a null context on error");
269                AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
270            })?;
271
272            if job_res {
273                return Poll::Ready(Ok(true));
274            }
275
276            match lock.runtime.get_opaque().poll(cx) {
277                SchedularPoll::ShouldYield => Poll::Pending,
278                SchedularPoll::Empty => Poll::Ready(Ok(false)),
279                SchedularPoll::Pending => Poll::Ready(Ok(false)),
280                SchedularPoll::PendingProgress => Poll::Ready(Ok(true)),
281            }
282        });
283
284        #[cfg(feature = "parallel")]
285        let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
286
287        f.await
288    }
289
290    /// Run all futures and jobs in the runtime until all are finished.
291    #[inline]
292    pub async fn idle(&self) {
293        let mut lock = self.inner.lock().await;
294        lock.runtime.update_stack_top();
295        lock.drop_pending();
296
297        let f = ManualPoll::new(|cx| {
298            loop {
299                let pending = lock.runtime.execute_pending_job().map_err(|e| {
300                    let ptr = NonNull::new(e)
301                        .expect("executing pending job returned a null context on error");
302                    AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
303                });
304                match pending {
305                    Err(e) => {
306                        // SAFETY: Runtime is already locked so creating a context is safe.
307                        let ctx = unsafe { Ctx::from_ptr(e.0 .0.ctx.as_ptr()) };
308                        let err = ctx.catch();
309                        if let Some(x) = err.clone().into_object().and_then(Exception::from_object)
310                        {
311                            // TODO do something better with errors.
312                            println!("error executing job: {}", x);
313                        } else {
314                            println!("error executing job: {:?}", err);
315                        }
316                    }
317                    Ok(true) => continue,
318                    Ok(false) => {}
319                }
320
321                match lock.runtime.get_opaque().poll(cx) {
322                    SchedularPoll::ShouldYield => return Poll::Pending,
323                    SchedularPoll::Empty => return Poll::Ready(()),
324                    SchedularPoll::Pending => return Poll::Pending,
325                    SchedularPoll::PendingProgress => {}
326                }
327            }
328        });
329
330        #[cfg(feature = "parallel")]
331        let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
332
333        f.await
334    }
335
336    /// Returns a future that completes when the runtime is dropped.
337    /// If the future is polled it will drive futures spawned inside the runtime completing them
338    /// even if runtime is currently not in use.
339    pub fn drive(&self) -> DriveFuture {
340        DriveFuture::new(self.weak())
341    }
342}
343
344#[cfg(test)]
345macro_rules! async_test_case {
346    ($name:ident => ($rt:ident,$ctx:ident) { $($t:tt)* }) => {
347    #[test]
348    fn $name() {
349        let rt = if cfg!(feature = "parallel") {
350            tokio::runtime::Builder::new_multi_thread()
351        } else {
352            tokio::runtime::Builder::new_current_thread()
353        }
354        .enable_all()
355        .build()
356        .unwrap();
357
358        #[cfg(feature = "parallel")]
359        {
360            rt.block_on(async {
361                let $rt = crate::AsyncRuntime::new().unwrap();
362                let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
363
364                $($t)*
365
366            })
367        }
368        #[cfg(not(feature = "parallel"))]
369        {
370            let set = tokio::task::LocalSet::new();
371            set.block_on(&rt, async {
372                let $rt = crate::AsyncRuntime::new().unwrap();
373                let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
374
375                $($t)*
376            })
377        }
378    }
379    };
380}
381
382#[cfg(test)]
383mod test {
384    use std::time::Duration;
385
386    use crate::*;
387
388    use self::context::EvalOptions;
389
390    async_test_case!(basic => (_rt,ctx){
391        async_with!(&ctx => |ctx|{
392            let res: i32 = ctx.eval("1 + 1").unwrap();
393            assert_eq!(res,2i32);
394        }).await;
395    });
396
397    async_test_case!(sleep_closure => (_rt,ctx){
398
399        let mut a = 1;
400        let a_ref = &mut a;
401
402
403        async_with!(&ctx => |ctx|{
404            tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
405            ctx.globals().set("foo","bar").unwrap();
406            *a_ref += 1;
407        }).await;
408        assert_eq!(a,2);
409    });
410
411    async_test_case!(drive => (rt,ctx){
412        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
413
414        #[cfg(feature = "parallel")]
415        tokio::spawn(rt.drive());
416        #[cfg(not(feature = "parallel"))]
417        tokio::task::spawn_local(rt.drive());
418
419        // Give drive time to start.
420        tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
421
422        let number = Arc::new(AtomicUsize::new(0));
423        let number_clone = number.clone();
424
425        async_with!(&ctx => |ctx|{
426            ctx.spawn(async move {
427                tokio::task::yield_now().await;
428                number_clone.store(1,Ordering::SeqCst);
429            });
430        }).await;
431        assert_eq!(number.load(Ordering::SeqCst),0);
432        // Give drive time to finish the task.
433        tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
434        assert_eq!(number.load(Ordering::SeqCst),1);
435
436    });
437
438    async_test_case!(no_drive => (rt,ctx){
439        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
440
441        let number = Arc::new(AtomicUsize::new(0));
442        let number_clone = number.clone();
443
444        async_with!(&ctx => |ctx|{
445            ctx.spawn(async move {
446                tokio::task::yield_now().await;
447                number_clone.store(1,Ordering::SeqCst);
448            });
449        }).await;
450        assert_eq!(number.load(Ordering::SeqCst),0);
451        tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
452        assert_eq!(number.load(Ordering::SeqCst),0);
453
454    });
455
456    async_test_case!(idle => (rt,ctx){
457        use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
458
459        let number = Arc::new(AtomicUsize::new(0));
460        let number_clone = number.clone();
461
462        async_with!(&ctx => |ctx|{
463            ctx.spawn(async move {
464                tokio::task::yield_now().await;
465                number_clone.store(1,Ordering::SeqCst);
466            });
467        }).await;
468        assert_eq!(number.load(Ordering::SeqCst),0);
469        rt.idle().await;
470        assert_eq!(number.load(Ordering::SeqCst),1);
471
472    });
473
474    async_test_case!(recursive_spawn => (rt,ctx){
475        use tokio::sync::oneshot;
476
477        async_with!(&ctx => |ctx|{
478            let ctx_clone = ctx.clone();
479            let (tx,rx) = oneshot::channel::<()>();
480            let (tx2,rx2) = oneshot::channel::<()>();
481            ctx.spawn(async move {
482                tokio::task::yield_now().await;
483
484                let ctx = ctx_clone.clone();
485
486                ctx_clone.spawn(async move {
487                    tokio::task::yield_now().await;
488                    ctx.spawn(async move {
489                        tokio::task::yield_now().await;
490                        tx2.send(()).unwrap();
491                        tokio::task::yield_now().await;
492                    });
493                    tokio::task::yield_now().await;
494                    tx.send(()).unwrap();
495                });
496
497                // Add a bunch of futures just to make sure possible segfaults are more likely to
498                // happen
499                for _ in 0..32{
500                    ctx_clone.spawn(async move {})
501                }
502
503            });
504            tokio::time::timeout(Duration::from_millis(500), rx).await.unwrap().unwrap();
505            tokio::time::timeout(Duration::from_millis(500), rx2).await.unwrap().unwrap();
506        }).await;
507
508    });
509
510    async_test_case!(recursive_spawn_from_script => (rt,ctx) {
511        use std::sync::atomic::{Ordering, AtomicUsize};
512        use crate::prelude::Func;
513
514        static COUNT: AtomicUsize = AtomicUsize::new(0);
515        static SCRIPT: &str = r#"
516
517        async function main() {
518
519          setTimeout(() => {
520            inc_count()
521            setTimeout(async () => {
522                inc_count()
523            }, 100);
524          }, 100);
525        }
526
527        main().catch(print);
528
529
530        "#;
531
532        fn inc_count(){
533            COUNT.fetch_add(1,Ordering::Relaxed);
534        }
535
536        fn set_timeout_spawn<'js>(ctx: Ctx<'js>, callback: Function<'js>, millis: usize) -> Result<()> {
537            ctx.spawn(async move {
538                tokio::time::sleep(Duration::from_millis(millis as u64)).await;
539                callback.call::<_, ()>(()).unwrap();
540            });
541
542            Ok(())
543        }
544
545
546        async_with!(ctx => |ctx|{
547
548            let res: Result<Promise> = (|| {
549                let globals = ctx.globals();
550
551                globals.set("inc_count", Func::from(inc_count))?;
552
553                globals.set("setTimeout", Func::from(set_timeout_spawn))?;
554                let options = EvalOptions{
555                    promise: true,
556                    strict: false,
557                    ..EvalOptions::default()
558                };
559
560                ctx.eval_with_options(SCRIPT, options)?
561            })();
562
563            match res.catch(&ctx){
564                Ok(promise) => {
565                    if let Err(err) = promise.into_future::<Value>().await.catch(&ctx){
566                        eprintln!("{}", err)
567                    }
568                },
569                Err(err) => {
570                    eprintln!("{}", err)
571                },
572            };
573
574        })
575        .await;
576
577        rt.idle().await;
578
579        assert_eq!(COUNT.load(Ordering::Relaxed),2);
580    });
581
582    #[cfg(feature = "parallel")]
583    fn assert_is_send<T: Send>(t: T) -> T {
584        t
585    }
586
587    #[cfg(feature = "parallel")]
588    fn assert_is_sync<T: Send>(t: T) -> T {
589        t
590    }
591
592    #[cfg(feature = "parallel")]
593    #[tokio::test]
594    async fn ensure_types_are_send_sync() {
595        let rt = AsyncRuntime::new().unwrap();
596
597        std::mem::drop(assert_is_sync(rt.idle()));
598        std::mem::drop(assert_is_sync(rt.execute_pending_job()));
599        std::mem::drop(assert_is_sync(rt.drive()));
600
601        std::mem::drop(assert_is_send(rt.idle()));
602        std::mem::drop(assert_is_send(rt.execute_pending_job()));
603        std::mem::drop(assert_is_send(rt.drive()));
604    }
605}