vortex_io/runtime/handle.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use tracing::Instrument;
13use vortex_error::vortex_panic;
14
15use crate::runtime::AbortHandleRef;
16use crate::runtime::Executor;
17
18/// A handle to an active Vortex runtime.
19///
20/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
21/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
22///
23/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
24/// runtime has been dropped, then any requests to spawn new tasks will panic.
25#[derive(Clone)]
26pub struct Handle {
27 runtime: Weak<dyn Executor>,
28}
29
30impl Handle {
31 pub fn new(runtime: Weak<dyn Executor>) -> Self {
32 Self { runtime }
33 }
34
35 fn runtime(&self) -> Arc<dyn Executor> {
36 self.runtime.upgrade().unwrap_or_else(|| {
37 vortex_panic!("Attempted to use a Handle after its runtime was dropped")
38 })
39 }
40
41 /// Returns a handle to the current runtime, if such a reasonable choice exists.
42 ///
43 /// For example, if called from within a Tokio context this will return a
44 /// `TokioRuntime` handle.
45 pub fn find() -> Option<Self> {
46 #[cfg(feature = "tokio")]
47 {
48 use tokio::runtime::Handle as TokioHandle;
49 if TokioHandle::try_current().is_ok() {
50 return Some(crate::runtime::tokio::TokioRuntime::current());
51 }
52 }
53
54 None
55 }
56
57 /// Spawn a new future onto the runtime.
58 ///
59 /// These futures are expected to not perform expensive CPU work and instead simply schedule
60 /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
61 ///
62 /// See [`Task`] for details on cancelling or detaching the spawned task.
63 pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
64 where
65 Fut: Future<Output = R> + Send + 'static,
66 R: Send + 'static,
67 {
68 let (send, recv) = oneshot::channel();
69 let span = tracing::Span::current();
70 let abort_handle = self.runtime().spawn(
71 async move {
72 // Task::detach allows the receiver to be dropped, so we ignore send errors.
73 drop(send.send(f.await));
74 }
75 .instrument(span)
76 .boxed(),
77 );
78 Task {
79 recv: recv.into_future(),
80 abort_handle: Some(abort_handle),
81 }
82 }
83
84 /// A helper function to avoid manually cloning the handle when spawning nested tasks.
85 pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
86 where
87 F: FnOnce(Handle) -> Fut,
88 Fut: Future<Output = R> + Send + 'static,
89 R: Send + 'static,
90 {
91 self.spawn(f(Handle::new(self.runtime.clone())))
92 }
93
94 /// Spawn a CPU-bound task for execution on the runtime.
95 ///
96 /// Note that many runtimes will interleave this work on the same async runtime. See the
97 /// documentation for each runtime implementation for details.
98 ///
99 /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
100 /// once started, CPU work cannot be cancelled.
101 pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
102 where
103 // Unlike scheduling futures, the CPU task should have a static lifetime because it
104 // doesn't need to access to handle to spawn more work.
105 F: FnOnce() -> R + Send + 'static,
106 R: Send + 'static,
107 {
108 let (send, recv) = oneshot::channel();
109 let span = tracing::Span::current();
110 let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
111 let _guard = span.enter();
112 // Optimistically avoid the work if the result won't be used.
113 if !send.is_closed() {
114 // Task::detach allows the receiver to be dropped, so we ignore send errors.
115 drop(send.send(f()));
116 }
117 }));
118 Task {
119 recv: recv.into_future(),
120 abort_handle: Some(abort_handle),
121 }
122 }
123
124 /// Spawn a blocking I/O task for execution on the runtime.
125 pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
126 where
127 F: FnOnce() -> R + Send + 'static,
128 R: Send + 'static,
129 {
130 let (send, recv) = oneshot::channel();
131 let span = tracing::Span::current();
132 let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
133 let _guard = span.enter();
134 // Optimistically avoid the work if the result won't be used.
135 if !send.is_closed() {
136 // Task::detach allows the receiver to be dropped, so we ignore send errors.
137 drop(send.send(f()));
138 }
139 }));
140 Task {
141 recv: recv.into_future(),
142 abort_handle: Some(abort_handle),
143 }
144 }
145}
146
147/// A handle to a spawned Task.
148///
149/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
150/// continue running in the background, call [`Task::detach`].
151#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
152pub struct Task<T> {
153 recv: oneshot::AsyncReceiver<T>,
154 abort_handle: Option<AbortHandleRef>,
155}
156
157impl<T> Task<T> {
158 /// Detach the task, allowing it to continue running in the background after being dropped.
159 /// This is only possible if the underlying runtime has a 'static lifetime.
160 pub fn detach(mut self) {
161 drop(self.abort_handle.take());
162 }
163}
164
165impl<T> Future for Task<T> {
166 type Output = T;
167
168 #[allow(clippy::panic)]
169 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170 match ready!(self.recv.poll_unpin(cx)) {
171 Ok(result) => Poll::Ready(result),
172 Err(_recv_err) => {
173 // If the other end of the channel was dropped, it means the runtime dropped
174 // the future without ever completing it. If the caller aborted this task by
175 // dropping it, then they wouldn't be able to poll it anymore.
176 // So we consider a closed channel to be a Runtime programming error and therefore
177 // we panic.
178
179 // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
180 panic!("Runtime dropped task without completing it, likely it panicked")
181 }
182 }
183 }
184}
185
186impl<T> Drop for Task<T> {
187 fn drop(&mut self) {
188 // Optimistically abort the task if it's still running.
189 if let Some(handle) = self.abort_handle.take() {
190 handle.abort();
191 }
192 }
193}