amadeus-core 0.4.3

Harmonious distributed data analysis in Rust.
Documentation
#![allow(unused_imports, clippy::single_component_path_imports)]

use derive_new::new;
use educe::Educe;
use futures::{ready, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
	future::Future, marker::PhantomData, pin::Pin, task::{Context, Poll}
};

use super::{Reducer, ReducerProcessSend, ReducerSend};
use crate::{pipe::Sink, pool::ProcessSend};

mod macros {
	#[macro_export]
	macro_rules! folder_par_sink {
		($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
			type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done;
			type Pipe = P;
			type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
			type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Final>;

			fn reducers($self) -> (P, Self::ReduceA, Self::ReduceC) {
				let init_a = $init_a;
				let init_b = $init_b;
				(
					$self.pipe,
					FolderSyncReducer::new(init_a),
					FolderSyncReducer::new(init_b),
				)
			}
		};
	}
	#[macro_export]
	macro_rules! folder_dist_sink {
		($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
			type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done>>::Done;
			type Pipe = P;
			type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
			type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Inter>;
			type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $folder_b, crate::par_sink::Final>;

			fn reducers($self) -> (P, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
				let init_a = $init_a;
				let init_b = $init_b;
				(
					$self.pipe,
					FolderSyncReducer::new(init_a),
					FolderSyncReducer::new(init_b.clone()),
					FolderSyncReducer::new(init_b),
				)
			}
		};
	}
	pub(crate) use folder_dist_sink;
	pub(crate) use folder_par_sink;
}

pub(crate) use macros::{folder_dist_sink, folder_par_sink};

pub trait FolderSync<Item> {
	type State;
	type Done;

	fn zero(&mut self) -> Self::State;
	fn push(&mut self, state: &mut Self::State, item: Item);
	fn done(&mut self, state: Self::State) -> Self::Done;
}

pub struct Inter;
pub struct Final;

#[derive(Educe, Serialize, Deserialize, new)]
#[educe(Clone(bound = "F: Clone"))]
#[serde(
	bound(serialize = "F: Serialize"),
	bound(deserialize = "F: Deserialize<'de>")
)]
pub struct FolderSyncReducer<Item, F, Final> {
	folder: F,
	marker: PhantomData<fn() -> (Item, Final)>,
}

impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Inter>
where
	F: FolderSync<Item>,
{
	type Done = F::State;
	type Async = FolderSyncReducerAsync<Item, F, F::State, Inter>;

	fn into_async(mut self) -> Self::Async {
		FolderSyncReducerAsync {
			state: Some(self.folder.zero()),
			folder: self.folder,
			marker: PhantomData,
		}
	}
}
impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Inter>
where
	F: FolderSync<Item>,
	F::State: ProcessSend + 'static,
{
	type Done = F::State;
}
impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Inter>
where
	F: FolderSync<Item>,
	F::State: Send + 'static,
{
	type Done = F::State;
}

impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Final>
where
	F: FolderSync<Item>,
{
	type Done = F::Done;
	type Async = FolderSyncReducerAsync<Item, F, F::State, Final>;

	fn into_async(mut self) -> Self::Async {
		FolderSyncReducerAsync {
			state: Some(self.folder.zero()),
			folder: self.folder,
			marker: PhantomData,
		}
	}
}
impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Final>
where
	F: FolderSync<Item>,
	F::Done: ProcessSend + 'static,
{
	type Done = F::Done;
}
impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Final>
where
	F: FolderSync<Item>,
	F::Done: Send + 'static,
{
	type Done = F::Done;
}

#[pin_project]
pub struct FolderSyncReducerAsync<Item, F, S, Final> {
	state: Option<S>,
	folder: F,
	marker: PhantomData<fn() -> (Item, Final)>,
}
impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Inter>
where
	F: FolderSync<Item>,
{
	type Done = F::State;

	#[inline(always)]
	fn poll_forward(
		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
	) -> Poll<Self::Done> {
		let self_ = self.project();
		let folder = self_.folder;
		let state = self_.state.as_mut().unwrap();
		while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
			folder.push(state, item);
		}
		Poll::Ready(self_.state.take().unwrap())
	}
}
impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Final>
where
	F: FolderSync<Item>,
{
	type Done = F::Done;

	#[inline(always)]
	fn poll_forward(
		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
	) -> Poll<Self::Done> {
		let self_ = self.project();
		let folder = self_.folder;
		let state = self_.state.as_mut().unwrap();
		while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
			folder.push(state, item);
		}
		Poll::Ready(folder.done(self_.state.take().unwrap()))
	}
}