amadeus_core/par_sink/
pipe.rs1use derive_new::new;
2use pin_project::pin_project;
3use serde::{Deserialize, Serialize};
4use std::{
5 pin::Pin, task::{Context, Poll}
6};
7
8use super::{DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask};
9use crate::{
10 par_stream::{ParallelStream, StreamTask}, pipe::{Pipe as _, PipePipe, StreamExt, StreamPipe}
11};
12
13#[pin_project]
14#[derive(new)]
15#[must_use]
16pub struct Pipe<A, B> {
17 #[pin]
18 a: A,
19 b: B,
20}
21
22impl_par_dist! {
23 impl<A: ParallelStream, B: ParallelPipe<A::Item>> ParallelStream for Pipe<A, B> {
24 type Item = B::Output;
25 type Task = JoinTask<A::Task, B::Task>;
26
27 #[inline(always)]
28 fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
29 let self_ = self.project();
30 let b = self_.b;
31 self_.a.next_task(cx).map(|task| {
32 task.map(|a| {
33 let b = b.task();
34 JoinTask { a, b }
35 })
36 })
37 }
38 #[inline(always)]
39 fn size_hint(&self) -> (usize, Option<usize>) {
40 self.a.size_hint()
41 }
42 }
43 impl<A: ParallelPipe<Input>, B: ParallelPipe<A::Output>, Input> ParallelPipe<Input>
44 for Pipe<A, B>
45 {
46 type Output = B::Output;
47 type Task = JoinTask<A::Task, B::Task>;
48
49 #[inline(always)]
50 fn task(&self) -> Self::Task {
51 let a = self.a.task();
52 let b = self.b.task();
53 JoinTask { a, b }
54 }
55 }
56}
57
58impl<A: ParallelPipe<Item>, B: ParallelSink<A::Output>, Item> ParallelSink<Item> for Pipe<A, B> {
59 type Done = B::Done;
60 type Pipe = Pipe<A, B::Pipe>;
61 type ReduceA = B::ReduceA;
62 type ReduceC = B::ReduceC;
63
64 #[inline(always)]
65 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) {
66 let (a, b, c) = self.b.reducers();
67 (Pipe::new(self.a, a), b, c)
68 }
69}
70impl<A: DistributedPipe<Item>, B: DistributedSink<A::Output>, Item> DistributedSink<Item>
71 for Pipe<A, B>
72{
73 type Done = B::Done;
74 type Pipe = Pipe<A, B::Pipe>;
75 type ReduceA = B::ReduceA;
76 type ReduceB = B::ReduceB;
77 type ReduceC = B::ReduceC;
78
79 #[inline(always)]
80 fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
81 let (a, b, c, d) = self.b.reducers();
82 (Pipe::new(self.a, a), b, c, d)
83 }
84}
85
86#[pin_project]
87#[derive(Serialize, Deserialize)]
88pub struct JoinTask<A, B> {
89 #[pin]
90 a: A,
91 #[pin]
92 b: B,
93}
94
95impl<A: StreamTask, B: PipeTask<A::Item>> StreamTask for JoinTask<A, B> {
96 type Item = B::Output;
97 type Async = StreamPipe<A::Async, B::Async>;
98
99 #[inline(always)]
100 fn into_async(self) -> Self::Async {
101 self.a.into_async().pipe(self.b.into_async())
102 }
103}
104
105impl<A: PipeTask<Input>, B: PipeTask<A::Output>, Input> PipeTask<Input> for JoinTask<A, B> {
106 type Output = B::Output;
107 type Async = PipePipe<A::Async, B::Async>;
108
109 #[inline(always)]
110 fn into_async(self) -> Self::Async {
111 self.a.into_async().pipe(self.b.into_async())
112 }
113}