amadeus_core/par_sink/
folder.rs

1#![allow(unused_imports, clippy::single_component_path_imports)]
2
3use derive_new::new;
4use educe::Educe;
5use futures::{ready, Stream};
6use pin_project::pin_project;
7use serde::{Deserialize, Serialize};
8use std::{
9	future::Future, marker::PhantomData, pin::Pin, task::{Context, Poll}
10};
11
12use super::{Reducer, ReducerProcessSend, ReducerSend};
13use crate::{pipe::Sink, pool::ProcessSend};
14
15mod macros {
16	#[macro_export]
17	macro_rules! folder_par_sink {
18		($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
19			type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done;
20			type Pipe = P;
21			type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
22			type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Final>;
23
24			fn reducers($self) -> (P, Self::ReduceA, Self::ReduceC) {
25				let init_a = $init_a;
26				let init_b = $init_b;
27				(
28					$self.pipe,
29					FolderSyncReducer::new(init_a),
30					FolderSyncReducer::new(init_b),
31				)
32			}
33		};
34	}
35	#[macro_export]
36	macro_rules! folder_dist_sink {
37		($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
38			type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done>>::Done;
39			type Pipe = P;
40			type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
41			type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Inter>;
42			type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $folder_b, crate::par_sink::Final>;
43
44			fn reducers($self) -> (P, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
45				let init_a = $init_a;
46				let init_b = $init_b;
47				(
48					$self.pipe,
49					FolderSyncReducer::new(init_a),
50					FolderSyncReducer::new(init_b.clone()),
51					FolderSyncReducer::new(init_b),
52				)
53			}
54		};
55	}
56	pub(crate) use folder_dist_sink;
57	pub(crate) use folder_par_sink;
58}
59
60pub(crate) use macros::{folder_dist_sink, folder_par_sink};
61
62pub trait FolderSync<Item> {
63	type State;
64	type Done;
65
66	fn zero(&mut self) -> Self::State;
67	fn push(&mut self, state: &mut Self::State, item: Item);
68	fn done(&mut self, state: Self::State) -> Self::Done;
69}
70
71pub struct Inter;
72pub struct Final;
73
74#[derive(Educe, Serialize, Deserialize, new)]
75#[educe(Clone(bound = "F: Clone"))]
76#[serde(
77	bound(serialize = "F: Serialize"),
78	bound(deserialize = "F: Deserialize<'de>")
79)]
80pub struct FolderSyncReducer<Item, F, Final> {
81	folder: F,
82	marker: PhantomData<fn() -> (Item, Final)>,
83}
84
85impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Inter>
86where
87	F: FolderSync<Item>,
88{
89	type Done = F::State;
90	type Async = FolderSyncReducerAsync<Item, F, F::State, Inter>;
91
92	fn into_async(mut self) -> Self::Async {
93		FolderSyncReducerAsync {
94			state: Some(self.folder.zero()),
95			folder: self.folder,
96			marker: PhantomData,
97		}
98	}
99}
100impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Inter>
101where
102	F: FolderSync<Item>,
103	F::State: ProcessSend + 'static,
104{
105	type Done = F::State;
106}
107impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Inter>
108where
109	F: FolderSync<Item>,
110	F::State: Send + 'static,
111{
112	type Done = F::State;
113}
114
115impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Final>
116where
117	F: FolderSync<Item>,
118{
119	type Done = F::Done;
120	type Async = FolderSyncReducerAsync<Item, F, F::State, Final>;
121
122	fn into_async(mut self) -> Self::Async {
123		FolderSyncReducerAsync {
124			state: Some(self.folder.zero()),
125			folder: self.folder,
126			marker: PhantomData,
127		}
128	}
129}
130impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Final>
131where
132	F: FolderSync<Item>,
133	F::Done: ProcessSend + 'static,
134{
135	type Done = F::Done;
136}
137impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Final>
138where
139	F: FolderSync<Item>,
140	F::Done: Send + 'static,
141{
142	type Done = F::Done;
143}
144
145#[pin_project]
146pub struct FolderSyncReducerAsync<Item, F, S, Final> {
147	state: Option<S>,
148	folder: F,
149	marker: PhantomData<fn() -> (Item, Final)>,
150}
151impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Inter>
152where
153	F: FolderSync<Item>,
154{
155	type Done = F::State;
156
157	#[inline(always)]
158	fn poll_forward(
159		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
160	) -> Poll<Self::Done> {
161		let self_ = self.project();
162		let folder = self_.folder;
163		let state = self_.state.as_mut().unwrap();
164		while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
165			folder.push(state, item);
166		}
167		Poll::Ready(self_.state.take().unwrap())
168	}
169}
170impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Final>
171where
172	F: FolderSync<Item>,
173{
174	type Done = F::Done;
175
176	#[inline(always)]
177	fn poll_forward(
178		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
179	) -> Poll<Self::Done> {
180		let self_ = self.project();
181		let folder = self_.folder;
182		let state = self_.state.as_mut().unwrap();
183		while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
184			folder.push(state, item);
185		}
186		Poll::Ready(folder.done(self_.state.take().unwrap()))
187	}
188}