microasync_util/
queued_runtime.rs

1extern crate alloc;
2#[cfg(not(feature = "no_std"))]
3extern crate std;
4
5use core::mem;
6use core::ops::Deref;
7use core::ptr::null_mut;
8use core::{cell::RefCell, future::Future, pin::Pin, task::Poll};
9
10use alloc::collections::VecDeque;
11use microasync::{prep, BoxFuture};
12
13struct ForceSync<T>(T);
14
15unsafe impl<T> Send for ForceSync<T> {}
16unsafe impl<T> Sync for ForceSync<T> {}
17impl<T> Deref for ForceSync<T> {
18    type Target = T;
19
20    fn deref(&self) -> &Self::Target {
21        &self.0
22    }
23}
24
25#[cfg(feature = "no_std")]
26// SAFETY: We can ForceSync this because we assume no_std means we won't do threading.
27static CURRENT_RUNTIME: ForceSync<RefCell<*mut QueuedRuntime>> =
28    ForceSync(RefCell::new(null_mut()));
29
30#[cfg(not(feature = "no_std"))]
31std::thread_local! {
32    static CURRENT_RUNTIME: RefCell<*mut QueuedRuntime> = RefCell::new(null_mut());
33}
34
35/// A very small async runtime, with support for adding more tasks as it runs. This uses a VecDeque
36/// internally.
37pub struct QueuedRuntime {
38    queue: RefCell<VecDeque<BoxFuture<'static, ()>>>,
39}
40
41impl QueuedRuntime {
42    /// Creates a new, empty QueuedRuntime. Awaiting this does nothing unless futures are pushed to
43    /// it.
44    pub fn new() -> Self {
45        Self {
46            queue: RefCell::new(VecDeque::new()),
47        }
48    }
49
50    /// Creates a new QueuedRuntime. Unlike new(), this adds a single future immediately, so
51    /// awaiting this will have an effect.
52    pub fn new_with_boxed(future: BoxFuture<'static, ()>) -> Self {
53        let mut r = Self::new();
54        r.push_boxed(future);
55        r
56    }
57    /// Creates a new QueuedRuntime. Unlike new(), this adds a single future immediately, so
58    /// awaiting this will have an effect.
59    pub fn new_with(future: impl Future<Output = ()> + 'static) -> Self {
60        let mut r = Self::new();
61        r.push(future);
62        r
63    }
64
65    /// Adds a new future to the queue to be completed.
66    pub fn push_boxed(&mut self, future: BoxFuture<'static, ()>) -> &mut Self {
67        self.queue.borrow_mut().push_back(future);
68        self
69    }
70
71    /// Adds a new future to the queue to be completed.
72    pub fn push(&mut self, future: impl Future<Output = ()> + 'static) -> &mut Self {
73        self.queue.borrow_mut().push_back(prep(future));
74        self
75    }
76}
77
78impl Default for QueuedRuntime {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl Future for QueuedRuntime {
85    type Output = ();
86
87    fn poll(
88        self: Pin<&mut Self>,
89        cx: &mut core::task::Context<'_>,
90    ) -> core::task::Poll<Self::Output> {
91        let me = self.get_mut();
92        #[cfg(feature = "no_std")]
93        {
94            *CURRENT_RUNTIME.borrow_mut() = me as *mut _;
95        }
96        #[cfg(not(feature = "no_std"))]
97        {
98            CURRENT_RUNTIME.with(|x| *x.borrow_mut() = me as *mut _);
99        }
100        let mut all_pending = true;
101        let mut i = 0;
102        // SAFETY: The queue *must* only be accessed from the thread that is executing the runtime,
103        // because the runtime does not implement Send. This makes these borrows necessarily
104        // exclusive.
105        let r = loop {
106            let mut q = me.queue.borrow_mut();
107            let Some(mut future) = q.pop_front() else { break Poll::Ready(()) };
108            mem::drop(q);
109            if future.as_mut().poll(cx).is_pending() {
110                me.queue.borrow_mut().push_back(future);
111            }
112            else {
113                all_pending = false;
114            }
115            i += 1;
116            // if queue was traversed with no progress made, stop
117            if i >= me.queue.borrow().len() {
118                if all_pending {
119                    break Poll::Pending;
120                }
121                all_pending = true;
122            }
123        };
124        #[cfg(feature = "no_std")]
125        {
126            *CURRENT_RUNTIME.borrow_mut() = null_mut();
127        }
128        #[cfg(not(feature = "no_std"))]
129        {
130            CURRENT_RUNTIME.with(|x| *x.borrow_mut() = null_mut());
131        }
132        r
133    }
134}
135
136#[cfg(feature = "no_std")]
137/// This assumes a single-threaded environment. Attempting to use this in a multi-threaded
138/// environment is highly unsafe and should NEVER be done.
139pub async fn get_current_runtime<'a>() -> &'a mut QueuedRuntime {
140    let it = CURRENT_RUNTIME.borrow();
141    // SAFETY: CURRENT_RUNTIME *MUST* be set to null when QueuedRuntime finishes a poll, so it
142    // *cannot* be freed while this is non-null
143    unsafe {
144        if let Some(x) = it.as_mut() {
145            x
146        } else {
147            panic!("get_current_runtime MUST only be called from a future running within a QueuedRuntime!")
148        }
149    }
150}
151
152#[cfg(not(feature = "no_std"))]
153/// This gets the currently running runtime. PANICS IF IT IS CALLED FROM OUTSIDE THE RUNTIME.
154pub async fn get_current_runtime<'a>() -> &'a mut QueuedRuntime {
155    let it = CURRENT_RUNTIME.with(|x| *x.borrow());
156    // SAFETY: CURRENT_RUNTIME *MUST* be set to null when QueuedRuntime finishes a poll, so it
157    // *cannot* be freed while this is non-null
158    unsafe {
159        if let Some(x) = it.as_mut() {
160            x
161        } else {
162            panic!("get_current_runtime MUST only be called from a future running within a QueuedRuntime!")
163        }
164    }
165}