vortex_io/runtime/handle.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use vortex_error::vortex_panic;
13
14use crate::runtime::AbortHandleRef;
15use crate::runtime::Executor;
16
17/// A handle to an active Vortex runtime.
18///
19/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
20/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
21///
22/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
23/// runtime has been dropped, then any requests to spawn new tasks will panic.
24#[derive(Clone)]
25pub struct Handle {
26 runtime: Weak<dyn Executor>,
27}
28
29impl Handle {
30 pub(crate) fn new(runtime: Weak<dyn Executor>) -> Self {
31 Self { runtime }
32 }
33
34 fn runtime(&self) -> Arc<dyn Executor> {
35 self.runtime.upgrade().unwrap_or_else(|| {
36 vortex_panic!("Attempted to use a Handle after its runtime was dropped")
37 })
38 }
39
40 /// Returns a handle to the current runtime, if such a reasonable choice exists.
41 ///
42 /// For example, if called from within a Tokio context this will return a
43 /// `TokioRuntime` handle.
44 pub fn find() -> Option<Self> {
45 #[cfg(feature = "tokio")]
46 {
47 use tokio::runtime::Handle as TokioHandle;
48 if TokioHandle::try_current().is_ok() {
49 return Some(crate::runtime::tokio::TokioRuntime::current());
50 }
51 }
52
53 None
54 }
55
56 /// Spawn a new future onto the runtime.
57 ///
58 /// These futures are expected to not perform expensive CPU work and instead simply schedule
59 /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
60 ///
61 /// See [`Task`] for details on cancelling or detaching the spawned task.
62 pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
63 where
64 Fut: Future<Output = R> + Send + 'static,
65 R: Send + 'static,
66 {
67 let (send, recv) = oneshot::channel();
68 let abort_handle = self.runtime().spawn(
69 async move {
70 // Task::detach allows the receiver to be dropped, so we ignore send errors.
71 drop(send.send(f.await));
72 }
73 .boxed(),
74 );
75 Task {
76 recv,
77 abort_handle: Some(abort_handle),
78 }
79 }
80
81 /// A helper function to avoid manually cloning the handle when spawning nested tasks.
82 pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
83 where
84 F: FnOnce(Handle) -> Fut,
85 Fut: Future<Output = R> + Send + 'static,
86 R: Send + 'static,
87 {
88 self.spawn(f(Handle::new(self.runtime.clone())))
89 }
90
91 /// Spawn a CPU-bound task for execution on the runtime.
92 ///
93 /// Note that many runtimes will interleave this work on the same async runtime. See the
94 /// documentation for each runtime implementation for details.
95 ///
96 /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
97 /// once started, CPU work cannot be cancelled.
98 pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
99 where
100 // Unlike scheduling futures, the CPU task should have a static lifetime because it
101 // doesn't need to access to handle to spawn more work.
102 F: FnOnce() -> R + Send + 'static,
103 R: Send + 'static,
104 {
105 let (send, recv) = oneshot::channel();
106 let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
107 // Optimistically avoid the work if the result won't be used.
108 if !send.is_closed() {
109 // Task::detach allows the receiver to be dropped, so we ignore send errors.
110 drop(send.send(f()));
111 }
112 }));
113 Task {
114 recv,
115 abort_handle: Some(abort_handle),
116 }
117 }
118
119 /// Spawn a blocking I/O task for execution on the runtime.
120 pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
121 where
122 F: FnOnce() -> R + Send + 'static,
123 R: Send + 'static,
124 {
125 let (send, recv) = oneshot::channel();
126 let abort_handle = self.runtime().spawn_blocking(Box::new(move || {
127 // Optimistically avoid the work if the result won't be used.
128 if !send.is_closed() {
129 // Task::detach allows the receiver to be dropped, so we ignore send errors.
130 drop(send.send(f()));
131 }
132 }));
133 Task {
134 recv,
135 abort_handle: Some(abort_handle),
136 }
137 }
138}
139
140/// A handle to a spawned Task.
141///
142/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
143/// continue running in the background, call [`Task::detach`].
144#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
145pub struct Task<T> {
146 recv: oneshot::Receiver<T>,
147 abort_handle: Option<AbortHandleRef>,
148}
149
150impl<T> Task<T> {
151 /// Detach the task, allowing it to continue running in the background after being dropped.
152 /// This is only possible if the underlying runtime has a 'static lifetime.
153 pub fn detach(mut self) {
154 drop(self.abort_handle.take());
155 }
156}
157
158impl<T> Future for Task<T> {
159 type Output = T;
160
161 #[allow(clippy::panic)]
162 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
163 match ready!(self.recv.poll_unpin(cx)) {
164 Ok(result) => Poll::Ready(result),
165 Err(_recv_err) => {
166 // If the other end of the channel was dropped, it means the runtime dropped
167 // the future without ever completing it. If the caller aborted this task by
168 // dropping it, then they wouldn't be able to poll it anymore.
169 // So we consider a closed channel to be a Runtime programming error and therefore
170 // we panic.
171
172 // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
173 panic!("Runtime dropped task without completing it, likely it panicked")
174 }
175 }
176 }
177}
178
179impl<T> Drop for Task<T> {
180 fn drop(&mut self) {
181 // Optimistically abort the task if it's still running.
182 if let Some(handle) = self.abort_handle.take() {
183 handle.abort();
184 }
185 }
186}