1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use derive_new::new;
use futures::{pin_mut, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
	marker::PhantomData, pin::Pin, task::{Context, Poll}
};

use super::{ParallelPipe, PipeTask, PipeTaskAsync};
use crate::sink::{Sink, SinkMap};

#[derive(new)]
#[must_use]
pub struct Cloned<I, T, Source> {
	i: I,
	marker: PhantomData<fn(Source, T)>,
}

// impl<'a,I,T:'a> DistributedStream for Cloned<I>
// where
// 	I: DistributedStream<Item = &'a T>,
// 	T: Clone,
// {
// 	type Item = T;
// 	type Task = ClonedTask<I::Task>;
// 	fn size_hint(&self) -> (usize, Option<usize>) {
// 		self.i.size_hint()
// 	}
// 	fn next_task(&mut self) -> Option<Self::Task> {
// 		self.i.next_task().map(|task| ClonedTask { task })
// 	}
// }

// https://github.com/rust-lang/rust/issues/55731
// https://play.rust-lang.org/?version=nightly&mode=debug&edition=2015&gist=238651c4992913bcd62b68b4832fcd9a
// https://play.rust-lang.org/?version=nightly&mode=debug&edition=2015&gist=2f1da304878b050cc313c0279047b0fa

impl_par_dist! {
	impl<'a, I, Source, T: 'a> ParallelPipe<&'a Source> for Cloned<I, T, Source>
	where
		I: ParallelPipe<&'a Source, Item = &'a T>,
		T: Clone,
	{
		type Item = T;
		type Task = ClonedTask<I::Task>;

		fn task(&self) -> Self::Task {
			let task = self.i.task();
			ClonedTask { task }
		}
	}
}

#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct ClonedTask<T> {
	#[pin]
	task: T,
}
impl<'a, C, Source, T: 'a> PipeTask<&'a Source> for ClonedTask<C>
where
	C: PipeTask<&'a Source, Item = &'a T>,
	T: Clone,
{
	type Item = T;
	type Async = ClonedTask<C::Async>;
	fn into_async(self) -> Self::Async {
		ClonedTask {
			task: self.task.into_async(),
		}
	}
}
// impl<'a,C,T:'a> StreamTask for  ClonedTask<C>
// where
// 	C: StreamTask<Item = &'a T>,
// 	T: Clone,
// {
// 	type Item = T;
// 	fn run(self, i: &mut impl FnMut(Self::Item) -> bool) -> bool {
// 		self.task.run(&mut |item| i(item.clone()))
// 	}
// }
impl<'a, C, Source: 'a, T: 'a> PipeTaskAsync<&'a Source> for ClonedTask<C>
where
	C: PipeTaskAsync<&'a Source, Item = &'a T>,
	T: Clone,
{
	type Item = T;

	fn poll_run(
		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = &'a Source>>,
		sink: Pin<&mut impl Sink<Item = Self::Item>>,
	) -> Poll<()> {
		let sink = SinkMap::new(sink, |item: &T| item.clone());
		pin_mut!(sink);
		self.project().task.poll_run(cx, stream, sink)
	}
}