1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use derive_new::new;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
	pin::Pin, task::{Context, Poll}
};

use super::{DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask};
use crate::{
	par_stream::{ParallelStream, StreamTask}, pipe::{Pipe as _, PipePipe, StreamExt, StreamPipe}
};

#[pin_project]
#[derive(new)]
#[must_use]
pub struct Pipe<A, B> {
	#[pin]
	a: A,
	b: B,
}

impl_par_dist! {
	impl<A: ParallelStream, B: ParallelPipe<A::Item>> ParallelStream for Pipe<A, B> {
		type Item = B::Output;
		type Task = JoinTask<A::Task, B::Task>;

		#[inline(always)]
		fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
			let self_ = self.project();
			let b = self_.b;
			self_.a.next_task(cx).map(|task| {
				task.map(|a| {
					let b = b.task();
					JoinTask { a, b }
				})
			})
		}
		#[inline(always)]
		fn size_hint(&self) -> (usize, Option<usize>) {
			self.a.size_hint()
		}
	}
	impl<A: ParallelPipe<Input>, B: ParallelPipe<A::Output>, Input> ParallelPipe<Input>
		for Pipe<A, B>
	{
		type Output = B::Output;
		type Task = JoinTask<A::Task, B::Task>;

		#[inline(always)]
		fn task(&self) -> Self::Task {
			let a = self.a.task();
			let b = self.b.task();
			JoinTask { a, b }
		}
	}
}

impl<A: ParallelPipe<Item>, B: ParallelSink<A::Output>, Item> ParallelSink<Item> for Pipe<A, B> {
	type Done = B::Done;
	type Pipe = Pipe<A, B::Pipe>;
	type ReduceA = B::ReduceA;
	type ReduceC = B::ReduceC;

	#[inline(always)]
	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
		let (a, b, c) = self.b.reducers();
		(Pipe::new(self.a, a), b, c)
	}
}
impl<A: DistributedPipe<Item>, B: DistributedSink<A::Output>, Item> DistributedSink<Item>
	for Pipe<A, B>
{
	type Done = B::Done;
	type Pipe = Pipe<A, B::Pipe>;
	type ReduceA = B::ReduceA;
	type ReduceB = B::ReduceB;
	type ReduceC = B::ReduceC;

	#[inline(always)]
	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
		let (a, b, c, d) = self.b.reducers();
		(Pipe::new(self.a, a), b, c, d)
	}
}

#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct JoinTask<A, B> {
	#[pin]
	a: A,
	#[pin]
	b: B,
}

impl<A: StreamTask, B: PipeTask<A::Item>> StreamTask for JoinTask<A, B> {
	type Item = B::Output;
	type Async = StreamPipe<A::Async, B::Async>;

	#[inline(always)]
	fn into_async(self) -> Self::Async {
		self.a.into_async().pipe(self.b.into_async())
	}
}

impl<A: PipeTask<Input>, B: PipeTask<A::Output>, Input> PipeTask<Input> for JoinTask<A, B> {
	type Output = B::Output;
	type Async = PipePipe<A::Async, B::Async>;

	#[inline(always)]
	fn into_async(self) -> Self::Async {
		self.a.into_async().pipe(self.b.into_async())
	}
}