amadeus_core/par_sink/
tuple.rs

1#![allow(
2	non_snake_case,
3	clippy::type_complexity,
4	irrefutable_let_patterns,
5	clippy::new_without_default,
6	unused_mut,
7	unreachable_code,
8	clippy::too_many_arguments
9)]
10
11use derive_new::new;
12use futures::{pin_mut, ready, stream, Stream, StreamExt};
13use pin_project::pin_project;
14use serde::{Deserialize, Serialize};
15use std::{
16	pin::Pin, task::{Context, Poll}
17};
18use sum::{Sum0, Sum1, Sum2, Sum3, Sum4, Sum5, Sum6, Sum7, Sum8};
19
20use super::{
21	DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask, Reducer, ReducerProcessSend, ReducerSend
22};
23use crate::{
24	pipe::{Pipe, Sink}, pool::ProcessSend
25};
26
27fn substream<'a, 'b, 'c, 'd, 'e, S, F1, F2, O>(
28	cx: &'d Context<'c>, mut stream: Pin<&'a mut Peekable<'b, S>>, mut is: F1, mut unwrap: F2,
29) -> impl Stream<Item = F2::Output> + 'a
30where
31	S: Stream,
32	F1: FnMut(&S::Item) -> bool + 'a,
33	F2: FnMut(S::Item) -> O + 'a,
34	'a: 'b,
35{
36	let waker = cx.waker().clone();
37	stream::poll_fn(move |cx| match ready!(stream.as_mut().poll_peek(cx)) {
38		Some(enum_) if is(enum_) => Poll::Ready(Some(
39			if let Poll::Ready(Some(enum_)) = stream.as_mut().poll_next(cx) {
40				unwrap(enum_)
41			} else {
42				unreachable!()
43			},
44		)),
45		Some(_) => {
46			let waker_ = cx.waker();
47			if !waker.will_wake(waker_) {
48				waker_.wake_by_ref();
49			}
50			Poll::Pending
51		}
52		None => Poll::Ready(None),
53	})
54	.fuse()
55}
56
57macro_rules! impl_tuple {
58	($reducea:ident $reduceaasync:ident $reduceb:ident $reducebasync:ident $async:ident $enum:ident $join:ident $($copy:ident)? : $($num:tt $t:ident $s:ident $i:ident $r:ident $o:ident $c:ident $iterator:ident $reducera:ident $reducerb:ident $($copyb:ident)? , $comma:tt)*) => (
59		impl<
60				Item,
61				$($r: ParallelSink<Item, Done = $o>,)*
62				$($o,)*
63			> ParallelSink<Item> for ($($r,)*)
64				where Item: $($copy)*,
65		{
66			type Done = ($($o,)*);
67			type Pipe = ($($r::Pipe,)*);
68			type ReduceA = $reducea<$($r::ReduceA,)*>;
69			type ReduceC = $reduceb<$($r::ReduceC,)*>;
70
71			fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
72				$(let ($iterator, $reducera, $t) = self.$num.reducers();)*
73				(
74					($($iterator,)*),
75					$reducea{$($t: $reducera,)*},
76					$reduceb{$($t,)*},
77				)
78			}
79		}
80		impl<
81				Item,
82				$($r: DistributedSink<Item, Done = $o>,)*
83				$($o,)*
84			> DistributedSink<Item> for ($($r,)*)
85				where Item: $($copy)*,
86		{
87			type Done = ($($o,)*);
88			type Pipe = ($($r::Pipe,)*);
89			type ReduceA = $reducea<$($r::ReduceA,)*>;
90			type ReduceB = $reduceb<$($r::ReduceB,)*>;
91			type ReduceC = $reduceb<$($r::ReduceC,)*>;
92
93			fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
94				$(let ($iterator, $reducera, $reducerb, $t) = self.$num.reducers();)*
95				(
96					($($iterator,)*),
97					$reducea{$($t: $reducera,)*},
98					$reduceb{$($t: $reducerb,)*},
99					$reduceb{$($t,)*},
100				)
101			}
102		}
103
104		impl<Input, $($i: ParallelPipe<Input>,)*>
105			ParallelPipe<Input> for ($($i,)*)
106				where Input: $($copy)*,
107		{
108			type Output = $enum<$($i::Output,)*>;
109			type Task = ($($i::Task,)*);
110
111			#[allow(clippy::unused_unit)]
112			fn task(&self) -> Self::Task {
113				($(self.$num.task(),)*)
114			}
115		}
116		impl<Input, $($i: DistributedPipe<Input>,)*>
117			DistributedPipe<Input> for ($($i,)*)
118				where Input: $($copy)*,
119		{
120			type Output = $enum<$($i::Output,)*>;
121			type Task = ($($i::Task,)*);
122
123			#[allow(clippy::unused_unit)]
124			fn task(&self) -> Self::Task {
125				($(self.$num.task(),)*)
126			}
127		}
128
129		impl<Input, $($c: PipeTask<Input>,)*> PipeTask<Input> for ($($c,)*)
130		where
131			Input: $($copy)*,
132		{
133			type Output = $enum<$($c::Output,)*>;
134			type Async = $async<Input, $($c::Async,)*>;
135
136			fn into_async(self) -> Self::Async {
137				$async{
138					$($t: Some(self.$num.into_async()),)*
139					pending: None,
140					given: ($(false $comma)*),
141				}
142			}
143		}
144
145		#[pin_project]
146		pub struct $async<Input, $($c,)*> {
147			$(#[pin] $t: Option<$c>,)*
148			pending: Option<Option<Input>>,
149			given: ($(bool $comma)*),
150		}
151
152		#[allow(unused_variables)]
153		impl<Input, $($c: Pipe<Input>,)*> Pipe<Input> for $async<Input, $($c,)*>
154		where
155			Input: $($copy)*,
156		{
157			type Output = $enum<$($c::Output,)*>;
158
159			#[allow(non_snake_case)]
160			fn poll_next(
161				self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
162			) -> Poll<Option<Self::Output>> {
163				let mut self_ = self.project();
164				// buffer, copy to each
165				loop {
166					if self_.pending.is_none() {
167						*self_.pending = Some(ready!(stream.as_mut().poll_next(cx)));
168					}
169					$({
170						let pending: &mut Option<Input> = self_.pending.as_mut().unwrap();
171						let given = &mut self_.given.$num;
172						let waker = cx.waker();
173						let stream_ = stream::poll_fn(|cx| {
174							if !*given {
175								*given = true;
176								$(
177									return Poll::Ready(*pending);
178									let $copyb = ();
179								)?
180								Poll::Ready(pending.take())
181							} else {
182								let waker_ = cx.waker();
183								if !waker.will_wake(waker_) {
184									waker_.wake_by_ref();
185								}
186								Poll::Pending
187							}
188						}).fuse();
189						pin_mut!(stream_);
190						match self_.$t.as_mut().as_pin_mut().map(|pipe|pipe.poll_next(cx, stream_)) {
191							Some(Poll::Ready(Some(item))) => break Poll::Ready(Some($enum::$t(item))),
192							Some(Poll::Ready(None)) | None => { self_.$t.set(None); *given = true },
193							Some(Poll::Pending) => (),
194						}
195					})*
196					if $(self_.$t.is_none() &&)* true {
197						break Poll::Ready(None);
198					}
199					if $(self_.given.$num &&)* true {
200						$(self_.given.$num = false;)*
201						*self_.pending = None;
202					} else {
203						assert!(self_.pending.as_ref().unwrap().is_some());
204						break Poll::Pending;
205					}
206				}
207			}
208		}
209
210		#[derive(Clone, Serialize, Deserialize, new)]
211		pub struct $reducea<$($t,)*> {
212			$($t: $t,)*
213		}
214		impl<$($t: Reducer<$s>,)* $($s,)*> Reducer<$enum<$($s,)*>> for $reducea<$($t,)*> {
215			type Done = ($($t::Done,)*);
216			type Async = $reduceaasync<$($t::Async,)* $($s,)*>;
217
218			fn into_async(self) -> Self::Async {
219				$reduceaasync{
220					$($t: self.$t.into_async(),)*
221					peeked: None,
222					ready: ($(None::<$t::Done>,)*),
223				}
224			}
225		}
226		impl<$($t: Reducer<$s>,)* $($s,)*> ReducerProcessSend<$enum<$($s,)*>> for $reducea<$($t,)*> where $($t::Done: ProcessSend + 'static,)* {
227			type Done = ($($t::Done,)*);
228		}
229		impl<$($t: Reducer<$s>,)* $($s,)*> ReducerSend<$enum<$($s,)*>> for $reducea<$($t,)*> where $($t::Done: Send + 'static,)* {
230			type Done = ($($t::Done,)*);
231		}
232		#[pin_project]
233		pub struct $reduceaasync<$($t,)* $($s,)*> where $($t: Sink<$s>,)* {
234			$(#[pin] $t: $t,)*
235			peeked: Option<$enum<$($s,)*>>,
236			ready: ($(Option<$t::Done>,)*),
237		}
238		#[allow(unused_variables)]
239		impl<$($t: Sink<$s>,)* $($s,)*> Sink<$enum<$($s,)*>> for $reduceaasync<$($t,)* $($s,)*> {
240			type Done = ($($t::Done,)*);
241
242			fn poll_forward(self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = $enum<$($s,)*>>>) -> Poll<Self::Done> {
243				let mut self_ = self.project();
244				loop {
245					let mut progress = false;
246					$({
247						let stream = Peekable{stream:stream.as_mut(),peeked:&mut *self_.peeked};
248						pin_mut!(stream);
249						let stream_ = substream(cx, stream, |item| matches!(item, $enum::$t(_)), |item| { progress = true; if let $enum::$t(item) = item { item } else { unreachable!() } });
250						pin_mut!(stream_);
251						if self_.ready.$num.is_none() {
252							if let Poll::Ready(done) = self_.$t.as_mut().poll_forward(cx, stream_) {
253								self_.ready.$num = Some(done);
254							}
255						}
256					})*
257					if $(self_.ready.$num.is_some() &&)* true {
258						break Poll::Ready(($(self_.ready.$num.take().unwrap(),)*));
259					}
260					if !progress {
261						break Poll::Pending;
262					}
263				}
264			}
265		}
266
267		#[derive(Clone, Serialize, Deserialize, new)]
268		pub struct $reduceb<$($t,)*> {
269			$($t: $t,)*
270		}
271		impl<$($t: Reducer<$s>,)* $($s,)*> Reducer<($($s,)*)> for $reduceb<$($t,)*> {
272			type Done = ($($t::Done,)*);
273			type Async = $reducebasync<$($t::Async,)* $($s,)*>;
274
275			fn into_async(self) -> Self::Async {
276				$reducebasync{
277					$($t: self.$t.into_async(),)*
278					peeked: None,
279					ready: ($(None::<$t::Done>,)*),
280				}
281			}
282		}
283		impl<$($t: ReducerProcessSend<$s>,)* $($s,)*> ReducerProcessSend<($($s,)*)> for $reduceb<$($t,)*> {
284			type Done = ($(<$t as ReducerProcessSend<$s>>::Done,)*);
285		}
286		impl<$($t: ReducerSend<$s>,)* $($s,)*> ReducerSend<($($s,)*)> for $reduceb<$($t,)*> {
287			type Done = ($(<$t as ReducerSend<$s>>::Done,)*);
288		}
289		#[pin_project]
290		pub struct $reducebasync<$($t,)* $($s,)*> where $($t: Sink<$s>,)* {
291			$(#[pin] $t: $t,)*
292			peeked: Option<($(Option<$s>,)*)>,
293			ready: ($(Option<$t::Done>,)*),
294		}
295		#[allow(unused_variables)]
296		impl<$($t: Sink<$s>,)* $($s,)*> Sink<($($s,)*)> for $reducebasync<$($t,)* $($s,)*> {
297			type Done = ($($t::Done,)*);
298
299			fn poll_forward(self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = ($($s,)*)>>) -> Poll<Self::Done> {
300				let mut self_ = self.project();
301				let stream = stream.map(|item| ($(Some(item.$num),)*));
302				pin_mut!(stream);
303				loop {
304					let mut progress = false;
305					$({
306						let stream = Peekable{stream:stream.as_mut(),peeked:&mut *self_.peeked};
307						pin_mut!(stream);
308						let waker = cx.waker();
309						let stream = stream::poll_fn(|cx| match ready!(stream.as_mut().poll_peek(cx)) {
310							Some(enum_) if enum_.$num.is_some() => {
311								let ret = enum_.$num.take().unwrap();
312								progress = true;
313								Poll::Ready(Some(ret))
314							}
315							Some(_) => {
316								let waker_ = cx.waker();
317								if !waker.will_wake(waker_) {
318									waker_.wake_by_ref();
319								}
320								Poll::Pending
321							},
322							None => Poll::Ready(None),
323						}).fuse();
324						pin_mut!(stream);
325						if self_.ready.$num.is_none() {
326							if let Poll::Ready(done) = self_.$t.as_mut().poll_forward(cx, stream) {
327								self_.ready.$num = Some(done);
328							}
329						}
330					})*
331					if $(self_.ready.$num.is_some() &&)* true {
332						break Poll::Ready(($(self_.ready.$num.take().unwrap(),)*));
333					}
334					if let Some(peeked) = self_.peeked {
335						if $(peeked.$num.is_none() &&)* true {
336							*self_.peeked = None;
337							progress = true;
338						}
339					}
340					if !progress {
341						break Poll::Pending;
342					}
343				}
344			}
345		}
346	);
347}
348impl_tuple!(ReduceA0 ReduceA0Async ReduceC0 ReduceC0Async AsyncTuple0 Sum0 Join0:);
349impl_tuple!(ReduceA1 ReduceA1Async ReduceC1 ReduceC1Async AsyncTuple1 Sum1 Join1: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0,,);
350impl_tuple!(ReduceA2 ReduceA2Async ReduceC2 ReduceC2Async AsyncTuple2 Sum2 Join2 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,,);
351impl_tuple!(ReduceA3 ReduceA3Async ReduceC3 ReduceC3Async AsyncTuple3 Sum3 Join3 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,,);
352impl_tuple!(ReduceA4 ReduceA4Async ReduceC4 ReduceC4Async AsyncTuple4 Sum4 Join4 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,,);
353impl_tuple!(ReduceA5 ReduceA5Async ReduceC5 ReduceC5Async AsyncTuple5 Sum5 Join5 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,, 4 E S4 I4 R4 O4 C4 iterator_4 reducer_a_4 reducer_b_4 Copy,,);
354impl_tuple!(ReduceA6 ReduceA6Async ReduceC6 ReduceC6Async AsyncTuple6 Sum6 Join6 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,, 4 E S4 I4 R4 O4 C4 iterator_4 reducer_a_4 reducer_b_4 Copy,, 5 F S5 I5 R5 O5 C5 iterator_5 reducer_a_5 reducer_b_5 Copy,,);
355impl_tuple!(ReduceA7 ReduceA7Async ReduceC7 ReduceC7Async AsyncTuple7 Sum7 Join7 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,, 4 E S4 I4 R4 O4 C4 iterator_4 reducer_a_4 reducer_b_4 Copy,, 5 F S5 I5 R5 O5 C5 iterator_5 reducer_a_5 reducer_b_5 Copy,, 6 G S6 I6 R6 O6 C6 iterator_6 reducer_a_6 reducer_b_6 Copy,,);
356impl_tuple!(ReduceA8 ReduceA8Async ReduceC8 ReduceC8Async AsyncTuple8 Sum8 Join8 Copy: 0 A S0 I0 R0 O0 C0 iterator_0 reducer_a_0 reducer_b_0 Copy,, 1 B S1 I1 R1 O1 C1 iterator_1 reducer_a_1 reducer_b_1 Copy,, 2 C S2 I2 R2 O2 C2 iterator_2 reducer_a_2 reducer_b_2 Copy,, 3 D S3 I3 R3 O3 C3 iterator_3 reducer_a_3 reducer_b_3 Copy,, 4 E S4 I4 R4 O4 C4 iterator_4 reducer_a_4 reducer_b_4 Copy,, 5 F S5 I5 R5 O5 C5 iterator_5 reducer_a_5 reducer_b_5 Copy,, 6 G S6 I6 R6 O6 C6 iterator_6 reducer_a_6 reducer_b_6 Copy,, 7 H S7 I7 R7 O7 C7 iterator_7 reducer_a_7 reducer_b_7 Copy,,);
357
358#[pin_project(project = PeekableProj)]
359#[derive(Debug)]
360#[must_use = "streams do nothing unless polled"]
361pub struct Peekable<'a, St: Stream> {
362	#[pin]
363	stream: St,
364	peeked: &'a mut Option<St::Item>,
365}
366
367impl<'a, St: Stream> Peekable<'a, St> {
368	pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&mut St::Item>> {
369		let PeekableProj { mut stream, peeked } = self.project();
370
371		Poll::Ready(loop {
372			if peeked.is_some() {
373				break peeked.as_mut();
374			} else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
375				**peeked = Some(item);
376			} else {
377				break None;
378			}
379		})
380	}
381}
382
383impl<'a, S: Stream> Stream for Peekable<'a, S> {
384	type Item = S::Item;
385
386	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
387		let PeekableProj { stream, peeked } = self.project();
388		if let Some(item) = peeked.take() {
389			return Poll::Ready(Some(item));
390		}
391		stream.poll_next(cx)
392	}
393
394	fn size_hint(&self) -> (usize, Option<usize>) {
395		let peek_len = if self.peeked.is_some() { 1 } else { 0 };
396		let (lower, upper) = self.stream.size_hint();
397		let lower = lower.saturating_add(peek_len);
398		let upper = match upper {
399			Some(x) => x.checked_add(peek_len),
400			None => None,
401		};
402		(lower, upper)
403	}
404}