1use std::{
2 ffi::CString,
3 ptr::NonNull,
4 result::Result as StdResult,
5 sync::{Arc, Weak},
6 task::Poll,
7};
8
9#[cfg(feature = "parallel")]
10use std::sync::mpsc::{self, Receiver, Sender};
11
12use async_lock::Mutex;
13
14use super::{
15 opaque::Opaque, raw::RawRuntime, schedular::SchedularPoll, spawner::DriveFuture,
16 InterruptHandler, MemoryUsage,
17};
18use crate::allocator::Allocator;
19#[cfg(feature = "loader")]
20use crate::loader::{Loader, Resolver};
21use crate::{
22 context::AsyncContext, result::AsyncJobException, util::ManualPoll, Ctx, Exception, Result,
23};
24#[cfg(feature = "parallel")]
25use crate::{
26 qjs,
27 util::{AssertSendFuture, AssertSyncFuture},
28};
29
30#[derive(Debug)]
31pub(crate) struct InnerRuntime {
32 pub runtime: RawRuntime,
33 #[cfg(feature = "parallel")]
34 pub drop_recv: Receiver<NonNull<qjs::JSContext>>,
35}
36
37impl InnerRuntime {
38 pub fn drop_pending(&self) {
39 #[cfg(feature = "parallel")]
40 while let Ok(x) = self.drop_recv.try_recv() {
41 unsafe { qjs::JS_FreeContext(x.as_ptr()) }
42 }
43 }
44}
45
46impl Drop for InnerRuntime {
47 fn drop(&mut self) {
48 self.drop_pending();
49 }
50}
51
52#[cfg(feature = "parallel")]
53unsafe impl Send for InnerRuntime {}
54
55#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
59#[derive(Clone)]
60pub struct AsyncWeakRuntime {
61 inner: Weak<Mutex<InnerRuntime>>,
62 #[cfg(feature = "parallel")]
63 drop_send: Sender<NonNull<qjs::JSContext>>,
64}
65
66impl AsyncWeakRuntime {
67 pub fn try_ref(&self) -> Option<AsyncRuntime> {
68 self.inner.upgrade().map(|inner| AsyncRuntime {
69 inner,
70 #[cfg(feature = "parallel")]
71 drop_send: self.drop_send.clone(),
72 })
73 }
74}
75
76#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "futures")))]
78#[derive(Clone)]
79pub struct AsyncRuntime {
80 pub(crate) inner: Arc<Mutex<InnerRuntime>>,
82 #[cfg(feature = "parallel")]
83 pub(crate) drop_send: Sender<NonNull<qjs::JSContext>>,
84}
85
86#[cfg(feature = "parallel")]
89unsafe impl Send for AsyncRuntime {}
90#[cfg(feature = "parallel")]
91unsafe impl Send for AsyncWeakRuntime {}
92
93#[cfg(feature = "parallel")]
97unsafe impl Sync for AsyncRuntime {}
98#[cfg(feature = "parallel")]
99unsafe impl Sync for AsyncWeakRuntime {}
100
101impl AsyncRuntime {
102 #[allow(clippy::arc_with_non_send_sync)]
110 pub fn new() -> Result<Self> {
111 let opaque = Opaque::with_spawner();
112 let runtime = unsafe { RawRuntime::new(opaque) }?;
113
114 #[cfg(feature = "parallel")]
115 let (drop_send, drop_recv) = mpsc::channel();
116
117 Ok(Self {
118 inner: Arc::new(Mutex::new(InnerRuntime {
119 runtime,
120 #[cfg(feature = "parallel")]
121 drop_recv,
122 })),
123 #[cfg(feature = "parallel")]
124 drop_send,
125 })
126 }
127
128 #[allow(clippy::arc_with_non_send_sync)]
133 pub fn new_with_alloc<A>(allocator: A) -> Result<Self>
134 where
135 A: Allocator + 'static,
136 {
137 let opaque = Opaque::with_spawner();
138 let runtime = unsafe { RawRuntime::new_with_allocator(opaque, allocator) }?;
139
140 #[cfg(feature = "parallel")]
141 let (drop_send, drop_recv) = mpsc::channel();
142
143 Ok(Self {
144 inner: Arc::new(Mutex::new(InnerRuntime {
145 runtime,
146 #[cfg(feature = "parallel")]
147 drop_recv,
148 })),
149 #[cfg(feature = "parallel")]
150 drop_send,
151 })
152 }
153
154 pub fn weak(&self) -> AsyncWeakRuntime {
156 AsyncWeakRuntime {
157 inner: Arc::downgrade(&self.inner),
158 #[cfg(feature = "parallel")]
159 drop_send: self.drop_send.clone(),
160 }
161 }
162
163 #[inline]
167 pub async fn set_interrupt_handler(&self, handler: Option<InterruptHandler>) {
168 unsafe {
169 self.inner
170 .lock()
171 .await
172 .runtime
173 .set_interrupt_handler(handler);
174 }
175 }
176
177 #[cfg(feature = "loader")]
179 #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "loader")))]
180 pub async fn set_loader<R, L>(&self, resolver: R, loader: L)
181 where
182 R: Resolver + 'static,
183 L: Loader + 'static,
184 {
185 unsafe {
186 self.inner.lock().await.runtime.set_loader(resolver, loader);
187 }
188 }
189
190 pub async fn set_info<S: Into<Vec<u8>>>(&self, info: S) -> Result<()> {
192 let string = CString::new(info)?;
193 unsafe {
194 self.inner.lock().await.runtime.set_info(string);
195 }
196 Ok(())
197 }
198
199 pub async fn set_memory_limit(&self, limit: usize) {
206 unsafe {
207 self.inner.lock().await.runtime.set_memory_limit(limit);
208 }
209 }
210
211 pub async fn set_max_stack_size(&self, limit: usize) {
215 unsafe {
216 self.inner.lock().await.runtime.set_max_stack_size(limit);
217 }
218 }
219
220 pub async fn set_gc_threshold(&self, threshold: usize) {
222 unsafe {
223 self.inner.lock().await.runtime.set_gc_threshold(threshold);
224 }
225 }
226
227 pub async fn run_gc(&self) {
234 unsafe {
235 let mut lock = self.inner.lock().await;
236 lock.drop_pending();
237 lock.runtime.run_gc();
238 }
239 }
240
241 pub async fn memory_usage(&self) -> MemoryUsage {
243 unsafe { self.inner.lock().await.runtime.memory_usage() }
244 }
245
246 #[inline]
250 pub async fn is_job_pending(&self) -> bool {
251 let lock = self.inner.lock().await;
252
253 lock.runtime.is_job_pending() || !lock.runtime.get_opaque().spawner_is_empty()
254 }
255
256 #[inline]
260 pub async fn execute_pending_job(&self) -> StdResult<bool, AsyncJobException> {
261 let mut lock = self.inner.lock().await;
262 lock.runtime.update_stack_top();
263 lock.drop_pending();
264
265 let f = ManualPoll::new(|cx| {
266 let job_res = lock.runtime.execute_pending_job().map_err(|e| {
267 let ptr = NonNull::new(e)
268 .expect("executing pending job returned a null context on error");
269 AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
270 })?;
271
272 if job_res {
273 return Poll::Ready(Ok(true));
274 }
275
276 match lock.runtime.get_opaque().poll(cx) {
277 SchedularPoll::ShouldYield => Poll::Pending,
278 SchedularPoll::Empty => Poll::Ready(Ok(false)),
279 SchedularPoll::Pending => Poll::Ready(Ok(false)),
280 SchedularPoll::PendingProgress => Poll::Ready(Ok(true)),
281 }
282 });
283
284 #[cfg(feature = "parallel")]
285 let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
286
287 f.await
288 }
289
290 #[inline]
292 pub async fn idle(&self) {
293 let mut lock = self.inner.lock().await;
294 lock.runtime.update_stack_top();
295 lock.drop_pending();
296
297 let f = ManualPoll::new(|cx| {
298 loop {
299 let pending = lock.runtime.execute_pending_job().map_err(|e| {
300 let ptr = NonNull::new(e)
301 .expect("executing pending job returned a null context on error");
302 AsyncJobException(unsafe { AsyncContext::from_raw(ptr, self.clone()) })
303 });
304 match pending {
305 Err(e) => {
306 let ctx = unsafe { Ctx::from_ptr(e.0 .0.ctx.as_ptr()) };
308 let err = ctx.catch();
309 if let Some(x) = err.clone().into_object().and_then(Exception::from_object)
310 {
311 println!("error executing job: {}", x);
313 } else {
314 println!("error executing job: {:?}", err);
315 }
316 }
317 Ok(true) => continue,
318 Ok(false) => {}
319 }
320
321 match lock.runtime.get_opaque().poll(cx) {
322 SchedularPoll::ShouldYield => return Poll::Pending,
323 SchedularPoll::Empty => return Poll::Ready(()),
324 SchedularPoll::Pending => return Poll::Pending,
325 SchedularPoll::PendingProgress => {}
326 }
327 }
328 });
329
330 #[cfg(feature = "parallel")]
331 let f = unsafe { AssertSendFuture::assert(AssertSyncFuture::assert(f)) };
332
333 f.await
334 }
335
336 pub fn drive(&self) -> DriveFuture {
340 DriveFuture::new(self.weak())
341 }
342}
343
344#[cfg(test)]
345macro_rules! async_test_case {
346 ($name:ident => ($rt:ident,$ctx:ident) { $($t:tt)* }) => {
347 #[test]
348 fn $name() {
349 let rt = if cfg!(feature = "parallel") {
350 tokio::runtime::Builder::new_multi_thread()
351 } else {
352 tokio::runtime::Builder::new_current_thread()
353 }
354 .enable_all()
355 .build()
356 .unwrap();
357
358 #[cfg(feature = "parallel")]
359 {
360 rt.block_on(async {
361 let $rt = crate::AsyncRuntime::new().unwrap();
362 let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
363
364 $($t)*
365
366 })
367 }
368 #[cfg(not(feature = "parallel"))]
369 {
370 let set = tokio::task::LocalSet::new();
371 set.block_on(&rt, async {
372 let $rt = crate::AsyncRuntime::new().unwrap();
373 let $ctx = crate::AsyncContext::full(&$rt).await.unwrap();
374
375 $($t)*
376 })
377 }
378 }
379 };
380}
381
382#[cfg(test)]
383mod test {
384 use std::time::Duration;
385
386 use crate::*;
387
388 use self::context::EvalOptions;
389
390 async_test_case!(basic => (_rt,ctx){
391 async_with!(&ctx => |ctx|{
392 let res: i32 = ctx.eval("1 + 1").unwrap();
393 assert_eq!(res,2i32);
394 }).await;
395 });
396
397 async_test_case!(sleep_closure => (_rt,ctx){
398
399 let mut a = 1;
400 let a_ref = &mut a;
401
402
403 async_with!(&ctx => |ctx|{
404 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
405 ctx.globals().set("foo","bar").unwrap();
406 *a_ref += 1;
407 }).await;
408 assert_eq!(a,2);
409 });
410
411 async_test_case!(drive => (rt,ctx){
412 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
413
414 #[cfg(feature = "parallel")]
415 tokio::spawn(rt.drive());
416 #[cfg(not(feature = "parallel"))]
417 tokio::task::spawn_local(rt.drive());
418
419 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
421
422 let number = Arc::new(AtomicUsize::new(0));
423 let number_clone = number.clone();
424
425 async_with!(&ctx => |ctx|{
426 ctx.spawn(async move {
427 tokio::task::yield_now().await;
428 number_clone.store(1,Ordering::SeqCst);
429 });
430 }).await;
431 assert_eq!(number.load(Ordering::SeqCst),0);
432 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
434 assert_eq!(number.load(Ordering::SeqCst),1);
435
436 });
437
438 async_test_case!(no_drive => (rt,ctx){
439 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
440
441 let number = Arc::new(AtomicUsize::new(0));
442 let number_clone = number.clone();
443
444 async_with!(&ctx => |ctx|{
445 ctx.spawn(async move {
446 tokio::task::yield_now().await;
447 number_clone.store(1,Ordering::SeqCst);
448 });
449 }).await;
450 assert_eq!(number.load(Ordering::SeqCst),0);
451 tokio::time::sleep(Duration::from_secs_f64(0.01)).await;
452 assert_eq!(number.load(Ordering::SeqCst),0);
453
454 });
455
456 async_test_case!(idle => (rt,ctx){
457 use std::sync::{Arc, atomic::{Ordering,AtomicUsize}};
458
459 let number = Arc::new(AtomicUsize::new(0));
460 let number_clone = number.clone();
461
462 async_with!(&ctx => |ctx|{
463 ctx.spawn(async move {
464 tokio::task::yield_now().await;
465 number_clone.store(1,Ordering::SeqCst);
466 });
467 }).await;
468 assert_eq!(number.load(Ordering::SeqCst),0);
469 rt.idle().await;
470 assert_eq!(number.load(Ordering::SeqCst),1);
471
472 });
473
474 async_test_case!(recursive_spawn => (rt,ctx){
475 use tokio::sync::oneshot;
476
477 async_with!(&ctx => |ctx|{
478 let ctx_clone = ctx.clone();
479 let (tx,rx) = oneshot::channel::<()>();
480 let (tx2,rx2) = oneshot::channel::<()>();
481 ctx.spawn(async move {
482 tokio::task::yield_now().await;
483
484 let ctx = ctx_clone.clone();
485
486 ctx_clone.spawn(async move {
487 tokio::task::yield_now().await;
488 ctx.spawn(async move {
489 tokio::task::yield_now().await;
490 tx2.send(()).unwrap();
491 tokio::task::yield_now().await;
492 });
493 tokio::task::yield_now().await;
494 tx.send(()).unwrap();
495 });
496
497 for _ in 0..32{
500 ctx_clone.spawn(async move {})
501 }
502
503 });
504 tokio::time::timeout(Duration::from_millis(500), rx).await.unwrap().unwrap();
505 tokio::time::timeout(Duration::from_millis(500), rx2).await.unwrap().unwrap();
506 }).await;
507
508 });
509
510 async_test_case!(recursive_spawn_from_script => (rt,ctx) {
511 use std::sync::atomic::{Ordering, AtomicUsize};
512 use crate::prelude::Func;
513
514 static COUNT: AtomicUsize = AtomicUsize::new(0);
515 static SCRIPT: &str = r#"
516
517 async function main() {
518
519 setTimeout(() => {
520 inc_count()
521 setTimeout(async () => {
522 inc_count()
523 }, 100);
524 }, 100);
525 }
526
527 main().catch(print);
528
529
530 "#;
531
532 fn inc_count(){
533 COUNT.fetch_add(1,Ordering::Relaxed);
534 }
535
536 fn set_timeout_spawn<'js>(ctx: Ctx<'js>, callback: Function<'js>, millis: usize) -> Result<()> {
537 ctx.spawn(async move {
538 tokio::time::sleep(Duration::from_millis(millis as u64)).await;
539 callback.call::<_, ()>(()).unwrap();
540 });
541
542 Ok(())
543 }
544
545
546 async_with!(ctx => |ctx|{
547
548 let res: Result<Promise> = (|| {
549 let globals = ctx.globals();
550
551 globals.set("inc_count", Func::from(inc_count))?;
552
553 globals.set("setTimeout", Func::from(set_timeout_spawn))?;
554 let options = EvalOptions{
555 promise: true,
556 strict: false,
557 ..EvalOptions::default()
558 };
559
560 ctx.eval_with_options(SCRIPT, options)?
561 })();
562
563 match res.catch(&ctx){
564 Ok(promise) => {
565 if let Err(err) = promise.into_future::<Value>().await.catch(&ctx){
566 eprintln!("{}", err)
567 }
568 },
569 Err(err) => {
570 eprintln!("{}", err)
571 },
572 };
573
574 })
575 .await;
576
577 rt.idle().await;
578
579 assert_eq!(COUNT.load(Ordering::Relaxed),2);
580 });
581
582 #[cfg(feature = "parallel")]
583 fn assert_is_send<T: Send>(t: T) -> T {
584 t
585 }
586
587 #[cfg(feature = "parallel")]
588 fn assert_is_sync<T: Send>(t: T) -> T {
589 t
590 }
591
592 #[cfg(feature = "parallel")]
593 #[tokio::test]
594 async fn ensure_types_are_send_sync() {
595 let rt = AsyncRuntime::new().unwrap();
596
597 std::mem::drop(assert_is_sync(rt.idle()));
598 std::mem::drop(assert_is_sync(rt.execute_pending_job()));
599 std::mem::drop(assert_is_sync(rt.drive()));
600
601 std::mem::drop(assert_is_send(rt.idle()));
602 std::mem::drop(assert_is_send(rt.execute_pending_job()));
603 std::mem::drop(assert_is_send(rt.drive()));
604 }
605}