fluvio_future/
task.rs

1use std::future::Future;
2
3use crate::timer::sleep;
4
5/// run future and return immediately
6pub fn run<F>(spawn_closure: F)
7where
8    F: Future<Output = ()> + Send + 'static,
9{
10    cfg_if::cfg_if! {
11        if #[cfg(target_arch = "wasm32")] {
12            wasm_bindgen_futures::spawn_local(spawn_closure);
13        } else {
14            async_global_executor::block_on(spawn_closure);
15        }
16    }
17}
18
19/// run future and wait forever
20/// this is typically used in the server
21pub fn main<F>(spawn_closure: F)
22where
23    F: Future<Output = ()> + Send + 'static,
24{
25    use std::time::Duration;
26
27    run(async {
28        spawn_closure.await;
29        // do infinite loop for now
30        loop {
31            cfg_if::cfg_if! {
32                if #[cfg(target_arch = "wasm32")] {
33                    sleep(Duration::from_secs(3600)).await.unwrap();
34                } else {
35                    sleep(Duration::from_secs(3600)).await;
36                }
37            }
38        }
39    });
40}
41
42// preserve async-std spawn behavior which is always detach task.
43// to fully control task life cycle, use spawn_task
44cfg_if::cfg_if! {
45    if #[cfg(target_arch = "wasm32")] {
46        pub fn spawn<F: Future<Output = T> + 'static, T: 'static>(future: F) {
47            wasm_bindgen_futures::spawn_local(async move {
48                let _ = future.await;
49            });
50        }
51    } else {
52        pub fn spawn<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) {
53            async_global_executor::spawn(future).detach();
54        }
55        pub fn spawn_task<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) -> async_task::Task<T> {
56            async_global_executor::spawn(future)
57        }
58    }
59}
60
61#[cfg(not(target_arch = "wasm32"))]
62pub use async_global_executor::spawn_blocking;
63
64cfg_if::cfg_if! {
65    if #[cfg(target_arch = "wasm32")] {
66        pub fn run_block_on<F, T>(f: F)
67            where
68                F: Future<Output = T> + 'static,
69                T: 'static,
70            {
71                wasm_bindgen_futures::spawn_local(async move {
72                    let _ = f.await;
73                });
74            }
75    } else {
76        pub use async_global_executor::block_on as run_block_on;
77    }
78}
79
80#[cfg(not(target_arch = "wasm32"))]
81pub type Task<T> = async_task::Task<T>;
82
83#[cfg(test)]
84mod basic_test {
85
86    use std::io::Error;
87    use std::thread;
88    use std::time;
89
90    use futures_lite::future::zip;
91    use tracing::debug;
92
93    use crate::task::spawn;
94    use crate::test_async;
95
96    #[test_async]
97    async fn future_join() -> Result<(), Error> {
98        // with join, futures are dispatched on same thread
99        // since ft1 starts first and
100        // blocks on thread, it will block future2
101        // should see ft1,ft1,ft2,ft2
102
103        //let mut ft_id = 0;
104
105        let ft1 = async {
106            debug!("ft1: starting sleeping for 1000ms");
107            // this will block ft2.  both ft1 and ft2 share same thread
108            thread::sleep(time::Duration::from_millis(1000));
109            debug!("ft1: woke from sleep");
110            //  ft_id = 1;
111            Ok(()) as Result<(), ()>
112        };
113
114        let ft2 = async {
115            debug!("ft2: starting sleeping for 500ms");
116            thread::sleep(time::Duration::from_millis(500));
117            debug!("ft2: woke up");
118            //   ft_id = 2;
119            Ok(()) as Result<(), ()>
120        };
121
122        let core_threads = num_cpus::get().max(1);
123        debug!("num threads: {}", core_threads);
124        let _ = zip(ft1, ft2).await;
125        Ok(())
126    }
127
128    #[test_async]
129    async fn future_spawn() -> Result<(), Error> {
130        // with spawn, futures are dispatched on separate thread
131        // in this case, thread sleep on ft1 won't block
132        // should see  ft1, ft2, ft2, ft1
133
134        let ft1 = async {
135            debug!("ft1: starting sleeping for 1000ms");
136            thread::sleep(time::Duration::from_millis(1000)); // give time for server to come up
137            debug!("ft1: woke from sleep");
138        };
139
140        let ft2 = async {
141            debug!("ft2: starting sleeping for 500ms");
142            thread::sleep(time::Duration::from_millis(500));
143            debug!("ft2: woke up");
144        };
145
146        let core_threads = num_cpus::get().max(1);
147        debug!("num threads: {}", core_threads);
148
149        spawn(ft1);
150        spawn(ft2);
151        // wait for all futures complete
152        thread::sleep(time::Duration::from_millis(2000));
153
154        Ok(())
155    }
156}