vortex_io/runtime/handle.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::{Arc, Weak};
6use std::task::{Context, Poll, ready};
7
8use futures::channel::mpsc;
9use futures::{FutureExt, StreamExt};
10use vortex_error::{VortexResult, vortex_panic};
11use vortex_metrics::VortexMetrics;
12
13use crate::file::{FileRead, IntoReadSource, IoRequestStream};
14use crate::runtime::{AbortHandleRef, Executor, IoTask};
15
16/// A handle to an active Vortex runtime.
17///
18/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
19/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
20///
21/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
22/// runtime has been dropped, then any requests to spawn new tasks will panic.
23#[derive(Clone)]
24pub struct Handle {
25 runtime: Weak<dyn Executor>,
26}
27
28impl Handle {
29 pub(crate) fn new(runtime: Weak<dyn Executor>) -> Self {
30 Self { runtime }
31 }
32
33 fn runtime(&self) -> Arc<dyn Executor> {
34 self.runtime.upgrade().unwrap_or_else(|| {
35 vortex_panic!("Attempted to use a Handle after its runtime was dropped")
36 })
37 }
38
39 /// Returns a handle to the current runtime, if such a reasonable choice exists.
40 ///
41 /// For example, if called from within a Tokio context this will return a
42 /// `TokioRuntime` handle.
43 pub fn find() -> Option<Self> {
44 #[cfg(feature = "tokio")]
45 {
46 use tokio::runtime::Handle as TokioHandle;
47 if TokioHandle::try_current().is_ok() {
48 return Some(crate::runtime::tokio::TokioRuntime::current());
49 }
50 }
51
52 None
53 }
54
55 /// Spawn a new future onto the runtime.
56 ///
57 /// These futures are expected to not perform expensive CPU work and instead simply schedule
58 /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
59 ///
60 /// See [`Task`] for details on cancelling or detaching the spawned task.
61 pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
62 where
63 Fut: Future<Output = R> + Send + 'static,
64 R: Send + 'static,
65 {
66 let (send, recv) = oneshot::channel();
67 let abort_handle = self.runtime().spawn(
68 async move {
69 // Task::detach allows the receiver to be dropped, so we ignore send errors.
70 drop(send.send(f.await));
71 }
72 .boxed(),
73 );
74 Task {
75 recv,
76 abort_handle: Some(abort_handle),
77 }
78 }
79
80 /// A helper function to avoid manually cloning the handle when spawning nested tasks.
81 pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
82 where
83 F: FnOnce(Handle) -> Fut,
84 Fut: Future<Output = R> + Send + 'static,
85 R: Send + 'static,
86 {
87 self.spawn(f(Handle::new(self.runtime.clone())))
88 }
89
90 /// Spawn a CPU-bound task for execution on the runtime.
91 ///
92 /// Note that many runtimes will interleave this work on the same async runtime. See the
93 /// documentation for each runtime implementation for details.
94 ///
95 /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
96 /// once started, CPU work cannot be cancelled.
97 pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
98 where
99 // Unlike scheduling futures, the CPU task should have a static lifetime because it
100 // doesn't need to access to handle to spawn more work.
101 F: FnOnce() -> R + Send + 'static,
102 R: Send + 'static,
103 {
104 let (send, recv) = oneshot::channel();
105 let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
106 // Optimistically avoid the work if the result won't be used.
107 if !send.is_closed() {
108 // Task::detach allows the receiver to be dropped, so we ignore send errors.
109 drop(send.send(f()));
110 }
111 }));
112 Task {
113 recv,
114 abort_handle: Some(abort_handle),
115 }
116 }
117
118 /// Spawn a blocking I/O task for execution on the runtime.
119 pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
120 where
121 F: FnOnce() -> R + Send + 'static,
122 R: Send + 'static,
123 {
124 let (send, recv) = oneshot::channel();
125 let abort_handle = self.runtime().spawn_blocking(Box::new(move || {
126 // Optimistically avoid the work if the result won't be used.
127 if !send.is_closed() {
128 // Task::detach allows the receiver to be dropped, so we ignore send errors.
129 drop(send.send(f()));
130 }
131 }));
132 Task {
133 recv,
134 abort_handle: Some(abort_handle),
135 }
136 }
137
138 /// Open a file for I/O on this runtime.
139 pub fn open_read<S: IntoReadSource>(
140 &self,
141 source: S,
142 metrics: VortexMetrics,
143 ) -> VortexResult<FileRead> {
144 let source = source.into_read_source(self.clone())?;
145
146 let (send, recv) = mpsc::unbounded();
147
148 let read = FileRead::new(source.uri().clone(), source.size(), send);
149
150 let stream =
151 IoRequestStream::new(StreamExt::boxed(recv), source.coalesce_window(), metrics).boxed();
152
153 self.runtime().spawn_io(IoTask::new(source, stream));
154
155 Ok(read)
156 }
157}
158
159/// A handle to a spawned Task.
160///
161/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
162/// continue running in the background, call [`Task::detach`].
163#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
164pub struct Task<T> {
165 recv: oneshot::Receiver<T>,
166 abort_handle: Option<AbortHandleRef>,
167}
168
169impl<T> Task<T> {
170 /// Detach the task, allowing it to continue running in the background after being dropped.
171 /// This is only possible if the underlying runtime has a 'static lifetime.
172 pub fn detach(mut self) {
173 drop(self.abort_handle.take());
174 }
175}
176
177impl<T> Future for Task<T> {
178 type Output = T;
179
180 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181 match ready!(self.recv.poll_unpin(cx)) {
182 Ok(result) => Poll::Ready(result),
183 Err(recv_err) => {
184 // If the other end of the channel was dropped, it means the runtime dropped
185 // the future without ever completing it. If the caller aborted this task by
186 // dropping it, then they wouldn't be able to poll it anymore.
187 // So we consider a closed channel to be a Runtime programming error and therefore
188 // we panic.
189 vortex_panic!(
190 "Runtime dropped task without completing it, likely it panicked: {recv_err}"
191 )
192 }
193 }
194 }
195}
196
197impl<T> Drop for Task<T> {
198 fn drop(&mut self) {
199 // Optimistically abort the task if it's still running.
200 if let Some(handle) = self.abort_handle.take() {
201 handle.abort();
202 }
203 }
204}