Skip to main content

vortex_io/runtime/
handle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use tracing::Instrument;
13use vortex_error::vortex_panic;
14
15use crate::runtime::AbortHandleRef;
16use crate::runtime::Executor;
17
18/// A handle to an active Vortex runtime.
19///
20/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
21/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
22///
23/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
24/// runtime has been dropped, then any requests to spawn new tasks will panic.
25#[derive(Clone)]
26pub struct Handle {
27    runtime: Weak<dyn Executor>,
28}
29
30impl Handle {
31    pub fn new(runtime: Weak<dyn Executor>) -> Self {
32        Self { runtime }
33    }
34
35    fn runtime(&self) -> Arc<dyn Executor> {
36        self.runtime.upgrade().unwrap_or_else(|| {
37            vortex_panic!("Attempted to use a Handle after its runtime was dropped")
38        })
39    }
40
41    /// Returns a handle to the current runtime, if such a reasonable choice exists.
42    ///
43    /// For example, if called from within a Tokio context this will return a
44    /// `TokioRuntime` handle.
45    pub fn find() -> Option<Self> {
46        #[cfg(feature = "tokio")]
47        {
48            use tokio::runtime::Handle as TokioHandle;
49
50            use crate::runtime::tokio::TokioRuntime;
51            if TokioHandle::try_current().is_ok() {
52                return Some(TokioRuntime::current());
53            }
54        }
55
56        None
57    }
58
59    /// Spawn a new future onto the runtime.
60    ///
61    /// These futures are expected to not perform expensive CPU work and instead simply schedule
62    /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
63    ///
64    /// See [`Task`] for details on cancelling or detaching the spawned task.
65    pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
66    where
67        Fut: Future<Output = R> + Send + 'static,
68        R: Send + 'static,
69    {
70        let (send, recv) = oneshot::channel();
71        let span = tracing::Span::current();
72        let abort_handle = self.runtime().spawn(
73            async move {
74                // Task::detach allows the receiver to be dropped, so we ignore send errors.
75                drop(send.send(f.await));
76            }
77            .instrument(span)
78            .boxed(),
79        );
80        Task {
81            recv: recv.into_future(),
82            abort_handle: Some(abort_handle),
83        }
84    }
85
86    /// A helper function to avoid manually cloning the handle when spawning nested tasks.
87    pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
88    where
89        F: FnOnce(Handle) -> Fut,
90        Fut: Future<Output = R> + Send + 'static,
91        R: Send + 'static,
92    {
93        self.spawn(f(Handle::new(Weak::clone(&self.runtime))))
94    }
95
96    /// Spawn a new I/O future onto the runtime.
97    ///
98    /// See [`Executor::spawn_io`] for more details about how this future is expected to run.
99    ///
100    // See [`Task`] for details on cancelling or detaching the spawned task.
101    pub fn spawn_io<Fut, R>(&self, f: Fut) -> Task<R>
102    where
103        Fut: Future<Output = R> + Send + 'static,
104        R: Send + 'static,
105    {
106        let (send, recv) = oneshot::channel();
107        let span = tracing::Span::current();
108        let abort_handle = self.runtime().spawn_io(
109            async move {
110                // Task::detach allows the receiver to be dropped, so we ignore send errors.
111                drop(send.send(f.await));
112            }
113            .instrument(span)
114            .boxed(),
115        );
116        Task {
117            recv: recv.into_future(),
118            abort_handle: Some(abort_handle),
119        }
120    }
121
122    /// Spawn a CPU-bound task for execution on the runtime.
123    ///
124    /// Note that many runtimes will interleave this work on the same async runtime. See the
125    /// documentation for each runtime implementation for details.
126    ///
127    /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
128    /// once started, CPU work cannot be cancelled.
129    pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
130    where
131        // Unlike scheduling futures, the CPU task should have a static lifetime because it
132        // doesn't need to access to handle to spawn more work.
133        F: FnOnce() -> R + Send + 'static,
134        R: Send + 'static,
135    {
136        let (send, recv) = oneshot::channel();
137        let span = tracing::Span::current();
138        let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
139            let _guard = span.enter();
140            // Optimistically avoid the work if the result won't be used.
141            if !send.is_closed() {
142                // Task::detach allows the receiver to be dropped, so we ignore send errors.
143                drop(send.send(f()));
144            }
145        }));
146        Task {
147            recv: recv.into_future(),
148            abort_handle: Some(abort_handle),
149        }
150    }
151
152    /// Spawn a blocking I/O task for execution on the runtime.
153    pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
154    where
155        F: FnOnce() -> R + Send + 'static,
156        R: Send + 'static,
157    {
158        let (send, recv) = oneshot::channel();
159        let span = tracing::Span::current();
160        let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
161            let _guard = span.enter();
162            // Optimistically avoid the work if the result won't be used.
163            if !send.is_closed() {
164                // Task::detach allows the receiver to be dropped, so we ignore send errors.
165                drop(send.send(f()));
166            }
167        }));
168        Task {
169            recv: recv.into_future(),
170            abort_handle: Some(abort_handle),
171        }
172    }
173}
174
175/// A handle to a spawned Task.
176///
177/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
178/// continue running in the background, call [`Task::detach`].
179#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
180pub struct Task<T> {
181    recv: oneshot::AsyncReceiver<T>,
182    abort_handle: Option<AbortHandleRef>,
183}
184
185impl<T> Task<T> {
186    /// Detach the task, allowing it to continue running in the background after being dropped.
187    /// This is only possible if the underlying runtime has a 'static lifetime.
188    pub fn detach(mut self) {
189        drop(self.abort_handle.take());
190    }
191}
192
193impl<T> Future for Task<T> {
194    type Output = T;
195
196    #[expect(clippy::panic)]
197    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
198        match ready!(self.recv.poll_unpin(cx)) {
199            Ok(result) => Poll::Ready(result),
200            Err(_recv_err) => {
201                // If the other end of the channel was dropped, it means the runtime dropped
202                // the future without ever completing it. If the caller aborted this task by
203                // dropping it, then they wouldn't be able to poll it anymore.
204                // So we consider a closed channel to be a Runtime programming error and therefore
205                // we panic.
206
207                // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
208                panic!("Runtime dropped task without completing it, likely it panicked")
209            }
210        }
211    }
212}
213
214impl<T> Drop for Task<T> {
215    fn drop(&mut self) {
216        // Optimistically abort the task if it's still running.
217        if let Some(handle) = self.abort_handle.take() {
218            handle.abort();
219        }
220    }
221}