1mod filesystem;
14mod read_at;
15mod write;
16
17use std::pin::Pin;
18use std::sync::LazyLock;
19use std::task::Context;
20use std::task::Poll;
21use std::thread;
22
23use futures::Stream;
24use pin_project_lite::pin_project;
25use vortex_error::VortexExpect;
26
27fn runtime_handle() -> tokio::runtime::Handle {
32 static TOKIO: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
33 thread::Builder::new()
34 .name("vortex-async-compat".into())
35 .spawn(|| TOKIO.block_on(Pending))
36 .vortex_expect("cannot start tokio runtime thread");
37 tokio::runtime::Builder::new_current_thread()
38 .enable_all()
39 .build()
40 .vortex_expect("cannot start tokio runtime")
41 });
42
43 tokio::runtime::Handle::try_current().unwrap_or_else(|_| TOKIO.handle().clone())
44}
45
46struct Pending;
47impl Future for Pending {
48 type Output = ();
49
50 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
51 Poll::Pending
52 }
53}
54
55pin_project! {
56 #[derive(Clone, Debug)]
58 pub struct Compat<T> {
59 #[pin]
60 inner: Option<T>,
61 }
62
63 impl<T> PinnedDrop for Compat<T> {
64 fn drop(this: Pin<&mut Self>) {
65 if this.inner.is_some() {
66 let _guard = runtime_handle().enter();
69 this.project().inner.set(None);
70 }
71 }
72 }
73}
74
75impl<T> Compat<T> {
76 pub fn new(inner: T) -> Self {
78 Self { inner: Some(inner) }
79 }
80
81 #[inline]
82 fn inner(&self) -> &T {
83 self.inner
84 .as_ref()
85 .vortex_expect("inner is only None when Compat is about to drop")
86 }
87
88 #[inline]
89 fn inner_mut(&mut self) -> &mut T {
90 self.inner
91 .as_mut()
92 .vortex_expect("inner is only None when Compat is about to drop")
93 }
94
95 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
96 self.project()
97 .inner
98 .as_pin_mut()
99 .vortex_expect("inner is only None when Compat is about to drop")
100 }
101}
102
103#[deny(clippy::missing_trait_methods)]
104impl<T: Future> Future for Compat<T> {
105 type Output = T::Output;
106
107 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
108 let _guard = runtime_handle().enter();
109 self.get_pin_mut().poll(cx)
110 }
111}
112
113#[deny(clippy::missing_trait_methods)]
114impl<S: Stream> Stream for Compat<S> {
115 type Item = S::Item;
116
117 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
118 let _guard = runtime_handle().enter();
119 self.get_pin_mut().poll_next(cx)
120 }
121
122 fn size_hint(&self) -> (usize, Option<usize>) {
123 self.inner().size_hint()
124 }
125}