futures_concurrency/stream/zip/
array.rs1use super::Zip as ZipTrait;
2use crate::stream::IntoStream;
3use crate::utils::{self, PollArray, WakerArray};
4
5use core::array;
6use core::fmt;
7use core::mem::{self, MaybeUninit};
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use futures_core::Stream;
12use pin_project::{pin_project, pinned_drop};
13
14#[pin_project(PinnedDrop)]
22pub struct Zip<S, const N: usize>
23where
24 S: Stream,
25{
26 #[pin]
27 streams: [S; N],
28 output: [MaybeUninit<<S as Stream>::Item>; N],
29 wakers: WakerArray<N>,
30 state: PollArray<N>,
31 done: bool,
32}
33
34impl<S, const N: usize> Zip<S, N>
35where
36 S: Stream,
37{
38 pub(crate) fn new(streams: [S; N]) -> Self {
39 Self {
40 streams,
41 output: array::from_fn(|_| MaybeUninit::uninit()),
42 state: PollArray::new_pending(),
43 wakers: WakerArray::new(),
44 done: false,
45 }
46 }
47}
48
49impl<S, const N: usize> fmt::Debug for Zip<S, N>
50where
51 S: Stream + fmt::Debug,
52{
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_list().entries(self.streams.iter()).finish()
55 }
56}
57
58impl<S, const N: usize> Stream for Zip<S, N>
59where
60 S: Stream,
61{
62 type Item = [S::Item; N];
63
64 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 let mut this = self.project();
66
67 assert!(!*this.done, "Stream should not be polled after completion");
68
69 let mut readiness = this.wakers.readiness();
70 readiness.set_waker(cx.waker());
71 for index in 0..N {
72 if !readiness.any_ready() {
73 return Poll::Pending;
75 } else if this.state[index].is_ready() || !readiness.clear_ready(index) {
76 continue;
79 }
80
81 #[allow(clippy::drop_non_drop)]
83 drop(readiness);
84
85 let mut cx = Context::from_waker(this.wakers.get(index).unwrap());
87
88 let stream = utils::get_pin_mut(this.streams.as_mut(), index).unwrap();
89 match stream.poll_next(&mut cx) {
90 Poll::Ready(Some(item)) => {
91 this.output[index] = MaybeUninit::new(item);
92 this.state[index].set_ready();
93
94 let all_ready = this.state.iter().all(|state| state.is_ready());
95 if all_ready {
96 readiness = this.wakers.readiness();
98 readiness.set_all_ready();
99 this.state.set_all_pending();
100
101 let mut output = array::from_fn(|_| MaybeUninit::uninit());
106 mem::swap(this.output, &mut output);
107 let output = unsafe { array_assume_init(output) };
108 return Poll::Ready(Some(output));
109 }
110 }
111 Poll::Ready(None) => {
112 *this.done = true;
115 return Poll::Ready(None);
116 }
117 Poll::Pending => {}
118 }
119
120 readiness = this.wakers.readiness();
122 }
123 Poll::Pending
124 }
125}
126
127#[pinned_drop]
129impl<S, const N: usize> PinnedDrop for Zip<S, N>
130where
131 S: Stream,
132{
133 fn drop(self: Pin<&mut Self>) {
134 let this = self.project();
135
136 for (state, output) in this.state.iter_mut().zip(this.output.iter_mut()) {
137 if state.is_ready() {
138 unsafe { output.assume_init_drop() };
141 }
142 }
143 }
144}
145
146impl<S, const N: usize> ZipTrait for [S; N]
147where
148 S: IntoStream,
149{
150 type Item = <Zip<S::IntoStream, N> as Stream>::Item;
151 type Stream = Zip<S::IntoStream, N>;
152
153 fn zip(self) -> Self::Stream {
154 Zip::new(self.map(|i| i.into_stream()))
155 }
156}
157
158unsafe fn array_assume_init<T, const N: usize>(array: [MaybeUninit<T>; N]) -> [T; N] {
161 let ret = unsafe { (&array as *const _ as *const [T; N]).read() };
167 #[allow(clippy::forget_non_drop)]
168 mem::forget(array);
169 ret
170}
171
172#[cfg(test)]
173mod tests {
174 use crate::stream::Zip;
175 use futures_lite::future::block_on;
176 use futures_lite::prelude::*;
177 use futures_lite::stream;
178
179 #[test]
180 fn zip_array_3() {
181 block_on(async {
182 let a = stream::repeat(1).take(2);
183 let b = stream::repeat(2).take(2);
184 let c = stream::repeat(3).take(2);
185 let mut s = Zip::zip([a, b, c]);
186
187 assert_eq!(s.next().await, Some([1, 2, 3]));
188 assert_eq!(s.next().await, Some([1, 2, 3]));
189 assert_eq!(s.next().await, None);
190 })
191 }
192}