amadeus_core/par_sink/
collect.rs

1use derive_new::new;
2use educe::Educe;
3use futures::{pin_mut, ready, Stream, StreamExt};
4use pin_project::pin_project;
5use serde::{Deserialize, Serialize};
6use std::{
7	collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque}, hash::{BuildHasher, Hash}, iter, marker::PhantomData, pin::Pin, task::{Context, Poll}
8};
9
10use super::{
11	DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend, ReducerSend
12};
13use crate::{pipe::Sink, pool::ProcessSend};
14
15#[derive(new)]
16#[must_use]
17pub struct Collect<P, A> {
18	pipe: P,
19	marker: PhantomData<fn() -> A>,
20}
21
22impl<P: ParallelPipe<Item>, Item, T: FromParallelStream<P::Output>> ParallelSink<Item>
23	for Collect<P, T>
24{
25	type Done = T;
26	type Pipe = P;
27	type ReduceA = T::ReduceA;
28	type ReduceC = T::ReduceC;
29
30	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
31		let (a, b) = T::reducers();
32		(self.pipe, a, b)
33	}
34}
35impl<P: DistributedPipe<Item>, Item, T: FromDistributedStream<P::Output>> DistributedSink<Item>
36	for Collect<P, T>
37{
38	type Done = T;
39	type Pipe = P;
40	type ReduceA = T::ReduceA;
41	type ReduceB = T::ReduceB;
42	type ReduceC = T::ReduceC;
43
44	fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
45		let (a, b, c) = T::reducers();
46		(self.pipe, a, b, c)
47	}
48}
49
50pub trait FromParallelStream<T>: Sized {
51	type ReduceA: ReducerSend<T> + Clone + Send;
52	type ReduceC: Reducer<<Self::ReduceA as ReducerSend<T>>::Done, Done = Self>;
53
54	fn reducers() -> (Self::ReduceA, Self::ReduceC);
55}
56
57pub trait FromDistributedStream<T>: Sized {
58	type ReduceA: ReducerSend<T> + Clone + ProcessSend;
59	type ReduceB: ReducerProcessSend<<Self::ReduceA as ReducerSend<T>>::Done> + Clone + ProcessSend;
60	type ReduceC: Reducer<
61		<Self::ReduceB as ReducerProcessSend<<Self::ReduceA as ReducerSend<T>>::Done>>::Done,
62		Done = Self,
63	>;
64
65	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC);
66	// 	fn from_dist_stream<P>(dist_stream: P, pool: &Pool) -> Self where T: Serialize + DeserializeOwned + Send + 'static, P: IntoDistributedStream<Item = T>, <<P as IntoDistributedStream>::Iter as DistributedStream>::Task: Serialize + DeserializeOwned + Send + 'static;
67}
68
69#[derive(Educe, Serialize, Deserialize, new)]
70#[educe(Clone, Default)]
71#[serde(bound = "")]
72pub struct PushReducer<Item, T = Item>(PhantomData<fn() -> (T, Item)>);
73
74impl<Item, T: Default + Extend<Item>> Reducer<Item> for PushReducer<Item, T> {
75	type Done = T;
76	type Async = PushReducerAsync<Item, T>;
77
78	fn into_async(self) -> Self::Async {
79		PushReducerAsync(Some(Default::default()), PhantomData)
80	}
81}
82impl<Item, T: Default + Extend<Item>> ReducerProcessSend<Item> for PushReducer<Item, T>
83where
84	T: ProcessSend + 'static,
85{
86	type Done = T;
87}
88impl<Item, T: Default + Extend<Item>> ReducerSend<Item> for PushReducer<Item, T>
89where
90	T: Send + 'static,
91{
92	type Done = T;
93}
94
95#[pin_project]
96pub struct PushReducerAsync<Item, T = Item>(Option<T>, PhantomData<fn() -> Item>);
97impl<Item, T: Extend<Item>> Sink<Item> for PushReducerAsync<Item, T> {
98	type Done = T;
99
100	#[inline]
101	fn poll_forward(
102		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
103	) -> Poll<Self::Done> {
104		let self_ = self.project();
105		while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
106			self_.0.as_mut().unwrap().extend(iter::once(item));
107		}
108		Poll::Ready(self_.0.take().unwrap())
109	}
110}
111
112#[derive(Educe, Serialize, Deserialize)]
113#[educe(Clone, Default)]
114#[serde(bound = "")]
115pub struct ExtendReducer<Item, T = Item>(PhantomData<fn() -> (T, Item)>);
116impl<Item: IntoIterator<Item = B>, T: Default + Extend<B>, B> Reducer<Item>
117	for ExtendReducer<Item, T>
118{
119	type Done = T;
120	type Async = ExtendReducerAsync<Item, T>;
121
122	fn into_async(self) -> Self::Async {
123		ExtendReducerAsync(Some(T::default()), PhantomData)
124	}
125}
126impl<Item: IntoIterator<Item = B>, T: Default + Extend<B>, B> ReducerProcessSend<Item>
127	for ExtendReducer<Item, T>
128where
129	T: ProcessSend + 'static,
130{
131	type Done = T;
132}
133impl<Item: IntoIterator<Item = B>, T: Default + Extend<B>, B> ReducerSend<Item>
134	for ExtendReducer<Item, T>
135where
136	T: Send + 'static,
137{
138	type Done = T;
139}
140
141#[pin_project]
142pub struct ExtendReducerAsync<Item, T = Item>(Option<T>, PhantomData<fn() -> Item>);
143impl<Item: IntoIterator<Item = B>, T: Extend<B>, B> Sink<Item> for ExtendReducerAsync<Item, T> {
144	type Done = T;
145
146	#[inline]
147	fn poll_forward(
148		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
149	) -> Poll<Self::Done> {
150		let self_ = self.project();
151		while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
152			self_.0.as_mut().unwrap().extend(item);
153		}
154		Poll::Ready(self_.0.take().unwrap())
155	}
156}
157
158#[derive(Educe, Serialize, Deserialize)]
159#[educe(Clone(bound = "R: Clone"), Default(bound = "R: Default"))]
160#[serde(
161	bound(serialize = "R: Serialize"),
162	bound(deserialize = "R: Deserialize<'de>")
163)]
164pub struct IntoReducer<R, T>(R, PhantomData<fn() -> T>);
165
166impl<R: Reducer<Item>, T, Item> Reducer<Item> for IntoReducer<R, T>
167where
168	R::Done: Into<T>,
169{
170	type Done = T;
171	type Async = IntoReducerAsync<R::Async, T>;
172
173	fn into_async(self) -> Self::Async {
174		IntoReducerAsync(self.0.into_async(), PhantomData)
175	}
176}
177
178#[pin_project]
179pub struct IntoReducerAsync<R, T>(#[pin] R, PhantomData<fn() -> T>);
180
181impl<R: Sink<Item>, T, Item> Sink<Item> for IntoReducerAsync<R, T>
182where
183	R::Done: Into<T>,
184{
185	type Done = T;
186
187	#[inline]
188	fn poll_forward(
189		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
190	) -> Poll<Self::Done> {
191		let stream = stream.map(Into::into);
192		pin_mut!(stream);
193		self.project().0.poll_forward(cx, stream).map(Into::into)
194	}
195}
196
197#[derive(Clone, Default, Serialize, Deserialize)]
198pub struct OptionReducer<R>(R);
199impl<R: Reducer<Item>, Item> Reducer<Option<Item>> for OptionReducer<R> {
200	type Done = Option<R::Done>;
201	type Async = OptionReducerAsync<R::Async>;
202
203	fn into_async(self) -> Self::Async {
204		OptionReducerAsync(Some(self.0.into_async()))
205	}
206}
207impl<R: Reducer<Item>, Item> ReducerProcessSend<Option<Item>> for OptionReducer<R>
208where
209	R::Done: ProcessSend + 'static,
210{
211	type Done = Option<R::Done>;
212}
213impl<R: Reducer<Item>, Item> ReducerSend<Option<Item>> for OptionReducer<R>
214where
215	R::Done: Send + 'static,
216{
217	type Done = Option<R::Done>;
218}
219
220#[pin_project]
221pub struct OptionReducerAsync<R>(#[pin] Option<R>);
222
223impl<R: Sink<Item>, Item> Sink<Option<Item>> for OptionReducerAsync<R> {
224	type Done = Option<R::Done>;
225
226	#[inline]
227	fn poll_forward(
228		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Option<Item>>>,
229	) -> Poll<Self::Done> {
230		let stream = stream.map(|x|x.expect("Not yet implemented: Tracking at https://github.com/constellation-rs/amadeus/projects/3#card-40276549"));
231		pin_mut!(stream);
232		match self.project().0.as_pin_mut() {
233			Some(a) => a.poll_forward(cx, stream).map(Some),
234			None => Poll::Ready(None),
235		}
236	}
237}
238
239#[derive(Educe, Serialize, Deserialize)]
240#[educe(Clone(bound = "R: Clone"), Default(bound = "R: Default"))]
241#[serde(
242	bound(serialize = "R: Serialize"),
243	bound(deserialize = "R: Deserialize<'de>")
244)]
245pub struct ResultReducer<R, E>(R, PhantomData<fn() -> E>);
246
247impl<R: Reducer<Item>, E, Item> Reducer<Result<Item, E>> for ResultReducer<R, E> {
248	type Done = Result<R::Done, E>;
249	type Async = ResultReducerAsync<R::Async, E>;
250
251	fn into_async(self) -> Self::Async {
252		ResultReducerAsync::Ok(self.0.into_async())
253	}
254}
255impl<R: Reducer<Item>, E, Item> ReducerProcessSend<Result<Item, E>> for ResultReducer<R, E>
256where
257	R::Done: ProcessSend + 'static,
258	E: ProcessSend + 'static,
259{
260	type Done = Result<R::Done, E>;
261}
262impl<R: Reducer<Item>, E, Item> ReducerSend<Result<Item, E>> for ResultReducer<R, E>
263where
264	R::Done: Send + 'static,
265	E: Send + 'static,
266{
267	type Done = Result<R::Done, E>;
268}
269
270#[pin_project(project = ResultReducerAsyncProj)]
271pub enum ResultReducerAsync<R, E> {
272	Ok(#[pin] R),
273	Err(Option<E>),
274}
275impl<R: Sink<Item>, E, Item> Sink<Result<Item, E>> for ResultReducerAsync<R, E> {
276	type Done = Result<R::Done, E>;
277
278	#[inline]
279	fn poll_forward(
280		self: Pin<&mut Self>, cx: &mut Context,
281		stream: Pin<&mut impl Stream<Item = Result<Item, E>>>,
282	) -> Poll<Self::Done> {
283		let stream = stream.map(|x|x.ok().expect("Not yet implemented: Tracking at https://github.com/constellation-rs/amadeus/projects/3#card-40276549"));
284		pin_mut!(stream);
285		match self.project() {
286			ResultReducerAsyncProj::Ok(a) => a.poll_forward(cx, stream).map(Ok),
287			ResultReducerAsyncProj::Err(b) => Poll::Ready(Err(b.take().unwrap())),
288		}
289	}
290}
291
292impl<T> FromParallelStream<T> for Vec<T>
293where
294	T: Send + 'static,
295{
296	type ReduceA = PushReducer<T, Self>;
297	type ReduceC = ExtendReducer<Self>;
298
299	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
300		Default::default()
301	}
302}
303
304impl<T> FromParallelStream<T> for VecDeque<T>
305where
306	T: Send + 'static,
307{
308	type ReduceA = PushReducer<T, Vec<T>>;
309	type ReduceC = IntoReducer<ExtendReducer<Vec<T>>, Self>;
310
311	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
312		Default::default()
313	}
314}
315
316impl<T: Ord> FromParallelStream<T> for BinaryHeap<T>
317where
318	T: Send + 'static,
319{
320	type ReduceA = PushReducer<T, Vec<T>>;
321	type ReduceC = IntoReducer<ExtendReducer<Vec<T>>, Self>;
322
323	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
324		Default::default()
325	}
326}
327
328impl<T> FromParallelStream<T> for LinkedList<T>
329where
330	T: Send + 'static,
331{
332	type ReduceA = PushReducer<T, Self>;
333	type ReduceC = ExtendReducer<Self>;
334
335	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
336		Default::default()
337	}
338}
339
340impl<T, S> FromParallelStream<T> for HashSet<T, S>
341where
342	T: Eq + Hash + Send + 'static,
343	S: BuildHasher + Default + Send + 'static,
344{
345	type ReduceA = PushReducer<T, Self>;
346	type ReduceC = ExtendReducer<Self>;
347
348	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
349		Default::default()
350	}
351}
352
353impl<K, V, S> FromParallelStream<(K, V)> for HashMap<K, V, S>
354where
355	K: Eq + Hash + Send + 'static,
356	V: Send + 'static,
357	S: BuildHasher + Default + Send + 'static,
358{
359	type ReduceA = PushReducer<(K, V), Self>;
360	type ReduceC = ExtendReducer<Self>;
361
362	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
363		Default::default()
364	}
365}
366
367impl<T> FromParallelStream<T> for BTreeSet<T>
368where
369	T: Ord + Send + 'static,
370{
371	type ReduceA = PushReducer<T, Self>;
372	type ReduceC = ExtendReducer<Self>;
373
374	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
375		Default::default()
376	}
377}
378
379impl<K, V> FromParallelStream<(K, V)> for BTreeMap<K, V>
380where
381	K: Ord + Send + 'static,
382	V: Send + 'static,
383{
384	type ReduceA = PushReducer<(K, V), Self>;
385	type ReduceC = ExtendReducer<Self>;
386
387	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
388		Default::default()
389	}
390}
391
392impl FromParallelStream<char> for String {
393	type ReduceA = PushReducer<char, Self>;
394	type ReduceC = PushReducer<Self>;
395
396	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
397		Default::default()
398	}
399}
400
401impl FromParallelStream<Self> for String {
402	type ReduceA = PushReducer<Self>;
403	type ReduceC = PushReducer<Self>;
404
405	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
406		Default::default()
407	}
408}
409
410impl FromParallelStream<()> for () {
411	type ReduceA = PushReducer<Self>;
412	type ReduceC = PushReducer<Self>;
413
414	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
415		Default::default()
416	}
417}
418
419impl<T, C: FromParallelStream<T>> FromParallelStream<Option<T>> for Option<C> {
420	type ReduceA = OptionReducer<C::ReduceA>;
421	type ReduceC = OptionReducer<C::ReduceC>;
422
423	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
424		let (a, c) = C::reducers();
425		(OptionReducer(a), OptionReducer(c))
426	}
427}
428
429impl<T, C: FromParallelStream<T>, E> FromParallelStream<Result<T, E>> for Result<C, E>
430where
431	E: Send + 'static,
432{
433	type ReduceA = ResultReducer<C::ReduceA, E>;
434	type ReduceC = ResultReducer<C::ReduceC, E>;
435
436	fn reducers() -> (Self::ReduceA, Self::ReduceC) {
437		let (a, c) = C::reducers();
438		(ResultReducer(a, PhantomData), ResultReducer(c, PhantomData))
439	}
440}
441
442impl<T> FromDistributedStream<T> for Vec<T>
443where
444	T: ProcessSend + 'static,
445{
446	type ReduceA = PushReducer<T, Self>;
447	type ReduceB = ExtendReducer<Self>;
448	type ReduceC = ExtendReducer<Self>;
449
450	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
451		Default::default()
452	}
453}
454
455impl<T> FromDistributedStream<T> for VecDeque<T>
456where
457	T: ProcessSend + 'static,
458{
459	type ReduceA = PushReducer<T, Vec<T>>;
460	type ReduceB = ExtendReducer<Vec<T>>;
461	type ReduceC = IntoReducer<ExtendReducer<Vec<T>>, Self>;
462
463	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
464		Default::default()
465	}
466}
467
468impl<T: Ord> FromDistributedStream<T> for BinaryHeap<T>
469where
470	T: ProcessSend + 'static,
471{
472	type ReduceA = PushReducer<T, Vec<T>>;
473	type ReduceB = ExtendReducer<Vec<T>>;
474	type ReduceC = IntoReducer<ExtendReducer<Vec<T>>, Self>;
475
476	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
477		Default::default()
478	}
479}
480
481impl<T> FromDistributedStream<T> for LinkedList<T>
482where
483	T: ProcessSend + 'static,
484{
485	type ReduceA = PushReducer<T, Self>;
486	type ReduceB = ExtendReducer<Self>;
487	type ReduceC = ExtendReducer<Self>;
488
489	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
490		Default::default()
491	}
492}
493
494impl<T, S> FromDistributedStream<T> for HashSet<T, S>
495where
496	T: Eq + Hash + ProcessSend + 'static,
497	S: BuildHasher + Default + Send + 'static,
498{
499	type ReduceA = PushReducer<T, Self>;
500	type ReduceB = ExtendReducer<Self>;
501	type ReduceC = ExtendReducer<Self>;
502
503	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
504		Default::default()
505	}
506}
507
508impl<K, V, S> FromDistributedStream<(K, V)> for HashMap<K, V, S>
509where
510	K: Eq + Hash + ProcessSend + 'static,
511	V: ProcessSend + 'static,
512	S: BuildHasher + Default + Send + 'static,
513{
514	type ReduceA = PushReducer<(K, V), Self>;
515	type ReduceB = ExtendReducer<Self>;
516	type ReduceC = ExtendReducer<Self>;
517
518	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
519		Default::default()
520	}
521}
522
523impl<T> FromDistributedStream<T> for BTreeSet<T>
524where
525	T: Ord + ProcessSend + 'static,
526{
527	type ReduceA = PushReducer<T, Self>;
528	type ReduceB = ExtendReducer<Self>;
529	type ReduceC = ExtendReducer<Self>;
530
531	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
532		Default::default()
533	}
534}
535
536impl<K, V> FromDistributedStream<(K, V)> for BTreeMap<K, V>
537where
538	K: Ord + ProcessSend + 'static,
539	V: ProcessSend + 'static,
540{
541	type ReduceA = PushReducer<(K, V), Self>;
542	type ReduceB = ExtendReducer<Self>;
543	type ReduceC = ExtendReducer<Self>;
544
545	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
546		Default::default()
547	}
548}
549
550impl FromDistributedStream<char> for String {
551	type ReduceA = PushReducer<char, Self>;
552	type ReduceB = PushReducer<Self>;
553	type ReduceC = PushReducer<Self>;
554
555	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
556		Default::default()
557	}
558}
559
560impl FromDistributedStream<Self> for String {
561	type ReduceA = PushReducer<Self>;
562	type ReduceB = PushReducer<Self>;
563	type ReduceC = PushReducer<Self>;
564
565	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
566		Default::default()
567	}
568}
569
570impl FromDistributedStream<()> for () {
571	type ReduceA = PushReducer<Self>;
572	type ReduceB = PushReducer<Self>;
573	type ReduceC = PushReducer<Self>;
574
575	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
576		Default::default()
577	}
578}
579
580impl<T, C: FromDistributedStream<T>> FromDistributedStream<Option<T>> for Option<C> {
581	type ReduceA = OptionReducer<C::ReduceA>;
582	type ReduceB = OptionReducer<C::ReduceB>;
583	type ReduceC = OptionReducer<C::ReduceC>;
584
585	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
586		let (a, b, c) = C::reducers();
587		(OptionReducer(a), OptionReducer(b), OptionReducer(c))
588	}
589}
590
591impl<T, C: FromDistributedStream<T>, E> FromDistributedStream<Result<T, E>> for Result<C, E>
592where
593	E: ProcessSend + 'static,
594{
595	type ReduceA = ResultReducer<C::ReduceA, E>;
596	type ReduceB = ResultReducer<C::ReduceB, E>;
597	type ReduceC = ResultReducer<C::ReduceC, E>;
598
599	fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
600		let (a, b, c) = C::reducers();
601		(
602			ResultReducer(a, PhantomData),
603			ResultReducer(b, PhantomData),
604			ResultReducer(c, PhantomData),
605		)
606	}
607}