amadeus-core 0.2.5

Harmonious distributed data analysis in Rust.
Documentation
use derive_new::new;
use futures::{ready, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
	marker::PhantomData, pin::Pin, task::{Context, Poll}
};

use super::{
	DistributedPipe, DistributedSink, Factory, ParallelPipe, ParallelSink, Reducer, ReducerAsync, ReducerProcessSend, ReducerSend
};
use crate::pool::ProcessSend;

#[derive(new)]
#[must_use]
pub struct Any<I, F> {
	i: I,
	f: F,
}

impl<I: DistributedPipe<Source>, Source, F> DistributedSink<Source> for Any<I, F>
where
	F: FnMut(I::Item) -> bool + Clone + ProcessSend + 'static,
{
	type Output = bool;
	type Pipe = I;
	type ReduceAFactory = AnyReducerFactory<I::Item, F>;
	type ReduceBFactory = BoolOrReducerFactory;
	type ReduceA = AnyReducer<I::Item, F>;
	type ReduceB = BoolOrReducer;
	type ReduceC = BoolOrReducer;

	fn reducers(
		self,
	) -> (
		Self::Pipe,
		Self::ReduceAFactory,
		Self::ReduceBFactory,
		Self::ReduceC,
	) {
		(
			self.i,
			AnyReducerFactory(self.f, PhantomData),
			BoolOrReducerFactory,
			BoolOrReducer(true),
		)
	}
}
impl<I: ParallelPipe<Source>, Source, F> ParallelSink<Source> for Any<I, F>
where
	F: FnMut(I::Item) -> bool + Clone + Send + 'static,
{
	type Output = bool;
	type Pipe = I;
	type ReduceAFactory = AnyReducerFactory<I::Item, F>;
	type ReduceA = AnyReducer<I::Item, F>;
	type ReduceC = BoolOrReducer;

	fn reducers(self) -> (Self::Pipe, Self::ReduceAFactory, Self::ReduceC) {
		(
			self.i,
			AnyReducerFactory(self.f, PhantomData),
			BoolOrReducer(true),
		)
	}
}

#[derive(Serialize, Deserialize)]
#[serde(
	bound(serialize = "F: Serialize"),
	bound(deserialize = "F: Deserialize<'de>")
)]
pub struct AnyReducerFactory<A, F>(F, PhantomData<fn() -> A>);
impl<A, F> Factory for AnyReducerFactory<A, F>
where
	F: FnMut(A) -> bool + Clone,
{
	type Item = AnyReducer<A, F>;
	fn make(&self) -> Self::Item {
		AnyReducer(self.0.clone(), true, PhantomData)
	}
}
impl<A, F> Clone for AnyReducerFactory<A, F>
where
	F: Clone,
{
	fn clone(&self) -> Self {
		Self(self.0.clone(), PhantomData)
	}
}

#[pin_project]
#[derive(Serialize, Deserialize)]
#[serde(
	bound(serialize = "F: Serialize"),
	bound(deserialize = "F: Deserialize<'de>")
)]
pub struct AnyReducer<A, F>(F, bool, PhantomData<fn() -> A>);

impl<A, F> Reducer for AnyReducer<A, F>
where
	F: FnMut(A) -> bool,
{
	type Item = A;
	type Output = bool;
	type Async = Self;
	fn into_async(self) -> Self::Async {
		self
	}
}
impl<A, F> ReducerAsync for AnyReducer<A, F>
where
	F: FnMut(A) -> bool,
{
	type Item = A;
	type Output = bool;

	#[inline(always)]
	fn poll_forward(
		self: Pin<&mut Self>, cx: &mut Context,
		mut stream: Pin<&mut impl Stream<Item = Self::Item>>,
	) -> Poll<()> {
		let self_ = self.project();
		while *self_.1 {
			if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
				*self_.1 = *self_.1 && !self_.0(item);
			} else {
				break;
			}
		}
		Poll::Ready(())
	}
	fn poll_output(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
		Poll::Ready(!self.1)
	}
}
impl<A, F> ReducerProcessSend for AnyReducer<A, F>
where
	F: FnMut(A) -> bool,
{
	type Output = bool;
}
impl<A, F> ReducerSend for AnyReducer<A, F>
where
	F: FnMut(A) -> bool,
{
	type Output = bool;
}

#[derive(Serialize, Deserialize)]
pub struct BoolOrReducerFactory;
impl Factory for BoolOrReducerFactory {
	type Item = BoolOrReducer;
	fn make(&self) -> Self::Item {
		BoolOrReducer(true)
	}
}

#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct BoolOrReducer(bool);

impl Reducer for BoolOrReducer {
	type Item = bool;
	type Output = bool;
	type Async = Self;
	fn into_async(self) -> Self::Async {
		self
	}
}
impl ReducerAsync for BoolOrReducer {
	type Item = bool;
	type Output = bool;

	#[inline(always)]
	fn poll_forward(
		mut self: Pin<&mut Self>, cx: &mut Context,
		mut stream: Pin<&mut impl Stream<Item = Self::Item>>,
	) -> Poll<()> {
		while self.0 {
			if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
				self.0 = self.0 && !item;
			} else {
				break;
			}
		}
		Poll::Ready(())
	}
	fn poll_output(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
		Poll::Ready(!self.0)
	}
}
impl ReducerProcessSend for BoolOrReducer {
	type Output = bool;
}
impl ReducerSend for BoolOrReducer {
	type Output = bool;
}