vortex_io/runtime/
handle.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::sync::{Arc, Weak};
6use std::task::{Context, Poll, ready};
7
8use futures::channel::mpsc;
9use futures::{FutureExt, StreamExt};
10use vortex_error::{VortexResult, vortex_panic};
11use vortex_metrics::VortexMetrics;
12
13use crate::file::{FileRead, IntoReadSource, IoRequestStream};
14use crate::runtime::{AbortHandleRef, Executor, IoTask};
15
16/// A handle to an active Vortex runtime.
17///
18/// Users should obtain a handle from one of the Vortex runtime's and use it to spawn new async
19/// tasks, blocking I/O tasks, CPU-heavy tasks, or to open files for reading or writing.
20///
21/// Note that a [`Handle`] is a weak reference to the underlying runtime. If the associated
22/// runtime has been dropped, then any requests to spawn new tasks will panic.
23#[derive(Clone)]
24pub struct Handle {
25    runtime: Weak<dyn Executor>,
26}
27
28impl Handle {
29    pub(crate) fn new(runtime: Weak<dyn Executor>) -> Self {
30        Self { runtime }
31    }
32
33    fn runtime(&self) -> Arc<dyn Executor> {
34        self.runtime.upgrade().unwrap_or_else(|| {
35            vortex_panic!("Attempted to use a Handle after its runtime was dropped")
36        })
37    }
38
39    /// Returns a handle to the current runtime, if such a reasonable choice exists.
40    ///
41    /// For example, if called from within a Tokio context this will return a
42    /// `TokioRuntime` handle.
43    pub fn find() -> Option<Self> {
44        #[cfg(feature = "tokio")]
45        {
46            use tokio::runtime::Handle as TokioHandle;
47            if TokioHandle::try_current().is_ok() {
48                return Some(crate::runtime::tokio::TokioRuntime::current());
49            }
50        }
51
52        None
53    }
54
55    /// Spawn a new future onto the runtime.
56    ///
57    /// These futures are expected to not perform expensive CPU work and instead simply schedule
58    /// either CPU tasks or I/O tasks. See [`Handle::spawn_cpu`] for spawning CPU-bound work.
59    ///
60    /// See [`Task`] for details on cancelling or detaching the spawned task.
61    pub fn spawn<Fut, R>(&self, f: Fut) -> Task<R>
62    where
63        Fut: Future<Output = R> + Send + 'static,
64        R: Send + 'static,
65    {
66        let (send, recv) = oneshot::channel();
67        let abort_handle = self.runtime().spawn(
68            async move {
69                // Task::detach allows the receiver to be dropped, so we ignore send errors.
70                drop(send.send(f.await));
71            }
72            .boxed(),
73        );
74        Task {
75            recv,
76            abort_handle: Some(abort_handle),
77        }
78    }
79
80    /// A helper function to avoid manually cloning the handle when spawning nested tasks.
81    pub fn spawn_nested<F, Fut, R>(&self, f: F) -> Task<R>
82    where
83        F: FnOnce(Handle) -> Fut,
84        Fut: Future<Output = R> + Send + 'static,
85        R: Send + 'static,
86    {
87        self.spawn(f(Handle::new(self.runtime.clone())))
88    }
89
90    /// Spawn a CPU-bound task for execution on the runtime.
91    ///
92    /// Note that many runtimes will interleave this work on the same async runtime. See the
93    /// documentation for each runtime implementation for details.
94    ///
95    /// See [`Task`] for details on cancelling or detaching the spawned work, although note that
96    /// once started, CPU work cannot be cancelled.
97    pub fn spawn_cpu<F, R>(&self, f: F) -> Task<R>
98    where
99        // Unlike scheduling futures, the CPU task should have a static lifetime because it
100        // doesn't need to access to handle to spawn more work.
101        F: FnOnce() -> R + Send + 'static,
102        R: Send + 'static,
103    {
104        let (send, recv) = oneshot::channel();
105        let abort_handle = self.runtime().spawn_cpu(Box::new(move || {
106            // Optimistically avoid the work if the result won't be used.
107            if !send.is_closed() {
108                // Task::detach allows the receiver to be dropped, so we ignore send errors.
109                drop(send.send(f()));
110            }
111        }));
112        Task {
113            recv,
114            abort_handle: Some(abort_handle),
115        }
116    }
117
118    /// Spawn a blocking I/O task for execution on the runtime.
119    pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
120    where
121        F: FnOnce() -> R + Send + 'static,
122        R: Send + 'static,
123    {
124        let (send, recv) = oneshot::channel();
125        let abort_handle = self.runtime().spawn_blocking(Box::new(move || {
126            // Optimistically avoid the work if the result won't be used.
127            if !send.is_closed() {
128                // Task::detach allows the receiver to be dropped, so we ignore send errors.
129                drop(send.send(f()));
130            }
131        }));
132        Task {
133            recv,
134            abort_handle: Some(abort_handle),
135        }
136    }
137
138    /// Open a file for I/O on this runtime.
139    pub fn open_read<S: IntoReadSource>(
140        &self,
141        source: S,
142        metrics: VortexMetrics,
143    ) -> VortexResult<FileRead> {
144        let source = source.into_read_source(self.clone())?;
145
146        let (send, recv) = mpsc::unbounded();
147
148        let read = FileRead::new(source.uri().clone(), source.size(), send);
149
150        let stream =
151            IoRequestStream::new(StreamExt::boxed(recv), source.coalesce_window(), metrics).boxed();
152
153        self.runtime().spawn_io(IoTask::new(source, stream));
154
155        Ok(read)
156    }
157}
158
159/// A handle to a spawned Task.
160///
161/// If this handle is dropped, the task is cancelled where possible. In order to allow the task to
162/// continue running in the background, call [`Task::detach`].
163#[must_use = "When a Task is dropped without being awaited, it is cancelled"]
164pub struct Task<T> {
165    recv: oneshot::Receiver<T>,
166    abort_handle: Option<AbortHandleRef>,
167}
168
169impl<T> Task<T> {
170    /// Detach the task, allowing it to continue running in the background after being dropped.
171    /// This is only possible if the underlying runtime has a 'static lifetime.
172    pub fn detach(mut self) {
173        drop(self.abort_handle.take());
174    }
175}
176
177impl<T> Future for Task<T> {
178    type Output = T;
179
180    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181        match ready!(self.recv.poll_unpin(cx)) {
182            Ok(result) => Poll::Ready(result),
183            Err(recv_err) => {
184                // If the other end of the channel was dropped, it means the runtime dropped
185                // the future without ever completing it. If the caller aborted this task by
186                // dropping it, then they wouldn't be able to poll it anymore.
187                // So we consider a closed channel to be a Runtime programming error and therefore
188                // we panic.
189                vortex_panic!(
190                    "Runtime dropped task without completing it, likely it panicked: {recv_err}"
191                )
192            }
193        }
194    }
195}
196
197impl<T> Drop for Task<T> {
198    fn drop(&mut self) {
199        // Optimistically abort the task if it's still running.
200        if let Some(handle) = self.abort_handle.take() {
201            handle.abort();
202        }
203    }
204}