dropbear_future_queue/
lib.rs

1//! Enabling multithreading for functions and apps that are purely single threaded.
2//!
3//! This was originally a module in my [dropbear](https://github.com/4tkbytes/dropbear) game engine,
4//! however I thought there were barely any libraries that had future queuing. It takes inspiration
5//! from Unity and how they handle their events.
6//!
7//! # Example
8//! ```rust
9//! use dropbear_future_queue::{FutureQueue, FutureStatus};
10//!
11//! # tokio_test::block_on(async {
12//! // create new queue
13//! let queue = FutureQueue::new();
14//!
15//! // create a new handle to keep for reference
16//! let handle = queue.push(async move {
17//!     tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
18//!     67 + 41
19//! });
20//!
21//! // Check initial status
22//! assert!(matches!(queue.get_status(&handle), Some(FutureStatus::NotPolled)));
23//!
24//! // execute the futures
25//! queue.poll();
26//!
27//! // Wait a bit for completion and check result
28//! tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
29//!
30//! if let Some(result) = queue.exchange_as::<i32>(&handle) {
31//!     println!("67 + 41 = {}", result);
32//!     assert_eq!(result, 108);
33//! }
34//! # });
35//! ```
36
37use std::any::Any;
38use std::cell::RefCell;
39use std::collections::VecDeque;
40use std::pin::Pin;
41use std::sync::Arc;
42use ahash::{HashMap, HashMapExt};
43use parking_lot::Mutex;
44use tokio::sync::oneshot;
45use std::future::Future;
46use std::rc::Rc;
47
48/// A type used for a future.
49///
50/// It must include a [`Send`] trait to be usable for the [`FutureQueue`]
51pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
52/// A clonable generic result. It can/preferred to be downcasted to your specific type.
53pub type AnyResult = Arc<dyn Any + Send + Sync>;
54/// Internal function: A result receiver
55type ResultReceiver = oneshot::Receiver<AnyResult>;
56/// A type for storing the queue. It uses a [`VecDeque`] to store [`FutureHandle`]'s and [`BoxFuture`]
57pub type FutureStorage = Arc<Mutex<VecDeque<(FutureHandle, BoxFuture<()>)>>>;
58/// A type recommended to be used by [`FutureQueue`] to allow being thrown around in your app
59pub type Throwable<T> = Rc<RefCell<T>>;
60
61/// A status showing the future, used by the [`ResultReceiver`] and [`ResultSender`]
62#[derive(Clone, Debug)]
63pub enum FutureStatus {
64    NotPolled,
65    CurrentlyPolling,
66    Completed,
67}
68
69/// A handle to the future task
70#[derive(Default, Copy, Clone, Eq, Hash, PartialEq, Debug)]
71pub struct FutureHandle {
72    pub id: u64,
73}
74
75/// Internal storage per handle — separate from FutureHandle
76struct HandleEntry {
77    receiver: Option<ResultReceiver>,
78    status: FutureStatus,
79    cached_result: Option<AnyResult>,
80}
81
82/// A queue for polling futures. It is stored in here until [`FutureQueue::poll`] is run.
83pub struct FutureQueue {
84    /// The queue for the futures.
85    queued: FutureStorage,
86    /// A place to store all handle data
87    handle_registry: Arc<Mutex<HashMap<FutureHandle, HandleEntry>>>,
88    /// Next id to be processed
89    next_id: Arc<Mutex<u64>>,
90}
91
92impl FutureQueue {
93    /// Creates a new [`Arc<FutureQueue>`].
94    pub fn new() -> Self {
95        Self {
96            queued: Arc::new(Mutex::new(VecDeque::new())),
97            handle_registry: Arc::new(Mutex::new(HashMap::new())),
98            next_id: Arc::new(Mutex::new(0)),
99        }
100    }
101
102    /// Pushes a future to the FutureQueue. It will sit and wait
103    /// to be processed until [`FutureQueue::poll`] is called.
104    pub fn push<F, T>(&self, future: F) -> FutureHandle
105    where
106        F: Future<Output = T> + Send + 'static,
107        T: Send + Sync + 'static,
108    {
109        let mut next_id = self.next_id.lock();
110        let id = *next_id;
111        *next_id += 1;
112
113        let id = FutureHandle { id };
114
115        let (sender, receiver) = oneshot::channel::<AnyResult>();
116
117        let entry = HandleEntry {
118            receiver: Some(receiver),
119            status: FutureStatus::NotPolled,
120            cached_result: None,
121        };
122
123        self.handle_registry.lock().insert(id, entry);
124
125        let wrapped_future: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
126            log("Starting future execution");
127            let result = future.await;
128            let boxed_result: AnyResult = Arc::new(result);
129            log("Future completed, sending result");
130
131            let _ = sender.send(boxed_result);
132
133            // Don't update status here - let the exchange method handle it
134            log("Result sent via channel");
135        });
136
137        self.queued.lock().push_back((id, wrapped_future));
138
139        id
140    }
141
142    /// Polls all the futures in the future queue and resolves the handles.
143    ///
144    /// This function spawns a new async thread for each item inside the thread and
145    /// sends updates to the Handle's receiver.
146    pub fn poll(&self) {
147        let mut queue = self.queued.lock();
148        log("Locked queue for polling");
149
150        if queue.is_empty() {
151            log("Queue is empty, nothing to poll");
152            return;
153        }
154
155        let mut futures_to_spawn = Vec::new();
156
157        while let Some((id, future)) = queue.pop_front() {
158            log(format!("Processing future with id: {:?}", id));
159
160            {
161                let mut registry = self.handle_registry.lock();
162                if let Some(entry) = registry.get_mut(&id) {
163                    entry.status = FutureStatus::CurrentlyPolling;
164                    log("Updated status to CurrentlyPolling");
165                }
166            }
167
168            futures_to_spawn.push(future);
169        }
170
171        drop(queue);
172
173        for future in futures_to_spawn {
174            log("Spawning future with tokio");
175            tokio::spawn(future);
176        }
177    }
178
179    /// Exchanges the future for the result.
180    ///
181    /// When the handle is not successful, it will return nothing. When the handle is successful,
182    /// it will return the result. The result is cached and can be retrieved multiple times.
183    pub fn exchange(&self, handle: &FutureHandle) -> Option<AnyResult> {
184        let mut registry = self.handle_registry.lock();
185        if let Some(entry) = registry.get_mut(handle) {
186            match &entry.status {
187                FutureStatus::Completed => {
188                    log("FutureStatus::Completed - returning cached result");
189                    entry.cached_result.clone()
190                }
191                _ => {
192                    log("Future not completed yet, checking receiver");
193                    if let Some(receiver) = entry.receiver.as_mut() {
194                        match receiver.try_recv() {
195                            Ok(result) => {
196                                log("Received result from channel");
197                                entry.status = FutureStatus::Completed;
198                                entry.cached_result = Some(result.clone());
199                                entry.receiver = None; // Remove receiver as it's no longer needed
200                                Some(result)
201                            }
202                            Err(oneshot::error::TryRecvError::Empty) => {
203                                log("Channel is empty - future still running");
204                                None
205                            }
206                            Err(oneshot::error::TryRecvError::Closed) => {
207                                log("Channel is closed - future may have panicked");
208                                None
209                            }
210                        }
211                    } else {
212                        log("No receiver available");
213                        None
214                    }
215                }
216            }
217        } else {
218            log("Handle not found in registry");
219            None
220        }
221    }
222
223    /// Exchanges the future for the result, taking ownership and consuming the cached result.
224    ///
225    /// When the handle is not successful, it will return nothing. When the handle is successful,
226    /// it will return the result and remove it from the cache, allowing Arc::try_unwrap to succeed.
227    /// This method can only be called once per completed future.
228    pub fn exchange_owned(&self, handle: &FutureHandle) -> Option<AnyResult> {
229        let mut registry = self.handle_registry.lock();
230        if let Some(entry) = registry.get_mut(handle) {
231            match &entry.status {
232                FutureStatus::Completed => {
233                    log("FutureStatus::Completed - taking ownership of cached result");
234                    entry.cached_result.take()
235                }
236                _ => {
237                    log("Future not completed yet, checking receiver");
238                    if let Some(receiver) = entry.receiver.as_mut() {
239                        match receiver.try_recv() {
240                            Ok(result) => {
241                                log("Received result from channel");
242                                entry.status = FutureStatus::Completed;
243                                entry.receiver = None; // Remove receiver as it's no longer needed
244                                Some(result)
245                            }
246                            Err(oneshot::error::TryRecvError::Empty) => {
247                                log("Channel is empty - future still running");
248                                None
249                            }
250                            Err(oneshot::error::TryRecvError::Closed) => {
251                                log("Channel is closed - future may have panicked");
252                                None
253                            }
254                        }
255                    } else {
256                        log("No receiver available");
257                        None
258                    }
259                }
260            }
261        } else {
262            log("Handle not found in registry");
263            None
264        }
265    }
266
267    /// Exchanges the handle and safely downcasts it into a specific type.
268    pub fn exchange_as<T: Any + Send + Sync + 'static>(&self, handle: &FutureHandle) -> Option<T> {
269        self.exchange(handle)?
270            .downcast::<T>()
271            .ok()
272            .and_then(|arc| Arc::try_unwrap(arc).ok())
273    }
274
275    /// Exchanges the handle taking ownership and safely downcasts it into a specific type.
276    /// This method consumes the cached result, allowing Arc::try_unwrap to succeed.
277    pub fn exchange_owned_as<T: Any + Send + Sync + 'static>(&self, handle: &FutureHandle) -> Option<T> {
278        self.exchange_owned(handle)?
279            .downcast::<T>()
280            .ok()
281            .and_then(|arc| Arc::try_unwrap(arc).ok())
282    }
283
284    /// Get status of a handle
285    pub fn get_status(&self, handle: &FutureHandle) -> Option<FutureStatus> {
286        let registry = self.handle_registry.lock();
287        registry.get(handle).map(|entry| entry.status.clone())
288    }
289
290    /// Cleans up any completed handles and removes them from the registry.
291    ///
292    /// You can do this manually, however this is typically done at the end of the frame.
293    pub fn cleanup(&self) {
294        let mut registry = self.handle_registry.lock();
295        let completed_ids: Vec<FutureHandle> = registry
296            .iter()
297            .filter_map(|(&id, entry)| {
298                matches!(entry.status, FutureStatus::Completed).then_some(id)
299            })
300            .collect();
301
302        for id in completed_ids {
303            registry.remove(&id);
304        }
305    }
306}
307
308
309/// Internal function for logging to a file for tests (when stdout is not available).
310///
311/// Only logs if the [`LOG_TO_FILE`] constant is set to true.
312#[cfg(test)]
313fn log(msg: impl ToString) {
314    use std::io::Write;
315
316    let mut file = std::fs::OpenOptions::new().append(true).create(true).open("test.log").unwrap();
317    file.write_all(format!("{}\n", msg.to_string()).as_bytes()).unwrap();
318}
319
320#[cfg(not(test))]
321fn log(_msg: impl ToString) {
322
323}
324
325impl Default for FutureQueue {
326    fn default() -> Self {
327        Self::new()
328    }
329}
330
331#[test]
332fn test_future_queue() {
333    tokio::runtime::Builder::new_multi_thread()
334    .enable_all()
335    .build()
336    .unwrap()
337    .block_on(async {
338        let queue = FutureQueue::new();
339        log("Created new queue");
340
341        let handle = queue.push(async move {
342            log("Inside the pushed future - starting work");
343            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
344            log("Inside the pushed future - work completed");
345            67 + 41
346        });
347        log("Created new handle");
348
349        queue.poll();
350        log("Initial poll completed");
351
352        let mut attempts = 0;
353        let max_attempts = 100;
354        let start_time = std::time::Instant::now();
355
356        loop {
357            attempts += 1;
358            log(format!("Attempt {}: Checking for result", attempts));
359            log(format!("Time since start: {} ms", start_time.elapsed().as_millis()));
360
361            if let Some(result) = queue.exchange(&handle) {
362                let result = result.downcast::<i32>().unwrap();
363                log(format!("Success! 67 + 41 = {}", result));
364                assert_eq!(*result, 108);
365                break;
366            }
367
368            if attempts >= max_attempts {
369                log("Max attempts reached - test failed");
370                panic!("Future never completed");
371            }
372
373            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
374        }
375
376        log("Test completed successfully");
377    });
378}
379
380// #[tokio::test]
381#[test]
382fn test_exchange_owned_as() {
383    tokio::runtime::Builder::new_multi_thread()
384    .enable_all()
385    .build()
386    .unwrap()
387    .block_on(async {
388        let queue = FutureQueue::new();
389
390        let handle = queue.push(async move {
391            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
392            67 + 41
393        });
394
395        queue.poll();
396
397        let mut attempts = 0;
398        let max_attempts = 100;
399
400        loop {
401            attempts += 1;
402
403            if let Some(result) = queue.exchange_owned_as::<i32>(&handle) {
404                assert_eq!(result, 108);
405                break;
406            }
407
408            if attempts >= max_attempts {
409                panic!("Future never completed");
410            }
411
412            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
413        }
414
415        assert!(queue.exchange_owned_as::<i32>(&handle).is_none());
416    });
417}