microasync_rt/
queued_runtime.rs

1extern crate alloc;
2#[cfg(not(feature = "no_std"))]
3extern crate std;
4
5use core::mem;
6use core::time::Duration;
7use core::{cell::RefCell, future::Future, pin::Pin, task::Poll};
8
9use alloc::boxed::Box;
10use alloc::collections::VecDeque;
11
12use async_core::*;
13
14/// A very small async runtime, with support for adding more tasks as it runs. This uses a VecDeque
15/// internally.
16pub struct QueuedRuntime {
17    queue: RefCell<VecDeque<(BoxFuture<'static, ()>, u64)>>,
18    counter: u64,
19}
20
21impl QueuedRuntime {
22    /// Creates a new, empty QueuedRuntime. Awaiting this does nothing unless futures are pushed to
23    /// it.
24    pub fn new() -> Self {
25        Self {
26            queue: RefCell::new(VecDeque::new()),
27            counter: 0,
28        }
29    }
30
31    /// Creates a new QueuedRuntime. Unlike new(), this adds a single future immediately, so
32    /// awaiting this will have an effect.
33    pub fn new_with_boxed(future: BoxFuture<'static, ()>) -> Self {
34        let mut r = Self::new();
35        Runtime::push_boxed(&mut r, future);
36        r
37    }
38    /// Creates a new QueuedRuntime. Unlike new(), this adds a single future immediately, so
39    /// awaiting this will have an effect.
40    pub fn new_with(future: impl Future<Output = ()> + 'static) -> Self {
41        let mut r = Self::new();
42        r.push(future);
43        r
44    }
45}
46
47impl Default for QueuedRuntime {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl Future for QueuedRuntime {
54    type Output = ();
55
56    fn poll(
57        self: Pin<&mut Self>,
58        cx: &mut core::task::Context<'_>,
59    ) -> core::task::Poll<Self::Output> {
60        let me = self.get_mut();
61        if me.counter == u64::MAX {
62            return Poll::Ready(());
63        }
64        set_current_runtime(me);
65        let mut all_pending = true;
66        let mut i = 0;
67        // SAFETY: The queue *must* only be accessed from the thread that is executing the runtime,
68        // because the runtime does not implement Send. This makes these borrows necessarily
69        // exclusive.
70        let r = loop {
71            let mut q = me.queue.borrow_mut();
72            let Some(mut future) = q.pop_front() else { break Poll::Ready(()) };
73            mem::drop(q);
74            if future.0.as_mut().poll(cx).is_pending() {
75                me.queue.borrow_mut().push_back(future);
76            } else {
77                all_pending = false;
78            }
79            if me.counter == u64::MAX {
80                break Poll::Ready(());
81            }
82            i += 1;
83            // if queue was traversed with no progress made, stop
84            if i >= me.queue.borrow().len() {
85                if all_pending {
86                    break Poll::Pending;
87                }
88                all_pending = true;
89                i = 0;
90            }
91        };
92        clear_current_runtime();
93        r
94    }
95}
96
97impl InternalRuntime for QueuedRuntime {
98    fn push_boxed(&mut self, future: BoxFuture<'static, ()>) -> u64 {
99        if self.counter == u64::MAX {
100            return self.counter;
101        }
102        self.counter += 1;
103        self.queue.borrow_mut().push_back((future, self.counter));
104        self.counter
105    }
106
107    fn contains(&mut self, id: u64) -> bool {
108        self.queue.borrow().iter().any(|x| x.1 == id)
109    }
110
111    fn sleep<'b>(&self, duration: Duration) -> BoxFuture<'b, ()> {
112        Box::pin(crate::wait(duration))
113    }
114
115    fn stop(&mut self) -> Stop {
116        self.counter = u64::MAX;
117        Stop
118    }
119}