fluvio_future/
task.rs

1use std::future::Future;
2
3use async_std::task;
4
5#[cfg(feature = "task_unstable")]
6pub use async_std::task::spawn_local;
7
8/// run future and wait forever
9/// this is typically used in the server
10pub fn run<F>(spawn_closure: F)
11where
12    F: Future<Output = ()> + Send + 'static,
13{
14    task::block_on(spawn_closure);
15}
16
17cfg_if::cfg_if! {
18    if #[cfg(target_arch = "wasm32")] {
19        pub use async_std::task::spawn_local as spawn;
20    } else {
21        pub use async_std::task::spawn;
22    }
23}
24
25#[cfg(feature = "task_unstable")]
26#[cfg(not(target_arch = "wasm32"))]
27pub use async_std::task::spawn_blocking;
28
29cfg_if::cfg_if! {
30    if #[cfg(target_arch = "wasm32")] {
31        pub fn run_block_on<F, T>(f: F)
32            where
33                F: Future<Output = T> + 'static,
34                T: 'static,
35            {
36                task::block_on(f)
37            }
38    } else {
39        pub use async_std::task::block_on as run_block_on;
40    }
41}
42
43pub use async_std::task::JoinHandle;
44
45#[cfg(test)]
46mod basic_test {
47
48    use std::io::Error;
49    use std::thread;
50    use std::time;
51
52    use futures_lite::future::zip;
53    use tracing::debug;
54
55    use crate::task::spawn;
56    use crate::test_async;
57
58    #[test_async]
59    async fn future_join() -> Result<(), Error> {
60        // with join, futures are dispatched on same thread
61        // since ft1 starts first and
62        // blocks on thread, it will block future2
63        // should see ft1,ft1,ft2,ft2
64
65        //let mut ft_id = 0;
66
67        let ft1 = async {
68            debug!("ft1: starting sleeping for 1000ms");
69            // this will block ft2.  both ft1 and ft2 share same thread
70            thread::sleep(time::Duration::from_millis(1000));
71            debug!("ft1: woke from sleep");
72            //  ft_id = 1;
73            Ok(()) as Result<(), ()>
74        };
75
76        let ft2 = async {
77            debug!("ft2: starting sleeping for 500ms");
78            thread::sleep(time::Duration::from_millis(500));
79            debug!("ft2: woke up");
80            //   ft_id = 2;
81            Ok(()) as Result<(), ()>
82        };
83
84        let core_threads = num_cpus::get().max(1);
85        debug!("num threads: {}", core_threads);
86        let _ = zip(ft1, ft2).await;
87        Ok(())
88    }
89
90    #[test_async]
91    async fn future_spawn() -> Result<(), Error> {
92        // with spawn, futures are dispatched on separate thread
93        // in this case, thread sleep on ft1 won't block
94        // should see  ft1, ft2, ft2, ft1
95
96        let ft1 = async {
97            debug!("ft1: starting sleeping for 1000ms");
98            thread::sleep(time::Duration::from_millis(1000)); // give time for server to come up
99            debug!("ft1: woke from sleep");
100        };
101
102        let ft2 = async {
103            debug!("ft2: starting sleeping for 500ms");
104            thread::sleep(time::Duration::from_millis(500));
105            debug!("ft2: woke up");
106        };
107
108        let core_threads = num_cpus::get().max(1);
109        debug!("num threads: {}", core_threads);
110
111        spawn(ft1);
112        spawn(ft2);
113        // wait for all futures complete
114        thread::sleep(time::Duration::from_millis(2000));
115
116        Ok(())
117    }
118}