Skip to main content

vortex_io/runtime/
handle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use vortex_error::vortex_panic;
13
14use crate::runtime::AbortHandleRef;
15use crate::runtime::Executor;
16
17/// A handle to an active Vortex runtime.
18///
19/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
20/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
21///
22/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
23/// runtime has been dropped, then any requests to spawn new tasks will panic.
24#[derive(Clone)]
25pub struct Handle {
26    runtime: Weak<dyn Executor>,
27}
28
29impl Handle {
30    pub(crate) fn new(runtime: Weak<dyn Executor>) -> Self {
31        Self { runtime }
32    }
33
34    fn runtime(&self) -> Arc<dyn Executor> {
35        self.runtime.upgrade().unwrap_or_else(|| {
36            vortex_panic!("Attempted to use a Handle after its runtime was dropped")
37        })
38    }
39
40    /// Returns a handle to the current runtime, if such a reasonable choice exists.
41    ///
42    /// For example, if called from within a Tokio context this will return a
43    /// `TokioRuntime` handle.
44    pub fn find() -> Option<Self> {
45        #[cfg(feature = "tokio")]
46        {
47            use tokio::runtime::Handle as TokioHandle;
48            if TokioHandle::try_current().is_ok() {
49                return Some(crate::runtime::tokio::TokioRuntime::current());
50            }
51        }
52
53        None
54    }
55
56    /// Spawn a new future onto the runtime.
57    ///
58    /// These futures are expected to not perform expensive CPU work and instead simply schedule
59    /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
60    ///
61    /// See [`Task`] for details on cancelling or detaching the spawned task.
62    pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
63    where
64        Fut: Future<Output = R> + Send + 'static,
65        R: Send + 'static,
66    {
67        let (send, recv) = oneshot::channel();
68        let abort_handle = self.runtime().spawn(
69            async move {
70                // Task::detach allows the receiver to be dropped, so we ignore send errors.
71                drop(send.send(f.await));
72            }
73            .boxed(),
74        );
75        Task {
76            recv,
77            abort_handle: Some(abort_handle),
78        }
79    }
80
81    /// A helper function to avoid manually cloning the handle when spawning nested tasks.
82    pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
83    where
84        F: FnOnce(Handle) -> Fut,
85        Fut: Future<Output = R> + Send + 'static,
86        R: Send + 'static,
87    {
88        self.spawn(f(Handle::new(self.runtime.clone())))
89    }
90
91    /// Spawn a CPU-bound task for execution on the runtime.
92    ///
93    /// Note that many runtimes will interleave this work on the same async runtime. See the
94    /// documentation for each runtime implementation for details.
95    ///
96    /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
97    /// once started, CPU work cannot be cancelled.
98    pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
99    where
100        // Unlike scheduling futures, the CPU task should have a static lifetime because it
101        // doesn't need to access to handle to spawn more work.
102        F: FnOnce() -> R + Send + 'static,
103        R: Send + 'static,
104    {
105        let (send, recv) = oneshot::channel();
106        let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
107            // Optimistically avoid the work if the result won't be used.
108            if !send.is_closed() {
109                // Task::detach allows the receiver to be dropped, so we ignore send errors.
110                drop(send.send(f()));
111            }
112        }));
113        Task {
114            recv,
115            abort_handle: Some(abort_handle),
116        }
117    }
118
119    /// Spawn a blocking I/O task for execution on the runtime.
120    pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
121    where
122        F: FnOnce() -> R + Send + 'static,
123        R: Send + 'static,
124    {
125        let (send, recv) = oneshot::channel();
126        let abort_handle = self.runtime().spawn_blocking(Box::new(move || {
127            // Optimistically avoid the work if the result won't be used.
128            if !send.is_closed() {
129                // Task::detach allows the receiver to be dropped, so we ignore send errors.
130                drop(send.send(f()));
131            }
132        }));
133        Task {
134            recv,
135            abort_handle: Some(abort_handle),
136        }
137    }
138}
139
140/// A handle to a spawned Task.
141///
142/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
143/// continue running in the background, call [`Task::detach`].
144#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
145pub struct Task<T> {
146    recv: oneshot::Receiver<T>,
147    abort_handle: Option<AbortHandleRef>,
148}
149
150impl<T> Task<T> {
151    /// Detach the task, allowing it to continue running in the background after being dropped.
152    /// This is only possible if the underlying runtime has a 'static lifetime.
153    pub fn detach(mut self) {
154        drop(self.abort_handle.take());
155    }
156}
157
158impl<T> Future for Task<T> {
159    type Output = T;
160
161    #[allow(clippy::panic)]
162    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
163        match ready!(self.recv.poll_unpin(cx)) {
164            Ok(result) => Poll::Ready(result),
165            Err(_recv_err) => {
166                // If the other end of the channel was dropped, it means the runtime dropped
167                // the future without ever completing it. If the caller aborted this task by
168                // dropping it, then they wouldn't be able to poll it anymore.
169                // So we consider a closed channel to be a Runtime programming error and therefore
170                // we panic.
171
172                // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
173                panic!("Runtime dropped task without completing it, likely it panicked")
174            }
175        }
176    }
177}
178
179impl<T> Drop for Task<T> {
180    fn drop(&mut self) {
181        // Optimistically abort the task if it's still running.
182        if let Some(handle) = self.abort_handle.take() {
183            handle.abort();
184        }
185    }
186}