1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
use derive_new::new; use futures::Stream; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::{ marker::PhantomData, pin::Pin, task::{Context, Poll} }; use super::{ParallelPipe, PipeTask}; use crate::pipe::Pipe; #[derive(new)] #[must_use] pub struct Cloned<P, T, Input> { pipe: P, marker: PhantomData<fn() -> (Input, T)>, } impl_par_dist! { impl<'a, P, Input, T: 'a> ParallelPipe<&'a Input> for Cloned<P, T, Input> where P: ParallelPipe<&'a Input, Output = &'a T>, T: Clone, { type Output = T; type Task = ClonedTask<P::Task>; fn task(&self) -> Self::Task { let task = self.pipe.task(); ClonedTask { task } } } } #[pin_project] #[derive(Serialize, Deserialize)] pub struct ClonedTask<T> { #[pin] task: T, } impl<'a, C, Input, T: 'a> PipeTask<&'a Input> for ClonedTask<C> where C: PipeTask<&'a Input, Output = &'a T>, T: Clone, { type Output = T; type Async = ClonedTask<C::Async>; fn into_async(self) -> Self::Async { ClonedTask { task: self.task.into_async(), } } } impl<'a, C, Input: 'a, T: 'a> Pipe<&'a Input> for ClonedTask<C> where C: Pipe<&'a Input, Output = &'a T>, T: Clone, { type Output = T; fn poll_next( self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = &'a Input>>, ) -> Poll<Option<Self::Output>> { self.project() .task .poll_next(cx, stream) .map(Option::<&_>::cloned) } }