futures_concurrency/stream/zip/
array.rs

1use super::Zip as ZipTrait;
2use crate::stream::IntoStream;
3use crate::utils::{self, PollArray, WakerArray};
4
5use core::array;
6use core::fmt;
7use core::mem::{self, MaybeUninit};
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use futures_core::Stream;
12use pin_project::{pin_project, pinned_drop};
13
14/// A stream that ‘zips up’ multiple streams into a single stream of pairs.
15///
16/// This `struct` is created by the [`zip`] method on the [`Zip`] trait. See its
17/// documentation for more.
18///
19/// [`zip`]: trait.Zip.html#method.zip
20/// [`Zip`]: trait.Zip.html
21#[pin_project(PinnedDrop)]
22pub struct Zip<S, const N: usize>
23where
24    S: Stream,
25{
26    #[pin]
27    streams: [S; N],
28    output: [MaybeUninit<<S as Stream>::Item>; N],
29    wakers: WakerArray<N>,
30    state: PollArray<N>,
31    done: bool,
32}
33
34impl<S, const N: usize> Zip<S, N>
35where
36    S: Stream,
37{
38    pub(crate) fn new(streams: [S; N]) -> Self {
39        Self {
40            streams,
41            output: array::from_fn(|_| MaybeUninit::uninit()),
42            state: PollArray::new_pending(),
43            wakers: WakerArray::new(),
44            done: false,
45        }
46    }
47}
48
49impl<S, const N: usize> fmt::Debug for Zip<S, N>
50where
51    S: Stream + fmt::Debug,
52{
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_list().entries(self.streams.iter()).finish()
55    }
56}
57
58impl<S, const N: usize> Stream for Zip<S, N>
59where
60    S: Stream,
61{
62    type Item = [S::Item; N];
63
64    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        let mut this = self.project();
66
67        assert!(!*this.done, "Stream should not be polled after completion");
68
69        let mut readiness = this.wakers.readiness();
70        readiness.set_waker(cx.waker());
71        for index in 0..N {
72            if !readiness.any_ready() {
73                // Nothing is ready yet
74                return Poll::Pending;
75            } else if this.state[index].is_ready() || !readiness.clear_ready(index) {
76                // We already have data stored for this stream,
77                // Or this waker isn't ready yet
78                continue;
79            }
80
81            // unlock readiness so we don't deadlock when polling
82            #[allow(clippy::drop_non_drop)]
83            drop(readiness);
84
85            // Obtain the intermediate waker.
86            let mut cx = Context::from_waker(this.wakers.get(index).unwrap());
87
88            let stream = utils::get_pin_mut(this.streams.as_mut(), index).unwrap();
89            match stream.poll_next(&mut cx) {
90                Poll::Ready(Some(item)) => {
91                    this.output[index] = MaybeUninit::new(item);
92                    this.state[index].set_ready();
93
94                    let all_ready = this.state.iter().all(|state| state.is_ready());
95                    if all_ready {
96                        // Reset the future's state.
97                        readiness = this.wakers.readiness();
98                        readiness.set_all_ready();
99                        this.state.set_all_pending();
100
101                        // Take the output
102                        //
103                        // SAFETY: we just validated all our data is populated, meaning
104                        // we can assume this is initialized.
105                        let mut output = array::from_fn(|_| MaybeUninit::uninit());
106                        mem::swap(this.output, &mut output);
107                        let output = unsafe { array_assume_init(output) };
108                        return Poll::Ready(Some(output));
109                    }
110                }
111                Poll::Ready(None) => {
112                    // If one stream returns `None`, we can no longer return
113                    // pairs - meaning the stream is over.
114                    *this.done = true;
115                    return Poll::Ready(None);
116                }
117                Poll::Pending => {}
118            }
119
120            // Lock readiness so we can use it again
121            readiness = this.wakers.readiness();
122        }
123        Poll::Pending
124    }
125}
126
127/// Drop the already initialized values on cancellation.
128#[pinned_drop]
129impl<S, const N: usize> PinnedDrop for Zip<S, N>
130where
131    S: Stream,
132{
133    fn drop(self: Pin<&mut Self>) {
134        let this = self.project();
135
136        for (state, output) in this.state.iter_mut().zip(this.output.iter_mut()) {
137            if state.is_ready() {
138                // SAFETY: we've just filtered down to *only* the initialized values.
139                // We can assume they're initialized, and this is where we drop them.
140                unsafe { output.assume_init_drop() };
141            }
142        }
143    }
144}
145
146impl<S, const N: usize> ZipTrait for [S; N]
147where
148    S: IntoStream,
149{
150    type Item = <Zip<S::IntoStream, N> as Stream>::Item;
151    type Stream = Zip<S::IntoStream, N>;
152
153    fn zip(self) -> Self::Stream {
154        Zip::new(self.map(|i| i.into_stream()))
155    }
156}
157
158// Inlined version of the unstable `MaybeUninit::array_assume_init` feature.
159// FIXME: replace with `utils::array_assume_init`
160unsafe fn array_assume_init<T, const N: usize>(array: [MaybeUninit<T>; N]) -> [T; N] {
161    // SAFETY:
162    // * The caller guarantees that all elements of the array are initialized
163    // * `MaybeUninit<T>` and T are guaranteed to have the same layout
164    // * `MaybeUninit` does not drop, so there are no double-frees
165    // And thus the conversion is safe
166    let ret = unsafe { (&array as *const _ as *const [T; N]).read() };
167    #[allow(clippy::forget_non_drop)]
168    mem::forget(array);
169    ret
170}
171
172#[cfg(test)]
173mod tests {
174    use crate::stream::Zip;
175    use futures_lite::future::block_on;
176    use futures_lite::prelude::*;
177    use futures_lite::stream;
178
179    #[test]
180    fn zip_array_3() {
181        block_on(async {
182            let a = stream::repeat(1).take(2);
183            let b = stream::repeat(2).take(2);
184            let c = stream::repeat(3).take(2);
185            let mut s = Zip::zip([a, b, c]);
186
187            assert_eq!(s.next().await, Some([1, 2, 3]));
188            assert_eq!(s.next().await, Some([1, 2, 3]));
189            assert_eq!(s.next().await, None);
190        })
191    }
192}