vortex_layout/scan/unified.rs
1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::{Stream, TryFutureExt, TryStreamExt};
6use pin_project_lite::pin_project;
7use vortex_error::VortexResult;
8
9pin_project! {
10 /// A [`Stream`] that drives the both the I/O stream and the execution stream concurrently.
11 ///
12 /// This is sort of like a `select!` implementation, but not quite.
13 ///
14 /// We can't use `futures::stream::select` because it requires both streams to terminate, and
15 /// our I/O stream will never terminate.
16 ///
17 /// We can't use `futures::stream::zip` because it waits for boths streams to emit an item,
18 /// but our execution stream may require multiple I/O operations to complete before it can
19 /// return an item.
20 pub struct UnifiedDriverStream<R, S> {
21 #[pin]
22 pub exec_stream: R,
23 #[pin]
24 pub io_stream: S,
25 }
26}
27
28impl<T, R, S> Stream for UnifiedDriverStream<R, S>
29where
30 R: Stream<Item = VortexResult<T>>,
31 S: Stream<Item = VortexResult<()>>,
32{
33 type Item = VortexResult<T>;
34
35 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36 let mut this = self.project();
37 loop {
38 // If the exec stream is ready, then we can return the result.
39 // If it's pending, then we try polling the I/O stream.
40 if let Poll::Ready(r) = this.exec_stream.try_poll_next_unpin(cx) {
41 return Poll::Ready(r);
42 }
43
44 match this.io_stream.as_mut().try_poll_next_unpin(cx) {
45 // If the I/O stream made progress, it returns Ok.
46 Poll::Ready(Some(Ok(()))) => {}
47 // If the I/O stream failed, then propagate the error.
48 Poll::Ready(Some(Err(result))) => {
49 return Poll::Ready(Some(Err(result)));
50 }
51 // Unexpected end of stream.
52 Poll::Ready(None) => {
53 continue;
54 }
55 // If the I/O stream is not ready, then we return Pending and wait for the next wakeup.
56 Poll::Pending => return Poll::Pending,
57 }
58 }
59 }
60}
61
62pin_project! {
63 pub struct UnifiedDriverFuture<R, S> {
64 #[pin]
65 pub exec_future: R,
66 #[pin]
67 pub io_stream: S,
68 }
69}
70
71impl<T, R, S> Future for UnifiedDriverFuture<R, S>
72where
73 R: Future<Output = VortexResult<T>>,
74 S: Stream<Item = VortexResult<()>>,
75{
76 type Output = VortexResult<T>;
77
78 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79 let mut this = self.project();
80 loop {
81 // If the exec stream is ready, then we can return the result.
82 // If it's pending, then we try polling the I/O stream.
83 if let Poll::Ready(r) = this.exec_future.try_poll_unpin(cx) {
84 return Poll::Ready(r);
85 }
86
87 match this.io_stream.as_mut().try_poll_next_unpin(cx) {
88 // If the I/O stream made progress, it returns Ok.
89 Poll::Ready(Some(Ok(()))) => {}
90 // If the I/O stream failed, then propagate the error.
91 Poll::Ready(Some(Err(result))) => {
92 return Poll::Ready(Err(result));
93 }
94 // Unexpected end of stream.
95 Poll::Ready(None) => {
96 continue;
97 }
98 // If the I/O stream is not ready, then we return Pending and wait for the next wakeup.
99 Poll::Pending => return Poll::Pending,
100 }
101 }
102 }
103}