rquickjs_core/runtime/
spawner.rs

1use super::{
2    schedular::{Schedular, SchedularPoll},
3    AsyncWeakRuntime, InnerRuntime,
4};
5use crate::AsyncRuntime;
6use alloc::vec::Vec;
7use core::{
8    future::Future,
9    pin::Pin,
10    task::{ready, Context, Poll, Waker},
11};
12
13use async_lock::futures::LockArc;
14
15/// A structure to hold futures spawned inside the runtime.
16pub struct Spawner {
17    schedular: Schedular,
18    wakeup: Vec<Waker>,
19}
20
21impl Spawner {
22    pub fn new() -> Self {
23        Spawner {
24            schedular: Schedular::new(),
25            wakeup: Vec::new(),
26        }
27    }
28
29    pub unsafe fn push<F>(&mut self, f: F)
30    where
31        F: Future<Output = ()>,
32    {
33        unsafe { self.schedular.push(f) };
34        self.wakeup.drain(..).for_each(Waker::wake);
35    }
36
37    pub fn listen(&mut self, wake: Waker) {
38        self.wakeup.push(wake);
39    }
40
41    pub fn is_empty(&mut self) -> bool {
42        self.schedular.is_empty()
43    }
44
45    pub fn poll(&mut self, cx: &mut Context) -> SchedularPoll {
46        unsafe { self.schedular.poll(cx) }
47    }
48}
49
50enum DriveFutureState {
51    Initial,
52    Lock {
53        lock_future: Option<LockArc<InnerRuntime>>,
54        // Here to ensure the lock remains valid.
55        _runtime: AsyncRuntime,
56    },
57}
58
59pub struct DriveFuture {
60    rt: AsyncWeakRuntime,
61    state: DriveFutureState,
62}
63
64#[cfg(feature = "parallel")]
65unsafe impl Send for DriveFuture {}
66#[cfg(feature = "parallel")]
67unsafe impl Sync for DriveFuture {}
68
69impl DriveFuture {
70    pub(crate) fn new(rt: AsyncWeakRuntime) -> Self {
71        Self {
72            rt,
73            state: DriveFutureState::Initial,
74        }
75    }
76}
77
78impl Future for DriveFuture {
79    type Output = ();
80
81    fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
82        // Safety: We manually ensure that pinned values remained properly pinned.
83        let this = unsafe { self.get_unchecked_mut() };
84        loop {
85            let mut lock = match this.state {
86                DriveFutureState::Initial => {
87                    let Some(_runtime) = this.rt.try_ref() else {
88                        return Poll::Ready(());
89                    };
90
91                    let lock_future = _runtime.inner.lock_arc();
92                    this.state = DriveFutureState::Lock {
93                        lock_future: Some(lock_future),
94                        _runtime,
95                    };
96                    continue;
97                }
98                DriveFutureState::Lock {
99                    ref mut lock_future,
100                    ..
101                } => {
102                    // Safety: The future will not be moved until it is ready and then dropped.
103                    let res = unsafe {
104                        ready!(Pin::new_unchecked(lock_future.as_mut().unwrap()).poll(cx))
105                    };
106                    // Assign none explicitly so it we don't move out of the future.
107                    *lock_future = None;
108                    res
109                }
110            };
111
112            lock.runtime.update_stack_top();
113
114            lock.runtime.get_opaque().listen(cx.waker().clone());
115
116            loop {
117                // TODO: Handle error.
118                if let Ok(true) = lock.runtime.execute_pending_job() {
119                    continue;
120                }
121
122                // TODO: Handle error.
123                match lock.runtime.get_opaque().poll(cx) {
124                    SchedularPoll::ShouldYield | SchedularPoll::Empty | SchedularPoll::Pending => {
125                        break
126                    }
127                    SchedularPoll::PendingProgress => {}
128                }
129            }
130
131            this.state = DriveFutureState::Initial;
132            return Poll::Pending;
133        }
134    }
135}