1#![allow(
2 non_snake_case,
3 clippy::type_complexity,
4 irrefutable_let_patterns,
5 clippy::new_without_default,
6 unused_mut,
7 unreachable_code,
8 clippy::too_many_arguments
9)]
10
11use derive_new::new;
12use futures::{pin_mut, ready, stream, Stream, StreamExt};
13use pin_project::pin_project;
14use serde::{Deserialize, Serialize};
15use std::{
16 pin::Pin, task::{Context, Poll}
17};
18use sum::{Sum0, Sum1, Sum2, Sum3, Sum4, Sum5, Sum6, Sum7, Sum8};
19
20use super::{
21 DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask, Reducer, ReducerProcessSend, ReducerSend
22};
23use crate::{
24 pipe::{Pipe, Sink}, pool::ProcessSend
25};
26
27fn substream<'a, 'b, 'c, 'd, 'e, S, F1, F2, O>(
28 cx: &'d Context<'c>, mut stream: Pin<&'a mut Peekable<'b, S>>, mut is: F1, mut unwrap: F2,
29) -> impl Stream<Item = F2::Output> + 'a
30where
31 S: Stream,
32 F1: FnMut(&S::Item) -> bool + 'a,
33 F2: FnMut(S::Item) -> O + 'a,
34 'a: 'b,
35{
36 let waker = cx.waker().clone();
37 stream::poll_fn(move |cx| match ready!(stream.as_mut().poll_peek(cx)) {
38 Some(enum_) if is(enum_) => Poll::Ready(Some(
39 if let Poll::Ready(Some(enum_)) = stream.as_mut().poll_next(cx) {
40 unwrap(enum_)
41 } else {
42 unreachable!()
43 },
44 )),
45 Some(_) => {
46 let waker_ = cx.waker();
47 if !waker.will_wake(waker_) {
48 waker_.wake_by_ref();
49 }
50 Poll::Pending
51 }
52 None => Poll::Ready(None),
53 })
54 .fuse()
55}
56
57macro_rules! impl_tuple {
58 ($reducea:ident $reduceaasync:ident $reduceb:ident $reducebasync:ident $async:ident $enum:ident $join:ident $($copy:ident)? : $($num:tt $t:ident $s:ident $i:ident $r:ident $o:ident $c:ident $iterator:ident $reducera:ident $reducerb:ident $($copyb:ident)? , $comma:tt)*) => (
59 impl<
60 Item,
61 $($r: ParallelSink<Item, Done = $o>,)*
62 $($o,)*
63 > ParallelSink<Item> for ($($r,)*)
64 where Item: $($copy)*,
65 {
66 type Done = ($($o,)*);
67 type Pipe = ($($r::Pipe,)*);
68 type ReduceA = $reducea<$($r::ReduceA,)*>;
69 type ReduceC = $reduceb<$($r::ReduceC,)*>;
70
71 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
72 $(let ($iterator, $reducera, $t) = self.$num.reducers();)*
73 (
74 ($($iterator,)*),
75 $reducea{$($t: $reducera,)*},
76 $reduceb{$($t,)*},
77 )
78 }
79 }
80 impl<
81 Item,
82 $($r: DistributedSink<Item, Done = $o>,)*
83 $($o,)*
84 > DistributedSink<Item> for ($($r,)*)
85 where Item: $($copy)*,
86 {
87 type Done = ($($o,)*);
88 type Pipe = ($($r::Pipe,)*);
89 type ReduceA = $reducea<$($r::ReduceA,)*>;
90 type ReduceB = $reduceb<$($r::ReduceB,)*>;
91 type ReduceC = $reduceb<$($r::ReduceC,)*>;
92
93 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
94 $(let ($iterator, $reducera, $reducerb, $t) = self.$num.reducers();)*
95 (
96 ($($iterator,)*),
97 $reducea{$($t: $reducera,)*},
98 $reduceb{$($t: $reducerb,)*},
99 $reduceb{$($t,)*},
100 )
101 }
102 }
103
104 impl<Input, $($i: ParallelPipe<Input>,)*>
105 ParallelPipe<Input> for ($($i,)*)
106 where Input: $($copy)*,
107 {
108 type Output = $enum<$($i::Output,)*>;
109 type Task = ($($i::Task,)*);
110
111 #[allow(clippy::unused_unit)]
112 fn task(&self) -> Self::Task {
113 ($(self.$num.task(),)*)
114 }
115 }
116 impl<Input, $($i: DistributedPipe<Input>,)*>
117 DistributedPipe<Input> for ($($i,)*)
118 where Input: $($copy)*,
119 {
120 type Output = $enum<$($i::Output,)*>;
121 type Task = ($($i::Task,)*);
122
123 #[allow(clippy::unused_unit)]
124 fn task(&self) -> Self::Task {
125 ($(self.$num.task(),)*)
126 }
127 }
128
129 impl<Input, $($c: PipeTask<Input>,)*> PipeTask<Input> for ($($c,)*)
130 where
131 Input: $($copy)*,
132 {
133 type Output = $enum<$($c::Output,)*>;
134 type Async = $async<Input, $($c::Async,)*>;
135
136 fn into_async(self) -> Self::Async {
137 $async{
138 $($t: Some(self.$num.into_async()),)*
139 pending: None,
140 given: ($(false $comma)*),
141 }
142 }
143 }
144
145 #[pin_project]
146 pub struct $async<Input, $($c,)*> {
147 $(#[pin] $t: Option<$c>,)*
148 pending: Option<Option<Input>>,
149 given: ($(bool $comma)*),
150 }
151
152 #[allow(unused_variables)]
153 impl<Input, $($c: Pipe<Input>,)*> Pipe<Input> for $async<Input, $($c,)*>
154 where
155 Input: $($copy)*,
156 {
157 type Output = $enum<$($c::Output,)*>;
158
159 #[allow(non_snake_case)]
160 fn poll_next(
161 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
162 ) -> Poll<Option<Self::Output>> {
163 let mut self_ = self.project();
164 loop {
166 if self_.pending.is_none() {
167 *self_.pending = Some(ready!(stream.as_mut().poll_next(cx)));
168 }
169 $({
170 let pending: &mut Option<Input> = self_.pending.as_mut().unwrap();
171 let given = &mut self_.given.$num;
172 let waker = cx.waker();
173 let stream_ = stream::poll_fn(|cx| {
174 if !*given {
175 *given = true;
176 $(
177 return Poll::Ready(*pending);
178 let $copyb = ();
179 )?
180 Poll::Ready(pending.take())
181 } else {
182 let waker_ = cx.waker();
183 if !waker.will_wake(waker_) {
184 waker_.wake_by_ref();
185 }
186 Poll::Pending
187 }
188 }).fuse();
189 pin_mut!(stream_);
190 match self_.$t.as_mut().as_pin_mut().map(|pipe|pipe.poll_next(cx, stream_)) {
191 Some(Poll::Ready(Some(item))) => break Poll::Ready(Some($enum::$t(item))),
192 Some(Poll::Ready(None)) | None => { self_.$t.set(None); *given = true },
193 Some(Poll::Pending) => (),
194 }
195 })*
196 if $(self_.$t.is_none() &&)* true {
197 break Poll::Ready(None);
198 }
199 if $(self_.given.$num &&)* true {
200 $(self_.given.$num = false;)*
201 *self_.pending = None;
202 } else {
203 assert!(self_.pending.as_ref().unwrap().is_some());
204 break Poll::Pending;
205 }
206 }
207 }
208 }
209
210 #[derive(Clone, Serialize, Deserialize, new)]
211 pub struct $reducea<$($t,)*> {
212 $($t: $t,)*
213 }
214 impl<$($t: Reducer<$s>,)* $($s,)*> Reducer<$enum<$($s,)*>> for $reducea<$($t,)*> {
215 type Done = ($($t::Done,)*);
216 type Async = $reduceaasync<$($t::Async,)* $($s,)*>;
217
218 fn into_async(self) -> Self::Async {
219 $reduceaasync{
220 $($t: self.$t.into_async(),)*
221 peeked: None,
222 ready: ($(None::<$t::Done>,)*),
223 }
224 }
225 }
226 impl<$($t: Reducer<$s>,)* $($s,)*> ReducerProcessSend<$enum<$($s,)*>> for $reducea<$($t,)*> where $($t::Done: ProcessSend + 'static,)* {
227 type Done = ($($t::Done,)*);
228 }
229 impl<$($t: Reducer<$s>,)* $($s,)*> ReducerSend<$enum<$($s,)*>> for $reducea<$($t,)*> where $($t::Done: Send + 'static,)* {
230 type Done = ($($t::Done,)*);
231 }
232 #[pin_project]
233 pub struct $reduceaasync<$($t,)* $($s,)*> where $($t: Sink<$s>,)* {
234 $(#[pin] $t: $t,)*
235 peeked: Option<$enum<$($s,)*>>,
236 ready: ($(Option<$t::Done>,)*),
237 }
238 #[allow(unused_variables)]
239 impl<$($t: Sink<$s>,)* $($s,)*> Sink<$enum<$($s,)*>> for $reduceaasync<$($t,)* $($s,)*> {
240 type Done = ($($t::Done,)*);
241
242 fn poll_forward(self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = $enum<$($s,)*>>>) -> Poll<Self::Done> {
243 let mut self_ = self.project();
244 loop {
245 let mut progress = false;
246 $({
247 let stream = Peekable{stream:stream.as_mut(),peeked:&mut *self_.peeked};
248 pin_mut!(stream);
249 let stream_ = substream(cx, stream, |item| matches!(item, $enum::$t(_)), |item| { progress = true; if let $enum::$t(item) = item { item } else { unreachable!() } });
250 pin_mut!(stream_);
251 if self_.ready.$num.is_none() {
252 if let Poll::Ready(done) = self_.$t.as_mut().poll_forward(cx, stream_) {
253 self_.ready.$num = Some(done);
254 }
255 }
256 })*
257 if $(self_.ready.$num.is_some() &&)* true {
258 break Poll::Ready(($(self_.ready.$num.take().unwrap(),)*));
259 }
260 if !progress {
261 break Poll::Pending;
262 }
263 }
264 }
265 }
266
267 #[derive(Clone, Serialize, Deserialize, new)]
268 pub struct $reduceb<$($t,)*> {
269 $($t: $t,)*
270 }
271 impl<$($t: Reducer<$s>,)* $($s,)*> Reducer<($($s,)*)> for $reduceb<$($t,)*> {
272 type Done = ($($t::Done,)*);
273 type Async = $reducebasync<$($t::Async,)* $($s,)*>;
274
275 fn into_async(self) -> Self::Async {
276 $reducebasync{
277 $($t: self.$t.into_async(),)*
278 peeked: None,
279 ready: ($(None::<$t::Done>,)*),
280 }
281 }
282 }
283 impl<$($t: ReducerProcessSend<$s>,)* $($s,)*> ReducerProcessSend<($($s,)*)> for $reduceb<$($t,)*> {
284 type Done = ($(<$t as ReducerProcessSend<$s>>::Done,)*);
285 }
286 impl<$($t: ReducerSend<$s>,)* $($s,)*> ReducerSend<($($s,)*)> for $reduceb<$($t,)*> {
287 type Done = ($(<$t as ReducerSend<$s>>::Done,)*);
288 }
289 #[pin_project]
290 pub struct $reducebasync<$($t,)* $($s,)*> where $($t: Sink<$s>,)* {
291 $(#[pin] $t: $t,)*
292 peeked: Option<($(Option<$s>,)*)>,
293 ready: ($(Option<$t::Done>,)*),
294 }
295 #[allow(unused_variables)]
296 impl<$($t: Sink<$s>,)* $($s,)*> Sink<($($s,)*)> for $reducebasync<$($t,)* $($s,)*> {
297 type Done = ($($t::Done,)*);
298
299 fn poll_forward(self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = ($($s,)*)>>) -> Poll<Self::Done> {
300 let mut self_ = self.project();
301 let stream = stream.map(|item| ($(Some(item.$num),)*));
302 pin_mut!(stream);
303 loop {
304 let mut progress = false;
305 $({
306 let stream = Peekable{stream:stream.as_mut(),peeked:&mut *self_.peeked};
307 pin_mut!(stream);
308 let waker = cx.waker();
309 let stream = stream::poll_fn(|cx| match ready!(stream.as_mut().poll_peek(cx)) {
310 Some(enum_) if enum_.$num.is_some() => {
311 let ret = enum_.$num.take().unwrap();
312 progress = true;
313 Poll::Ready(Some(ret))
314 }
315 Some(_) => {
316 let waker_ = cx.waker();
317 if !waker.will_wake(waker_) {
318 waker_.wake_by_ref();
319 }
320 Poll::Pending
321 },
322 None => Poll::Ready(None),
323 }).fuse();
324 pin_mut!(stream);
325 if self_.ready.$num.is_none() {
326 if let Poll::Ready(done) = self_.$t.as_mut().poll_forward(cx, stream) {
327 self_.ready.$num = Some(done);
328 }
329 }
330 })*
331 if $(self_.ready.$num.is_some() &&)* true {
332 break Poll::Ready(($(self_.ready.$num.take().unwrap(),)*));
333 }
334 if let Some(peeked) = self_.peeked {
335 if $(peeked.$num.is_none() &&)* true {
336 *self_.peeked = None;
337 progress = true;
338 }
339 }
340 if !progress {
341 break Poll::Pending;
342 }
343 }
344 }
345 }
346 );
347}
348impl_tuple!(ReduceA0 ReduceA0Async ReduceC0 ReduceC0Async AsyncTuple0 Sum0 Join0:);
349impl_tuple!(ReduceA1 ReduceA1Async ReduceC1 ReduceC1Async AsyncTuple1 Sum1 Join1: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0,,);
350impl_tuple!(ReduceA2 ReduceA2Async ReduceC2 ReduceC2Async AsyncTuple2 Sum2 Join2 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,,);
351impl_tuple!(ReduceA3 ReduceA3Async ReduceC3 ReduceC3Async AsyncTuple3 Sum3 Join3 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,,);
352impl_tuple!(ReduceA4 ReduceA4Async ReduceC4 ReduceC4Async AsyncTuple4 Sum4 Join4 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,,);
353impl_tuple!(ReduceA5 ReduceA5Async ReduceC5 ReduceC5Async AsyncTuple5 Sum5 Join5 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,, 4 E S4 I4 R4 O4 C4 iterator_4 reducer_a_4 reducer_b_4 Copy,,);
354impl_tuple!(ReduceA6 ReduceA6Async ReduceC6 ReduceC6Async AsyncTuple6 Sum6 Join6 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,, 4 E S4 I4 R4 O4 C4 iterator_4 reducer_a_4 reducer_b_4 Copy,, 5 F S5 I5 R5 O5 C5 iterator_5 reducer_a_5 reducer_b_5 Copy,,);
355impl_tuple!(ReduceA7 ReduceA7Async ReduceC7 ReduceC7Async AsyncTuple7 Sum7 Join7 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,, 4 E S4 I4 R4 O4 C4 iterator_4 reducer_a_4 reducer_b_4 Copy,, 5 F S5 I5 R5 O5 C5 iterator_5 reducer_a_5 reducer_b_5 Copy,, 6 G S6 I6 R6 O6 C6 iterator_6 reducer_a_6 reducer_b_6 Copy,,);
356impl_tuple!(ReduceA8 ReduceA8Async ReduceC8 ReduceC8Async AsyncTuple8 Sum8 Join8 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,, 4 E S4 I4 R4 O4 C4 iterator_4 reducer_a_4 reducer_b_4 Copy,, 5 F S5 I5 R5 O5 C5 iterator_5 reducer_a_5 reducer_b_5 Copy,, 6 G S6 I6 R6 O6 C6 iterator_6 reducer_a_6 reducer_b_6 Copy,, 7 H S7 I7 R7 O7 C7 iterator_7 reducer_a_7 reducer_b_7 Copy,,);
357
358#[pin_project(project = PeekableProj)]
359#[derive(Debug)]
360#[must_use = "streams do nothing unless polled"]
361pub struct Peekable<'a, St: Stream> {
362 #[pin]
363 stream: St,
364 peeked: &'a mut Option<St::Item>,
365}
366
367impl<'a, St: Stream> Peekable<'a, St> {
368 pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&mut St::Item>> {
369 let PeekableProj { mut stream, peeked } = self.project();
370
371 Poll::Ready(loop {
372 if peeked.is_some() {
373 break peeked.as_mut();
374 } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
375 **peeked = Some(item);
376 } else {
377 break None;
378 }
379 })
380 }
381}
382
383impl<'a, S: Stream> Stream for Peekable<'a, S> {
384 type Item = S::Item;
385
386 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
387 let PeekableProj { stream, peeked } = self.project();
388 if let Some(item) = peeked.take() {
389 return Poll::Ready(Some(item));
390 }
391 stream.poll_next(cx)
392 }
393
394 fn size_hint(&self) -> (usize, Option<usize>) {
395 let peek_len = if self.peeked.is_some() { 1 } else { 0 };
396 let (lower, upper) = self.stream.size_hint();
397 let lower = lower.saturating_add(peek_len);
398 let upper = match upper {
399 Some(x) => x.checked_add(peek_len),
400 None => None,
401 };
402 (lower, upper)
403 }
404}