dropbear_future_queue/
lib.rs

1//! Enabling multithreading for functions and apps that are purely single threaded.
2//!
3//! This was originally a module in my [dropbear](https://github.com/4tkbytes/dropbear) game engine,
4//! however I thought there were barely any libraries that had future queuing. It takes inspiration
5//! from Unity and how they handle their events.
6//!
7//! # Example
8//! ```rust
9//! use tokio::runtime::Runtime;
10//! use tokio::time::sleep;
11//! use dropbear_future_queue::FutureQueue;
12//!
13//! // requires a tokio thread
14//! let rt = Runtime::new().unwrap();
15//! let _guard = rt.enter();
16//!
17//! // create new queue
18//! let queue = FutureQueue::new();
19//!
20//! // create a new handle to keep for reference
21//! let handle = queue.push(async move {
22//!     sleep(1000).await;
23//!     67 + 41
24//! });
25//!
26//! // assume this is the event loop
27//! loop {
28//!     // executes all the futures in the database
29//!     queue.poll();
30//!
31//!     println!("Current status of compututation: {:?}", queue.get_status(&handle));
32//!
33//!     // check if it is ready to be taken
34//!     if let Some(result) = queue.exchange_as::<i32>(&handle) {
35//!         println!("67 + 41 = {}", result);
36//!         break;
37//!     }
38//!
39//!     // cleans up any ids not needed anymore.
40//!     queue.cleanup()
41//! }
42//! ```
43
44use std::any::Any;
45use std::cell::RefCell;
46use std::collections::VecDeque;
47use std::pin::Pin;
48use std::sync::Arc;
49use ahash::{HashMap, HashMapExt};
50use parking_lot::Mutex;
51use tokio::sync::oneshot;
52use std::future::Future;
53use std::rc::Rc;
54use std::time::Instant;
55use tokio::runtime::Runtime;
56
57/// A type used for a future.
58///
59/// It must include a [`Send`] trait to be usable for the [`FutureQueue`]
60pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
61/// A clonable generic result. It can/preferred to be downcasted to your specific type.
62pub type AnyResult = Arc<dyn Any + Send + Sync>;
63/// Internal function: A result receiver
64type ResultReceiver = oneshot::Receiver<AnyResult>;
65/// A type for storing the queue. It uses a [`VecDeque`] to store [`FutureHandle`]'s and [`BoxFuture`]
66pub type FutureStorage = Arc<Mutex<VecDeque<(FutureHandle, BoxFuture<()>)>>>;
67/// A type recommended to be used by [`FutureQueue`] to allow being thrown around in your app
68pub type Throwable<T> = Rc<RefCell<T>>;
69
70/// A status showing the future, used by the [`ResultReceiver`] and [`ResultSender`]
71#[derive(Clone)]
72pub enum FutureStatus {
73    NotPolled,
74    CurrentlyPolling,
75    Completed(AnyResult),
76}
77
78/// A handle to the future task
79#[derive(Default, Copy, Clone, Eq, Hash, PartialEq, Debug)]
80pub struct FutureHandle {
81    pub id: u64,
82}
83
84/// Internal storage per handle — separate from FutureHandle
85struct HandleEntry {
86    receiver: ResultReceiver,
87    status: FutureStatus,
88}
89
90/// A queue for polling futures. It is stored in here until [`FutureQueue::poll`] is run.
91pub struct FutureQueue {
92    /// The queue for the futures.
93    queued: FutureStorage,
94    /// A place to store all handle data
95    handle_registry: Arc<Mutex<HashMap<FutureHandle, HandleEntry>>>,
96    /// Next id to be processed
97    next_id: Arc<Mutex<u64>>,
98}
99
100impl FutureQueue {
101    /// Creates a new [`Arc<FutureQueue>`].
102    pub fn new() -> Self {
103        Self {
104            queued: Arc::new(Mutex::new(VecDeque::new())),
105            handle_registry: Arc::new(Mutex::new(HashMap::new())),
106            next_id: Arc::new(Mutex::new(0)),
107        }
108    }
109
110    /// Pushes a future to the FutureQueue. It will sit and wait
111    /// to be processed until [`FutureQueue::poll`] is called.
112    pub fn push<F, T>(&self, future: F) -> FutureHandle
113    where
114        F: Future<Output = T> + Send + 'static,
115        T: Send + Sync + 'static,
116    {
117        let mut next_id = self.next_id.lock();
118        let id = *next_id;
119        *next_id += 1;
120
121        let id = FutureHandle { id };
122
123        let (sender, receiver) = oneshot::channel::<AnyResult>();
124
125        let entry = HandleEntry {
126            receiver,
127            status: FutureStatus::NotPolled,
128        };
129
130        self.handle_registry.lock().insert(id, entry);
131
132        let registry_clone = self.handle_registry.clone();
133
134        let wrapped_future: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
135            log("Starting future execution");
136            let result = future.await;
137            let boxed_result: AnyResult = Arc::new(result);
138            log("Future completed, sending result");
139
140            let _ = sender.send(boxed_result.clone());
141
142            let mut registry = registry_clone.lock();
143            if let Some(entry) = registry.get_mut(&id) {
144                entry.status = FutureStatus::Completed(boxed_result);
145                log("Updated status to completed");
146            }
147        });
148
149        self.queued.lock().push_back((id, wrapped_future));
150
151        id
152    }
153
154    /// Polls all the futures in the future queue and resolves the handles.
155    ///
156    /// This function spawns a new async thread for each item inside the thread and
157    /// sends updates to the Handle's receiver.
158    pub fn poll(&self) {
159        let mut queue = self.queued.lock();
160        log("Locked queue for polling");
161
162        if queue.is_empty() {
163            log("Queue is empty, nothing to poll");
164            return;
165        }
166
167        let mut futures_to_spawn = Vec::new();
168
169        while let Some((id, future)) = queue.pop_front() {
170            log(format!("Processing future with id: {:?}", id));
171
172            {
173                let mut registry = self.handle_registry.lock();
174                if let Some(entry) = registry.get_mut(&id) {
175                    entry.status = FutureStatus::CurrentlyPolling;
176                    log("Updated status to CurrentlyPolling");
177                }
178            }
179
180            futures_to_spawn.push(future);
181        }
182
183        drop(queue);
184
185        for future in futures_to_spawn {
186            log("Spawning future with tokio");
187            tokio::spawn(future);
188        }
189    }
190
191    /// Exchanges the future for the result.
192    ///
193    /// When the handle is not successful, it will return nothing. When the handle is successful,
194    /// it will return the result and drop the handle, removing the usage of it.
195    pub fn exchange(&self, handle: &FutureHandle) -> Option<AnyResult> {
196        let mut registry = self.handle_registry.lock();
197        if let Some(entry) = registry.get_mut(handle) {
198            return match &entry.status {
199                FutureStatus::Completed(result) => {
200                    log("FutureStatus::Completed - returning cached result");
201                    Some(result.clone())
202                }
203                _ => {
204                    log("Future not completed yet, checking receiver");
205                    match entry.receiver.try_recv() {
206                        Ok(result) => {
207                            log("Received result from channel");
208                            entry.status = FutureStatus::Completed(result.clone());
209                            Some(result)
210                        }
211                        Err(oneshot::error::TryRecvError::Empty) => {
212                            log("Channel is empty - future still running");
213                            None
214                        },
215                        Err(oneshot::error::TryRecvError::Closed) => {
216                            log("Channel is closed - future may have panicked");
217                            None
218                        },
219                    }
220                }
221            }
222        } else {
223            log("Handle not found in registry");
224        }
225        None
226    }
227
228    /// Exchanges the handle and safely downcasts it into a specific type.
229    pub fn exchange_as<T: Any + Send + Sync + 'static>(&self, handle: &FutureHandle) -> Option<Arc<T>> {
230        self.exchange(handle)?
231            .downcast()
232            .ok()
233    }
234
235    /// Get status of a handle
236    pub fn get_status(&self, handle: &FutureHandle) -> Option<FutureStatus> {
237        let registry = self.handle_registry.lock();
238        registry.get(handle).map(|entry| entry.status.clone())
239    }
240
241    /// Cleans up any completed handles and removes them from the registry.
242    ///
243    /// You can do this manually, however this is typically done at the end of the frame.
244    pub fn cleanup(&self) {
245        let mut registry = self.handle_registry.lock();
246        let completed_ids: Vec<FutureHandle> = registry
247            .iter()
248            .filter_map(|(&id, entry)| {
249                matches!(entry.status, FutureStatus::Completed(_)).then_some(id)
250            })
251            .collect();
252
253        for id in completed_ids {
254            registry.remove(&id);
255        }
256    }
257}
258
259
260/// Internal function for logging to a file for tests (when stdout is not available).
261///
262/// Only logs if the [`LOG_TO_FILE`] constant is set to true.
263#[cfg(test)]
264fn log(msg: impl ToString) {
265    use std::io::Write;
266
267    let mut file = std::fs::OpenOptions::new().append(true).create(true).open("test.log").unwrap();
268    file.write_all(format!("{}\n", msg.to_string()).as_bytes()).unwrap();
269}
270
271#[cfg(not(test))]
272fn log(_msg: impl ToString) {
273
274}
275
276impl Default for FutureQueue {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282#[test]
283fn test_future_queue() {
284    use tokio::time::{sleep, Duration};
285
286    let rt = Runtime::new().unwrap();
287    let _guard = rt.enter();
288
289    let queue = FutureQueue::new();
290    log("Created new queue");
291
292    let handle = queue.push(async move {
293        log("Inside the pushed future - starting work");
294        sleep(Duration::from_millis(10)).await;
295        log("Inside the pushed future - work completed");
296        67 + 41
297    });
298    log("Created new handle");
299
300    queue.poll();
301    log("Initial poll completed");
302
303    let mut attempts = 0;
304    let max_attempts = 100;
305    let elapsed = Instant::now();
306
307    loop {
308        let now = Instant::now();
309        attempts += 1;
310        log(format!("Attempt {}: Checking for result", attempts));
311        log(format!("Time since last attempt: {} ms", elapsed.elapsed().as_millis() - now.elapsed().as_millis()));
312
313        if let Some(result) = queue.exchange(&handle) {
314            let result = result.downcast::<i32>().unwrap();
315            log(format!("Success! 67 + 41 = {}", result));
316            assert_eq!(*result, 108);
317            break;
318        }
319
320        if attempts >= max_attempts {
321            log("Max attempts reached - test failed");
322            panic!("Future never completed");
323        }
324    }
325
326    log("Test completed successfully");
327}