1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use derive_new::new;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};

use super::{DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask};
use crate::{
	par_stream::{ParallelStream, StreamTask}, pipe::{Pipe as _, PipePipe, StreamExt, StreamPipe}
};

#[derive(new)]
#[must_use]
pub struct Pipe<A, B> {
	a: A,
	b: B,
}

impl_par_dist! {
	impl<A: ParallelStream, B: ParallelPipe<A::Item>> ParallelStream for Pipe<A, B> {
		type Item = B::Item;
		type Task = JoinTask<A::Task, B::Task>;

		fn next_task(&mut self) -> Option<Self::Task> {
			self.a.next_task().map(|a| {
				let b = self.b.task();
				JoinTask { a, b }
			})
		}
		fn size_hint(&self) -> (usize, Option<usize>) {
			self.a.size_hint()
		}
	}
	impl<A: ParallelPipe<Source>, B: ParallelPipe<A::Item>, Source> ParallelPipe<Source> for Pipe<A, B> {
		type Item = B::Item;
		type Task = JoinTask<A::Task, B::Task>;

		fn task(&self) -> Self::Task {
			let a = self.a.task();
			let b = self.b.task();
			JoinTask { a, b }
		}
	}
}

impl<A: ParallelPipe<Source>, B: ParallelSink<A::Item>, Source> ParallelSink<Source>
	for Pipe<A, B>
{
	type Output = B::Output;
	type Pipe = Pipe<A, B::Pipe>;
	type ReduceA = B::ReduceA;
	type ReduceC = B::ReduceC;

	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
		let (a, b, c) = self.b.reducers();
		(Pipe::new(self.a, a), b, c)
	}
}
impl<A: DistributedPipe<Source>, B: DistributedSink<A::Item>, Source> DistributedSink<Source>
	for Pipe<A, B>
{
	type Output = B::Output;
	type Pipe = Pipe<A, B::Pipe>;
	type ReduceA = B::ReduceA;
	type ReduceB = B::ReduceB;
	type ReduceC = B::ReduceC;

	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
		let (a, b, c, d) = self.b.reducers();
		(Pipe::new(self.a, a), b, c, d)
	}
}

#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct JoinTask<A, B> {
	#[pin]
	a: A,
	#[pin]
	b: B,
}

impl<A: StreamTask, B: PipeTask<A::Item>> StreamTask for JoinTask<A, B> {
	type Item = B::Item;
	type Async = StreamPipe<A::Async, B::Async>;

	fn into_async(self) -> Self::Async {
		self.a.into_async().pipe(self.b.into_async())
	}
}

impl<A: PipeTask<Source>, B: PipeTask<A::Item>, Source> PipeTask<Source> for JoinTask<A, B> {
	type Item = B::Item;
	type Async = PipePipe<A::Async, B::Async>;

	fn into_async(self) -> Self::Async {
		self.a.into_async().pipe(self.b.into_async())
	}
}