amadeus_core/par_stream/
cloned.rs

1use derive_new::new;
2use futures::Stream;
3use pin_project::pin_project;
4use serde::{Deserialize, Serialize};
5use std::{
6	marker::PhantomData, pin::Pin, task::{Context, Poll}
7};
8
9use super::{ParallelPipe, PipeTask};
10use crate::pipe::Pipe;
11
12#[derive(new)]
13#[must_use]
14pub struct Cloned<P, T, Input> {
15	pipe: P,
16	marker: PhantomData<fn() -> (Input, T)>,
17}
18
19impl_par_dist! {
20	impl<'a, P, Input, T: 'a> ParallelPipe<&'a Input> for Cloned<P, T, Input>
21	where
22		P: ParallelPipe<&'a Input, Output = &'a T>,
23		T: Clone,
24	{
25		type Output = T;
26		type Task = ClonedTask<P::Task>;
27
28		fn task(&self) -> Self::Task {
29			let task = self.pipe.task();
30			ClonedTask { task }
31		}
32	}
33}
34
35#[pin_project]
36#[derive(Serialize, Deserialize)]
37pub struct ClonedTask<T> {
38	#[pin]
39	task: T,
40}
41impl<'a, C, Input, T: 'a> PipeTask<&'a Input> for ClonedTask<C>
42where
43	C: PipeTask<&'a Input, Output = &'a T>,
44	T: Clone,
45{
46	type Output = T;
47	type Async = ClonedTask<C::Async>;
48
49	fn into_async(self) -> Self::Async {
50		ClonedTask {
51			task: self.task.into_async(),
52		}
53	}
54}
55
56impl<'a, C, Input: 'a, T: 'a> Pipe<&'a Input> for ClonedTask<C>
57where
58	C: Pipe<&'a Input, Output = &'a T>,
59	T: Clone,
60{
61	type Output = T;
62
63	fn poll_next(
64		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = &'a Input>>,
65	) -> Poll<Option<Self::Output>> {
66		self.project()
67			.task
68			.poll_next(cx, stream)
69			.map(Option::<&_>::cloned)
70	}
71}