microasync_util/
queued_runtime.rs1extern crate alloc;
2#[cfg(not(feature = "no_std"))]
3extern crate std;
4
5use core::mem;
6use core::ops::Deref;
7use core::ptr::null_mut;
8use core::{cell::RefCell, future::Future, pin::Pin, task::Poll};
9
10use alloc::collections::VecDeque;
11use microasync::{prep, BoxFuture};
12
13struct ForceSync<T>(T);
14
15unsafe impl<T> Send for ForceSync<T> {}
16unsafe impl<T> Sync for ForceSync<T> {}
17impl<T> Deref for ForceSync<T> {
18 type Target = T;
19
20 fn deref(&self) -> &Self::Target {
21 &self.0
22 }
23}
24
25#[cfg(feature = "no_std")]
26static CURRENT_RUNTIME: ForceSync<RefCell<*mut QueuedRuntime>> =
28 ForceSync(RefCell::new(null_mut()));
29
30#[cfg(not(feature = "no_std"))]
31std::thread_local! {
32 static CURRENT_RUNTIME: RefCell<*mut QueuedRuntime> = RefCell::new(null_mut());
33}
34
35pub struct QueuedRuntime {
38 queue: RefCell<VecDeque<BoxFuture<'static, ()>>>,
39}
40
41impl QueuedRuntime {
42 pub fn new() -> Self {
45 Self {
46 queue: RefCell::new(VecDeque::new()),
47 }
48 }
49
50 pub fn new_with_boxed(future: BoxFuture<'static, ()>) -> Self {
53 let mut r = Self::new();
54 r.push_boxed(future);
55 r
56 }
57 pub fn new_with(future: impl Future<Output = ()> + 'static) -> Self {
60 let mut r = Self::new();
61 r.push(future);
62 r
63 }
64
65 pub fn push_boxed(&mut self, future: BoxFuture<'static, ()>) -> &mut Self {
67 self.queue.borrow_mut().push_back(future);
68 self
69 }
70
71 pub fn push(&mut self, future: impl Future<Output = ()> + 'static) -> &mut Self {
73 self.queue.borrow_mut().push_back(prep(future));
74 self
75 }
76}
77
78impl Default for QueuedRuntime {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl Future for QueuedRuntime {
85 type Output = ();
86
87 fn poll(
88 self: Pin<&mut Self>,
89 cx: &mut core::task::Context<'_>,
90 ) -> core::task::Poll<Self::Output> {
91 let me = self.get_mut();
92 #[cfg(feature = "no_std")]
93 {
94 *CURRENT_RUNTIME.borrow_mut() = me as *mut _;
95 }
96 #[cfg(not(feature = "no_std"))]
97 {
98 CURRENT_RUNTIME.with(|x| *x.borrow_mut() = me as *mut _);
99 }
100 let mut all_pending = true;
101 let mut i = 0;
102 let r = loop {
106 let mut q = me.queue.borrow_mut();
107 let Some(mut future) = q.pop_front() else { break Poll::Ready(()) };
108 mem::drop(q);
109 if future.as_mut().poll(cx).is_pending() {
110 me.queue.borrow_mut().push_back(future);
111 }
112 else {
113 all_pending = false;
114 }
115 i += 1;
116 if i >= me.queue.borrow().len() {
118 if all_pending {
119 break Poll::Pending;
120 }
121 all_pending = true;
122 }
123 };
124 #[cfg(feature = "no_std")]
125 {
126 *CURRENT_RUNTIME.borrow_mut() = null_mut();
127 }
128 #[cfg(not(feature = "no_std"))]
129 {
130 CURRENT_RUNTIME.with(|x| *x.borrow_mut() = null_mut());
131 }
132 r
133 }
134}
135
136#[cfg(feature = "no_std")]
137pub async fn get_current_runtime<'a>() -> &'a mut QueuedRuntime {
140 let it = CURRENT_RUNTIME.borrow();
141 unsafe {
144 if let Some(x) = it.as_mut() {
145 x
146 } else {
147 panic!("get_current_runtime MUST only be called from a future running within a QueuedRuntime!")
148 }
149 }
150}
151
152#[cfg(not(feature = "no_std"))]
153pub async fn get_current_runtime<'a>() -> &'a mut QueuedRuntime {
155 let it = CURRENT_RUNTIME.with(|x| *x.borrow());
156 unsafe {
159 if let Some(x) = it.as_mut() {
160 x
161 } else {
162 panic!("get_current_runtime MUST only be called from a future running within a QueuedRuntime!")
163 }
164 }
165}