1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
use derive_new::new; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{ pin::Pin, task::{Context, Poll} }; use super::{DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask}; use crate::{ par_stream::{ParallelStream, StreamTask}, pipe::{Pipe as _, PipePipe, StreamExt, StreamPipe} }; #[pin_project] #[derive(new)] #[must_use] pub struct Pipe<A, B> { #[pin] a: A, b: B, } impl_par_dist! { impl<A: ParallelStream, B: ParallelPipe<A::Item>> ParallelStream for Pipe<A, B> { type Item = B::Output; type Task = JoinTask<A::Task, B::Task>; #[inline(always)] fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> { let self_ = self.project(); let b = self_.b; self_.a.next_task(cx).map(|task| { task.map(|a| { let b = b.task(); JoinTask { a, b } }) }) } #[inline(always)] fn size_hint(&self) -> (usize, Option<usize>) { self.a.size_hint() } } impl<A: ParallelPipe<Input>, B: ParallelPipe<A::Output>, Input> ParallelPipe<Input> for Pipe<A, B> { type Output = B::Output; type Task = JoinTask<A::Task, B::Task>; #[inline(always)] fn task(&self) -> Self::Task { let a = self.a.task(); let b = self.b.task(); JoinTask { a, b } } } } impl<A: ParallelPipe<Item>, B: ParallelSink<A::Output>, Item> ParallelSink<Item> for Pipe<A, B> { type Done = B::Done; type Pipe = Pipe<A, B::Pipe>; type ReduceA = B::ReduceA; type ReduceC = B::ReduceC; #[inline(always)] fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceC) { let (a, b, c) = self.b.reducers(); (Pipe::new(self.a, a), b, c) } } impl<A: DistributedPipe<Item>, B: DistributedSink<A::Output>, Item> DistributedSink<Item> for Pipe<A, B> { type Done = B::Done; type Pipe = Pipe<A, B::Pipe>; type ReduceA = B::ReduceA; type ReduceB = B::ReduceB; type ReduceC = B::ReduceC; #[inline(always)] fn reducers(self) -> (Self::Pipe, Self::ReduceA, Self::ReduceB, Self::ReduceC) { let (a, b, c, d) = self.b.reducers(); (Pipe::new(self.a, a), b, c, d) } } #[pin_project] #[derive(Serialize, Deserialize)] pub struct JoinTask<A, B> { #[pin] a: A, #[pin] b: B, } impl<A: StreamTask, B: PipeTask<A::Item>> StreamTask for JoinTask<A, B> { type Item = B::Output; type Async = StreamPipe<A::Async, B::Async>; #[inline(always)] fn into_async(self) -> Self::Async { self.a.into_async().pipe(self.b.into_async()) } } impl<A: PipeTask<Input>, B: PipeTask<A::Output>, Input> PipeTask<Input> for JoinTask<A, B> { type Output = B::Output; type Async = PipePipe<A::Async, B::Async>; #[inline(always)] fn into_async(self) -> Self::Async { self.a.into_async().pipe(self.b.into_async()) } }