vortex_layout/
executor.rs1use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::channel::oneshot;
8use futures::future::BoxFuture;
9use vortex_error::{VortexResult, vortex_err};
10
11pub trait TaskExecutor: 'static + Send + Sync {
12 fn do_spawn(
13 &self,
14 fut: BoxFuture<'static, VortexResult<()>>,
15 ) -> BoxFuture<'static, VortexResult<()>>;
16}
17
18impl<T: TaskExecutor> TaskExecutor for Arc<T> {
19 fn do_spawn(
20 &self,
21 fut: BoxFuture<'static, VortexResult<()>>,
22 ) -> BoxFuture<'static, VortexResult<()>> {
23 self.as_ref().do_spawn(fut)
24 }
25}
26
27pub trait TaskExecutorExt: TaskExecutor {
28 fn spawn<T: 'static + Send>(
29 &self,
30 fut: BoxFuture<'static, VortexResult<T>>,
31 ) -> BoxFuture<'static, VortexResult<T>>;
32}
33
34impl<E: TaskExecutor + ?Sized> TaskExecutorExt for E {
35 fn spawn<T: 'static + Send>(
36 &self,
37 fut: BoxFuture<'static, VortexResult<T>>,
38 ) -> BoxFuture<'static, VortexResult<T>> {
39 let (send, recv) = oneshot::channel::<VortexResult<T>>();
40 let fut = self.do_spawn(
41 async move {
42 let result = fut.await;
43 send.send(result)
44 .map_err(|_| vortex_err!("Failed to send result"))
45 }
46 .boxed(),
47 );
48
49 Box::pin(async move {
50 fut.await?;
51 recv.await
52 .map_err(|canceled| vortex_err!("Spawned task canceled {}", canceled))
53 .flatten()
54 })
55 }
56}
57
58#[cfg(feature = "tokio")]
59impl TaskExecutor for tokio::runtime::Handle {
60 fn do_spawn(
61 &self,
62 f: BoxFuture<'static, VortexResult<()>>,
63 ) -> BoxFuture<'static, VortexResult<()>> {
64 use futures::TryFutureExt;
65 use tracing::Instrument;
66
67 tokio::runtime::Handle::spawn(self, f.in_current_span())
68 .map_err(vortex_error::VortexError::from)
69 .map(|result| result.flatten())
70 .boxed()
71 }
72}
73
74pub struct LocalExecutor;
75
76impl TaskExecutor for LocalExecutor {
77 fn do_spawn(
78 &self,
79 fut: BoxFuture<'static, VortexResult<()>>,
80 ) -> BoxFuture<'static, VortexResult<()>> {
81 fut
82 }
83}