amadeus_core/par_sink/
pipe.rs

1use derive_new::new;
2use pin_project::pin_project;
3use serde::{Deserialize, Serialize};
4use std::{
5	pin::Pin, task::{Context, Poll}
6};
7
8use super::{DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask};
9use crate::{
10	par_stream::{ParallelStream, StreamTask}, pipe::{Pipe as _, PipePipe, StreamExt, StreamPipe}
11};
12
13#[pin_project]
14#[derive(new)]
15#[must_use]
16pub struct Pipe<A, B> {
17	#[pin]
18	a: A,
19	b: B,
20}
21
22impl_par_dist! {
23	impl<A: ParallelStream, B: ParallelPipe<A::Item>> ParallelStream for Pipe<A, B> {
24		type Item = B::Output;
25		type Task = JoinTask<A::Task, B::Task>;
26
27		#[inline(always)]
28		fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
29			let self_ = self.project();
30			let b = self_.b;
31			self_.a.next_task(cx).map(|task| {
32				task.map(|a| {
33					let b = b.task();
34					JoinTask { a, b }
35				})
36			})
37		}
38		#[inline(always)]
39		fn size_hint(&self) -> (usize, Option<usize>) {
40			self.a.size_hint()
41		}
42	}
43	impl<A: ParallelPipe<Input>, B: ParallelPipe<A::Output>, Input> ParallelPipe<Input>
44		for Pipe<A, B>
45	{
46		type Output = B::Output;
47		type Task = JoinTask<A::Task, B::Task>;
48
49		#[inline(always)]
50		fn task(&self) -> Self::Task {
51			let a = self.a.task();
52			let b = self.b.task();
53			JoinTask { a, b }
54		}
55	}
56}
57
58impl<A: ParallelPipe<Item>, B: ParallelSink<A::Output>, Item> ParallelSink<Item> for Pipe<A, B> {
59	type Done = B::Done;
60	type Pipe = Pipe<A, B::Pipe>;
61	type ReduceA = B::ReduceA;
62	type ReduceC = B::ReduceC;
63
64	#[inline(always)]
65	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
66		let (a, b, c) = self.b.reducers();
67		(Pipe::new(self.a, a), b, c)
68	}
69}
70impl<A: DistributedPipe<Item>, B: DistributedSink<A::Output>, Item> DistributedSink<Item>
71	for Pipe<A, B>
72{
73	type Done = B::Done;
74	type Pipe = Pipe<A, B::Pipe>;
75	type ReduceA = B::ReduceA;
76	type ReduceB = B::ReduceB;
77	type ReduceC = B::ReduceC;
78
79	#[inline(always)]
80	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
81		let (a, b, c, d) = self.b.reducers();
82		(Pipe::new(self.a, a), b, c, d)
83	}
84}
85
86#[pin_project]
87#[derive(Serialize, Deserialize)]
88pub struct JoinTask<A, B> {
89	#[pin]
90	a: A,
91	#[pin]
92	b: B,
93}
94
95impl<A: StreamTask, B: PipeTask<A::Item>> StreamTask for JoinTask<A, B> {
96	type Item = B::Output;
97	type Async = StreamPipe<A::Async, B::Async>;
98
99	#[inline(always)]
100	fn into_async(self) -> Self::Async {
101		self.a.into_async().pipe(self.b.into_async())
102	}
103}
104
105impl<A: PipeTask<Input>, B: PipeTask<A::Output>, Input> PipeTask<Input> for JoinTask<A, B> {
106	type Output = B::Output;
107	type Async = PipePipe<A::Async, B::Async>;
108
109	#[inline(always)]
110	fn into_async(self) -> Self::Async {
111		self.a.into_async().pipe(self.b.into_async())
112	}
113}