vortex_layout/scan/
executor.rs1use std::sync::Arc;
2
3use futures::FutureExt;
4use futures::channel::oneshot;
5use futures::future::BoxFuture;
6use vortex_error::{ResultExt, VortexResult, vortex_err};
7
8pub trait TaskExecutor: 'static + Send + Sync {
9 fn do_spawn(
10 &self,
11 fut: BoxFuture<'static, VortexResult<()>>,
12 ) -> BoxFuture<'static, VortexResult<()>>;
13}
14
15impl<T: TaskExecutor> TaskExecutor for Arc<T> {
16 fn do_spawn(
17 &self,
18 fut: BoxFuture<'static, VortexResult<()>>,
19 ) -> BoxFuture<'static, VortexResult<()>> {
20 self.as_ref().do_spawn(fut)
21 }
22}
23
24pub trait TaskExecutorExt: TaskExecutor {
25 fn spawn<T: 'static + Send>(
26 &self,
27 fut: BoxFuture<'static, VortexResult<T>>,
28 ) -> BoxFuture<'static, VortexResult<T>>;
29}
30
31impl<E: TaskExecutor + ?Sized> TaskExecutorExt for E {
32 fn spawn<T: 'static + Send>(
33 &self,
34 fut: BoxFuture<'static, VortexResult<T>>,
35 ) -> BoxFuture<'static, VortexResult<T>> {
36 let (send, recv) = oneshot::channel::<VortexResult<T>>();
37 let fut = self.do_spawn(
38 async move {
39 let result = fut.await;
40 send.send(result)
41 .map_err(|_| vortex_err!("Failed to send result"))
42 }
43 .boxed(),
44 );
45
46 Box::pin(async move {
47 fut.await?;
48 recv.await
49 .map_err(|canceled| vortex_err!("Spawned task canceled {}", canceled))
50 .unnest()
51 })
52 }
53}
54
55#[cfg(feature = "tokio")]
56impl TaskExecutor for tokio::runtime::Handle {
57 fn do_spawn(
58 &self,
59 f: BoxFuture<'static, VortexResult<()>>,
60 ) -> BoxFuture<'static, VortexResult<()>> {
61 use futures::TryFutureExt;
62
63 tokio::runtime::Handle::spawn(self, f)
64 .map_err(vortex_error::VortexError::from)
65 .map(|result| result.unnest())
66 .boxed()
67 }
68}