1#![allow(unused_imports, clippy::single_component_path_imports)]
2
3use derive_new::new;
4use educe::Educe;
5use futures::{ready, Stream};
6use pin_project::pin_project;
7use serde::{Deserialize, Serialize};
8use std::{
9 future::Future, marker::PhantomData, pin::Pin, task::{Context, Poll}
10};
11
12use super::{Reducer, ReducerProcessSend, ReducerSend};
13use crate::{pipe::Sink, pool::ProcessSend};
14
15mod macros {
16 #[macro_export]
17 macro_rules! folder_par_sink {
18 ($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
19 type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done;
20 type Pipe = P;
21 type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
22 type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Final>;
23
24 fn reducers($self) -> (P, Self::ReduceA, Self::ReduceC) {
25 let init_a = $init_a;
26 let init_b = $init_b;
27 (
28 $self.pipe,
29 FolderSyncReducer::new(init_a),
30 FolderSyncReducer::new(init_b),
31 )
32 }
33 };
34 }
35 #[macro_export]
36 macro_rules! folder_dist_sink {
37 ($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
38 type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done>>::Done;
39 type Pipe = P;
40 type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
41 type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Inter>;
42 type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $folder_b, crate::par_sink::Final>;
43
44 fn reducers($self) -> (P, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
45 let init_a = $init_a;
46 let init_b = $init_b;
47 (
48 $self.pipe,
49 FolderSyncReducer::new(init_a),
50 FolderSyncReducer::new(init_b.clone()),
51 FolderSyncReducer::new(init_b),
52 )
53 }
54 };
55 }
56 pub(crate) use folder_dist_sink;
57 pub(crate) use folder_par_sink;
58}
59
60pub(crate) use macros::{folder_dist_sink, folder_par_sink};
61
62pub trait FolderSync<Item> {
63 type State;
64 type Done;
65
66 fn zero(&mut self) -> Self::State;
67 fn push(&mut self, state: &mut Self::State, item: Item);
68 fn done(&mut self, state: Self::State) -> Self::Done;
69}
70
71pub struct Inter;
72pub struct Final;
73
74#[derive(Educe, Serialize, Deserialize, new)]
75#[educe(Clone(bound = "F: Clone"))]
76#[serde(
77 bound(serialize = "F: Serialize"),
78 bound(deserialize = "F: Deserialize<'de>")
79)]
80pub struct FolderSyncReducer<Item, F, Final> {
81 folder: F,
82 marker: PhantomData<fn() -> (Item, Final)>,
83}
84
85impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Inter>
86where
87 F: FolderSync<Item>,
88{
89 type Done = F::State;
90 type Async = FolderSyncReducerAsync<Item, F, F::State, Inter>;
91
92 fn into_async(mut self) -> Self::Async {
93 FolderSyncReducerAsync {
94 state: Some(self.folder.zero()),
95 folder: self.folder,
96 marker: PhantomData,
97 }
98 }
99}
100impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Inter>
101where
102 F: FolderSync<Item>,
103 F::State: ProcessSend + 'static,
104{
105 type Done = F::State;
106}
107impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Inter>
108where
109 F: FolderSync<Item>,
110 F::State: Send + 'static,
111{
112 type Done = F::State;
113}
114
115impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Final>
116where
117 F: FolderSync<Item>,
118{
119 type Done = F::Done;
120 type Async = FolderSyncReducerAsync<Item, F, F::State, Final>;
121
122 fn into_async(mut self) -> Self::Async {
123 FolderSyncReducerAsync {
124 state: Some(self.folder.zero()),
125 folder: self.folder,
126 marker: PhantomData,
127 }
128 }
129}
130impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Final>
131where
132 F: FolderSync<Item>,
133 F::Done: ProcessSend + 'static,
134{
135 type Done = F::Done;
136}
137impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Final>
138where
139 F: FolderSync<Item>,
140 F::Done: Send + 'static,
141{
142 type Done = F::Done;
143}
144
145#[pin_project]
146pub struct FolderSyncReducerAsync<Item, F, S, Final> {
147 state: Option<S>,
148 folder: F,
149 marker: PhantomData<fn() -> (Item, Final)>,
150}
151impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Inter>
152where
153 F: FolderSync<Item>,
154{
155 type Done = F::State;
156
157 #[inline(always)]
158 fn poll_forward(
159 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
160 ) -> Poll<Self::Done> {
161 let self_ = self.project();
162 let folder = self_.folder;
163 let state = self_.state.as_mut().unwrap();
164 while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
165 folder.push(state, item);
166 }
167 Poll::Ready(self_.state.take().unwrap())
168 }
169}
170impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Final>
171where
172 F: FolderSync<Item>,
173{
174 type Done = F::Done;
175
176 #[inline(always)]
177 fn poll_forward(
178 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
179 ) -> Poll<Self::Done> {
180 let self_ = self.project();
181 let folder = self_.folder;
182 let state = self_.state.as_mut().unwrap();
183 while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
184 folder.push(state, item);
185 }
186 Poll::Ready(folder.done(self_.state.take().unwrap()))
187 }
188}