1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
use derive_new::new; use futures::{pin_mut, Stream}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{ marker::PhantomData, pin::Pin, task::{Context, Poll} }; use super::{ DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask, PipeTaskAsync }; use crate::sink::Sink; #[derive(new)] #[must_use] pub struct Pipe<A, B> { a: A, b: B, } impl_par_dist! { impl<A: ParallelPipe<Source>, B: ParallelPipe<A::Item>, Source> ParallelPipe<Source> for Pipe<A, B> { type Item = B::Item; type Task = JoinTask<A::Task, B::Task>; fn task(&self) -> Self::Task { let a = self.a.task(); let b = self.b.task(); JoinTask { a, b } } } } impl<A: ParallelPipe<Source>, B: ParallelSink<A::Item>, Source> ParallelSink<Source> for Pipe<A, B> { type Output = B::Output; type Pipe = Pipe<A, B::Pipe>; type ReduceAFactory = B::ReduceAFactory; type ReduceA = B::ReduceA; type ReduceC = B::ReduceC; fn reducers(self) -> (Self::Pipe, Self::ReduceAFactory, Self::ReduceC) { let (a, b, c) = self.b.reducers(); (Pipe::new(self.a, a), b, c) } } impl<A: DistributedPipe<Source>, B: DistributedSink<A::Item>, Source> DistributedSink<Source> for Pipe<A, B> { type Output = B::Output; type Pipe = Pipe<A, B::Pipe>; type ReduceAFactory = B::ReduceAFactory; type ReduceBFactory = B::ReduceBFactory; type ReduceA = B::ReduceA; type ReduceB = B::ReduceB; type ReduceC = B::ReduceC; fn reducers( self, ) -> ( Self::Pipe, Self::ReduceAFactory, Self::ReduceBFactory, Self::ReduceC, ) { let (a, b, c, d) = self.b.reducers(); (Pipe::new(self.a, a), b, c, d) } } #[pin_project] #[derive(Serialize, Deserialize)] pub struct JoinTask<A, B> { #[pin] a: A, #[pin] b: B, } impl<A: PipeTask<Source>, B: PipeTask<A::Item>, Source> PipeTask<Source> for JoinTask<A, B> { type Item = B::Item; type Async = JoinTask<A::Async, B::Async>; fn into_async(self) -> Self::Async { JoinTask { a: self.a.into_async(), b: self.b.into_async(), } } } impl<A: PipeTaskAsync<Source>, B: PipeTaskAsync<A::Item>, Source> PipeTaskAsync<Source> for JoinTask<A, B> { type Item = B::Item; fn poll_run( self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Source>>, sink: Pin<&mut impl Sink<Item = Self::Item>>, ) -> Poll<()> { #[pin_project] struct Proxy<'a, I, B, Item>(#[pin] I, Pin<&'a mut B>, PhantomData<fn() -> Item>); impl<'a, I, B, Item> Sink for Proxy<'a, I, B, Item> where I: Sink<Item = B::Item>, B: PipeTaskAsync<Item>, { type Item = Item; fn poll_forward( self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Self::Item>>, ) -> Poll<()> { let self_ = self.project(); self_.1.as_mut().poll_run(cx, stream, self_.0) } } let self_ = self.project(); let sink = Proxy(sink, self_.b, PhantomData); pin_mut!(sink); self_.a.poll_run(cx, stream, sink) } }