rt_local_core/
base_impl.rs

1use slabmap::SlabMap;
2use std::{
3    cell::RefCell,
4    collections::VecDeque,
5    future::Future,
6    mem::{replace, swap},
7    ops::ControlFlow,
8    pin::{pin, Pin},
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc, Mutex,
12    },
13    task::{Context, Poll, Wake, Waker},
14};
15
16const ID_NULL: usize = usize::MAX;
17const ID_MAIN: usize = usize::MAX - 1;
18
19pub trait RuntimeInjector: 'static {
20    fn waker(&self) -> Arc<dyn RuntimeWaker>;
21}
22pub trait RuntimeLoop {
23    fn waker(&self) -> Arc<dyn RuntimeWaker>;
24    fn run<T>(&self, on_step: impl FnMut() -> ControlFlow<T>) -> T;
25}
26
27pub trait RuntimeWaker: 'static + Send + Sync {
28    fn wake(&self);
29}
30
31pub fn run<F: Future>(l: &impl RuntimeLoop, future: F) -> F::Output {
32    let mut runner = Runner::new(l.waker(), None);
33    Runtime::enter(&runner.rc);
34    runner.rc.push_wake(ID_MAIN);
35
36    let mut main = pin!(future);
37    let main_wake = TaskWake::new(ID_MAIN, &runner.rc);
38    let value = l.run(|| {
39        while runner.ready_requests() {
40            for id in runner.wakes.drain(..) {
41                if id == ID_MAIN {
42                    match main
43                        .as_mut()
44                        .poll(&mut Context::from_waker(&main_wake.waker()))
45                    {
46                        Poll::Ready(value) => return ControlFlow::Break(value),
47                        Poll::Pending => {}
48                    }
49                } else {
50                    run_item(&mut runner.rs[id]);
51                }
52            }
53            runner.apply_drops();
54        }
55        ControlFlow::Continue(())
56    });
57    Runtime::leave();
58    value
59}
60
61thread_local! {
62    static RUNNER: RefCell<Option<Runner>> = RefCell::new(None);
63}
64
65pub fn enter(injector: impl RuntimeInjector) {
66    let runner = Runner::new(injector.waker(), Some(Box::new(injector)));
67    Runtime::enter(&runner.rc);
68    RUNNER.with(|r| *r.borrow_mut() = Some(runner));
69}
70pub fn leave() {
71    let runner = RUNNER.with(|r| r.borrow_mut().take().expect("runtime is not exists"));
72    Runtime::leave();
73    drop(runner);
74}
75pub fn on_step() {
76    RUNNER.with(|r| {
77        r.borrow_mut()
78            .as_mut()
79            .expect("runtime is not exists")
80            .step()
81    });
82}
83pub fn on_idle() -> bool {
84    if let Some(on_idle) = Runtime::with(|rt| rt.rc.pop_on_idle()) {
85        on_idle.wake();
86        true
87    } else {
88        false
89    }
90}
91
92/// Spawn a future on the current thread.
93///
94/// # Panics
95///
96/// Panics if the runtime is not running.
97#[must_use]
98#[track_caller]
99pub fn spawn_local<F: Future + 'static>(future: F) -> Task<F::Output> {
100    Runtime::with(|rt| {
101        let need_wake = rt.rs.is_empty();
102        let task = RawTask::new(&rt.rc);
103        rt.rs.push(Box::pin(RawRunnable {
104            task: task.clone(),
105            future,
106        }));
107        if need_wake {
108            rt.rc.0.waker.wake();
109        }
110        Task {
111            task,
112            is_detach: false,
113        }
114    })
115}
116
117/// Wait until there are no more operations to be performed now on the current thread.
118///
119/// The "operations to be performed now" include not only tasks spawned by [`spawn_local`], but also events handled by the runtime backend.
120pub async fn wait_for_idle() {
121    struct WaitForIdle {
122        is_ready: bool,
123    }
124    impl Future for WaitForIdle {
125        type Output = ();
126
127        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128            if self.is_ready {
129                Poll::Ready(())
130            } else {
131                self.is_ready = true;
132                Runtime::with(|rt| rt.rc.push_on_idle(cx.waker().clone()));
133                Poll::Pending
134            }
135        }
136    }
137
138    WaitForIdle { is_ready: false }.await;
139}
140
141#[derive(Clone)]
142struct RequestChannel(Arc<RequestsData>);
143
144impl RequestChannel {
145    fn new(waker: Arc<dyn RuntimeWaker>) -> Self {
146        Self(Arc::new(RequestsData {
147            reqs: Mutex::new(RawRequests::new()),
148            waker,
149        }))
150    }
151    fn swap(&self, wakes: &mut Vec<usize>, drops: &mut Vec<usize>) {
152        let mut d = self.0.reqs.lock().unwrap();
153        swap(wakes, &mut d.wakes);
154        swap(drops, &mut d.drops);
155    }
156    fn push_with(&self, f: impl FnOnce(&mut RawRequests)) {
157        let mut d = self.0.reqs.lock().unwrap();
158        let call_wake = d.is_empty();
159        f(&mut d);
160        if call_wake {
161            self.0.waker.wake();
162        }
163    }
164    fn push_wake(&self, id: usize) {
165        self.push_with(|d| d.wakes.push(id));
166    }
167    fn push_drop(&self, id: usize) {
168        self.push_with(|d| d.drops.push(id));
169    }
170    fn push_on_idle(&self, waker: Waker) {
171        self.push_with(|d| d.on_idle.push_back(waker));
172    }
173    fn pop_on_idle(&self) -> Option<Waker> {
174        self.0.reqs.lock().unwrap().on_idle.pop_front()
175    }
176}
177struct RequestsData {
178    waker: Arc<dyn RuntimeWaker>,
179    reqs: Mutex<RawRequests>,
180}
181
182struct RawRequests {
183    wakes: Vec<usize>,
184    drops: Vec<usize>,
185    on_idle: VecDeque<Waker>,
186}
187
188impl RawRequests {
189    fn new() -> Self {
190        Self {
191            wakes: Vec::new(),
192            drops: Vec::new(),
193            on_idle: VecDeque::new(),
194        }
195    }
196    fn is_empty(&self) -> bool {
197        self.wakes.is_empty() && self.drops.is_empty() && self.on_idle.is_empty()
198    }
199}
200
201thread_local! {
202    static RUNTIME: RefCell<Option<Runtime>> = RefCell::new(None);
203}
204
205struct Runtime {
206    rc: RequestChannel,
207    rs: Vec<Pin<Box<dyn DynRunnable>>>,
208}
209
210impl Runtime {
211    fn new(rc: RequestChannel) -> Self {
212        Self { rc, rs: Vec::new() }
213    }
214    fn enter(rc: &RequestChannel) {
215        RUNTIME.with(|rt| {
216            let mut rt = rt.borrow_mut();
217            if rt.is_some() {
218                panic!("runtime is already running");
219            }
220            *rt = Some(Runtime::new(rc.clone()));
221        })
222    }
223    fn leave() {
224        RUNTIME.with(|rt| rt.borrow_mut().take());
225    }
226    #[track_caller]
227    fn with<T>(f: impl FnOnce(&mut Self) -> T) -> T {
228        RUNTIME
229            .with(|rt| rt.borrow_mut().as_mut().map(f))
230            .expect("runtime is not running")
231    }
232}
233
234/// A spawned task.
235///
236/// When a [`Task`] is dropped, the asynchronous operation is canceled.
237///
238/// To drop a task without canceling, it is necessary to call [`Task::detach()`].
239pub struct Task<T> {
240    task: Arc<RawTask<T>>,
241    is_detach: bool,
242}
243
244struct RawTask<T> {
245    state: Mutex<TaskState<T>>,
246    reqs: RequestChannel,
247}
248
249enum TaskState<T> {
250    Running { id: usize, waker: Option<Waker> },
251    Cancelled,
252    Completed(T),
253    Finished,
254}
255
256impl<T> Task<T> {
257    /// Drop a task without canceling.
258    pub fn detach(mut self) {
259        self.is_detach = true;
260    }
261}
262
263impl<T> Drop for Task<T> {
264    fn drop(&mut self) {
265        if !self.is_detach {
266            let mut state = self.task.state.lock().unwrap();
267            if let &TaskState::Running { id, .. } = &*state {
268                *state = TaskState::Cancelled;
269                if id != ID_NULL {
270                    self.task.reqs.push_wake(id);
271                }
272            }
273        }
274    }
275}
276impl<T> Future for Task<T> {
277    type Output = T;
278
279    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
280        let mut state = self.task.state.lock().unwrap();
281        match &*state {
282            &TaskState::Running { id, .. } => {
283                *state = TaskState::Running {
284                    id,
285                    waker: Some(cx.waker().clone()),
286                };
287                Poll::Pending
288            }
289            TaskState::Cancelled => Poll::Pending,
290            TaskState::Completed(_) => {
291                if let TaskState::Completed(value) = replace(&mut *state, TaskState::Finished) {
292                    Poll::Ready(value)
293                } else {
294                    unreachable!()
295                }
296            }
297            TaskState::Finished => panic!("`poll` called twice"),
298        }
299    }
300}
301
302impl<T> RawTask<T> {
303    fn new(rc: &RequestChannel) -> Arc<Self> {
304        Arc::new(RawTask {
305            state: Mutex::new(TaskState::Running {
306                id: ID_NULL,
307                waker: None,
308            }),
309            reqs: rc.clone(),
310        })
311    }
312    fn complete(&self, value: T) {
313        if let TaskState::Running {
314            waker: Some(waker), ..
315        } = replace(
316            &mut *self.state.lock().unwrap(),
317            TaskState::Completed(value),
318        ) {
319            waker.wake()
320        }
321    }
322    fn is_cancelled(&self) -> bool {
323        matches!(&*self.state.lock().unwrap(), TaskState::Cancelled)
324    }
325}
326
327trait DynRunnable {
328    fn set_id(self: Pin<&Self>, id: usize);
329    fn run(self: Pin<&mut Self>, waker: &Waker) -> bool;
330}
331
332struct RawRunnable<F: Future> {
333    task: Arc<RawTask<F::Output>>,
334    future: F,
335}
336impl<Fut: Future> DynRunnable for RawRunnable<Fut> {
337    fn set_id(self: Pin<&Self>, id: usize) {
338        if let TaskState::Running { id: id_, .. } = &mut *self.task.state.lock().unwrap() {
339            *id_ = id;
340        }
341    }
342    fn run(self: Pin<&mut Self>, waker: &Waker) -> bool {
343        if self.task.is_cancelled() {
344            false
345        } else {
346            unsafe {
347                let this = self.get_unchecked_mut();
348                let f = Pin::new_unchecked(&mut this.future);
349                if let Poll::Ready(value) = f.poll(&mut Context::from_waker(waker)) {
350                    this.task.complete(value);
351                    false
352                } else {
353                    true
354                }
355            }
356        }
357    }
358}
359
360struct Runner {
361    rc: RequestChannel,
362    wakes: Vec<usize>,
363    drops: Vec<usize>,
364    rs: SlabMap<Option<Runnable>>,
365    _injector: Option<Box<dyn RuntimeInjector>>,
366}
367
368impl Runner {
369    fn new(waker: Arc<dyn RuntimeWaker>, injector: Option<Box<dyn RuntimeInjector>>) -> Self {
370        Self {
371            rc: RequestChannel::new(waker),
372            wakes: Vec::new(),
373            drops: Vec::new(),
374            rs: SlabMap::new(),
375            _injector: injector,
376        }
377    }
378    fn ready_requests(&mut self) -> bool {
379        self.rc.swap(&mut self.wakes, &mut self.drops);
380        Runtime::with(|rt| {
381            for r in rt.rs.drain(..) {
382                self.wakes.push(
383                    self.rs
384                        .insert_with_key(|id| Some(Runnable::new(r, id, &self.rc))),
385                );
386            }
387        });
388        !self.wakes.is_empty() || !self.drops.is_empty()
389    }
390    fn apply_drops(&mut self) {
391        for id in self.drops.drain(..) {
392            self.rs.remove(id);
393        }
394    }
395
396    fn step(&mut self) {
397        while self.ready_requests() {
398            for id in self.wakes.drain(..) {
399                run_item(&mut self.rs[id]);
400            }
401            self.apply_drops();
402        }
403    }
404}
405
406struct Runnable {
407    wake: Arc<TaskWake>,
408    r: Pin<Box<dyn DynRunnable>>,
409}
410
411impl Runnable {
412    fn new(r: Pin<Box<dyn DynRunnable>>, id: usize, rc: &RequestChannel) -> Self {
413        r.as_ref().set_id(id);
414        Self {
415            wake: TaskWake::new(id, rc),
416            r,
417        }
418    }
419    fn run(&mut self) -> bool {
420        self.r.as_mut().run(&self.wake.waker())
421    }
422}
423fn run_item(r: &mut Option<Runnable>) {
424    if let Some(runnable) = r {
425        if !runnable.run() {
426            r.take();
427        }
428    }
429}
430
431struct TaskWake {
432    id: usize,
433    is_wake: AtomicBool,
434    rc: RequestChannel,
435}
436
437impl TaskWake {
438    fn new(id: usize, rc: &RequestChannel) -> Arc<Self> {
439        Arc::new(TaskWake {
440            id,
441            is_wake: AtomicBool::new(true),
442            rc: rc.clone(),
443        })
444    }
445    fn waker(self: &Arc<Self>) -> Waker {
446        self.is_wake.store(false, Ordering::SeqCst);
447        self.clone().into()
448    }
449}
450
451impl Wake for TaskWake {
452    fn wake(self: Arc<Self>) {
453        if !self.is_wake.swap(true, Ordering::SeqCst) {
454            self.rc.push_wake(self.id)
455        }
456    }
457}
458impl Drop for TaskWake {
459    fn drop(&mut self) {
460        self.rc.push_drop(self.id);
461    }
462}