amadeus_core/par_sink/
all.rs1use derive_new::new;
2use educe::Educe;
3use futures::{ready, Stream};
4use pin_project::pin_project;
5use serde::{Deserialize, Serialize};
6use serde_closure::traits::FnMut;
7use std::{
8 marker::PhantomData, pin::Pin, task::{Context, Poll}
9};
10
11use super::{
12 DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend, ReducerSend
13};
14use crate::{pipe::Sink, pool::ProcessSend};
15
16#[derive(new)]
17#[must_use]
18pub struct All<P, F> {
19 pipe: P,
20 f: F,
21}
22
23impl<P: ParallelPipe<Item>, Item, F> ParallelSink<Item> for All<P, F>
24where
25 F: FnMut<(P::Output,), Output = bool> + Clone + Send + 'static,
26{
27 type Done = bool;
28 type Pipe = P;
29 type ReduceA = AllReducer<P::Output, F>;
30 type ReduceC = BoolAndReducer;
31
32 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
33 (self.pipe, AllReducer(self.f, PhantomData), BoolAndReducer)
34 }
35}
36impl<P: DistributedPipe<Item>, Item, F> DistributedSink<Item> for All<P, F>
37where
38 F: FnMut<(P::Output,), Output = bool> + Clone + ProcessSend + 'static,
39{
40 type Done = bool;
41 type Pipe = P;
42 type ReduceA = AllReducer<P::Output, F>;
43 type ReduceB = BoolAndReducer;
44 type ReduceC = BoolAndReducer;
45
46 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
47 (
48 self.pipe,
49 AllReducer(self.f, PhantomData),
50 BoolAndReducer,
51 BoolAndReducer,
52 )
53 }
54}
55
56#[derive(Educe, Serialize, Deserialize)]
57#[educe(Clone(bound = "F: Clone"))]
58#[serde(
59 bound(serialize = "F: Serialize"),
60 bound(deserialize = "F: Deserialize<'de>")
61)]
62pub struct AllReducer<Item, F>(F, PhantomData<fn() -> Item>);
63
64impl<Item, F> Reducer<Item> for AllReducer<Item, F>
65where
66 F: FnMut<(Item,), Output = bool>,
67{
68 type Done = bool;
69 type Async = AllReducerAsync<Item, F>;
70
71 fn into_async(self) -> Self::Async {
72 AllReducerAsync(self.0, true, PhantomData)
73 }
74}
75impl<Item, F> ReducerProcessSend<Item> for AllReducer<Item, F>
76where
77 F: FnMut<(Item,), Output = bool>,
78{
79 type Done = bool;
80}
81impl<Item, F> ReducerSend<Item> for AllReducer<Item, F>
82where
83 F: FnMut<(Item,), Output = bool>,
84{
85 type Done = bool;
86}
87
88#[pin_project]
89#[derive(Serialize, Deserialize)]
90#[serde(
91 bound(serialize = "F: Serialize"),
92 bound(deserialize = "F: Deserialize<'de>")
93)]
94pub struct AllReducerAsync<Item, F>(F, bool, PhantomData<fn() -> Item>);
95
96impl<Item, F> Sink<Item> for AllReducerAsync<Item, F>
97where
98 F: FnMut<(Item,), Output = bool>,
99{
100 type Done = bool;
101
102 #[inline(always)]
103 fn poll_forward(
104 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
105 ) -> Poll<Self::Done> {
106 let self_ = self.project();
107 while *self_.1 {
108 if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
109 *self_.1 = *self_.1 && self_.0.call_mut((item,));
110 } else {
111 break;
112 }
113 }
114 Poll::Ready(*self_.1)
115 }
116}
117
118#[derive(Clone, Serialize, Deserialize)]
119pub struct BoolAndReducer;
120
121impl Reducer<bool> for BoolAndReducer {
122 type Done = bool;
123 type Async = BoolAndReducerAsync;
124
125 fn into_async(self) -> Self::Async {
126 BoolAndReducerAsync(true)
127 }
128}
129impl ReducerProcessSend<bool> for BoolAndReducer {
130 type Done = bool;
131}
132impl ReducerSend<bool> for BoolAndReducer {
133 type Done = bool;
134}
135
136#[pin_project]
137pub struct BoolAndReducerAsync(bool);
138impl Sink<bool> for BoolAndReducerAsync {
139 type Done = bool;
140
141 #[inline(always)]
142 fn poll_forward(
143 mut self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = bool>>,
144 ) -> Poll<Self::Done> {
145 while self.0 {
146 if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
147 self.0 = self.0 && item;
148 } else {
149 break;
150 }
151 }
152 Poll::Ready(self.0)
153 }
154}