futures_concurrency/stream/zip/
vec.rs

1use super::Zip as ZipTrait;
2use crate::stream::IntoStream;
3use crate::utils::{self, PollVec, WakerVec};
4#[cfg(all(feature = "alloc", not(feature = "std")))]
5use alloc::vec::Vec;
6
7use core::fmt;
8use core::mem;
9use core::mem::MaybeUninit;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12
13use futures_core::Stream;
14use pin_project::{pin_project, pinned_drop};
15
16/// A stream that ‘zips up’ multiple streams into a single stream of pairs.
17///
18/// This `struct` is created by the [`zip`] method on the [`Zip`] trait. See its
19/// documentation for more.
20///
21/// [`zip`]: trait.Zip.html#method.zip
22/// [`Zip`]: trait.Zip.html
23#[pin_project(PinnedDrop)]
24pub struct Zip<S>
25where
26    S: Stream,
27{
28    #[pin]
29    streams: Vec<S>,
30    output: Vec<MaybeUninit<<S as Stream>::Item>>,
31    wakers: WakerVec,
32    state: PollVec,
33    done: bool,
34    len: usize,
35}
36
37impl<S> Zip<S>
38where
39    S: Stream,
40{
41    pub(crate) fn new(streams: Vec<S>) -> Self {
42        let len = streams.len();
43        Self {
44            len,
45            streams,
46            wakers: WakerVec::new(len),
47            output: (0..len).map(|_| MaybeUninit::uninit()).collect(),
48            state: PollVec::new_pending(len),
49            done: false,
50        }
51    }
52}
53
54impl<S> fmt::Debug for Zip<S>
55where
56    S: Stream + fmt::Debug,
57{
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.debug_list().entries(self.streams.iter()).finish()
60    }
61}
62
63impl<S> Stream for Zip<S>
64where
65    S: Stream,
66{
67    type Item = Vec<S::Item>;
68
69    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        let mut this = self.project();
71
72        assert!(!*this.done, "Stream should not be polled after completion");
73
74        let mut readiness = this.wakers.readiness();
75        readiness.set_waker(cx.waker());
76        for index in 0..*this.len {
77            if !readiness.any_ready() {
78                // Nothing is ready yet
79                return Poll::Pending;
80            } else if this.state[index].is_ready() || !readiness.clear_ready(index) {
81                // We already have data stored for this stream,
82                // Or this waker isn't ready yet
83                continue;
84            }
85
86            // unlock readiness so we don't deadlock when polling
87            #[allow(clippy::drop_non_drop)]
88            drop(readiness);
89
90            // Obtain the intermediate waker.
91            let mut cx = Context::from_waker(this.wakers.get(index).unwrap());
92
93            let stream = utils::get_pin_mut_from_vec(this.streams.as_mut(), index).unwrap();
94            match stream.poll_next(&mut cx) {
95                Poll::Ready(Some(item)) => {
96                    this.output[index] = MaybeUninit::new(item);
97                    this.state[index].set_ready();
98
99                    let all_ready = this.state.iter().all(|state| state.is_ready());
100                    if all_ready {
101                        // Reset the future's state.
102                        readiness = this.wakers.readiness();
103                        readiness.set_all_ready();
104                        this.state.set_all_pending();
105
106                        // Take the output
107                        //
108                        // SAFETY: we just validated all our data is populated, meaning
109                        // we can assume this is initialized.
110                        let mut output = (0..*this.len).map(|_| MaybeUninit::uninit()).collect();
111                        mem::swap(this.output, &mut output);
112                        let output = unsafe { vec_assume_init(output) };
113                        return Poll::Ready(Some(output));
114                    }
115                }
116                Poll::Ready(None) => {
117                    // If one stream returns `None`, we can no longer return
118                    // pairs - meaning the stream is over.
119                    *this.done = true;
120                    return Poll::Ready(None);
121                }
122                Poll::Pending => {}
123            }
124
125            // Lock readiness so we can use it again
126            readiness = this.wakers.readiness();
127        }
128        Poll::Pending
129    }
130}
131
132/// Drop the already initialized values on cancellation.
133#[pinned_drop]
134impl<S> PinnedDrop for Zip<S>
135where
136    S: Stream,
137{
138    fn drop(self: Pin<&mut Self>) {
139        let this = self.project();
140
141        for (state, output) in this.state.iter_mut().zip(this.output.iter_mut()) {
142            if state.is_ready() {
143                // SAFETY: we've just filtered down to *only* the initialized values.
144                // We can assume they're initialized, and this is where we drop them.
145                unsafe { output.assume_init_drop() };
146            }
147        }
148    }
149}
150
151impl<S> ZipTrait for Vec<S>
152where
153    S: IntoStream,
154{
155    type Item = <Zip<S::IntoStream> as Stream>::Item;
156    type Stream = Zip<S::IntoStream>;
157
158    fn zip(self) -> Self::Stream {
159        Zip::new(self.into_iter().map(|i| i.into_stream()).collect())
160    }
161}
162
163// Inlined version of the unstable `MaybeUninit::array_assume_init` feature.
164// FIXME: replace with `utils::array_assume_init`
165unsafe fn vec_assume_init<T>(vec: Vec<MaybeUninit<T>>) -> Vec<T> {
166    // SAFETY:
167    // * The caller guarantees that all elements of the vec are initialized
168    // * `MaybeUninit<T>` and T are guaranteed to have the same layout
169    // * `MaybeUninit` does not drop, so there are no double-frees
170    // And thus the conversion is safe
171    let ret = unsafe { (&vec as *const _ as *const Vec<T>).read() };
172    mem::forget(vec);
173    ret
174}
175
176#[cfg(test)]
177mod tests {
178    use alloc::vec;
179
180    use crate::stream::Zip;
181    use futures_lite::future::block_on;
182    use futures_lite::prelude::*;
183    use futures_lite::stream;
184
185    #[test]
186    fn zip_array_3() {
187        block_on(async {
188            let a = stream::repeat(1).take(2);
189            let b = stream::repeat(2).take(2);
190            let c = stream::repeat(3).take(2);
191            let mut s = vec![a, b, c].zip();
192
193            assert_eq!(s.next().await, Some(vec![1, 2, 3]));
194            assert_eq!(s.next().await, Some(vec![1, 2, 3]));
195            assert_eq!(s.next().await, None);
196        })
197    }
198}