amadeus_core/par_sink/
all.rs

1use derive_new::new;
2use educe::Educe;
3use futures::{ready, Stream};
4use pin_project::pin_project;
5use serde::{Deserialize, Serialize};
6use serde_closure::traits::FnMut;
7use std::{
8	marker::PhantomData, pin::Pin, task::{Context, Poll}
9};
10
11use super::{
12	DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend, ReducerSend
13};
14use crate::{pipe::Sink, pool::ProcessSend};
15
16#[derive(new)]
17#[must_use]
18pub struct All<P, F> {
19	pipe: P,
20	f: F,
21}
22
23impl<P: ParallelPipe<Item>, Item, F> ParallelSink<Item> for All<P, F>
24where
25	F: FnMut<(P::Output,), Output = bool> + Clone + Send + 'static,
26{
27	type Done = bool;
28	type Pipe = P;
29	type ReduceA = AllReducer<P::Output, F>;
30	type ReduceC = BoolAndReducer;
31
32	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
33		(self.pipe, AllReducer(self.f, PhantomData), BoolAndReducer)
34	}
35}
36impl<P: DistributedPipe<Item>, Item, F> DistributedSink<Item> for All<P, F>
37where
38	F: FnMut<(P::Output,), Output = bool> + Clone + ProcessSend + 'static,
39{
40	type Done = bool;
41	type Pipe = P;
42	type ReduceA = AllReducer<P::Output, F>;
43	type ReduceB = BoolAndReducer;
44	type ReduceC = BoolAndReducer;
45
46	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
47		(
48			self.pipe,
49			AllReducer(self.f, PhantomData),
50			BoolAndReducer,
51			BoolAndReducer,
52		)
53	}
54}
55
56#[derive(Educe, Serialize, Deserialize)]
57#[educe(Clone(bound = "F: Clone"))]
58#[serde(
59	bound(serialize = "F: Serialize"),
60	bound(deserialize = "F: Deserialize<'de>")
61)]
62pub struct AllReducer<Item, F>(F, PhantomData<fn() -> Item>);
63
64impl<Item, F> Reducer<Item> for AllReducer<Item, F>
65where
66	F: FnMut<(Item,), Output = bool>,
67{
68	type Done = bool;
69	type Async = AllReducerAsync<Item, F>;
70
71	fn into_async(self) -> Self::Async {
72		AllReducerAsync(self.0, true, PhantomData)
73	}
74}
75impl<Item, F> ReducerProcessSend<Item> for AllReducer<Item, F>
76where
77	F: FnMut<(Item,), Output = bool>,
78{
79	type Done = bool;
80}
81impl<Item, F> ReducerSend<Item> for AllReducer<Item, F>
82where
83	F: FnMut<(Item,), Output = bool>,
84{
85	type Done = bool;
86}
87
88#[pin_project]
89#[derive(Serialize, Deserialize)]
90#[serde(
91	bound(serialize = "F: Serialize"),
92	bound(deserialize = "F: Deserialize<'de>")
93)]
94pub struct AllReducerAsync<Item, F>(F, bool, PhantomData<fn() -> Item>);
95
96impl<Item, F> Sink<Item> for AllReducerAsync<Item, F>
97where
98	F: FnMut<(Item,), Output = bool>,
99{
100	type Done = bool;
101
102	#[inline(always)]
103	fn poll_forward(
104		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
105	) -> Poll<Self::Done> {
106		let self_ = self.project();
107		while *self_.1 {
108			if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
109				*self_.1 = *self_.1 && self_.0.call_mut((item,));
110			} else {
111				break;
112			}
113		}
114		Poll::Ready(*self_.1)
115	}
116}
117
118#[derive(Clone, Serialize, Deserialize)]
119pub struct BoolAndReducer;
120
121impl Reducer<bool> for BoolAndReducer {
122	type Done = bool;
123	type Async = BoolAndReducerAsync;
124
125	fn into_async(self) -> Self::Async {
126		BoolAndReducerAsync(true)
127	}
128}
129impl ReducerProcessSend<bool> for BoolAndReducer {
130	type Done = bool;
131}
132impl ReducerSend<bool> for BoolAndReducer {
133	type Done = bool;
134}
135
136#[pin_project]
137pub struct BoolAndReducerAsync(bool);
138impl Sink<bool> for BoolAndReducerAsync {
139	type Done = bool;
140
141	#[inline(always)]
142	fn poll_forward(
143		mut self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = bool>>,
144	) -> Poll<Self::Done> {
145		while self.0 {
146			if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
147				self.0 = self.0 && item;
148			} else {
149				break;
150			}
151		}
152		Poll::Ready(self.0)
153	}
154}