1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use derive_new::new;
use futures::{pin_mut, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
marker::PhantomData, pin::Pin, task::{Context, Poll}
};
use super::{ParallelPipe, PipeTask, PipeTaskAsync};
use crate::sink::{Sink, SinkMap};
#[derive(new)]
#[must_use]
pub struct Cloned<I, T, Source> {
i: I,
marker: PhantomData<fn(Source, T)>,
}
impl_par_dist! {
impl<'a, I, Source, T: 'a> ParallelPipe<&'a Source> for Cloned<I, T, Source>
where
I: ParallelPipe<&'a Source, Item = &'a T>,
T: Clone,
{
type Item = T;
type Task = ClonedTask<I::Task>;
fn task(&self) -> Self::Task {
let task = self.i.task();
ClonedTask { task }
}
}
}
#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct ClonedTask<T> {
#[pin]
task: T,
}
impl<'a, C, Source, T: 'a> PipeTask<&'a Source> for ClonedTask<C>
where
C: PipeTask<&'a Source, Item = &'a T>,
T: Clone,
{
type Item = T;
type Async = ClonedTask<C::Async>;
fn into_async(self) -> Self::Async {
ClonedTask {
task: self.task.into_async(),
}
}
}
impl<'a, C, Source: 'a, T: 'a> PipeTaskAsync<&'a Source> for ClonedTask<C>
where
C: PipeTaskAsync<&'a Source, Item = &'a T>,
T: Clone,
{
type Item = T;
fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = &'a Source>>,
sink: Pin<&mut impl Sink<Item = Self::Item>>,
) -> Poll<()> {
let sink = SinkMap::new(sink, |item: &T| item.clone());
pin_mut!(sink);
self.project().task.poll_run(cx, stream, sink)
}
}