par_stream/rt/
rt_custom.rs1use crate::common::*;
2use super::{get_global_runtime, BoxAny};
3
4pub fn spawn<Fut>(fut: Fut) -> JoinHandle<Fut::Output>
5where
6 Fut: 'static + Future + Send,
7 Fut::Output: 'static + Send,
8{
9 let future = async move {
10 let output = get_global_runtime()
11 .spawn(
12 async move {
13 let output: BoxAny<'static> = Box::new(fut.await);
14 output
15 }
16 .boxed(),
17 )
18 .await;
19
20 let output =
21 BoxAny::<'static>::downcast::<Fut::Output>(output).expect("interal error: unable downcast Box");
22 *output
23 }
24 .boxed();
25
26 JoinHandle {
27 future,
28 _phantom: PhantomData,
29 }
30}
31
32pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
33where
34 F: 'static + Send + FnOnce() -> R,
35 R: 'static + Send,
36{
37 let future = async move {
38 let output = get_global_runtime()
39 .spawn_blocking({
40 let f: Box<dyn FnOnce() -> BoxAny<'static> + Send> = Box::new(move || {
41 let output: BoxAny<'static> = Box::new(f());
42 output
43 });
44 f
45 })
46 .await;
47
48 let output = BoxAny::<'static>::downcast::<R>(output).expect("interal error: unable downcast Box");
49 *output
50 }
51 .boxed();
52
53 JoinHandle {
54 future,
55 _phantom: PhantomData,
56 }
57}
58
59pub async fn sleep(dur: Duration) {
60 get_global_runtime().sleep(dur).await;
61}
62
63pub fn block_on<F>(future: F) -> F::Output
64where
65 F: Future + Send,
66 F::Output: 'static + Send,
67{
68 let output = get_global_runtime().block_on(
69 async move {
70 let output: BoxAny<'static> = Box::new(future.await);
71 output
72 }
73 .boxed(),
74 );
75
76 let output = BoxAny::<'static>::downcast::<F::Output>(output).expect("interal error: unable downcast Box");
77 *output
78}
79
80pub fn block_on_executor<F>(future: F) -> F::Output
81where
82 F: 'static + Future + Send,
83 F::Output: 'static + Send,
84{
85 let output = get_global_runtime().block_on(
86 async {
87 let output: BoxAny<'static> = Box::new(future.await);
88 output
89 }
90 .boxed(),
91 );
92
93 let output = BoxAny::<'static>::downcast::<F::Output>(output).expect("interal error: unable downcast Box");
94 *output
95}
96
97#[repr(transparent)]
98#[pin_project]
99pub struct JoinHandle<T> {
100 _phantom: PhantomData<T>,
101 #[pin]
102 future: BoxFuture<'static, T>,
103}
104
105impl<T> Future for JoinHandle<T> {
106 type Output = T;
107
108 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109 self.project().future.poll_unpin(cx)
110 }
111}