Skip to main content

vortex_io/runtime/
handle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use tracing::Instrument;
13use vortex_error::vortex_panic;
14
15use crate::runtime::AbortHandleRef;
16use crate::runtime::Executor;
17
18/// A handle to an active Vortex runtime.
19///
20/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
21/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
22///
23/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
24/// runtime has been dropped, then any requests to spawn new tasks will panic.
25#[derive(Clone)]
26pub struct Handle {
27    runtime: Weak<dyn Executor>,
28}
29
30impl Handle {
31    pub fn new(runtime: Weak<dyn Executor>) -> Self {
32        Self { runtime }
33    }
34
35    fn runtime(&self) -> Arc<dyn Executor> {
36        self.runtime.upgrade().unwrap_or_else(|| {
37            vortex_panic!("Attempted to use a Handle after its runtime was dropped")
38        })
39    }
40
41    /// Returns a handle to the current runtime, if such a reasonable choice exists.
42    ///
43    /// For example, if called from within a Tokio context this will return a
44    /// `TokioRuntime` handle.
45    pub fn find() -> Option<Self> {
46        #[cfg(feature = "tokio")]
47        {
48            use tokio::runtime::Handle as TokioHandle;
49
50            use crate::runtime::tokio::TokioRuntime;
51            if TokioHandle::try_current().is_ok() {
52                return Some(TokioRuntime::current());
53            }
54        }
55
56        None
57    }
58
59    /// Spawn a new future onto the runtime.
60    ///
61    /// These futures are expected to not perform expensive CPU work and instead simply schedule
62    /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
63    ///
64    /// See [`Task`] for details on cancelling or detaching the spawned task.
65    pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
66    where
67        Fut: Future<Output = R> + Send + 'static,
68        R: Send + 'static,
69    {
70        let (send, recv) = oneshot::channel();
71        // Instrument with a dedicated, named span on its own target rather than
72        // re-entering `Span::current()`. `Instrumented::poll` enters and exits the
73        // span on every poll, so re-entering the caller's span makes its cost scale
74        // with poll count. A distinct target lets subscribers opt these spans in or
75        // out via filtering; when filtered out the span is disabled and enter/exit
76        // are no-ops, which keeps frequently-polled spawned futures cheap.
77        let span = tracing::trace_span!(target: "vortex_io::spawn", "spawn");
78        let abort_handle = self.runtime().spawn(
79            async move {
80                // Task::detach allows the receiver to be dropped, so we ignore send errors.
81                drop(send.send(f.await));
82            }
83            .instrument(span)
84            .boxed(),
85        );
86        Task {
87            recv: recv.into_future(),
88            abort_handle: Some(abort_handle),
89        }
90    }
91
92    /// A helper function to avoid manually cloning the handle when spawning nested tasks.
93    pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
94    where
95        F: FnOnce(Handle) -> Fut,
96        Fut: Future<Output = R> + Send + 'static,
97        R: Send + 'static,
98    {
99        self.spawn(f(Handle::new(Weak::clone(&self.runtime))))
100    }
101
102    /// Spawn a new I/O future onto the runtime.
103    ///
104    /// See [`Executor::spawn_io`] for more details about how this future is expected to run.
105    ///
106    // See [`Task`] for details on cancelling or detaching the spawned task.
107    pub fn spawn_io<Fut, R>(&self, f: Fut) -> Task<R>
108    where
109        Fut: Future<Output = R> + Send + 'static,
110        R: Send + 'static,
111    {
112        let (send, recv) = oneshot::channel();
113        // See `spawn` above: a dedicated target rather than `Span::current()` so
114        // subscribers can filter these spans in or out. I/O futures are polled
115        // frequently, so disabling the span (the default for an unconfigured
116        // target) avoids per-poll enter/exit cost.
117        let span = tracing::trace_span!(target: "vortex_io::spawn_io", "spawn_io");
118        let abort_handle = self.runtime().spawn_io(
119            async move {
120                // Task::detach allows the receiver to be dropped, so we ignore send errors.
121                drop(send.send(f.await));
122            }
123            .instrument(span)
124            .boxed(),
125        );
126        Task {
127            recv: recv.into_future(),
128            abort_handle: Some(abort_handle),
129        }
130    }
131
132    /// Spawn a CPU-bound task for execution on the runtime.
133    ///
134    /// Note that many runtimes will interleave this work on the same async runtime. See the
135    /// documentation for each runtime implementation for details.
136    ///
137    /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
138    /// once started, CPU work cannot be cancelled.
139    pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
140    where
141        // Unlike scheduling futures, the CPU task should have a static lifetime because it
142        // doesn't need to access to handle to spawn more work.
143        F: FnOnce() -> R + Send + 'static,
144        R: Send + 'static,
145    {
146        let (send, recv) = oneshot::channel();
147        let span = tracing::Span::current();
148        let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
149            let _guard = span.enter();
150            // Optimistically avoid the work if the result won't be used.
151            if !send.is_closed() {
152                // Task::detach allows the receiver to be dropped, so we ignore send errors.
153                drop(send.send(f()));
154            }
155        }));
156        Task {
157            recv: recv.into_future(),
158            abort_handle: Some(abort_handle),
159        }
160    }
161
162    /// Spawn a blocking I/O task for execution on the runtime.
163    pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
164    where
165        F: FnOnce() -> R + Send + 'static,
166        R: Send + 'static,
167    {
168        let (send, recv) = oneshot::channel();
169        let span = tracing::Span::current();
170        let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
171            let _guard = span.enter();
172            // Optimistically avoid the work if the result won't be used.
173            if !send.is_closed() {
174                // Task::detach allows the receiver to be dropped, so we ignore send errors.
175                drop(send.send(f()));
176            }
177        }));
178        Task {
179            recv: recv.into_future(),
180            abort_handle: Some(abort_handle),
181        }
182    }
183}
184
185/// A handle to a spawned Task.
186///
187/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
188/// continue running in the background, call [`Task::detach`].
189#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
190pub struct Task<T> {
191    recv: oneshot::AsyncReceiver<T>,
192    abort_handle: Option<AbortHandleRef>,
193}
194
195impl<T> Task<T> {
196    /// Detach the task, allowing it to continue running in the background after being dropped.
197    /// This is only possible if the underlying runtime has a 'static lifetime.
198    pub fn detach(mut self) {
199        drop(self.abort_handle.take());
200    }
201}
202
203impl<T> Future for Task<T> {
204    type Output = T;
205
206    #[expect(clippy::panic)]
207    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
208        match ready!(self.recv.poll_unpin(cx)) {
209            Ok(result) => Poll::Ready(result),
210            Err(_recv_err) => {
211                // If the other end of the channel was dropped, it means the runtime dropped
212                // the future without ever completing it. If the caller aborted this task by
213                // dropping it, then they wouldn't be able to poll it anymore.
214                // So we consider a closed channel to be a Runtime programming error and therefore
215                // we panic.
216
217                // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
218                panic!("Runtime dropped task without completing it, likely it panicked")
219            }
220        }
221    }
222}
223
224impl<T> Drop for Task<T> {
225    fn drop(&mut self) {
226        // Optimistically abort the task if it's still running.
227        if let Some(handle) = self.abort_handle.take() {
228            handle.abort();
229        }
230    }
231}