par_stream/rt/
rt_custom.rs

1use crate::common::*;
2use super::{get_global_runtime, BoxAny};
3
4pub fn spawn<Fut>(fut: Fut) -> JoinHandle<Fut::Output>
5where
6    Fut: 'static + Future + Send,
7    Fut::Output: 'static + Send,
8{
9    let future = async move {
10        let output = get_global_runtime()
11            .spawn(
12                async move {
13                    let output: BoxAny<'static> = Box::new(fut.await);
14                    output
15                }
16                .boxed(),
17            )
18            .await;
19
20        let output =
21            BoxAny::<'static>::downcast::<Fut::Output>(output).expect("interal error: unable downcast Box");
22        *output
23    }
24    .boxed();
25
26    JoinHandle {
27        future,
28        _phantom: PhantomData,
29    }
30}
31
32pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
33where
34    F: 'static + Send + FnOnce() -> R,
35    R: 'static + Send,
36{
37    let future = async move {
38        let output = get_global_runtime()
39            .spawn_blocking({
40                let f: Box<dyn FnOnce() -> BoxAny<'static> + Send> = Box::new(move || {
41                    let output: BoxAny<'static> = Box::new(f());
42                    output
43                });
44                f
45            })
46            .await;
47
48        let output = BoxAny::<'static>::downcast::<R>(output).expect("interal error: unable downcast Box");
49        *output
50    }
51    .boxed();
52
53    JoinHandle {
54        future,
55        _phantom: PhantomData,
56    }
57}
58
59pub async fn sleep(dur: Duration) {
60    get_global_runtime().sleep(dur).await;
61}
62
63pub fn block_on<F>(future: F) -> F::Output
64where
65    F: Future + Send,
66    F::Output: 'static + Send,
67{
68    let output = get_global_runtime().block_on(
69        async move {
70            let output: BoxAny<'static> = Box::new(future.await);
71            output
72        }
73        .boxed(),
74    );
75
76    let output = BoxAny::<'static>::downcast::<F::Output>(output).expect("interal error: unable downcast Box");
77    *output
78}
79
80pub fn block_on_executor<F>(future: F) -> F::Output
81where
82    F: 'static + Future + Send,
83    F::Output: 'static + Send,
84{
85    let output = get_global_runtime().block_on(
86        async {
87            let output: BoxAny<'static> = Box::new(future.await);
88            output
89        }
90        .boxed(),
91    );
92
93    let output = BoxAny::<'static>::downcast::<F::Output>(output).expect("interal error: unable downcast Box");
94    *output
95}
96
97#[repr(transparent)]
98#[pin_project]
99pub struct JoinHandle<T> {
100    _phantom: PhantomData<T>,
101    #[pin]
102    future: BoxFuture<'static, T>,
103}
104
105impl<T> Future for JoinHandle<T> {
106    type Output = T;
107
108    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109        self.project().future.poll_unpin(cx)
110    }
111}