dropbear_future_queue/
lib.rs1use std::any::Any;
38use std::cell::RefCell;
39use std::collections::VecDeque;
40use std::pin::Pin;
41use std::sync::Arc;
42use ahash::{HashMap, HashMapExt};
43use parking_lot::Mutex;
44use tokio::sync::oneshot;
45use std::future::Future;
46use std::rc::Rc;
47
48pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
52pub type AnyResult = Arc<dyn Any + Send + Sync>;
54type ResultReceiver = oneshot::Receiver<AnyResult>;
56pub type FutureStorage = Arc<Mutex<VecDeque<(FutureHandle, BoxFuture<()>)>>>;
58pub type Throwable<T> = Rc<RefCell<T>>;
60
61#[derive(Clone, Debug)]
63pub enum FutureStatus {
64 NotPolled,
65 CurrentlyPolling,
66 Completed,
67}
68
69#[derive(Default, Copy, Clone, Eq, Hash, PartialEq, Debug)]
71pub struct FutureHandle {
72 pub id: u64,
73}
74
75struct HandleEntry {
77 receiver: Option<ResultReceiver>,
78 status: FutureStatus,
79 cached_result: Option<AnyResult>,
80}
81
82pub struct FutureQueue {
84 queued: FutureStorage,
86 handle_registry: Arc<Mutex<HashMap<FutureHandle, HandleEntry>>>,
88 next_id: Arc<Mutex<u64>>,
90}
91
92impl FutureQueue {
93 pub fn new() -> Self {
95 Self {
96 queued: Arc::new(Mutex::new(VecDeque::new())),
97 handle_registry: Arc::new(Mutex::new(HashMap::new())),
98 next_id: Arc::new(Mutex::new(0)),
99 }
100 }
101
102 pub fn push<F, T>(&self, future: F) -> FutureHandle
105 where
106 F: Future<Output = T> + Send + 'static,
107 T: Send + Sync + 'static,
108 {
109 let mut next_id = self.next_id.lock();
110 let id = *next_id;
111 *next_id += 1;
112
113 let id = FutureHandle { id };
114
115 let (sender, receiver) = oneshot::channel::<AnyResult>();
116
117 let entry = HandleEntry {
118 receiver: Some(receiver),
119 status: FutureStatus::NotPolled,
120 cached_result: None,
121 };
122
123 self.handle_registry.lock().insert(id, entry);
124
125 let wrapped_future: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
126 log("Starting future execution");
127 let result = future.await;
128 let boxed_result: AnyResult = Arc::new(result);
129 log("Future completed, sending result");
130
131 let _ = sender.send(boxed_result);
132
133 log("Result sent via channel");
135 });
136
137 self.queued.lock().push_back((id, wrapped_future));
138
139 id
140 }
141
142 pub fn poll(&self) {
147 let mut queue = self.queued.lock();
148 log("Locked queue for polling");
149
150 if queue.is_empty() {
151 log("Queue is empty, nothing to poll");
152 return;
153 }
154
155 let mut futures_to_spawn = Vec::new();
156
157 while let Some((id, future)) = queue.pop_front() {
158 log(format!("Processing future with id: {:?}", id));
159
160 {
161 let mut registry = self.handle_registry.lock();
162 if let Some(entry) = registry.get_mut(&id) {
163 entry.status = FutureStatus::CurrentlyPolling;
164 log("Updated status to CurrentlyPolling");
165 }
166 }
167
168 futures_to_spawn.push(future);
169 }
170
171 drop(queue);
172
173 for future in futures_to_spawn {
174 log("Spawning future with tokio");
175 tokio::spawn(future);
176 }
177 }
178
179 pub fn exchange(&self, handle: &FutureHandle) -> Option<AnyResult> {
184 let mut registry = self.handle_registry.lock();
185 if let Some(entry) = registry.get_mut(handle) {
186 match &entry.status {
187 FutureStatus::Completed => {
188 log("FutureStatus::Completed - returning cached result");
189 entry.cached_result.clone()
190 }
191 _ => {
192 log("Future not completed yet, checking receiver");
193 if let Some(receiver) = entry.receiver.as_mut() {
194 match receiver.try_recv() {
195 Ok(result) => {
196 log("Received result from channel");
197 entry.status = FutureStatus::Completed;
198 entry.cached_result = Some(result.clone());
199 entry.receiver = None; Some(result)
201 }
202 Err(oneshot::error::TryRecvError::Empty) => {
203 log("Channel is empty - future still running");
204 None
205 }
206 Err(oneshot::error::TryRecvError::Closed) => {
207 log("Channel is closed - future may have panicked");
208 None
209 }
210 }
211 } else {
212 log("No receiver available");
213 None
214 }
215 }
216 }
217 } else {
218 log("Handle not found in registry");
219 None
220 }
221 }
222
223 pub fn exchange_owned(&self, handle: &FutureHandle) -> Option<AnyResult> {
229 let mut registry = self.handle_registry.lock();
230 if let Some(entry) = registry.get_mut(handle) {
231 match &entry.status {
232 FutureStatus::Completed => {
233 log("FutureStatus::Completed - taking ownership of cached result");
234 entry.cached_result.take()
235 }
236 _ => {
237 log("Future not completed yet, checking receiver");
238 if let Some(receiver) = entry.receiver.as_mut() {
239 match receiver.try_recv() {
240 Ok(result) => {
241 log("Received result from channel");
242 entry.status = FutureStatus::Completed;
243 entry.receiver = None; Some(result)
245 }
246 Err(oneshot::error::TryRecvError::Empty) => {
247 log("Channel is empty - future still running");
248 None
249 }
250 Err(oneshot::error::TryRecvError::Closed) => {
251 log("Channel is closed - future may have panicked");
252 None
253 }
254 }
255 } else {
256 log("No receiver available");
257 None
258 }
259 }
260 }
261 } else {
262 log("Handle not found in registry");
263 None
264 }
265 }
266
267 pub fn exchange_as<T: Any + Send + Sync + 'static>(&self, handle: &FutureHandle) -> Option<T> {
269 self.exchange(handle)?
270 .downcast::<T>()
271 .ok()
272 .and_then(|arc| Arc::try_unwrap(arc).ok())
273 }
274
275 pub fn exchange_owned_as<T: Any + Send + Sync + 'static>(&self, handle: &FutureHandle) -> Option<T> {
278 self.exchange_owned(handle)?
279 .downcast::<T>()
280 .ok()
281 .and_then(|arc| Arc::try_unwrap(arc).ok())
282 }
283
284 pub fn get_status(&self, handle: &FutureHandle) -> Option<FutureStatus> {
286 let registry = self.handle_registry.lock();
287 registry.get(handle).map(|entry| entry.status.clone())
288 }
289
290 pub fn cleanup(&self) {
294 let mut registry = self.handle_registry.lock();
295 let completed_ids: Vec<FutureHandle> = registry
296 .iter()
297 .filter_map(|(&id, entry)| {
298 matches!(entry.status, FutureStatus::Completed).then_some(id)
299 })
300 .collect();
301
302 for id in completed_ids {
303 registry.remove(&id);
304 }
305 }
306}
307
308
309#[cfg(test)]
313fn log(msg: impl ToString) {
314 use std::io::Write;
315
316 let mut file = std::fs::OpenOptions::new().append(true).create(true).open("test.log").unwrap();
317 file.write_all(format!("{}\n", msg.to_string()).as_bytes()).unwrap();
318}
319
320#[cfg(not(test))]
321fn log(_msg: impl ToString) {
322
323}
324
325impl Default for FutureQueue {
326 fn default() -> Self {
327 Self::new()
328 }
329}
330
331#[test]
332fn test_future_queue() {
333 tokio::runtime::Builder::new_multi_thread()
334 .enable_all()
335 .build()
336 .unwrap()
337 .block_on(async {
338 let queue = FutureQueue::new();
339 log("Created new queue");
340
341 let handle = queue.push(async move {
342 log("Inside the pushed future - starting work");
343 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
344 log("Inside the pushed future - work completed");
345 67 + 41
346 });
347 log("Created new handle");
348
349 queue.poll();
350 log("Initial poll completed");
351
352 let mut attempts = 0;
353 let max_attempts = 100;
354 let start_time = std::time::Instant::now();
355
356 loop {
357 attempts += 1;
358 log(format!("Attempt {}: Checking for result", attempts));
359 log(format!("Time since start: {} ms", start_time.elapsed().as_millis()));
360
361 if let Some(result) = queue.exchange(&handle) {
362 let result = result.downcast::<i32>().unwrap();
363 log(format!("Success! 67 + 41 = {}", result));
364 assert_eq!(*result, 108);
365 break;
366 }
367
368 if attempts >= max_attempts {
369 log("Max attempts reached - test failed");
370 panic!("Future never completed");
371 }
372
373 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
374 }
375
376 log("Test completed successfully");
377 });
378}
379
380#[test]
382fn test_exchange_owned_as() {
383 tokio::runtime::Builder::new_multi_thread()
384 .enable_all()
385 .build()
386 .unwrap()
387 .block_on(async {
388 let queue = FutureQueue::new();
389
390 let handle = queue.push(async move {
391 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
392 67 + 41
393 });
394
395 queue.poll();
396
397 let mut attempts = 0;
398 let max_attempts = 100;
399
400 loop {
401 attempts += 1;
402
403 if let Some(result) = queue.exchange_owned_as::<i32>(&handle) {
404 assert_eq!(result, 108);
405 break;
406 }
407
408 if attempts >= max_attempts {
409 panic!("Future never completed");
410 }
411
412 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
413 }
414
415 assert!(queue.exchange_owned_as::<i32>(&handle).is_none());
416 });
417}