1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::{pin::Pin, thread};

use futures::Future;
use tokio::runtime::Runtime;

use super::wait;

/// Run a future on a separate thread.
///
/// Useful when future has a blocking code. Normally such blocking code
/// will also block current all asynchronous tasks. This helper mitigates
/// such issue by running blocking future on a separate thread with its
/// own Tokio runtime.
/// 
/// ### Examples
/// 
/// ```
/// use std::{thread, pin::Pin, time::Duration};
///
/// use futures::{future, Future};
/// use cs_utils::futures::{wait, with_thread};
/// 
/// #[tokio::main(worker_threads = 1)]
/// async fn main() {
///     // variable to count how many iterations
///     // the normal future has run
///     static mut RUN_CNT: u64 = 0;
///
///     // create a future that blocks current thread
///     let blocking_future = async move {
///         thread::sleep(Duration::from_secs(1));    
///     };
///     
///     // create a future that intended to run in background
///     let normal_future = async move {
///         loop {
///             unsafe { RUN_CNT += 1 }
///             wait(100).await;
///         }
///     };
///     
///     // create futures list
///     let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![
///         Box::pin(with_thread(blocking_future)), // <-- wrap the blocking future here
///         Box::pin(normal_future),
///     ];
///     
///     // race the futures to completion 
///     future::select_all(futures).await;
/// 
///     // must go thru multiple iterations in the normal future
///     assert!(
///         unsafe { RUN_CNT >= 7 } && unsafe { RUN_CNT <= 10 },
///         "Normal future must run iterations multiple times.",
///     );
/// }
/// ```
pub fn with_thread<
    T: Send + 'static,
    TFuture: Future<Output = T> + Send + 'static,
>(
    original_future: TFuture,
) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> {
    let result_future: Pin<Box<dyn Future<Output = T> + Send + 'static>> = Box::pin(async move {
        let tokio_handle = Runtime::new()
            .expect("Cannot create Tokio runtime.");

        let other_thread = thread::spawn(move || {
            return tokio_handle.block_on(original_future);
        });

        // poll the thread, yielding if it is not finished yet
        while !other_thread.is_finished() {
            // common thread time slice is `~100ms`, so
            // `5ms` delay should be granular enough here 
            wait(5).await;

            continue;
        }

        return other_thread.join().expect("failed to join other thread");
    });

    return result_future;
}

#[cfg(test)]
mod tests {
    use std::{thread, pin::Pin, time::Duration};
    use futures::{future, Future};
    use cs_utils::futures::{wait, with_thread};

    #[tokio::test]
    async fn run_blocking_future_on_separate_thread() {
        static mut NORMAL_FUTURE_RUN_COUNTER: u64 = 0;
        let block_for_ms: u64 = 1000;
        let run_each_ms: u64 = 100;

        let blocking_future = async move {
            thread::sleep(Duration::from_millis(block_for_ms));    
        };
        let normal_future = async move {
            loop {
                wait(run_each_ms).await;

                unsafe {
                    NORMAL_FUTURE_RUN_COUNTER += 1;
                }
            }
        };
    
        let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![
            Box::pin(with_thread(blocking_future)),
            Box::pin(normal_future),
        ];
        
        // race futures to first completion
        future::select_all(futures).await;

        // normal future can run at most `BLOCK_FOR_MS` / `RUN_EACH_MS` times
        let expected_run_count = block_for_ms / run_each_ms;
        // assert that normal future run multiple time (close to `expected_run_count` times)
        let run_delta = expected_run_count - unsafe { NORMAL_FUTURE_RUN_COUNTER };
        assert!(
            run_delta <= 3,
            "Must run normal future multiple times.",
        );
    }
}