1use derive_new::new;
2use educe::Educe;
3use futures::{pin_mut, ready, Stream, StreamExt};
4use pin_project::pin_project;
5use serde::{Deserialize, Serialize};
6use std::{
7 collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque}, hash::{BuildHasher, Hash}, iter, marker::PhantomData, pin::Pin, task::{Context, Poll}
8};
9
10use super::{
11 DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend, ReducerSend
12};
13use crate::{pipe::Sink, pool::ProcessSend};
14
15#[derive(new)]
16#[must_use]
17pub struct Collect<P, A> {
18 pipe: P,
19 marker: PhantomData<fn() -> A>,
20}
21
22impl<P: ParallelPipe<Item>, Item, T: FromParallelStream<P::Output>> ParallelSink<Item>
23 for Collect<P, T>
24{
25 type Done = T;
26 type Pipe = P;
27 type ReduceA = T::ReduceA;
28 type ReduceC = T::ReduceC;
29
30 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
31 let (a, b) = T::reducers();
32 (self.pipe, a, b)
33 }
34}
35impl<P: DistributedPipe<Item>, Item, T: FromDistributedStream<P::Output>> DistributedSink<Item>
36 for Collect<P, T>
37{
38 type Done = T;
39 type Pipe = P;
40 type ReduceA = T::ReduceA;
41 type ReduceB = T::ReduceB;
42 type ReduceC = T::ReduceC;
43
44 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
45 let (a, b, c) = T::reducers();
46 (self.pipe, a, b, c)
47 }
48}
49
50pub trait FromParallelStream<T>: Sized {
51 type ReduceA: ReducerSend<T> + Clone + Send;
52 type ReduceC: Reducer<<Self::ReduceA as ReducerSend<T>>::Done, Done = Self>;
53
54 fn reducers() -> (Self::ReduceA, Self::ReduceC);
55}
56
57pub trait FromDistributedStream<T>: Sized {
58 type ReduceA: ReducerSend<T> + Clone + ProcessSend;
59 type ReduceB: ReducerProcessSend<<Self::ReduceA as ReducerSend<T>>::Done> + Clone + ProcessSend;
60 type ReduceC: Reducer<
61 <Self::ReduceB as ReducerProcessSend<<Self::ReduceA as ReducerSend<T>>::Done>>::Done,
62 Done = Self,
63 >;
64
65 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC);
66 }
68
69#[derive(Educe, Serialize, Deserialize, new)]
70#[educe(Clone, Default)]
71#[serde(bound = "")]
72pub struct PushReducer<Item, T = Item>(PhantomData<fn() -> (T, Item)>);
73
74impl<Item, T: Default + Extend<Item>> Reducer<Item> for PushReducer<Item, T> {
75 type Done = T;
76 type Async = PushReducerAsync<Item, T>;
77
78 fn into_async(self) -> Self::Async {
79 PushReducerAsync(Some(Default::default()), PhantomData)
80 }
81}
82impl<Item, T: Default + Extend<Item>> ReducerProcessSend<Item> for PushReducer<Item, T>
83where
84 T: ProcessSend + 'static,
85{
86 type Done = T;
87}
88impl<Item, T: Default + Extend<Item>> ReducerSend<Item> for PushReducer<Item, T>
89where
90 T: Send + 'static,
91{
92 type Done = T;
93}
94
95#[pin_project]
96pub struct PushReducerAsync<Item, T = Item>(Option<T>, PhantomData<fn() -> Item>);
97impl<Item, T: Extend<Item>> Sink<Item> for PushReducerAsync<Item, T> {
98 type Done = T;
99
100 #[inline]
101 fn poll_forward(
102 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
103 ) -> Poll<Self::Done> {
104 let self_ = self.project();
105 while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
106 self_.0.as_mut().unwrap().extend(iter::once(item));
107 }
108 Poll::Ready(self_.0.take().unwrap())
109 }
110}
111
112#[derive(Educe, Serialize, Deserialize)]
113#[educe(Clone, Default)]
114#[serde(bound = "")]
115pub struct ExtendReducer<Item, T = Item>(PhantomData<fn() -> (T, Item)>);
116impl<Item: IntoIterator<Item = B>, T: Default + Extend<B>, B> Reducer<Item>
117 for ExtendReducer<Item, T>
118{
119 type Done = T;
120 type Async = ExtendReducerAsync<Item, T>;
121
122 fn into_async(self) -> Self::Async {
123 ExtendReducerAsync(Some(T::default()), PhantomData)
124 }
125}
126impl<Item: IntoIterator<Item = B>, T: Default + Extend<B>, B> ReducerProcessSend<Item>
127 for ExtendReducer<Item, T>
128where
129 T: ProcessSend + 'static,
130{
131 type Done = T;
132}
133impl<Item: IntoIterator<Item = B>, T: Default + Extend<B>, B> ReducerSend<Item>
134 for ExtendReducer<Item, T>
135where
136 T: Send + 'static,
137{
138 type Done = T;
139}
140
141#[pin_project]
142pub struct ExtendReducerAsync<Item, T = Item>(Option<T>, PhantomData<fn() -> Item>);
143impl<Item: IntoIterator<Item = B>, T: Extend<B>, B> Sink<Item> for ExtendReducerAsync<Item, T> {
144 type Done = T;
145
146 #[inline]
147 fn poll_forward(
148 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
149 ) -> Poll<Self::Done> {
150 let self_ = self.project();
151 while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
152 self_.0.as_mut().unwrap().extend(item);
153 }
154 Poll::Ready(self_.0.take().unwrap())
155 }
156}
157
158#[derive(Educe, Serialize, Deserialize)]
159#[educe(Clone(bound = "R: Clone"), Default(bound = "R: Default"))]
160#[serde(
161 bound(serialize = "R: Serialize"),
162 bound(deserialize = "R: Deserialize<'de>")
163)]
164pub struct IntoReducer<R, T>(R, PhantomData<fn() -> T>);
165
166impl<R: Reducer<Item>, T, Item> Reducer<Item> for IntoReducer<R, T>
167where
168 R::Done: Into<T>,
169{
170 type Done = T;
171 type Async = IntoReducerAsync<R::Async, T>;
172
173 fn into_async(self) -> Self::Async {
174 IntoReducerAsync(self.0.into_async(), PhantomData)
175 }
176}
177
178#[pin_project]
179pub struct IntoReducerAsync<R, T>(#[pin] R, PhantomData<fn() -> T>);
180
181impl<R: Sink<Item>, T, Item> Sink<Item> for IntoReducerAsync<R, T>
182where
183 R::Done: Into<T>,
184{
185 type Done = T;
186
187 #[inline]
188 fn poll_forward(
189 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
190 ) -> Poll<Self::Done> {
191 let stream = stream.map(Into::into);
192 pin_mut!(stream);
193 self.project().0.poll_forward(cx, stream).map(Into::into)
194 }
195}
196
197#[derive(Clone, Default, Serialize, Deserialize)]
198pub struct OptionReducer<R>(R);
199impl<R: Reducer<Item>, Item> Reducer<Option<Item>> for OptionReducer<R> {
200 type Done = Option<R::Done>;
201 type Async = OptionReducerAsync<R::Async>;
202
203 fn into_async(self) -> Self::Async {
204 OptionReducerAsync(Some(self.0.into_async()))
205 }
206}
207impl<R: Reducer<Item>, Item> ReducerProcessSend<Option<Item>> for OptionReducer<R>
208where
209 R::Done: ProcessSend + 'static,
210{
211 type Done = Option<R::Done>;
212}
213impl<R: Reducer<Item>, Item> ReducerSend<Option<Item>> for OptionReducer<R>
214where
215 R::Done: Send + 'static,
216{
217 type Done = Option<R::Done>;
218}
219
220#[pin_project]
221pub struct OptionReducerAsync<R>(#[pin] Option<R>);
222
223impl<R: Sink<Item>, Item> Sink<Option<Item>> for OptionReducerAsync<R> {
224 type Done = Option<R::Done>;
225
226 #[inline]
227 fn poll_forward(
228 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Option<Item>>>,
229 ) -> Poll<Self::Done> {
230 let stream = stream.map(|x|x.expect("Not yet implemented: Tracking at https://github.com/constellation-rs/amadeus/projects/3#card-40276549"));
231 pin_mut!(stream);
232 match self.project().0.as_pin_mut() {
233 Some(a) => a.poll_forward(cx, stream).map(Some),
234 None => Poll::Ready(None),
235 }
236 }
237}
238
239#[derive(Educe, Serialize, Deserialize)]
240#[educe(Clone(bound = "R: Clone"), Default(bound = "R: Default"))]
241#[serde(
242 bound(serialize = "R: Serialize"),
243 bound(deserialize = "R: Deserialize<'de>")
244)]
245pub struct ResultReducer<R, E>(R, PhantomData<fn() -> E>);
246
247impl<R: Reducer<Item>, E, Item> Reducer<Result<Item, E>> for ResultReducer<R, E> {
248 type Done = Result<R::Done, E>;
249 type Async = ResultReducerAsync<R::Async, E>;
250
251 fn into_async(self) -> Self::Async {
252 ResultReducerAsync::Ok(self.0.into_async())
253 }
254}
255impl<R: Reducer<Item>, E, Item> ReducerProcessSend<Result<Item, E>> for ResultReducer<R, E>
256where
257 R::Done: ProcessSend + 'static,
258 E: ProcessSend + 'static,
259{
260 type Done = Result<R::Done, E>;
261}
262impl<R: Reducer<Item>, E, Item> ReducerSend<Result<Item, E>> for ResultReducer<R, E>
263where
264 R::Done: Send + 'static,
265 E: Send + 'static,
266{
267 type Done = Result<R::Done, E>;
268}
269
270#[pin_project(project = ResultReducerAsyncProj)]
271pub enum ResultReducerAsync<R, E> {
272 Ok(#[pin] R),
273 Err(Option<E>),
274}
275impl<R: Sink<Item>, E, Item> Sink<Result<Item, E>> for ResultReducerAsync<R, E> {
276 type Done = Result<R::Done, E>;
277
278 #[inline]
279 fn poll_forward(
280 self: Pin<&mut Self>, cx: &mut Context,
281 stream: Pin<&mut impl Stream<Item = Result<Item, E>>>,
282 ) -> Poll<Self::Done> {
283 let stream = stream.map(|x|x.ok().expect("Not yet implemented: Tracking at https://github.com/constellation-rs/amadeus/projects/3#card-40276549"));
284 pin_mut!(stream);
285 match self.project() {
286 ResultReducerAsyncProj::Ok(a) => a.poll_forward(cx, stream).map(Ok),
287 ResultReducerAsyncProj::Err(b) => Poll::Ready(Err(b.take().unwrap())),
288 }
289 }
290}
291
292impl<T> FromParallelStream<T> for Vec<T>
293where
294 T: Send + 'static,
295{
296 type ReduceA = PushReducer<T, Self>;
297 type ReduceC = ExtendReducer<Self>;
298
299 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
300 Default::default()
301 }
302}
303
304impl<T> FromParallelStream<T> for VecDeque<T>
305where
306 T: Send + 'static,
307{
308 type ReduceA = PushReducer<T, Vec<T>>;
309 type ReduceC = IntoReducer<ExtendReducer<Vec<T>>, Self>;
310
311 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
312 Default::default()
313 }
314}
315
316impl<T: Ord> FromParallelStream<T> for BinaryHeap<T>
317where
318 T: Send + 'static,
319{
320 type ReduceA = PushReducer<T, Vec<T>>;
321 type ReduceC = IntoReducer<ExtendReducer<Vec<T>>, Self>;
322
323 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
324 Default::default()
325 }
326}
327
328impl<T> FromParallelStream<T> for LinkedList<T>
329where
330 T: Send + 'static,
331{
332 type ReduceA = PushReducer<T, Self>;
333 type ReduceC = ExtendReducer<Self>;
334
335 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
336 Default::default()
337 }
338}
339
340impl<T, S> FromParallelStream<T> for HashSet<T, S>
341where
342 T: Eq + Hash + Send + 'static,
343 S: BuildHasher + Default + Send + 'static,
344{
345 type ReduceA = PushReducer<T, Self>;
346 type ReduceC = ExtendReducer<Self>;
347
348 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
349 Default::default()
350 }
351}
352
353impl<K, V, S> FromParallelStream<(K, V)> for HashMap<K, V, S>
354where
355 K: Eq + Hash + Send + 'static,
356 V: Send + 'static,
357 S: BuildHasher + Default + Send + 'static,
358{
359 type ReduceA = PushReducer<(K, V), Self>;
360 type ReduceC = ExtendReducer<Self>;
361
362 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
363 Default::default()
364 }
365}
366
367impl<T> FromParallelStream<T> for BTreeSet<T>
368where
369 T: Ord + Send + 'static,
370{
371 type ReduceA = PushReducer<T, Self>;
372 type ReduceC = ExtendReducer<Self>;
373
374 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
375 Default::default()
376 }
377}
378
379impl<K, V> FromParallelStream<(K, V)> for BTreeMap<K, V>
380where
381 K: Ord + Send + 'static,
382 V: Send + 'static,
383{
384 type ReduceA = PushReducer<(K, V), Self>;
385 type ReduceC = ExtendReducer<Self>;
386
387 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
388 Default::default()
389 }
390}
391
392impl FromParallelStream<char> for String {
393 type ReduceA = PushReducer<char, Self>;
394 type ReduceC = PushReducer<Self>;
395
396 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
397 Default::default()
398 }
399}
400
401impl FromParallelStream<Self> for String {
402 type ReduceA = PushReducer<Self>;
403 type ReduceC = PushReducer<Self>;
404
405 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
406 Default::default()
407 }
408}
409
410impl FromParallelStream<()> for () {
411 type ReduceA = PushReducer<Self>;
412 type ReduceC = PushReducer<Self>;
413
414 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
415 Default::default()
416 }
417}
418
419impl<T, C: FromParallelStream<T>> FromParallelStream<Option<T>> for Option<C> {
420 type ReduceA = OptionReducer<C::ReduceA>;
421 type ReduceC = OptionReducer<C::ReduceC>;
422
423 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
424 let (a, c) = C::reducers();
425 (OptionReducer(a), OptionReducer(c))
426 }
427}
428
429impl<T, C: FromParallelStream<T>, E> FromParallelStream<Result<T, E>> for Result<C, E>
430where
431 E: Send + 'static,
432{
433 type ReduceA = ResultReducer<C::ReduceA, E>;
434 type ReduceC = ResultReducer<C::ReduceC, E>;
435
436 fn reducers() -> (Self::ReduceA, Self::ReduceC) {
437 let (a, c) = C::reducers();
438 (ResultReducer(a, PhantomData), ResultReducer(c, PhantomData))
439 }
440}
441
442impl<T> FromDistributedStream<T> for Vec<T>
443where
444 T: ProcessSend + 'static,
445{
446 type ReduceA = PushReducer<T, Self>;
447 type ReduceB = ExtendReducer<Self>;
448 type ReduceC = ExtendReducer<Self>;
449
450 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
451 Default::default()
452 }
453}
454
455impl<T> FromDistributedStream<T> for VecDeque<T>
456where
457 T: ProcessSend + 'static,
458{
459 type ReduceA = PushReducer<T, Vec<T>>;
460 type ReduceB = ExtendReducer<Vec<T>>;
461 type ReduceC = IntoReducer<ExtendReducer<Vec<T>>, Self>;
462
463 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
464 Default::default()
465 }
466}
467
468impl<T: Ord> FromDistributedStream<T> for BinaryHeap<T>
469where
470 T: ProcessSend + 'static,
471{
472 type ReduceA = PushReducer<T, Vec<T>>;
473 type ReduceB = ExtendReducer<Vec<T>>;
474 type ReduceC = IntoReducer<ExtendReducer<Vec<T>>, Self>;
475
476 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
477 Default::default()
478 }
479}
480
481impl<T> FromDistributedStream<T> for LinkedList<T>
482where
483 T: ProcessSend + 'static,
484{
485 type ReduceA = PushReducer<T, Self>;
486 type ReduceB = ExtendReducer<Self>;
487 type ReduceC = ExtendReducer<Self>;
488
489 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
490 Default::default()
491 }
492}
493
494impl<T, S> FromDistributedStream<T> for HashSet<T, S>
495where
496 T: Eq + Hash + ProcessSend + 'static,
497 S: BuildHasher + Default + Send + 'static,
498{
499 type ReduceA = PushReducer<T, Self>;
500 type ReduceB = ExtendReducer<Self>;
501 type ReduceC = ExtendReducer<Self>;
502
503 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
504 Default::default()
505 }
506}
507
508impl<K, V, S> FromDistributedStream<(K, V)> for HashMap<K, V, S>
509where
510 K: Eq + Hash + ProcessSend + 'static,
511 V: ProcessSend + 'static,
512 S: BuildHasher + Default + Send + 'static,
513{
514 type ReduceA = PushReducer<(K, V), Self>;
515 type ReduceB = ExtendReducer<Self>;
516 type ReduceC = ExtendReducer<Self>;
517
518 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
519 Default::default()
520 }
521}
522
523impl<T> FromDistributedStream<T> for BTreeSet<T>
524where
525 T: Ord + ProcessSend + 'static,
526{
527 type ReduceA = PushReducer<T, Self>;
528 type ReduceB = ExtendReducer<Self>;
529 type ReduceC = ExtendReducer<Self>;
530
531 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
532 Default::default()
533 }
534}
535
536impl<K, V> FromDistributedStream<(K, V)> for BTreeMap<K, V>
537where
538 K: Ord + ProcessSend + 'static,
539 V: ProcessSend + 'static,
540{
541 type ReduceA = PushReducer<(K, V), Self>;
542 type ReduceB = ExtendReducer<Self>;
543 type ReduceC = ExtendReducer<Self>;
544
545 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
546 Default::default()
547 }
548}
549
550impl FromDistributedStream<char> for String {
551 type ReduceA = PushReducer<char, Self>;
552 type ReduceB = PushReducer<Self>;
553 type ReduceC = PushReducer<Self>;
554
555 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
556 Default::default()
557 }
558}
559
560impl FromDistributedStream<Self> for String {
561 type ReduceA = PushReducer<Self>;
562 type ReduceB = PushReducer<Self>;
563 type ReduceC = PushReducer<Self>;
564
565 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
566 Default::default()
567 }
568}
569
570impl FromDistributedStream<()> for () {
571 type ReduceA = PushReducer<Self>;
572 type ReduceB = PushReducer<Self>;
573 type ReduceC = PushReducer<Self>;
574
575 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
576 Default::default()
577 }
578}
579
580impl<T, C: FromDistributedStream<T>> FromDistributedStream<Option<T>> for Option<C> {
581 type ReduceA = OptionReducer<C::ReduceA>;
582 type ReduceB = OptionReducer<C::ReduceB>;
583 type ReduceC = OptionReducer<C::ReduceC>;
584
585 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
586 let (a, b, c) = C::reducers();
587 (OptionReducer(a), OptionReducer(b), OptionReducer(c))
588 }
589}
590
591impl<T, C: FromDistributedStream<T>, E> FromDistributedStream<Result<T, E>> for Result<C, E>
592where
593 E: ProcessSend + 'static,
594{
595 type ReduceA = ResultReducer<C::ReduceA, E>;
596 type ReduceB = ResultReducer<C::ReduceB, E>;
597 type ReduceC = ResultReducer<C::ReduceC, E>;
598
599 fn reducers() -> (Self::ReduceA, Self::ReduceB, Self::ReduceC) {
600 let (a, b, c) = C::reducers();
601 (
602 ResultReducer(a, PhantomData),
603 ResultReducer(b, PhantomData),
604 ResultReducer(c, PhantomData),
605 )
606 }
607}