rquickjs_core/runtime/
spawner.rs1use super::{
2 schedular::{Schedular, SchedularPoll},
3 AsyncWeakRuntime, InnerRuntime,
4};
5use crate::AsyncRuntime;
6use alloc::vec::Vec;
7use core::{
8 future::Future,
9 pin::Pin,
10 task::{ready, Context, Poll, Waker},
11};
12
13use async_lock::futures::LockArc;
14
15pub struct Spawner {
17 schedular: Schedular,
18 wakeup: Vec<Waker>,
19}
20
21impl Spawner {
22 pub fn new() -> Self {
23 Spawner {
24 schedular: Schedular::new(),
25 wakeup: Vec::new(),
26 }
27 }
28
29 pub unsafe fn push<F>(&mut self, f: F)
30 where
31 F: Future<Output = ()>,
32 {
33 unsafe { self.schedular.push(f) };
34 self.wakeup.drain(..).for_each(Waker::wake);
35 }
36
37 pub fn listen(&mut self, wake: Waker) {
38 self.wakeup.push(wake);
39 }
40
41 pub fn is_empty(&mut self) -> bool {
42 self.schedular.is_empty()
43 }
44
45 pub fn poll(&mut self, cx: &mut Context) -> SchedularPoll {
46 unsafe { self.schedular.poll(cx) }
47 }
48}
49
50enum DriveFutureState {
51 Initial,
52 Lock {
53 lock_future: Option<LockArc<InnerRuntime>>,
54 _runtime: AsyncRuntime,
56 },
57}
58
59pub struct DriveFuture {
60 rt: AsyncWeakRuntime,
61 state: DriveFutureState,
62}
63
64#[cfg(feature = "parallel")]
65unsafe impl Send for DriveFuture {}
66#[cfg(feature = "parallel")]
67unsafe impl Sync for DriveFuture {}
68
69impl DriveFuture {
70 pub(crate) fn new(rt: AsyncWeakRuntime) -> Self {
71 Self {
72 rt,
73 state: DriveFutureState::Initial,
74 }
75 }
76}
77
78impl Future for DriveFuture {
79 type Output = ();
80
81 fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
82 let this = unsafe { self.get_unchecked_mut() };
84 loop {
85 let mut lock = match this.state {
86 DriveFutureState::Initial => {
87 let Some(_runtime) = this.rt.try_ref() else {
88 return Poll::Ready(());
89 };
90
91 let lock_future = _runtime.inner.lock_arc();
92 this.state = DriveFutureState::Lock {
93 lock_future: Some(lock_future),
94 _runtime,
95 };
96 continue;
97 }
98 DriveFutureState::Lock {
99 ref mut lock_future,
100 ..
101 } => {
102 let res = unsafe {
104 ready!(Pin::new_unchecked(lock_future.as_mut().unwrap()).poll(cx))
105 };
106 *lock_future = None;
108 res
109 }
110 };
111
112 lock.runtime.update_stack_top();
113
114 lock.runtime.get_opaque().listen(cx.waker().clone());
115
116 loop {
117 if let Ok(true) = lock.runtime.execute_pending_job() {
119 continue;
120 }
121
122 match lock.runtime.get_opaque().poll(cx) {
124 SchedularPoll::ShouldYield | SchedularPoll::Empty | SchedularPoll::Pending => {
125 break
126 }
127 SchedularPoll::PendingProgress => {}
128 }
129 }
130
131 this.state = DriveFutureState::Initial;
132 return Poll::Pending;
133 }
134 }
135}