amadeus_core/par_stream/
cloned.rs1use derive_new::new;
2use futures::Stream;
3use pin_project::pin_project;
4use serde::{Deserialize, Serialize};
5use std::{
6 marker::PhantomData, pin::Pin, task::{Context, Poll}
7};
8
9use super::{ParallelPipe, PipeTask};
10use crate::pipe::Pipe;
11
12#[derive(new)]
13#[must_use]
14pub struct Cloned<P, T, Input> {
15 pipe: P,
16 marker: PhantomData<fn() -> (Input, T)>,
17}
18
19impl_par_dist! {
20 impl<'a, P, Input, T: 'a> ParallelPipe<&'a Input> for Cloned<P, T, Input>
21 where
22 P: ParallelPipe<&'a Input, Output = &'a T>,
23 T: Clone,
24 {
25 type Output = T;
26 type Task = ClonedTask<P::Task>;
27
28 fn task(&self) -> Self::Task {
29 let task = self.pipe.task();
30 ClonedTask { task }
31 }
32 }
33}
34
35#[pin_project]
36#[derive(Serialize, Deserialize)]
37pub struct ClonedTask<T> {
38 #[pin]
39 task: T,
40}
41impl<'a, C, Input, T: 'a> PipeTask<&'a Input> for ClonedTask<C>
42where
43 C: PipeTask<&'a Input, Output = &'a T>,
44 T: Clone,
45{
46 type Output = T;
47 type Async = ClonedTask<C::Async>;
48
49 fn into_async(self) -> Self::Async {
50 ClonedTask {
51 task: self.task.into_async(),
52 }
53 }
54}
55
56impl<'a, C, Input: 'a, T: 'a> Pipe<&'a Input> for ClonedTask<C>
57where
58 C: Pipe<&'a Input, Output = &'a T>,
59 T: Clone,
60{
61 type Output = T;
62
63 fn poll_next(
64 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = &'a Input>>,
65 ) -> Poll<Option<Self::Output>> {
66 self.project()
67 .task
68 .poll_next(cx, stream)
69 .map(Option::<&_>::cloned)
70 }
71}