Skip to main content

vortex_io/compat/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Compatibility layer similar to Rust async-compat that allows non-Tokio runtimes to use
5//! Tokio-based implementations of the core traits, such as the ObjectStore implementations.
6//!
7//! This works in the same way as async-compat, by either pulling a Tokio runtime from the
8//! current context, or by creating a new global single-thread Tokio runtime if one is not found.
9//!
10//! We cannot use async-compat directly because we need to wrap Streams as well as Futures,
11//! and async-compat only supports the latter.
12
13mod filesystem;
14mod read_at;
15mod write;
16
17use std::pin::Pin;
18use std::sync::LazyLock;
19use std::task::Context;
20use std::task::Poll;
21use std::thread;
22
23use futures::Stream;
24use pin_project_lite::pin_project;
25use vortex_error::VortexExpect;
26
27/// Get a handle to the current Tokio runtime, or create a new global single-thread runtime if one
28/// is not found.
29///
30/// From
31fn runtime_handle() -> tokio::runtime::Handle {
32    static TOKIO: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
33        thread::Builder::new()
34            .name("vortex-async-compat".into())
35            .spawn(|| TOKIO.block_on(Pending))
36            .vortex_expect("cannot start tokio runtime thread");
37        tokio::runtime::Builder::new_current_thread()
38            .enable_all()
39            .build()
40            .vortex_expect("cannot start tokio runtime")
41    });
42
43    tokio::runtime::Handle::try_current().unwrap_or_else(|_| TOKIO.handle().clone())
44}
45
46struct Pending;
47impl Future for Pending {
48    type Output = ();
49
50    fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
51        Poll::Pending
52    }
53}
54
55pin_project! {
56    /// Compatibility adapter for futures and I/O types.
57    #[derive(Clone, Debug)]
58    pub struct Compat<T> {
59        #[pin]
60        inner: Option<T>,
61    }
62
63    impl<T> PinnedDrop for Compat<T> {
64        fn drop(this: Pin<&mut Self>) {
65            if this.inner.is_some() {
66                // If the inner future wasn't moved out using into_inner,
67                // enter the tokio context while the inner value is dropped.
68                let _guard = runtime_handle().enter();
69                this.project().inner.set(None);
70            }
71        }
72    }
73}
74
75impl<T> Compat<T> {
76    /// Create a new Compat wrapper around the given value.
77    pub fn new(inner: T) -> Self {
78        Self { inner: Some(inner) }
79    }
80
81    #[inline]
82    fn inner(&self) -> &T {
83        self.inner
84            .as_ref()
85            .vortex_expect("inner is only None when Compat is about to drop")
86    }
87
88    #[inline]
89    fn inner_mut(&mut self) -> &mut T {
90        self.inner
91            .as_mut()
92            .vortex_expect("inner is only None when Compat is about to drop")
93    }
94
95    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
96        self.project()
97            .inner
98            .as_pin_mut()
99            .vortex_expect("inner is only None when Compat is about to drop")
100    }
101}
102
103#[deny(clippy::missing_trait_methods)]
104impl<T: Future> Future for Compat<T> {
105    type Output = T::Output;
106
107    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108        let _guard = runtime_handle().enter();
109        self.get_pin_mut().poll(cx)
110    }
111}
112
113#[deny(clippy::missing_trait_methods)]
114impl<S: Stream> Stream for Compat<S> {
115    type Item = S::Item;
116
117    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
118        let _guard = runtime_handle().enter();
119        self.get_pin_mut().poll_next(cx)
120    }
121
122    fn size_hint(&self) -> (usize, Option<usize>) {
123        self.inner().size_hint()
124    }
125}