vortex_io/runtime/handle.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use tracing::Instrument;
13use vortex_error::vortex_panic;
14
15use crate::runtime::AbortHandleRef;
16use crate::runtime::Executor;
17
18/// A handle to an active Vortex runtime.
19///
20/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
21/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
22///
23/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
24/// runtime has been dropped, then any requests to spawn new tasks will panic.
25#[derive(Clone)]
26pub struct Handle {
27 runtime: Weak<dyn Executor>,
28}
29
30impl Handle {
31 pub fn new(runtime: Weak<dyn Executor>) -> Self {
32 Self { runtime }
33 }
34
35 fn runtime(&self) -> Arc<dyn Executor> {
36 self.runtime.upgrade().unwrap_or_else(|| {
37 vortex_panic!("Attempted to use a Handle after its runtime was dropped")
38 })
39 }
40
41 /// Returns a handle to the current runtime, if such a reasonable choice exists.
42 ///
43 /// For example, if called from within a Tokio context this will return a
44 /// `TokioRuntime` handle.
45 pub fn find() -> Option<Self> {
46 #[cfg(feature = "tokio")]
47 {
48 use tokio::runtime::Handle as TokioHandle;
49
50 use crate::runtime::tokio::TokioRuntime;
51 if TokioHandle::try_current().is_ok() {
52 return Some(TokioRuntime::current());
53 }
54 }
55
56 None
57 }
58
59 /// Spawn a new future onto the runtime.
60 ///
61 /// These futures are expected to not perform expensive CPU work and instead simply schedule
62 /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
63 ///
64 /// See [`Task`] for details on cancelling or detaching the spawned task.
65 pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
66 where
67 Fut: Future<Output = R> + Send + 'static,
68 R: Send + 'static,
69 {
70 let (send, recv) = oneshot::channel();
71 let span = tracing::Span::current();
72 let abort_handle = self.runtime().spawn(
73 async move {
74 // Task::detach allows the receiver to be dropped, so we ignore send errors.
75 drop(send.send(f.await));
76 }
77 .instrument(span)
78 .boxed(),
79 );
80 Task {
81 recv: recv.into_future(),
82 abort_handle: Some(abort_handle),
83 }
84 }
85
86 /// A helper function to avoid manually cloning the handle when spawning nested tasks.
87 pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
88 where
89 F: FnOnce(Handle) -> Fut,
90 Fut: Future<Output = R> + Send + 'static,
91 R: Send + 'static,
92 {
93 self.spawn(f(Handle::new(Weak::clone(&self.runtime))))
94 }
95
96 /// Spawn a new I/O future onto the runtime.
97 ///
98 /// See [`Executor::spawn_io`] for more details about how this future is expected to run.
99 ///
100 // See [`Task`] for details on cancelling or detaching the spawned task.
101 pub fn spawn_io<Fut, R>(&self, f: Fut) -> Task<R>
102 where
103 Fut: Future<Output = R> + Send + 'static,
104 R: Send + 'static,
105 {
106 let (send, recv) = oneshot::channel();
107 let span = tracing::Span::current();
108 let abort_handle = self.runtime().spawn_io(
109 async move {
110 // Task::detach allows the receiver to be dropped, so we ignore send errors.
111 drop(send.send(f.await));
112 }
113 .instrument(span)
114 .boxed(),
115 );
116 Task {
117 recv: recv.into_future(),
118 abort_handle: Some(abort_handle),
119 }
120 }
121
122 /// Spawn a CPU-bound task for execution on the runtime.
123 ///
124 /// Note that many runtimes will interleave this work on the same async runtime. See the
125 /// documentation for each runtime implementation for details.
126 ///
127 /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
128 /// once started, CPU work cannot be cancelled.
129 pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
130 where
131 // Unlike scheduling futures, the CPU task should have a static lifetime because it
132 // doesn't need to access to handle to spawn more work.
133 F: FnOnce() -> R + Send + 'static,
134 R: Send + 'static,
135 {
136 let (send, recv) = oneshot::channel();
137 let span = tracing::Span::current();
138 let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
139 let _guard = span.enter();
140 // Optimistically avoid the work if the result won't be used.
141 if !send.is_closed() {
142 // Task::detach allows the receiver to be dropped, so we ignore send errors.
143 drop(send.send(f()));
144 }
145 }));
146 Task {
147 recv: recv.into_future(),
148 abort_handle: Some(abort_handle),
149 }
150 }
151
152 /// Spawn a blocking I/O task for execution on the runtime.
153 pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
154 where
155 F: FnOnce() -> R + Send + 'static,
156 R: Send + 'static,
157 {
158 let (send, recv) = oneshot::channel();
159 let span = tracing::Span::current();
160 let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
161 let _guard = span.enter();
162 // Optimistically avoid the work if the result won't be used.
163 if !send.is_closed() {
164 // Task::detach allows the receiver to be dropped, so we ignore send errors.
165 drop(send.send(f()));
166 }
167 }));
168 Task {
169 recv: recv.into_future(),
170 abort_handle: Some(abort_handle),
171 }
172 }
173}
174
175/// A handle to a spawned Task.
176///
177/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
178/// continue running in the background, call [`Task::detach`].
179#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
180pub struct Task<T> {
181 recv: oneshot::AsyncReceiver<T>,
182 abort_handle: Option<AbortHandleRef>,
183}
184
185impl<T> Task<T> {
186 /// Detach the task, allowing it to continue running in the background after being dropped.
187 /// This is only possible if the underlying runtime has a 'static lifetime.
188 pub fn detach(mut self) {
189 drop(self.abort_handle.take());
190 }
191}
192
193impl<T> Future for Task<T> {
194 type Output = T;
195
196 #[expect(clippy::panic)]
197 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
198 match ready!(self.recv.poll_unpin(cx)) {
199 Ok(result) => Poll::Ready(result),
200 Err(_recv_err) => {
201 // If the other end of the channel was dropped, it means the runtime dropped
202 // the future without ever completing it. If the caller aborted this task by
203 // dropping it, then they wouldn't be able to poll it anymore.
204 // So we consider a closed channel to be a Runtime programming error and therefore
205 // we panic.
206
207 // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
208 panic!("Runtime dropped task without completing it, likely it panicked")
209 }
210 }
211 }
212}
213
214impl<T> Drop for Task<T> {
215 fn drop(&mut self) {
216 // Optimistically abort the task if it's still running.
217 if let Some(handle) = self.abort_handle.take() {
218 handle.abort();
219 }
220 }
221}