amadeus-core 0.3.7

Harmonious distributed data analysis in Rust.
Documentation
use derive_new::new;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
	pin::Pin, task::{Context, Poll}
};

use super::{DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask};
use crate::{
	par_stream::{ParallelStream, StreamTask}, pipe::{Pipe as _, PipePipe, StreamExt, StreamPipe}
};

#[pin_project]
#[derive(new)]
#[must_use]
pub struct Pipe<A, B> {
	#[pin]
	a: A,
	b: B,
}

impl_par_dist! {
	impl<A: ParallelStream, B: ParallelPipe<A::Item>> ParallelStream for Pipe<A, B> {
		type Item = B::Output;
		type Task = JoinTask<A::Task, B::Task>;

		#[inline(always)]
		fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
			let self_ = self.project();
			let b = self_.b;
			self_.a.next_task(cx).map(|task| {
				task.map(|a| {
					let b = b.task();
					JoinTask { a, b }
				})
			})
		}
		#[inline(always)]
		fn size_hint(&self) -> (usize, Option<usize>) {
			self.a.size_hint()
		}
	}
	impl<A: ParallelPipe<Input>, B: ParallelPipe<A::Output>, Input> ParallelPipe<Input>
		for Pipe<A, B>
	{
		type Output = B::Output;
		type Task = JoinTask<A::Task, B::Task>;

		#[inline(always)]
		fn task(&self) -> Self::Task {
			let a = self.a.task();
			let b = self.b.task();
			JoinTask { a, b }
		}
	}
}

impl<A: ParallelPipe<Item>, B: ParallelSink<A::Output>, Item> ParallelSink<Item> for Pipe<A, B> {
	type Done = B::Done;
	type Pipe = Pipe<A, B::Pipe>;
	type ReduceA = B::ReduceA;
	type ReduceC = B::ReduceC;

	#[inline(always)]
	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
		let (a, b, c) = self.b.reducers();
		(Pipe::new(self.a, a), b, c)
	}
}
impl<A: DistributedPipe<Item>, B: DistributedSink<A::Output>, Item> DistributedSink<Item>
	for Pipe<A, B>
{
	type Done = B::Done;
	type Pipe = Pipe<A, B::Pipe>;
	type ReduceA = B::ReduceA;
	type ReduceB = B::ReduceB;
	type ReduceC = B::ReduceC;

	#[inline(always)]
	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
		let (a, b, c, d) = self.b.reducers();
		(Pipe::new(self.a, a), b, c, d)
	}
}

#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct JoinTask<A, B> {
	#[pin]
	a: A,
	#[pin]
	b: B,
}

impl<A: StreamTask, B: PipeTask<A::Item>> StreamTask for JoinTask<A, B> {
	type Item = B::Output;
	type Async = StreamPipe<A::Async, B::Async>;

	#[inline(always)]
	fn into_async(self) -> Self::Async {
		self.a.into_async().pipe(self.b.into_async())
	}
}

impl<A: PipeTask<Input>, B: PipeTask<A::Output>, Input> PipeTask<Input> for JoinTask<A, B> {
	type Output = B::Output;
	type Async = PipePipe<A::Async, B::Async>;

	#[inline(always)]
	fn into_async(self) -> Self::Async {
		self.a.into_async().pipe(self.b.into_async())
	}
}