1mod filesystem;
14#[cfg(feature = "object_store")]
15mod obj_store;
16mod read_at;
17mod write;
18
19use std::pin::Pin;
20use std::sync::LazyLock;
21use std::task::Context;
22use std::task::Poll;
23use std::thread;
24
25use futures::Stream;
26use pin_project_lite::pin_project;
27use vortex_error::VortexExpect;
28
29fn runtime_handle() -> tokio::runtime::Handle {
34 static TOKIO: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
35 thread::Builder::new()
36 .name("vortex-async-compat".into())
37 .spawn(|| TOKIO.block_on(Pending))
38 .vortex_expect("cannot start tokio runtime thread");
39 tokio::runtime::Builder::new_current_thread()
40 .enable_all()
41 .build()
42 .vortex_expect("cannot start tokio runtime")
43 });
44
45 tokio::runtime::Handle::try_current().unwrap_or_else(|_| TOKIO.handle().clone())
46}
47
48struct Pending;
49impl Future for Pending {
50 type Output = ();
51
52 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
53 Poll::Pending
54 }
55}
56
57pin_project! {
58 #[derive(Clone, Debug)]
60 pub struct Compat<T> {
61 #[pin]
62 inner: Option<T>,
63 }
64
65 impl<T> PinnedDrop for Compat<T> {
66 fn drop(this: Pin<&mut Self>) {
67 if this.inner.is_some() {
68 let _guard = runtime_handle().enter();
71 this.project().inner.set(None);
72 }
73 }
74 }
75}
76
77impl<T> Compat<T> {
78 pub fn new(inner: T) -> Self {
80 Self { inner: Some(inner) }
81 }
82
83 #[inline]
84 fn inner(&self) -> &T {
85 self.inner
86 .as_ref()
87 .vortex_expect("inner is only None when Compat is about to drop")
88 }
89
90 #[inline]
91 fn inner_mut(&mut self) -> &mut T {
92 self.inner
93 .as_mut()
94 .vortex_expect("inner is only None when Compat is about to drop")
95 }
96
97 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
98 self.project()
99 .inner
100 .as_pin_mut()
101 .vortex_expect("inner is only None when Compat is about to drop")
102 }
103}
104
105#[deny(clippy::missing_trait_methods)]
106impl<T: Future> Future for Compat<T> {
107 type Output = T::Output;
108
109 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110 let _guard = runtime_handle().enter();
111 self.get_pin_mut().poll(cx)
112 }
113}
114
115#[deny(clippy::missing_trait_methods)]
116impl<S: Stream> Stream for Compat<S> {
117 type Item = S::Item;
118
119 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
120 let _guard = runtime_handle().enter();
121 self.get_pin_mut().poll_next(cx)
122 }
123
124 fn size_hint(&self) -> (usize, Option<usize>) {
125 self.inner().size_hint()
126 }
127}