vortex_io/runtime/handle.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use tracing::Instrument;
13use vortex_error::vortex_panic;
14
15use crate::runtime::AbortHandleRef;
16use crate::runtime::Executor;
17
18/// A handle to an active Vortex runtime.
19///
20/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
21/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
22///
23/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
24/// runtime has been dropped, then any requests to spawn new tasks will panic.
25#[derive(Clone)]
26pub struct Handle {
27 runtime: Weak<dyn Executor>,
28}
29
30impl Handle {
31 pub fn new(runtime: Weak<dyn Executor>) -> Self {
32 Self { runtime }
33 }
34
35 fn runtime(&self) -> Arc<dyn Executor> {
36 self.runtime.upgrade().unwrap_or_else(|| {
37 vortex_panic!("Attempted to use a Handle after its runtime was dropped")
38 })
39 }
40
41 /// Returns a handle to the current runtime, if such a reasonable choice exists.
42 ///
43 /// For example, if called from within a Tokio context this will return a
44 /// `TokioRuntime` handle.
45 pub fn find() -> Option<Self> {
46 #[cfg(feature = "tokio")]
47 {
48 use tokio::runtime::Handle as TokioHandle;
49
50 use crate::runtime::tokio::TokioRuntime;
51 if TokioHandle::try_current().is_ok() {
52 return Some(TokioRuntime::current());
53 }
54 }
55
56 None
57 }
58
59 /// Spawn a new future onto the runtime.
60 ///
61 /// These futures are expected to not perform expensive CPU work and instead simply schedule
62 /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
63 ///
64 /// See [`Task`] for details on cancelling or detaching the spawned task.
65 pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
66 where
67 Fut: Future<Output = R> + Send + 'static,
68 R: Send + 'static,
69 {
70 let (send, recv) = oneshot::channel();
71 // Instrument with a dedicated, named span on its own target rather than
72 // re-entering `Span::current()`. `Instrumented::poll` enters and exits the
73 // span on every poll, so re-entering the caller's span makes its cost scale
74 // with poll count. A distinct target lets subscribers opt these spans in or
75 // out via filtering; when filtered out the span is disabled and enter/exit
76 // are no-ops, which keeps frequently-polled spawned futures cheap.
77 let span = tracing::trace_span!(target: "vortex_io::spawn", "spawn");
78 let abort_handle = self.runtime().spawn(
79 async move {
80 // Task::detach allows the receiver to be dropped, so we ignore send errors.
81 drop(send.send(f.await));
82 }
83 .instrument(span)
84 .boxed(),
85 );
86 Task {
87 recv: recv.into_future(),
88 abort_handle: Some(abort_handle),
89 }
90 }
91
92 /// A helper function to avoid manually cloning the handle when spawning nested tasks.
93 pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
94 where
95 F: FnOnce(Handle) -> Fut,
96 Fut: Future<Output = R> + Send + 'static,
97 R: Send + 'static,
98 {
99 self.spawn(f(Handle::new(Weak::clone(&self.runtime))))
100 }
101
102 /// Spawn a new I/O future onto the runtime.
103 ///
104 /// See [`Executor::spawn_io`] for more details about how this future is expected to run.
105 ///
106 // See [`Task`] for details on cancelling or detaching the spawned task.
107 pub fn spawn_io<Fut, R>(&self, f: Fut) -> Task<R>
108 where
109 Fut: Future<Output = R> + Send + 'static,
110 R: Send + 'static,
111 {
112 let (send, recv) = oneshot::channel();
113 // See `spawn` above: a dedicated target rather than `Span::current()` so
114 // subscribers can filter these spans in or out. I/O futures are polled
115 // frequently, so disabling the span (the default for an unconfigured
116 // target) avoids per-poll enter/exit cost.
117 let span = tracing::trace_span!(target: "vortex_io::spawn_io", "spawn_io");
118 let abort_handle = self.runtime().spawn_io(
119 async move {
120 // Task::detach allows the receiver to be dropped, so we ignore send errors.
121 drop(send.send(f.await));
122 }
123 .instrument(span)
124 .boxed(),
125 );
126 Task {
127 recv: recv.into_future(),
128 abort_handle: Some(abort_handle),
129 }
130 }
131
132 /// Spawn a CPU-bound task for execution on the runtime.
133 ///
134 /// Note that many runtimes will interleave this work on the same async runtime. See the
135 /// documentation for each runtime implementation for details.
136 ///
137 /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
138 /// once started, CPU work cannot be cancelled.
139 pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
140 where
141 // Unlike scheduling futures, the CPU task should have a static lifetime because it
142 // doesn't need to access to handle to spawn more work.
143 F: FnOnce() -> R + Send + 'static,
144 R: Send + 'static,
145 {
146 let (send, recv) = oneshot::channel();
147 let span = tracing::Span::current();
148 let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
149 let _guard = span.enter();
150 // Optimistically avoid the work if the result won't be used.
151 if !send.is_closed() {
152 // Task::detach allows the receiver to be dropped, so we ignore send errors.
153 drop(send.send(f()));
154 }
155 }));
156 Task {
157 recv: recv.into_future(),
158 abort_handle: Some(abort_handle),
159 }
160 }
161
162 /// Spawn a blocking I/O task for execution on the runtime.
163 pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
164 where
165 F: FnOnce() -> R + Send + 'static,
166 R: Send + 'static,
167 {
168 let (send, recv) = oneshot::channel();
169 let span = tracing::Span::current();
170 let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
171 let _guard = span.enter();
172 // Optimistically avoid the work if the result won't be used.
173 if !send.is_closed() {
174 // Task::detach allows the receiver to be dropped, so we ignore send errors.
175 drop(send.send(f()));
176 }
177 }));
178 Task {
179 recv: recv.into_future(),
180 abort_handle: Some(abort_handle),
181 }
182 }
183}
184
185/// A handle to a spawned Task.
186///
187/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
188/// continue running in the background, call [`Task::detach`].
189#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
190pub struct Task<T> {
191 recv: oneshot::AsyncReceiver<T>,
192 abort_handle: Option<AbortHandleRef>,
193}
194
195impl<T> Task<T> {
196 /// Detach the task, allowing it to continue running in the background after being dropped.
197 /// This is only possible if the underlying runtime has a 'static lifetime.
198 pub fn detach(mut self) {
199 drop(self.abort_handle.take());
200 }
201}
202
203impl<T> Future for Task<T> {
204 type Output = T;
205
206 #[expect(clippy::panic)]
207 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
208 match ready!(self.recv.poll_unpin(cx)) {
209 Ok(result) => Poll::Ready(result),
210 Err(_recv_err) => {
211 // If the other end of the channel was dropped, it means the runtime dropped
212 // the future without ever completing it. If the caller aborted this task by
213 // dropping it, then they wouldn't be able to poll it anymore.
214 // So we consider a closed channel to be a Runtime programming error and therefore
215 // we panic.
216
217 // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
218 panic!("Runtime dropped task without completing it, likely it panicked")
219 }
220 }
221 }
222}
223
224impl<T> Drop for Task<T> {
225 fn drop(&mut self) {
226 // Optimistically abort the task if it's still running.
227 if let Some(handle) = self.abort_handle.take() {
228 handle.abort();
229 }
230 }
231}