dropbear_future_queue/
lib.rs1use std::any::Any;
45use std::cell::RefCell;
46use std::collections::VecDeque;
47use std::pin::Pin;
48use std::sync::Arc;
49use ahash::{HashMap, HashMapExt};
50use parking_lot::Mutex;
51use tokio::sync::oneshot;
52use std::future::Future;
53use std::rc::Rc;
54use std::time::Instant;
55use tokio::runtime::Runtime;
56
57pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
61pub type AnyResult = Arc<dyn Any + Send + Sync>;
63type ResultReceiver = oneshot::Receiver<AnyResult>;
65pub type FutureStorage = Arc<Mutex<VecDeque<(FutureHandle, BoxFuture<()>)>>>;
67pub type Throwable<T> = Rc<RefCell<T>>;
69
70#[derive(Clone)]
72pub enum FutureStatus {
73 NotPolled,
74 CurrentlyPolling,
75 Completed(AnyResult),
76}
77
78#[derive(Default, Copy, Clone, Eq, Hash, PartialEq, Debug)]
80pub struct FutureHandle {
81 pub id: u64,
82}
83
84struct HandleEntry {
86 receiver: ResultReceiver,
87 status: FutureStatus,
88}
89
90pub struct FutureQueue {
92 queued: FutureStorage,
94 handle_registry: Arc<Mutex<HashMap<FutureHandle, HandleEntry>>>,
96 next_id: Arc<Mutex<u64>>,
98}
99
100impl FutureQueue {
101 pub fn new() -> Self {
103 Self {
104 queued: Arc::new(Mutex::new(VecDeque::new())),
105 handle_registry: Arc::new(Mutex::new(HashMap::new())),
106 next_id: Arc::new(Mutex::new(0)),
107 }
108 }
109
110 pub fn push<F, T>(&self, future: F) -> FutureHandle
113 where
114 F: Future<Output = T> + Send + 'static,
115 T: Send + Sync + 'static,
116 {
117 let mut next_id = self.next_id.lock();
118 let id = *next_id;
119 *next_id += 1;
120
121 let id = FutureHandle { id };
122
123 let (sender, receiver) = oneshot::channel::<AnyResult>();
124
125 let entry = HandleEntry {
126 receiver,
127 status: FutureStatus::NotPolled,
128 };
129
130 self.handle_registry.lock().insert(id, entry);
131
132 let registry_clone = self.handle_registry.clone();
133
134 let wrapped_future: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
135 log("Starting future execution");
136 let result = future.await;
137 let boxed_result: AnyResult = Arc::new(result);
138 log("Future completed, sending result");
139
140 let _ = sender.send(boxed_result.clone());
141
142 let mut registry = registry_clone.lock();
143 if let Some(entry) = registry.get_mut(&id) {
144 entry.status = FutureStatus::Completed(boxed_result);
145 log("Updated status to completed");
146 }
147 });
148
149 self.queued.lock().push_back((id, wrapped_future));
150
151 id
152 }
153
154 pub fn poll(&self) {
159 let mut queue = self.queued.lock();
160 log("Locked queue for polling");
161
162 if queue.is_empty() {
163 log("Queue is empty, nothing to poll");
164 return;
165 }
166
167 let mut futures_to_spawn = Vec::new();
168
169 while let Some((id, future)) = queue.pop_front() {
170 log(format!("Processing future with id: {:?}", id));
171
172 {
173 let mut registry = self.handle_registry.lock();
174 if let Some(entry) = registry.get_mut(&id) {
175 entry.status = FutureStatus::CurrentlyPolling;
176 log("Updated status to CurrentlyPolling");
177 }
178 }
179
180 futures_to_spawn.push(future);
181 }
182
183 drop(queue);
184
185 for future in futures_to_spawn {
186 log("Spawning future with tokio");
187 tokio::spawn(future);
188 }
189 }
190
191 pub fn exchange(&self, handle: &FutureHandle) -> Option<AnyResult> {
196 let mut registry = self.handle_registry.lock();
197 if let Some(entry) = registry.get_mut(handle) {
198 return match &entry.status {
199 FutureStatus::Completed(result) => {
200 log("FutureStatus::Completed - returning cached result");
201 Some(result.clone())
202 }
203 _ => {
204 log("Future not completed yet, checking receiver");
205 match entry.receiver.try_recv() {
206 Ok(result) => {
207 log("Received result from channel");
208 entry.status = FutureStatus::Completed(result.clone());
209 Some(result)
210 }
211 Err(oneshot::error::TryRecvError::Empty) => {
212 log("Channel is empty - future still running");
213 None
214 },
215 Err(oneshot::error::TryRecvError::Closed) => {
216 log("Channel is closed - future may have panicked");
217 None
218 },
219 }
220 }
221 }
222 } else {
223 log("Handle not found in registry");
224 }
225 None
226 }
227
228 pub fn exchange_as<T: Any + Send + Sync + 'static>(&self, handle: &FutureHandle) -> Option<Arc<T>> {
230 self.exchange(handle)?
231 .downcast()
232 .ok()
233 }
234
235 pub fn get_status(&self, handle: &FutureHandle) -> Option<FutureStatus> {
237 let registry = self.handle_registry.lock();
238 registry.get(handle).map(|entry| entry.status.clone())
239 }
240
241 pub fn cleanup(&self) {
245 let mut registry = self.handle_registry.lock();
246 let completed_ids: Vec<FutureHandle> = registry
247 .iter()
248 .filter_map(|(&id, entry)| {
249 matches!(entry.status, FutureStatus::Completed(_)).then_some(id)
250 })
251 .collect();
252
253 for id in completed_ids {
254 registry.remove(&id);
255 }
256 }
257}
258
259
260#[cfg(test)]
264fn log(msg: impl ToString) {
265 use std::io::Write;
266
267 let mut file = std::fs::OpenOptions::new().append(true).create(true).open("test.log").unwrap();
268 file.write_all(format!("{}\n", msg.to_string()).as_bytes()).unwrap();
269}
270
271#[cfg(not(test))]
272fn log(_msg: impl ToString) {
273
274}
275
276impl Default for FutureQueue {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282#[test]
283fn test_future_queue() {
284 use tokio::time::{sleep, Duration};
285
286 let rt = Runtime::new().unwrap();
287 let _guard = rt.enter();
288
289 let queue = FutureQueue::new();
290 log("Created new queue");
291
292 let handle = queue.push(async move {
293 log("Inside the pushed future - starting work");
294 sleep(Duration::from_millis(10)).await;
295 log("Inside the pushed future - work completed");
296 67 + 41
297 });
298 log("Created new handle");
299
300 queue.poll();
301 log("Initial poll completed");
302
303 let mut attempts = 0;
304 let max_attempts = 100;
305 let elapsed = Instant::now();
306
307 loop {
308 let now = Instant::now();
309 attempts += 1;
310 log(format!("Attempt {}: Checking for result", attempts));
311 log(format!("Time since last attempt: {} ms", elapsed.elapsed().as_millis() - now.elapsed().as_millis()));
312
313 if let Some(result) = queue.exchange(&handle) {
314 let result = result.downcast::<i32>().unwrap();
315 log(format!("Success! 67 + 41 = {}", result));
316 assert_eq!(*result, 108);
317 break;
318 }
319
320 if attempts >= max_attempts {
321 log("Max attempts reached - test failed");
322 panic!("Future never completed");
323 }
324 }
325
326 log("Test completed successfully");
327}