amadeus_core/par_sink/
any.rs1use derive_new::new;
2use educe::Educe;
3use futures::{ready, Stream};
4use pin_project::pin_project;
5use serde::{Deserialize, Serialize};
6use serde_closure::traits::FnMut;
7use std::{
8 marker::PhantomData, pin::Pin, task::{Context, Poll}
9};
10
11use super::{
12 DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend, ReducerSend
13};
14use crate::{pipe::Sink, pool::ProcessSend};
15
16#[derive(new)]
17#[must_use]
18pub struct Any<P, F> {
19 pipe: P,
20 f: F,
21}
22
23impl<P: ParallelPipe<Item>, Item, F> ParallelSink<Item> for Any<P, F>
24where
25 F: FnMut<(P::Output,), Output = bool> + Clone + Send + 'static,
26{
27 type Done = bool;
28 type Pipe = P;
29 type ReduceA = AnyReducer<P::Output, F>;
30 type ReduceC = BoolOrReducer;
31
32 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
33 (self.pipe, AnyReducer(self.f, PhantomData), BoolOrReducer)
34 }
35}
36impl<P: DistributedPipe<Item>, Item, F> DistributedSink<Item> for Any<P, F>
37where
38 F: FnMut<(P::Output,), Output = bool> + Clone + ProcessSend + 'static,
39{
40 type Done = bool;
41 type Pipe = P;
42 type ReduceA = AnyReducer<P::Output, F>;
43 type ReduceB = BoolOrReducer;
44 type ReduceC = BoolOrReducer;
45
46 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
47 (
48 self.pipe,
49 AnyReducer(self.f, PhantomData),
50 BoolOrReducer,
51 BoolOrReducer,
52 )
53 }
54}
55
56#[derive(Educe, Serialize, Deserialize)]
57#[educe(Clone(bound = "F: Clone"))]
58#[serde(
59 bound(serialize = "F: Serialize"),
60 bound(deserialize = "F: Deserialize<'de>")
61)]
62pub struct AnyReducer<Item, F>(F, PhantomData<fn() -> Item>);
63
64impl<Item, F> Reducer<Item> for AnyReducer<Item, F>
65where
66 F: FnMut<(Item,), Output = bool>,
67{
68 type Done = bool;
69 type Async = AnyReducerAsync<Item, F>;
70
71 fn into_async(self) -> Self::Async {
72 AnyReducerAsync(self.0, true, PhantomData)
73 }
74}
75impl<Item, F> ReducerProcessSend<Item> for AnyReducer<Item, F>
76where
77 F: FnMut<(Item,), Output = bool>,
78{
79 type Done = bool;
80}
81impl<Item, F> ReducerSend<Item> for AnyReducer<Item, F>
82where
83 F: FnMut<(Item,), Output = bool>,
84{
85 type Done = bool;
86}
87
88#[pin_project]
89#[derive(Serialize, Deserialize)]
90#[serde(
91 bound(serialize = "F: Serialize"),
92 bound(deserialize = "F: Deserialize<'de>")
93)]
94pub struct AnyReducerAsync<Item, F>(F, bool, PhantomData<fn() -> Item>);
95
96impl<Item, F> Sink<Item> for AnyReducerAsync<Item, F>
97where
98 F: FnMut<(Item,), Output = bool>,
99{
100 type Done = bool;
101
102 #[inline(always)]
103 fn poll_forward(
104 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
105 ) -> Poll<Self::Done> {
106 let self_ = self.project();
107 while *self_.1 {
108 if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
109 *self_.1 = *self_.1 && !self_.0.call_mut((item,));
110 } else {
111 break;
112 }
113 }
114 Poll::Ready(!*self_.1)
115 }
116}
117
118#[derive(Clone, Serialize, Deserialize)]
119pub struct BoolOrReducer;
120
121impl Reducer<bool> for BoolOrReducer {
122 type Done = bool;
123 type Async = BoolOrReducerAsync;
124
125 fn into_async(self) -> Self::Async {
126 BoolOrReducerAsync(true)
127 }
128}
129impl ReducerProcessSend<bool> for BoolOrReducer {
130 type Done = bool;
131}
132impl ReducerSend<bool> for BoolOrReducer {
133 type Done = bool;
134}
135
136#[pin_project]
137#[derive(Serialize, Deserialize)]
138pub struct BoolOrReducerAsync(bool);
139
140impl Sink<bool> for BoolOrReducerAsync {
141 type Done = bool;
142
143 #[inline(always)]
144 fn poll_forward(
145 mut self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = bool>>,
146 ) -> Poll<Self::Done> {
147 while self.0 {
148 if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
149 self.0 = self.0 && !item;
150 } else {
151 break;
152 }
153 }
154 Poll::Ready(!self.0)
155 }
156}