vortex_io/runtime/handle.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use futures::StreamExt;
13use futures::channel::mpsc;
14use vortex_error::VortexResult;
15use vortex_error::vortex_panic;
16use vortex_metrics::VortexMetrics;
17
18use crate::file::FileRead;
19use crate::file::IntoReadSource;
20use crate::file::IoRequestStream;
21use crate::runtime::AbortHandleRef;
22use crate::runtime::Executor;
23use crate::runtime::IoTask;
24
25/// A handle to an active Vortex runtime.
26///
27/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
28/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
29///
30/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
31/// runtime has been dropped, then any requests to spawn new tasks will panic.
32#[derive(Clone)]
33pub struct Handle {
34 runtime: Weak<dyn Executor>,
35}
36
37impl Handle {
38 pub(crate) fn new(runtime: Weak<dyn Executor>) -> Self {
39 Self { runtime }
40 }
41
42 fn runtime(&self) -> Arc<dyn Executor> {
43 self.runtime.upgrade().unwrap_or_else(|| {
44 vortex_panic!("Attempted to use a Handle after its runtime was dropped")
45 })
46 }
47
48 /// Returns a handle to the current runtime, if such a reasonable choice exists.
49 ///
50 /// For example, if called from within a Tokio context this will return a
51 /// `TokioRuntime` handle.
52 pub fn find() -> Option<Self> {
53 #[cfg(feature = "tokio")]
54 {
55 use tokio::runtime::Handle as TokioHandle;
56 if TokioHandle::try_current().is_ok() {
57 return Some(crate::runtime::tokio::TokioRuntime::current());
58 }
59 }
60
61 None
62 }
63
64 /// Spawn a new future onto the runtime.
65 ///
66 /// These futures are expected to not perform expensive CPU work and instead simply schedule
67 /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
68 ///
69 /// See [`Task`] for details on cancelling or detaching the spawned task.
70 pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
71 where
72 Fut: Future<Output = R> + Send + 'static,
73 R: Send + 'static,
74 {
75 let (send, recv) = oneshot::channel();
76 let abort_handle = self.runtime().spawn(
77 async move {
78 // Task::detach allows the receiver to be dropped, so we ignore send errors.
79 drop(send.send(f.await));
80 }
81 .boxed(),
82 );
83 Task {
84 recv,
85 abort_handle: Some(abort_handle),
86 }
87 }
88
89 /// A helper function to avoid manually cloning the handle when spawning nested tasks.
90 pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
91 where
92 F: FnOnce(Handle) -> Fut,
93 Fut: Future<Output = R> + Send + 'static,
94 R: Send + 'static,
95 {
96 self.spawn(f(Handle::new(self.runtime.clone())))
97 }
98
99 /// Spawn a CPU-bound task for execution on the runtime.
100 ///
101 /// Note that many runtimes will interleave this work on the same async runtime. See the
102 /// documentation for each runtime implementation for details.
103 ///
104 /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
105 /// once started, CPU work cannot be cancelled.
106 pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
107 where
108 // Unlike scheduling futures, the CPU task should have a static lifetime because it
109 // doesn't need to access to handle to spawn more work.
110 F: FnOnce() -> R + Send + 'static,
111 R: Send + 'static,
112 {
113 let (send, recv) = oneshot::channel();
114 let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
115 // Optimistically avoid the work if the result won't be used.
116 if !send.is_closed() {
117 // Task::detach allows the receiver to be dropped, so we ignore send errors.
118 drop(send.send(f()));
119 }
120 }));
121 Task {
122 recv,
123 abort_handle: Some(abort_handle),
124 }
125 }
126
127 /// Spawn a blocking I/O task for execution on the runtime.
128 pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
129 where
130 F: FnOnce() -> R + Send + 'static,
131 R: Send + 'static,
132 {
133 let (send, recv) = oneshot::channel();
134 let abort_handle = self.runtime().spawn_blocking(Box::new(move || {
135 // Optimistically avoid the work if the result won't be used.
136 if !send.is_closed() {
137 // Task::detach allows the receiver to be dropped, so we ignore send errors.
138 drop(send.send(f()));
139 }
140 }));
141 Task {
142 recv,
143 abort_handle: Some(abort_handle),
144 }
145 }
146
147 /// Open a file for I/O on this runtime.
148 pub fn open_read<S: IntoReadSource>(
149 &self,
150 source: S,
151 metrics: VortexMetrics,
152 ) -> VortexResult<FileRead> {
153 let source = source.into_read_source(self.clone())?;
154
155 let (send, recv) = mpsc::unbounded();
156
157 let read = FileRead::new(source.uri().clone(), source.size(), send);
158
159 let stream =
160 IoRequestStream::new(StreamExt::boxed(recv), source.coalesce_window(), metrics).boxed();
161
162 self.runtime().spawn_io(IoTask::new(source, stream));
163
164 Ok(read)
165 }
166}
167
168/// A handle to a spawned Task.
169///
170/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
171/// continue running in the background, call [`Task::detach`].
172#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
173pub struct Task<T> {
174 recv: oneshot::Receiver<T>,
175 abort_handle: Option<AbortHandleRef>,
176}
177
178impl<T> Task<T> {
179 /// Detach the task, allowing it to continue running in the background after being dropped.
180 /// This is only possible if the underlying runtime has a 'static lifetime.
181 pub fn detach(mut self) {
182 drop(self.abort_handle.take());
183 }
184}
185
186impl<T> Future for Task<T> {
187 type Output = T;
188
189 #[allow(clippy::panic)]
190 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191 match ready!(self.recv.poll_unpin(cx)) {
192 Ok(result) => Poll::Ready(result),
193 Err(_recv_err) => {
194 // If the other end of the channel was dropped, it means the runtime dropped
195 // the future without ever completing it. If the caller aborted this task by
196 // dropping it, then they wouldn't be able to poll it anymore.
197 // So we consider a closed channel to be a Runtime programming error and therefore
198 // we panic.
199
200 // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
201 panic!("Runtime dropped task without completing it, likely it panicked")
202 }
203 }
204 }
205}
206
207impl<T> Drop for Task<T> {
208 fn drop(&mut self) {
209 // Optimistically abort the task if it's still running.
210 if let Some(handle) = self.abort_handle.take() {
211 handle.abort();
212 }
213 }
214}