Skip to main content

vortex_io/compat/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Compatibility layer similar to Rust async-compat that allows non-Tokio runtimes to use
5//! Tokio-based implementations of the core traits, such as the ObjectStore implementations.
6//!
7//! This works in the same way as async-compat, by either pulling a Tokio runtime from the
8//! current context, or by creating a new global single-thread Tokio runtime if one is not found.
9//!
10//! We cannot use async-compat directly because we need to wrap Streams as well as Futures,
11//! and async-compat only supports the latter.
12
13mod filesystem;
14#[cfg(feature = "object_store")]
15mod obj_store;
16mod read_at;
17mod write;
18
19use std::pin::Pin;
20use std::sync::LazyLock;
21use std::task::Context;
22use std::task::Poll;
23use std::thread;
24
25use futures::Stream;
26use pin_project_lite::pin_project;
27use vortex_error::VortexExpect;
28
29/// Get a handle to the current Tokio runtime, or create a new global single-thread runtime if one
30/// is not found.
31///
32/// From
33fn runtime_handle() -> tokio::runtime::Handle {
34    static TOKIO: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
35        thread::Builder::new()
36            .name("vortex-async-compat".into())
37            .spawn(|| TOKIO.block_on(Pending))
38            .vortex_expect("cannot start tokio runtime thread");
39        tokio::runtime::Builder::new_current_thread()
40            .enable_all()
41            .build()
42            .vortex_expect("cannot start tokio runtime")
43    });
44
45    tokio::runtime::Handle::try_current().unwrap_or_else(|_| TOKIO.handle().clone())
46}
47
48struct Pending;
49impl Future for Pending {
50    type Output = ();
51
52    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
53        Poll::Pending
54    }
55}
56
57pin_project! {
58    /// Compatibility adapter for futures and I/O types.
59    #[derive(Clone, Debug)]
60    pub struct Compat<T> {
61        #[pin]
62        inner: Option<T>,
63    }
64
65    impl<T> PinnedDrop for Compat<T> {
66        fn drop(this: Pin<&mut Self>) {
67            if this.inner.is_some() {
68                // If the inner future wasn't moved out using into_inner,
69                // enter the tokio context while the inner value is dropped.
70                let _guard = runtime_handle().enter();
71                this.project().inner.set(None);
72            }
73        }
74    }
75}
76
77impl<T> Compat<T> {
78    /// Create a new Compat wrapper around the given value.
79    pub fn new(inner: T) -> Self {
80        Self { inner: Some(inner) }
81    }
82
83    #[inline]
84    fn inner(&self) -> &T {
85        self.inner
86            .as_ref()
87            .vortex_expect("inner is only None when Compat is about to drop")
88    }
89
90    #[inline]
91    fn inner_mut(&mut self) -> &mut T {
92        self.inner
93            .as_mut()
94            .vortex_expect("inner is only None when Compat is about to drop")
95    }
96
97    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
98        self.project()
99            .inner
100            .as_pin_mut()
101            .vortex_expect("inner is only None when Compat is about to drop")
102    }
103}
104
105#[deny(clippy::missing_trait_methods)]
106impl<T: Future> Future for Compat<T> {
107    type Output = T::Output;
108
109    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110        let _guard = runtime_handle().enter();
111        self.get_pin_mut().poll(cx)
112    }
113}
114
115#[deny(clippy::missing_trait_methods)]
116impl<S: Stream> Stream for Compat<S> {
117    type Item = S::Item;
118
119    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
120        let _guard = runtime_handle().enter();
121        self.get_pin_mut().poll_next(cx)
122    }
123
124    fn size_hint(&self) -> (usize, Option<usize>) {
125        self.inner().size_hint()
126    }
127}