fluvio_future/
task.rs

1use std::future::Future;
2
3/// run future and wait forever
4/// this is typically used in the server
5pub fn run<F>(spawn_closure: F)
6where
7    F: Future<Output = ()> + Send + 'static,
8{
9    cfg_if::cfg_if! {
10        if #[cfg(target_arch = "wasm32")] {
11            wasm_bindgen_futures::spawn_local(spawn_closure);
12        } else {
13            async_global_executor::block_on(spawn_closure);
14        }
15    }
16}
17
18// preserve async-std spawn behavior which is always detach task.
19// to fully control task life cycle, use spawn_task
20cfg_if::cfg_if! {
21    if #[cfg(target_arch = "wasm32")] {
22        pub fn spawn<F: Future<Output = T> + 'static, T: 'static>(future: F) {
23            wasm_bindgen_futures::spawn_local(async move {
24                let _ = future.await;
25            });
26        }
27    } else {
28        pub fn spawn<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) {
29            async_global_executor::spawn(future).detach();
30        }
31        pub fn spawn_task<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) -> async_task::Task<T> {
32            async_global_executor::spawn(future)
33        }
34    }
35}
36
37#[cfg(not(target_arch = "wasm32"))]
38pub use async_global_executor::spawn_blocking;
39
40cfg_if::cfg_if! {
41    if #[cfg(target_arch = "wasm32")] {
42        pub fn run_block_on<F, T>(f: F)
43            where
44                F: Future<Output = T> + 'static,
45                T: 'static,
46            {
47                wasm_bindgen_futures::spawn_local(async move {
48                    let _ = f.await;
49                });
50            }
51    } else {
52        pub use async_global_executor::block_on as run_block_on;
53    }
54}
55
56#[cfg(not(target_arch = "wasm32"))]
57pub type Task<T> = async_task::Task<T>;
58
59#[cfg(test)]
60mod basic_test {
61
62    use std::io::Error;
63    use std::thread;
64    use std::time;
65
66    use futures_lite::future::zip;
67    use tracing::debug;
68
69    use crate::task::spawn;
70    use crate::test_async;
71
72    #[test_async]
73    async fn future_join() -> Result<(), Error> {
74        // with join, futures are dispatched on same thread
75        // since ft1 starts first and
76        // blocks on thread, it will block future2
77        // should see ft1,ft1,ft2,ft2
78
79        //let mut ft_id = 0;
80
81        let ft1 = async {
82            debug!("ft1: starting sleeping for 1000ms");
83            // this will block ft2.  both ft1 and ft2 share same thread
84            thread::sleep(time::Duration::from_millis(1000));
85            debug!("ft1: woke from sleep");
86            //  ft_id = 1;
87            Ok(()) as Result<(), ()>
88        };
89
90        let ft2 = async {
91            debug!("ft2: starting sleeping for 500ms");
92            thread::sleep(time::Duration::from_millis(500));
93            debug!("ft2: woke up");
94            //   ft_id = 2;
95            Ok(()) as Result<(), ()>
96        };
97
98        let core_threads = num_cpus::get().max(1);
99        debug!("num threads: {}", core_threads);
100        let _ = zip(ft1, ft2).await;
101        Ok(())
102    }
103
104    #[test_async]
105    async fn future_spawn() -> Result<(), Error> {
106        // with spawn, futures are dispatched on separate thread
107        // in this case, thread sleep on ft1 won't block
108        // should see  ft1, ft2, ft2, ft1
109
110        let ft1 = async {
111            debug!("ft1: starting sleeping for 1000ms");
112            thread::sleep(time::Duration::from_millis(1000)); // give time for server to come up
113            debug!("ft1: woke from sleep");
114        };
115
116        let ft2 = async {
117            debug!("ft2: starting sleeping for 500ms");
118            thread::sleep(time::Duration::from_millis(500));
119            debug!("ft2: woke up");
120        };
121
122        let core_threads = num_cpus::get().max(1);
123        debug!("num threads: {}", core_threads);
124
125        spawn(ft1);
126        spawn(ft2);
127        // wait for all futures complete
128        thread::sleep(time::Duration::from_millis(2000));
129
130        Ok(())
131    }
132}