dropbear_future_queue/
lib.rs1use std::any::Any;
45use std::cell::RefCell;
46use std::collections::VecDeque;
47use std::pin::Pin;
48use std::sync::Arc;
49use ahash::{HashMap, HashMapExt};
50use parking_lot::Mutex;
51use tokio::sync::oneshot;
52use std::future::Future;
53use std::rc::Rc;
54
55pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
59pub type AnyResult = Arc<dyn Any + Send + Sync>;
61type ResultReceiver = oneshot::Receiver<AnyResult>;
63pub type FutureStorage = Arc<Mutex<VecDeque<(FutureHandle, BoxFuture<()>)>>>;
65pub type Throwable<T> = Rc<RefCell<T>>;
67
68#[derive(Clone)]
70pub enum FutureStatus {
71 NotPolled,
72 CurrentlyPolling,
73 Completed(AnyResult),
74}
75
76#[derive(Default, Copy, Clone, Eq, Hash, PartialEq, Debug)]
78pub struct FutureHandle {
79 pub id: u64,
80}
81
82struct HandleEntry {
84 receiver: ResultReceiver,
85 status: FutureStatus,
86}
87
88pub struct FutureQueue {
90 queued: FutureStorage,
92 handle_registry: Arc<Mutex<HashMap<FutureHandle, HandleEntry>>>,
94 next_id: Arc<Mutex<u64>>,
96}
97
98impl FutureQueue {
99 pub fn new() -> Self {
101 Self {
102 queued: Arc::new(Mutex::new(VecDeque::new())),
103 handle_registry: Arc::new(Mutex::new(HashMap::new())),
104 next_id: Arc::new(Mutex::new(0)),
105 }
106 }
107
108 pub fn push<F, T>(&self, future: F) -> FutureHandle
111 where
112 F: Future<Output = T> + Send + 'static,
113 T: Send + Sync + 'static,
114 {
115 let mut next_id = self.next_id.lock();
116 let id = *next_id;
117 *next_id += 1;
118
119 let id = FutureHandle { id };
120
121 let (sender, receiver) = oneshot::channel::<AnyResult>();
122
123 let entry = HandleEntry {
124 receiver,
125 status: FutureStatus::NotPolled,
126 };
127
128 self.handle_registry.lock().insert(id, entry);
129
130 let registry_clone = self.handle_registry.clone();
131
132 let wrapped_future: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
133 log("Starting future execution");
134 let result = future.await;
135 let boxed_result: AnyResult = Arc::new(result);
136 log("Future completed, sending result");
137
138 let _ = sender.send(boxed_result.clone());
139
140 let mut registry = registry_clone.lock();
141 if let Some(entry) = registry.get_mut(&id) {
142 entry.status = FutureStatus::Completed(boxed_result);
143 log("Updated status to completed");
144 }
145 });
146
147 self.queued.lock().push_back((id, wrapped_future));
148
149 id
150 }
151
152 pub fn poll(&self) {
157 let mut queue = self.queued.lock();
158 log("Locked queue for polling");
159
160 if queue.is_empty() {
161 log("Queue is empty, nothing to poll");
162 return;
163 }
164
165 let mut futures_to_spawn = Vec::new();
166
167 while let Some((id, future)) = queue.pop_front() {
168 log(format!("Processing future with id: {:?}", id));
169
170 {
171 let mut registry = self.handle_registry.lock();
172 if let Some(entry) = registry.get_mut(&id) {
173 entry.status = FutureStatus::CurrentlyPolling;
174 log("Updated status to CurrentlyPolling");
175 }
176 }
177
178 futures_to_spawn.push(future);
179 }
180
181 drop(queue);
182
183 for future in futures_to_spawn {
184 log("Spawning future with tokio");
185 tokio::spawn(future);
186 }
187 }
188
189 pub fn exchange(&self, handle: &FutureHandle) -> Option<AnyResult> {
194 let mut registry = self.handle_registry.lock();
195 if let Some(entry) = registry.get_mut(handle) {
196 return match &entry.status {
197 FutureStatus::Completed(result) => {
198 log("FutureStatus::Completed - returning cached result");
199 Some(result.clone())
200 }
201 _ => {
202 log("Future not completed yet, checking receiver");
203 match entry.receiver.try_recv() {
204 Ok(result) => {
205 log("Received result from channel");
206 entry.status = FutureStatus::Completed(result.clone());
207 Some(result)
208 }
209 Err(oneshot::error::TryRecvError::Empty) => {
210 log("Channel is empty - future still running");
211 None
212 },
213 Err(oneshot::error::TryRecvError::Closed) => {
214 log("Channel is closed - future may have panicked");
215 None
216 },
217 }
218 }
219 }
220 } else {
221 log("Handle not found in registry");
222 }
223 None
224 }
225
226 pub fn exchange_as<T: Any + Send + Sync + 'static>(&self, handle: &FutureHandle) -> Option<Arc<T>> {
228 self.exchange(handle)?
229 .downcast()
230 .ok()
231 }
232
233 pub fn get_status(&self, handle: &FutureHandle) -> Option<FutureStatus> {
235 let registry = self.handle_registry.lock();
236 registry.get(handle).map(|entry| entry.status.clone())
237 }
238
239 pub fn cleanup(&self) {
243 let mut registry = self.handle_registry.lock();
244 let completed_ids: Vec<FutureHandle> = registry
245 .iter()
246 .filter_map(|(&id, entry)| {
247 matches!(entry.status, FutureStatus::Completed(_)).then_some(id)
248 })
249 .collect();
250
251 for id in completed_ids {
252 registry.remove(&id);
253 }
254 }
255}
256
257
258#[cfg(test)]
262fn log(msg: impl ToString) {
263 use std::io::Write;
264
265 let mut file = std::fs::OpenOptions::new().append(true).create(true).open("test.log").unwrap();
266 file.write_all(format!("{}\n", msg.to_string()).as_bytes()).unwrap();
267}
268
269#[cfg(not(test))]
270fn log(_msg: impl ToString) {
271
272}
273
274impl Default for FutureQueue {
275 fn default() -> Self {
276 Self::new()
277 }
278}
279
280#[test]
281fn test_future_queue() {
282 let rt = tokio::runtime::Runtime::new().unwrap();
283 let _guard = rt.enter();
284
285 let queue = FutureQueue::new();
286 log("Created new queue");
287
288 let handle = queue.push(async move {
289 log("Inside the pushed future - starting work");
290 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
291 log("Inside the pushed future - work completed");
292 67 + 41
293 });
294 log("Created new handle");
295
296 queue.poll();
297 log("Initial poll completed");
298
299 let mut attempts = 0;
300 let max_attempts = 100;
301 let elapsed = std::time::Instant::now();
302
303 loop {
304 let now = std::time::Instant::now();
305 attempts += 1;
306 log(format!("Attempt {}: Checking for result", attempts));
307 log(format!("Time since last attempt: {} ms", elapsed.elapsed().as_millis() - now.elapsed().as_millis()));
308
309 if let Some(result) = queue.exchange(&handle) {
310 let result = result.downcast::<i32>().unwrap();
311 log(format!("Success! 67 + 41 = {}", result));
312 assert_eq!(*result, 108);
313 break;
314 }
315
316 if attempts >= max_attempts {
317 log("Max attempts reached - test failed");
318 panic!("Future never completed");
319 }
320 }
321
322 log("Test completed successfully");
323}