vortex_io/runtime/
handle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Weak;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::FutureExt;
12use futures::StreamExt;
13use futures::channel::mpsc;
14use vortex_error::VortexResult;
15use vortex_error::vortex_panic;
16use vortex_metrics::VortexMetrics;
17
18use crate::file::FileRead;
19use crate::file::IntoReadSource;
20use crate::file::IoRequestStream;
21use crate::runtime::AbortHandleRef;
22use crate::runtime::Executor;
23use crate::runtime::IoTask;
24
25/// A handle to an active Vortex runtime.
26///
27/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
28/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
29///
30/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
31/// runtime has been dropped, then any requests to spawn new tasks will panic.
32#[derive(Clone)]
33pub struct Handle {
34    runtime: Weak<dyn Executor>,
35}
36
37impl Handle {
38    pub(crate) fn new(runtime: Weak<dyn Executor>) -> Self {
39        Self { runtime }
40    }
41
42    fn runtime(&self) -> Arc<dyn Executor> {
43        self.runtime.upgrade().unwrap_or_else(|| {
44            vortex_panic!("Attempted to use a Handle after its runtime was dropped")
45        })
46    }
47
48    /// Returns a handle to the current runtime, if such a reasonable choice exists.
49    ///
50    /// For example, if called from within a Tokio context this will return a
51    /// `TokioRuntime` handle.
52    pub fn find() -> Option<Self> {
53        #[cfg(feature = "tokio")]
54        {
55            use tokio::runtime::Handle as TokioHandle;
56            if TokioHandle::try_current().is_ok() {
57                return Some(crate::runtime::tokio::TokioRuntime::current());
58            }
59        }
60
61        None
62    }
63
64    /// Spawn a new future onto the runtime.
65    ///
66    /// These futures are expected to not perform expensive CPU work and instead simply schedule
67    /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
68    ///
69    /// See [`Task`] for details on cancelling or detaching the spawned task.
70    pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
71    where
72        Fut: Future<Output = R> + Send + 'static,
73        R: Send + 'static,
74    {
75        let (send, recv) = oneshot::channel();
76        let abort_handle = self.runtime().spawn(
77            async move {
78                // Task::detach allows the receiver to be dropped, so we ignore send errors.
79                drop(send.send(f.await));
80            }
81            .boxed(),
82        );
83        Task {
84            recv,
85            abort_handle: Some(abort_handle),
86        }
87    }
88
89    /// A helper function to avoid manually cloning the handle when spawning nested tasks.
90    pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
91    where
92        F: FnOnce(Handle) -> Fut,
93        Fut: Future<Output = R> + Send + 'static,
94        R: Send + 'static,
95    {
96        self.spawn(f(Handle::new(self.runtime.clone())))
97    }
98
99    /// Spawn a CPU-bound task for execution on the runtime.
100    ///
101    /// Note that many runtimes will interleave this work on the same async runtime. See the
102    /// documentation for each runtime implementation for details.
103    ///
104    /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
105    /// once started, CPU work cannot be cancelled.
106    pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
107    where
108        // Unlike scheduling futures, the CPU task should have a static lifetime because it
109        // doesn't need to access to handle to spawn more work.
110        F: FnOnce() -> R + Send + 'static,
111        R: Send + 'static,
112    {
113        let (send, recv) = oneshot::channel();
114        let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
115            // Optimistically avoid the work if the result won't be used.
116            if !send.is_closed() {
117                // Task::detach allows the receiver to be dropped, so we ignore send errors.
118                drop(send.send(f()));
119            }
120        }));
121        Task {
122            recv,
123            abort_handle: Some(abort_handle),
124        }
125    }
126
127    /// Spawn a blocking I/O task for execution on the runtime.
128    pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
129    where
130        F: FnOnce() -> R + Send + 'static,
131        R: Send + 'static,
132    {
133        let (send, recv) = oneshot::channel();
134        let abort_handle = self.runtime().spawn_blocking(Box::new(move || {
135            // Optimistically avoid the work if the result won't be used.
136            if !send.is_closed() {
137                // Task::detach allows the receiver to be dropped, so we ignore send errors.
138                drop(send.send(f()));
139            }
140        }));
141        Task {
142            recv,
143            abort_handle: Some(abort_handle),
144        }
145    }
146
147    /// Open a file for I/O on this runtime.
148    pub fn open_read<S: IntoReadSource>(
149        &self,
150        source: S,
151        metrics: VortexMetrics,
152    ) -> VortexResult<FileRead> {
153        let source = source.into_read_source(self.clone())?;
154
155        let (send, recv) = mpsc::unbounded();
156
157        let read = FileRead::new(source.uri().clone(), source.size(), send);
158
159        let stream =
160            IoRequestStream::new(StreamExt::boxed(recv), source.coalesce_window(), metrics).boxed();
161
162        self.runtime().spawn_io(IoTask::new(source, stream));
163
164        Ok(read)
165    }
166}
167
168/// A handle to a spawned Task.
169///
170/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
171/// continue running in the background, call [`Task::detach`].
172#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
173pub struct Task<T> {
174    recv: oneshot::Receiver<T>,
175    abort_handle: Option<AbortHandleRef>,
176}
177
178impl<T> Task<T> {
179    /// Detach the task, allowing it to continue running in the background after being dropped.
180    /// This is only possible if the underlying runtime has a 'static lifetime.
181    pub fn detach(mut self) {
182        drop(self.abort_handle.take());
183    }
184}
185
186impl<T> Future for Task<T> {
187    type Output = T;
188
189    #[allow(clippy::panic)]
190    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191        match ready!(self.recv.poll_unpin(cx)) {
192            Ok(result) => Poll::Ready(result),
193            Err(_recv_err) => {
194                // If the other end of the channel was dropped, it means the runtime dropped
195                // the future without ever completing it. If the caller aborted this task by
196                // dropping it, then they wouldn't be able to poll it anymore.
197                // So we consider a closed channel to be a Runtime programming error and therefore
198                // we panic.
199
200                // NOTE(ngates): we don't use vortex_panic to avoid printing a useless backtrace.
201                panic!("Runtime dropped task without completing it, likely it panicked")
202            }
203        }
204    }
205}
206
207impl<T> Drop for Task<T> {
208    fn drop(&mut self) {
209        // Optimistically abort the task if it's still running.
210        if let Some(handle) = self.abort_handle.take() {
211            handle.abort();
212        }
213    }
214}