dropbear_future_queue/
lib.rs

1//! Enabling multithreading for functions and apps that are purely single threaded.
2//!
3//! This was originally a module in my [dropbear](https://github.com/4tkbytes/dropbear) game engine,
4//! however I thought there were barely any libraries that had future queuing. It takes inspiration
5//! from Unity and how they handle their events.
6//!
7//! # Example
8//! ```rust
9//! use tokio::runtime::Runtime;
10//! use tokio::time::sleep;
11//! use dropbear_future_queue::FutureQueue;
12//!
13//! // requires a tokio thread
14//! let rt = Runtime::new().unwrap();
15//! let _guard = rt.enter();
16//!
17//! // create new queue
18//! let queue = FutureQueue::new();
19//!
20//! // create a new handle to keep for reference
21//! let handle = queue.push(async move {
22//!     sleep(1000).await;
23//!     67 + 41
24//! });
25//!
26//! // assume this is the event loop
27//! loop {
28//!     // executes all the futures in the database
29//!     queue.poll();
30//!
31//!     println!("Current status of compututation: {:?}", queue.get_status(&handle));
32//!
33//!     // check if it is ready to be taken
34//!     if let Some(result) = queue.exchange_as::<i32>(&handle) {
35//!         println!("67 + 41 = {}", result);
36//!         break;
37//!     }
38//!
39//!     // cleans up any ids not needed anymore.
40//!     queue.cleanup()
41//! }
42//! ```
43
44use std::any::Any;
45use std::cell::RefCell;
46use std::collections::VecDeque;
47use std::pin::Pin;
48use std::sync::Arc;
49use ahash::{HashMap, HashMapExt};
50use parking_lot::Mutex;
51use tokio::sync::oneshot;
52use std::future::Future;
53use std::rc::Rc;
54
55/// A type used for a future.
56///
57/// It must include a [`Send`] trait to be usable for the [`FutureQueue`]
58pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
59/// A clonable generic result. It can/preferred to be downcasted to your specific type.
60pub type AnyResult = Arc<dyn Any + Send + Sync>;
61/// Internal function: A result receiver
62type ResultReceiver = oneshot::Receiver<AnyResult>;
63/// A type for storing the queue. It uses a [`VecDeque`] to store [`FutureHandle`]'s and [`BoxFuture`]
64pub type FutureStorage = Arc<Mutex<VecDeque<(FutureHandle, BoxFuture<()>)>>>;
65/// A type recommended to be used by [`FutureQueue`] to allow being thrown around in your app
66pub type Throwable<T> = Rc<RefCell<T>>;
67
68/// A status showing the future, used by the [`ResultReceiver`] and [`ResultSender`]
69#[derive(Clone)]
70pub enum FutureStatus {
71    NotPolled,
72    CurrentlyPolling,
73    Completed(AnyResult),
74}
75
76/// A handle to the future task
77#[derive(Default, Copy, Clone, Eq, Hash, PartialEq, Debug)]
78pub struct FutureHandle {
79    pub id: u64,
80}
81
82/// Internal storage per handle — separate from FutureHandle
83struct HandleEntry {
84    receiver: ResultReceiver,
85    status: FutureStatus,
86}
87
88/// A queue for polling futures. It is stored in here until [`FutureQueue::poll`] is run.
89pub struct FutureQueue {
90    /// The queue for the futures.
91    queued: FutureStorage,
92    /// A place to store all handle data
93    handle_registry: Arc<Mutex<HashMap<FutureHandle, HandleEntry>>>,
94    /// Next id to be processed
95    next_id: Arc<Mutex<u64>>,
96}
97
98impl FutureQueue {
99    /// Creates a new [`Arc<FutureQueue>`].
100    pub fn new() -> Self {
101        Self {
102            queued: Arc::new(Mutex::new(VecDeque::new())),
103            handle_registry: Arc::new(Mutex::new(HashMap::new())),
104            next_id: Arc::new(Mutex::new(0)),
105        }
106    }
107
108    /// Pushes a future to the FutureQueue. It will sit and wait
109    /// to be processed until [`FutureQueue::poll`] is called.
110    pub fn push<F, T>(&self, future: F) -> FutureHandle
111    where
112        F: Future<Output = T> + Send + 'static,
113        T: Send + Sync + 'static,
114    {
115        let mut next_id = self.next_id.lock();
116        let id = *next_id;
117        *next_id += 1;
118
119        let id = FutureHandle { id };
120
121        let (sender, receiver) = oneshot::channel::<AnyResult>();
122
123        let entry = HandleEntry {
124            receiver,
125            status: FutureStatus::NotPolled,
126        };
127
128        self.handle_registry.lock().insert(id, entry);
129
130        let registry_clone = self.handle_registry.clone();
131
132        let wrapped_future: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(async move {
133            log("Starting future execution");
134            let result = future.await;
135            let boxed_result: AnyResult = Arc::new(result);
136            log("Future completed, sending result");
137
138            let _ = sender.send(boxed_result.clone());
139
140            let mut registry = registry_clone.lock();
141            if let Some(entry) = registry.get_mut(&id) {
142                entry.status = FutureStatus::Completed(boxed_result);
143                log("Updated status to completed");
144            }
145        });
146
147        self.queued.lock().push_back((id, wrapped_future));
148
149        id
150    }
151
152    /// Polls all the futures in the future queue and resolves the handles.
153    ///
154    /// This function spawns a new async thread for each item inside the thread and
155    /// sends updates to the Handle's receiver.
156    pub fn poll(&self) {
157        let mut queue = self.queued.lock();
158        log("Locked queue for polling");
159
160        if queue.is_empty() {
161            log("Queue is empty, nothing to poll");
162            return;
163        }
164
165        let mut futures_to_spawn = Vec::new();
166
167        while let Some((id, future)) = queue.pop_front() {
168            log(format!("Processing future with id: {:?}", id));
169
170            {
171                let mut registry = self.handle_registry.lock();
172                if let Some(entry) = registry.get_mut(&id) {
173                    entry.status = FutureStatus::CurrentlyPolling;
174                    log("Updated status to CurrentlyPolling");
175                }
176            }
177
178            futures_to_spawn.push(future);
179        }
180
181        drop(queue);
182
183        for future in futures_to_spawn {
184            log("Spawning future with tokio");
185            tokio::spawn(future);
186        }
187    }
188
189    /// Exchanges the future for the result.
190    ///
191    /// When the handle is not successful, it will return nothing. When the handle is successful,
192    /// it will return the result and drop the handle, removing the usage of it.
193    pub fn exchange(&self, handle: &FutureHandle) -> Option<AnyResult> {
194        let mut registry = self.handle_registry.lock();
195        if let Some(entry) = registry.get_mut(handle) {
196            return match &entry.status {
197                FutureStatus::Completed(result) => {
198                    log("FutureStatus::Completed - returning cached result");
199                    Some(result.clone())
200                }
201                _ => {
202                    log("Future not completed yet, checking receiver");
203                    match entry.receiver.try_recv() {
204                        Ok(result) => {
205                            log("Received result from channel");
206                            entry.status = FutureStatus::Completed(result.clone());
207                            Some(result)
208                        }
209                        Err(oneshot::error::TryRecvError::Empty) => {
210                            log("Channel is empty - future still running");
211                            None
212                        },
213                        Err(oneshot::error::TryRecvError::Closed) => {
214                            log("Channel is closed - future may have panicked");
215                            None
216                        },
217                    }
218                }
219            }
220        } else {
221            log("Handle not found in registry");
222        }
223        None
224    }
225
226    /// Exchanges the handle and safely downcasts it into a specific type.
227    pub fn exchange_as<T: Any + Send + Sync + 'static>(&self, handle: &FutureHandle) -> Option<Arc<T>> {
228        self.exchange(handle)?
229            .downcast()
230            .ok()
231    }
232
233    /// Get status of a handle
234    pub fn get_status(&self, handle: &FutureHandle) -> Option<FutureStatus> {
235        let registry = self.handle_registry.lock();
236        registry.get(handle).map(|entry| entry.status.clone())
237    }
238
239    /// Cleans up any completed handles and removes them from the registry.
240    ///
241    /// You can do this manually, however this is typically done at the end of the frame.
242    pub fn cleanup(&self) {
243        let mut registry = self.handle_registry.lock();
244        let completed_ids: Vec<FutureHandle> = registry
245            .iter()
246            .filter_map(|(&id, entry)| {
247                matches!(entry.status, FutureStatus::Completed(_)).then_some(id)
248            })
249            .collect();
250
251        for id in completed_ids {
252            registry.remove(&id);
253        }
254    }
255}
256
257
258/// Internal function for logging to a file for tests (when stdout is not available).
259///
260/// Only logs if the [`LOG_TO_FILE`] constant is set to true.
261#[cfg(test)]
262fn log(msg: impl ToString) {
263    use std::io::Write;
264
265    let mut file = std::fs::OpenOptions::new().append(true).create(true).open("test.log").unwrap();
266    file.write_all(format!("{}\n", msg.to_string()).as_bytes()).unwrap();
267}
268
269#[cfg(not(test))]
270fn log(_msg: impl ToString) {
271
272}
273
274impl Default for FutureQueue {
275    fn default() -> Self {
276        Self::new()
277    }
278}
279
280#[test]
281fn test_future_queue() {
282    let rt = tokio::runtime::Runtime::new().unwrap();
283    let _guard = rt.enter();
284
285    let queue = FutureQueue::new();
286    log("Created new queue");
287
288    let handle = queue.push(async move {
289        log("Inside the pushed future - starting work");
290        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
291        log("Inside the pushed future - work completed");
292        67 + 41
293    });
294    log("Created new handle");
295
296    queue.poll();
297    log("Initial poll completed");
298
299    let mut attempts = 0;
300    let max_attempts = 100;
301    let elapsed = std::time::Instant::now();
302
303    loop {
304        let now = std::time::Instant::now();
305        attempts += 1;
306        log(format!("Attempt {}: Checking for result", attempts));
307        log(format!("Time since last attempt: {} ms", elapsed.elapsed().as_millis() - now.elapsed().as_millis()));
308
309        if let Some(result) = queue.exchange(&handle) {
310            let result = result.downcast::<i32>().unwrap();
311            log(format!("Success! 67 + 41 = {}", result));
312            assert_eq!(*result, 108);
313            break;
314        }
315
316        if attempts >= max_attempts {
317            log("Max attempts reached - test failed");
318            panic!("Future never completed");
319        }
320    }
321
322    log("Test completed successfully");
323}