vortex_layout/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use futures::FutureExt;
7use futures::channel::oneshot;
8use futures::future::BoxFuture;
9use vortex_error::{VortexResult, vortex_err};
10
11pub trait TaskExecutor: 'static + Send + Sync {
12    fn do_spawn(
13        &self,
14        fut: BoxFuture<'static, VortexResult<()>>,
15    ) -> BoxFuture<'static, VortexResult<()>>;
16}
17
18impl<T: TaskExecutor> TaskExecutor for Arc<T> {
19    fn do_spawn(
20        &self,
21        fut: BoxFuture<'static, VortexResult<()>>,
22    ) -> BoxFuture<'static, VortexResult<()>> {
23        self.as_ref().do_spawn(fut)
24    }
25}
26
27pub trait TaskExecutorExt: TaskExecutor {
28    fn spawn<T: 'static + Send>(
29        &self,
30        fut: BoxFuture<'static, VortexResult<T>>,
31    ) -> BoxFuture<'static, VortexResult<T>>;
32}
33
34impl<E: TaskExecutor + ?Sized> TaskExecutorExt for E {
35    fn spawn<T: 'static + Send>(
36        &self,
37        fut: BoxFuture<'static, VortexResult<T>>,
38    ) -> BoxFuture<'static, VortexResult<T>> {
39        let (send, recv) = oneshot::channel::<VortexResult<T>>();
40        let fut = self.do_spawn(
41            async move {
42                let result = fut.await;
43                send.send(result)
44                    .map_err(|_| vortex_err!("Failed to send result"))
45            }
46            .boxed(),
47        );
48
49        Box::pin(async move {
50            fut.await?;
51            recv.await
52                .map_err(|canceled| vortex_err!("Spawned task canceled {}", canceled))
53                .flatten()
54        })
55    }
56}
57
58#[cfg(feature = "tokio")]
59impl TaskExecutor for tokio::runtime::Handle {
60    fn do_spawn(
61        &self,
62        f: BoxFuture<'static, VortexResult<()>>,
63    ) -> BoxFuture<'static, VortexResult<()>> {
64        use futures::TryFutureExt;
65        use tracing::Instrument;
66
67        tokio::runtime::Handle::spawn(self, f.in_current_span())
68            .map_err(vortex_error::VortexError::from)
69            .map(|result| result.flatten())
70            .boxed()
71    }
72}
73
74pub struct LocalExecutor;
75
76impl TaskExecutor for LocalExecutor {
77    fn do_spawn(
78        &self,
79        fut: BoxFuture<'static, VortexResult<()>>,
80    ) -> BoxFuture<'static, VortexResult<()>> {
81        fut
82    }
83}