1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
use std::{pin::Pin, thread};
use futures::Future;
use tokio::runtime::Runtime;
use super::wait;
/// Run a future on a separate thread.
///
/// Useful when future has a blocking code. Normally such blocking code
/// will also block current all asynchronous tasks. This helper mitigates
/// such issue by running blocking future on a separate thread with its
/// own Tokio runtime.
///
/// ### Examples
///
/// ```
/// use std::{thread, pin::Pin, time::Duration};
///
/// use futures::{future, Future};
/// use cs_utils::futures::{wait, with_thread};
///
/// #[tokio::main(worker_threads = 1)]
/// async fn main() {
/// // variable to count how many iterations
/// // the normal future has run
/// static mut RUN_CNT: u64 = 0;
///
/// // create a future that blocks current thread
/// let blocking_future = async move {
/// thread::sleep(Duration::from_secs(1));
/// };
///
/// // create a future that intended to run in background
/// let normal_future = async move {
/// loop {
/// unsafe { RUN_CNT += 1 }
/// wait(100).await;
/// }
/// };
///
/// // create futures list
/// let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![
/// Box::pin(with_thread(blocking_future)), // <-- wrap the blocking future here
/// Box::pin(normal_future),
/// ];
///
/// // race the futures to completion
/// future::select_all(futures).await;
///
/// // must go thru multiple iterations in the normal future
/// assert!(
/// unsafe { RUN_CNT >= 7 } && unsafe { RUN_CNT <= 10 },
/// "Normal future must run iterations multiple times.",
/// );
/// }
/// ```
pub fn with_thread<
T: Send + 'static,
TFuture: Future<Output = T> + Send + 'static,
>(
original_future: TFuture,
) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> {
let result_future: Pin<Box<dyn Future<Output = T> + Send + 'static>> = Box::pin(async move {
let tokio_handle = Runtime::new()
.expect("Cannot create Tokio runtime.");
let other_thread = thread::spawn(move || {
return tokio_handle.block_on(original_future);
});
// poll the thread, yielding if it is not finished yet
while !other_thread.is_finished() {
// common thread time slice is `~100ms`, so
// `5ms` delay should be granular enough here
wait(5).await;
continue;
}
return other_thread.join().expect("failed to join other thread");
});
return result_future;
}
#[cfg(test)]
mod tests {
use std::{thread, pin::Pin, time::Duration};
use futures::{future, Future};
use cs_utils::futures::{wait, with_thread};
#[tokio::test]
async fn run_blocking_future_on_separate_thread() {
static mut NORMAL_FUTURE_RUN_COUNTER: u64 = 0;
let block_for_ms: u64 = 1000;
let run_each_ms: u64 = 100;
let blocking_future = async move {
thread::sleep(Duration::from_millis(block_for_ms));
};
let normal_future = async move {
loop {
wait(run_each_ms).await;
unsafe {
NORMAL_FUTURE_RUN_COUNTER += 1;
}
}
};
let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![
Box::pin(with_thread(blocking_future)),
Box::pin(normal_future),
];
// race futures to first completion
future::select_all(futures).await;
// normal future can run at most `BLOCK_FOR_MS` / `RUN_EACH_MS` times
let expected_run_count = block_for_ms / run_each_ms;
// assert that normal future run multiple time (close to `expected_run_count` times)
let run_delta = expected_run_count - unsafe { NORMAL_FUTURE_RUN_COUNTER };
assert!(
run_delta <= 3,
"Must run normal future multiple times.",
);
}
}