vortex_layout/scan/
unified.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::{Stream, TryFutureExt, TryStreamExt};
6use pin_project_lite::pin_project;
7use vortex_error::VortexResult;
8
9pin_project! {
10    /// A [`Stream`] that drives the both the I/O stream and the execution stream concurrently.
11    ///
12    /// This is sort of like a `select!` implementation, but not quite.
13    ///
14    /// We can't use `futures::stream::select` because it requires both streams to terminate, and
15    /// our I/O stream will never terminate.
16    ///
17    /// We can't use `futures::stream::zip` because it waits for boths streams to emit an item,
18    /// but our execution stream may require multiple I/O operations to complete before it can
19    /// return an item.
20    pub struct UnifiedDriverStream<R, S> {
21        #[pin]
22        pub exec_stream: R,
23        #[pin]
24        pub io_stream: S,
25    }
26}
27
28impl<T, R, S> Stream for UnifiedDriverStream<R, S>
29where
30    R: Stream<Item = VortexResult<T>>,
31    S: Stream<Item = VortexResult<()>>,
32{
33    type Item = VortexResult<T>;
34
35    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36        let mut this = self.project();
37        loop {
38            // If the exec stream is ready, then we can return the result.
39            // If it's pending, then we try polling the I/O stream.
40            if let Poll::Ready(r) = this.exec_stream.try_poll_next_unpin(cx) {
41                return Poll::Ready(r);
42            }
43
44            match this.io_stream.as_mut().try_poll_next_unpin(cx) {
45                // If the I/O stream made progress, it returns Ok.
46                Poll::Ready(Some(Ok(()))) => {}
47                // If the I/O stream failed, then propagate the error.
48                Poll::Ready(Some(Err(result))) => {
49                    return Poll::Ready(Some(Err(result)));
50                }
51                // Unexpected end of stream.
52                Poll::Ready(None) => {
53                    continue;
54                }
55                // If the I/O stream is not ready, then we return Pending and wait for the next wakeup.
56                Poll::Pending => return Poll::Pending,
57            }
58        }
59    }
60}
61
62pin_project! {
63    pub struct UnifiedDriverFuture<R, S> {
64        #[pin]
65        pub exec_future: R,
66        #[pin]
67        pub io_stream: S,
68    }
69}
70
71impl<T, R, S> Future for UnifiedDriverFuture<R, S>
72where
73    R: Future<Output = VortexResult<T>>,
74    S: Stream<Item = VortexResult<()>>,
75{
76    type Output = VortexResult<T>;
77
78    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        let mut this = self.project();
80        loop {
81            // If the exec stream is ready, then we can return the result.
82            // If it's pending, then we try polling the I/O stream.
83            if let Poll::Ready(r) = this.exec_future.try_poll_unpin(cx) {
84                return Poll::Ready(r);
85            }
86
87            match this.io_stream.as_mut().try_poll_next_unpin(cx) {
88                // If the I/O stream made progress, it returns Ok.
89                Poll::Ready(Some(Ok(()))) => {}
90                // If the I/O stream failed, then propagate the error.
91                Poll::Ready(Some(Err(result))) => {
92                    return Poll::Ready(Err(result));
93                }
94                // Unexpected end of stream.
95                Poll::Ready(None) => {
96                    continue;
97                }
98                // If the I/O stream is not ready, then we return Pending and wait for the next wakeup.
99                Poll::Pending => return Poll::Pending,
100            }
101        }
102    }
103}