Skip to main content

vortex_io/runtime/
handle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use tracing::Instrument;
13use vortex_error::vortex_panic;
14
15use crate::runtime::AbortHandleRef;
16use crate::runtime::Executor;
17
18/// A handle to an active Vortex runtime.
19///
20/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
21/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
22///
23/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
24/// runtime has been dropped, then any requests to spawn new tasks will panic.
25#[derive(Clone)]
26pub struct Handle {
27    runtime: Weak<dyn Executor>,
28}
29
30impl Handle {
31    pub fn new(runtime: Weak<dyn Executor>) -> Self {
32        Self { runtime }
33    }
34
35    fn runtime(&self) -> Arc<dyn Executor> {
36        self.runtime.upgrade().unwrap_or_else(|| {
37            vortex_panic!("Attempted to use a Handle after its runtime was dropped")
38        })
39    }
40
41    /// Returns a handle to the current runtime, if such a reasonable choice exists.
42    ///
43    /// For example, if called from within a Tokio context this will return a
44    /// `TokioRuntime` handle.
45    pub fn find() -> Option<Self> {
46        #[cfg(feature = "tokio")]
47        {
48            use tokio::runtime::Handle as TokioHandle;
49            if TokioHandle::try_current().is_ok() {
50                return Some(crate::runtime::tokio::TokioRuntime::current());
51            }
52        }
53
54        None
55    }
56
57    /// Spawn a new future onto the runtime.
58    ///
59    /// These futures are expected to not perform expensive CPU work and instead simply schedule
60    /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
61    ///
62    /// See [`Task`] for details on cancelling or detaching the spawned task.
63    pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
64    where
65        Fut: Future<Output = R> + Send + 'static,
66        R: Send + 'static,
67    {
68        let (send, recv) = oneshot::channel();
69        let span = tracing::Span::current();
70        let abort_handle = self.runtime().spawn(
71            async move {
72                // Task::detach allows the receiver to be dropped, so we ignore send errors.
73                drop(send.send(f.await));
74            }
75            .instrument(span)
76            .boxed(),
77        );
78        Task {
79            recv: recv.into_future(),
80            abort_handle: Some(abort_handle),
81        }
82    }
83
84    /// A helper function to avoid manually cloning the handle when spawning nested tasks.
85    pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
86    where
87        F: FnOnce(Handle) -> Fut,
88        Fut: Future<Output = R> + Send + 'static,
89        R: Send + 'static,
90    {
91        self.spawn(f(Handle::new(self.runtime.clone())))
92    }
93
94    /// Spawn a CPU-bound task for execution on the runtime.
95    ///
96    /// Note that many runtimes will interleave this work on the same async runtime. See the
97    /// documentation for each runtime implementation for details.
98    ///
99    /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
100    /// once started, CPU work cannot be cancelled.
101    pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
102    where
103        // Unlike scheduling futures, the CPU task should have a static lifetime because it
104        // doesn't need to access to handle to spawn more work.
105        F: FnOnce() -> R + Send + 'static,
106        R: Send + 'static,
107    {
108        let (send, recv) = oneshot::channel();
109        let span = tracing::Span::current();
110        let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
111            let _guard = span.enter();
112            // Optimistically avoid the work if the result won't be used.
113            if !send.is_closed() {
114                // Task::detach allows the receiver to be dropped, so we ignore send errors.
115                drop(send.send(f()));
116            }
117        }));
118        Task {
119            recv: recv.into_future(),
120            abort_handle: Some(abort_handle),
121        }
122    }
123
124    /// Spawn a blocking I/O task for execution on the runtime.
125    pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
126    where
127        F: FnOnce() -> R + Send + 'static,
128        R: Send + 'static,
129    {
130        let (send, recv) = oneshot::channel();
131        let span = tracing::Span::current();
132        let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || {
133            let _guard = span.enter();
134            // Optimistically avoid the work if the result won't be used.
135            if !send.is_closed() {
136                // Task::detach allows the receiver to be dropped, so we ignore send errors.
137                drop(send.send(f()));
138            }
139        }));
140        Task {
141            recv: recv.into_future(),
142            abort_handle: Some(abort_handle),
143        }
144    }
145}
146
147/// A handle to a spawned Task.
148///
149/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
150/// continue running in the background, call [`Task::detach`].
151#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
152pub struct Task<T> {
153    recv: oneshot::AsyncReceiver<T>,
154    abort_handle: Option<AbortHandleRef>,
155}
156
157impl<T> Task<T> {
158    /// Detach the task, allowing it to continue running in the background after being dropped.
159    /// This is only possible if the underlying runtime has a 'static lifetime.
160    pub fn detach(mut self) {
161        drop(self.abort_handle.take());
162    }
163}
164
165impl<T> Future for Task<T> {
166    type Output = T;
167
168    #[allow(clippy::panic)]
169    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170        match ready!(self.recv.poll_unpin(cx)) {
171            Ok(result) => Poll::Ready(result),
172            Err(_recv_err) => {
173                // If the other end of the channel was dropped, it means the runtime dropped
174                // the future without ever completing it. If the caller aborted this task by
175                // dropping it, then they wouldn't be able to poll it anymore.
176                // So we consider a closed channel to be a Runtime programming error and therefore
177                // we panic.
178
179                // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
180                panic!("Runtime dropped task without completing it, likely it panicked")
181            }
182        }
183    }
184}
185
186impl<T> Drop for Task<T> {
187    fn drop(&mut self) {
188        // Optimistically abort the task if it's still running.
189        if let Some(handle) = self.abort_handle.take() {
190            handle.abort();
191        }
192    }
193}