1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use derive_new::new;
use futures::{pin_mut, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
	marker::PhantomData, pin::Pin, task::{Context, Poll}
};

use super::{
	DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask, PipeTaskAsync
};
use crate::sink::Sink;

#[derive(new)]
#[must_use]
pub struct Pipe<A, B> {
	a: A,
	b: B,
}

impl_par_dist! {
	impl<A: ParallelPipe<Source>, B: ParallelPipe<A::Item>, Source> ParallelPipe<Source> for Pipe<A, B> {
		type Item = B::Item;
		type Task = JoinTask<A::Task, B::Task>;

		fn task(&self) -> Self::Task {
			let a = self.a.task();
			let b = self.b.task();
			JoinTask { a, b }
		}
	}
}

impl<A: ParallelPipe<Source>, B: ParallelSink<A::Item>, Source> ParallelSink<Source>
	for Pipe<A, B>
{
	type Output = B::Output;
	type Pipe = Pipe<A, B::Pipe>;
	type ReduceAFactory = B::ReduceAFactory;
	type ReduceA = B::ReduceA;
	type ReduceC = B::ReduceC;

	fn reducers(self) -> (Self::Pipe, Self::ReduceAFactory, Self::ReduceC) {
		let (a, b, c) = self.b.reducers();
		(Pipe::new(self.a, a), b, c)
	}
}
impl<A: DistributedPipe<Source>, B: DistributedSink<A::Item>, Source> DistributedSink<Source>
	for Pipe<A, B>
{
	type Output = B::Output;
	type Pipe = Pipe<A, B::Pipe>;
	type ReduceAFactory = B::ReduceAFactory;
	type ReduceBFactory = B::ReduceBFactory;
	type ReduceA = B::ReduceA;
	type ReduceB = B::ReduceB;
	type ReduceC = B::ReduceC;

	fn reducers(
		self,
	) -> (
		Self::Pipe,
		Self::ReduceAFactory,
		Self::ReduceBFactory,
		Self::ReduceC,
	) {
		let (a, b, c, d) = self.b.reducers();
		(Pipe::new(self.a, a), b, c, d)
	}
}

#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct JoinTask<A, B> {
	#[pin]
	a: A,
	#[pin]
	b: B,
}

impl<A: PipeTask<Source>, B: PipeTask<A::Item>, Source> PipeTask<Source> for JoinTask<A, B> {
	type Item = B::Item;
	type Async = JoinTask<A::Async, B::Async>;
	fn into_async(self) -> Self::Async {
		JoinTask {
			a: self.a.into_async(),
			b: self.b.into_async(),
		}
	}
}

impl<A: PipeTaskAsync<Source>, B: PipeTaskAsync<A::Item>, Source> PipeTaskAsync<Source>
	for JoinTask<A, B>
{
	type Item = B::Item;

	fn poll_run(
		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Source>>,
		sink: Pin<&mut impl Sink<Item = Self::Item>>,
	) -> Poll<()> {
		#[pin_project]
		struct Proxy<'a, I, B, Item>(#[pin] I, Pin<&'a mut B>, PhantomData<fn() -> Item>);
		impl<'a, I, B, Item> Sink for Proxy<'a, I, B, Item>
		where
			I: Sink<Item = B::Item>,
			B: PipeTaskAsync<Item>,
		{
			type Item = Item;

			fn poll_forward(
				self: Pin<&mut Self>, cx: &mut Context,
				stream: Pin<&mut impl Stream<Item = Self::Item>>,
			) -> Poll<()> {
				let self_ = self.project();
				self_.1.as_mut().poll_run(cx, stream, self_.0)
			}
		}
		let self_ = self.project();
		let sink = Proxy(sink, self_.b, PhantomData);
		pin_mut!(sink);
		self_.a.poll_run(cx, stream, sink)
	}
}