microasync_rt/
queued_runtime.rs1extern crate alloc;
2#[cfg(not(feature = "no_std"))]
3extern crate std;
4
5use core::mem;
6use core::time::Duration;
7use core::{cell::RefCell, future::Future, pin::Pin, task::Poll};
8
9use alloc::boxed::Box;
10use alloc::collections::VecDeque;
11
12use async_core::*;
13
14pub struct QueuedRuntime {
17 queue: RefCell<VecDeque<(BoxFuture<'static, ()>, u64)>>,
18 counter: u64,
19}
20
21impl QueuedRuntime {
22 pub fn new() -> Self {
25 Self {
26 queue: RefCell::new(VecDeque::new()),
27 counter: 0,
28 }
29 }
30
31 pub fn new_with_boxed(future: BoxFuture<'static, ()>) -> Self {
34 let mut r = Self::new();
35 Runtime::push_boxed(&mut r, future);
36 r
37 }
38 pub fn new_with(future: impl Future<Output = ()> + 'static) -> Self {
41 let mut r = Self::new();
42 r.push(future);
43 r
44 }
45}
46
47impl Default for QueuedRuntime {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl Future for QueuedRuntime {
54 type Output = ();
55
56 fn poll(
57 self: Pin<&mut Self>,
58 cx: &mut core::task::Context<'_>,
59 ) -> core::task::Poll<Self::Output> {
60 let me = self.get_mut();
61 if me.counter == u64::MAX {
62 return Poll::Ready(());
63 }
64 set_current_runtime(me);
65 let mut all_pending = true;
66 let mut i = 0;
67 let r = loop {
71 let mut q = me.queue.borrow_mut();
72 let Some(mut future) = q.pop_front() else { break Poll::Ready(()) };
73 mem::drop(q);
74 if future.0.as_mut().poll(cx).is_pending() {
75 me.queue.borrow_mut().push_back(future);
76 } else {
77 all_pending = false;
78 }
79 if me.counter == u64::MAX {
80 break Poll::Ready(());
81 }
82 i += 1;
83 if i >= me.queue.borrow().len() {
85 if all_pending {
86 break Poll::Pending;
87 }
88 all_pending = true;
89 i = 0;
90 }
91 };
92 clear_current_runtime();
93 r
94 }
95}
96
97impl InternalRuntime for QueuedRuntime {
98 fn push_boxed(&mut self, future: BoxFuture<'static, ()>) -> u64 {
99 if self.counter == u64::MAX {
100 return self.counter;
101 }
102 self.counter += 1;
103 self.queue.borrow_mut().push_back((future, self.counter));
104 self.counter
105 }
106
107 fn contains(&mut self, id: u64) -> bool {
108 self.queue.borrow().iter().any(|x| x.1 == id)
109 }
110
111 fn sleep<'b>(&self, duration: Duration) -> BoxFuture<'b, ()> {
112 Box::pin(crate::wait(duration))
113 }
114
115 fn stop(&mut self) -> Stop {
116 self.counter = u64::MAX;
117 Stop
118 }
119}