amadeus-core 0.2.5

Harmonious distributed data analysis in Rust.
Documentation
#![allow(clippy::type_complexity)]

use derive_new::new;
use educe::Educe;
use futures::{ready, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
	marker::PhantomData, pin::Pin, task::{Context, Poll}
};

use super::{Factory, Reducer, ReducerAsync, ReducerProcessSend, ReducerSend};
use crate::pool::ProcessSend;

mod macros {
	#[macro_export]
	macro_rules! folder_par_sink {
		($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
			type Output = <Self::ReduceC as $crate::par_sink::Reducer>::Output;
			type Pipe = I;
			type ReduceAFactory = FolderSyncReducerFactory<I::Item, $folder_a>;
			type ReduceA = FolderSyncReducer<I::Item, $folder_a>;
			type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;

			fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceC) {
				let init_a = $init_a;
				let init_b = $init_b;
				(
					$self.i,
					FolderSyncReducerFactory::new(init_a),
					FolderSyncReducer::new(init_b),
				)
			}
		};
	}
	#[macro_export]
	macro_rules! folder_dist_sink {
		($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
			type Output = <Self::ReduceC as $crate::par_sink::Reducer>::Output;
			type Pipe = I;
			type ReduceAFactory = FolderSyncReducerFactory<I::Item, $folder_a>;
			type ReduceBFactory = FolderSyncReducerFactory<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
			type ReduceA = FolderSyncReducer<I::Item, $folder_a>;
			type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer>::Output, $folder_b>;
			type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer>::Output, $folder_b>;

			fn reducers($self) -> (I, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC) {
				let init_a = $init_a;
				let init_b = $init_b;
				(
					$self.i,
					FolderSyncReducerFactory::new(init_a),
					FolderSyncReducerFactory::new(init_b.clone()),
					FolderSyncReducer::new(init_b),
				)
			}
		};
	}
	pub use folder_dist_sink;
	pub use folder_par_sink;
}

pub use macros::{folder_dist_sink, folder_par_sink};

pub trait FolderSync<A> {
	type Output;

	fn zero(&mut self) -> Self::Output;
	fn push(&mut self, state: &mut Self::Output, item: A);
}

#[derive(Educe, Serialize, Deserialize, new)]
#[educe(Clone(bound = "C: Clone"))]
#[serde(
	bound(serialize = "C: Serialize"),
	bound(deserialize = "C: Deserialize<'de>")
)]
pub struct FolderSyncReducerFactory<A, C> {
	folder: C,
	marker: PhantomData<fn() -> A>,
}
impl<A, C> Factory for FolderSyncReducerFactory<A, C>
where
	C: FolderSync<A> + Clone,
{
	type Item = FolderSyncReducer<A, C>;

	fn make(&self) -> Self::Item {
		FolderSyncReducer {
			folder: self.folder.clone(),
			marker: PhantomData,
		}
	}
}

#[derive(Educe, Serialize, Deserialize, new)]
#[educe(Clone(bound = "C: Clone"))]
#[serde(
	bound(serialize = "C: Serialize"),
	bound(deserialize = "C: Deserialize<'de>")
)]
pub struct FolderSyncReducer<A, C> {
	folder: C,
	marker: PhantomData<fn() -> A>,
}

impl<A, C> Reducer for FolderSyncReducer<A, C>
where
	C: FolderSync<A>,
{
	type Item = A;
	type Output = C::Output;
	type Async = FolderSyncReducerAsync<A, C, C::Output>;

	fn into_async(mut self) -> Self::Async {
		FolderSyncReducerAsync {
			state: Some(self.folder.zero()),
			folder: self.folder,
			marker: PhantomData,
		}
	}
}

#[pin_project]
pub struct FolderSyncReducerAsync<A, C, D> {
	state: Option<D>,
	folder: C,
	marker: PhantomData<fn() -> A>,
}
impl<A, C> ReducerAsync for FolderSyncReducerAsync<A, C, C::Output>
where
	C: FolderSync<A>,
{
	type Item = A;
	type Output = C::Output;

	#[inline(always)]
	fn poll_forward(
		self: Pin<&mut Self>, cx: &mut Context,
		mut stream: Pin<&mut impl Stream<Item = Self::Item>>,
	) -> Poll<()> {
		let self_ = self.project();
		let folder = self_.folder;
		while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
			folder.push(self_.state.as_mut().unwrap(), item);
		}
		Poll::Ready(())
	}
	fn poll_output(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
		Poll::Ready(self.project().state.take().unwrap())
	}
}
impl<A, C> ReducerProcessSend for FolderSyncReducer<A, C>
where
	C: FolderSync<A>,
	C::Output: ProcessSend + 'static,
{
	type Output = C::Output;
}
impl<A, C> ReducerSend for FolderSyncReducer<A, C>
where
	C: FolderSync<A>,
	C::Output: Send + 'static,
{
	type Output = C::Output;
}