1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use derive_new::new;
use futures::Stream;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
	marker::PhantomData, pin::Pin, task::{Context, Poll}
};

use super::{ParallelPipe, PipeTask};
use crate::pipe::Pipe;

#[derive(new)]
#[must_use]
pub struct Cloned<I, T, Source> {
	i: I,
	marker: PhantomData<fn(Source, T)>,
}

// impl<'a,I,T:'a> DistributedStream for Cloned<I>
// where
// 	I: DistributedStream<Item = &'a T>,
// 	T: Clone,
// {
// 	type Item = T;
// 	type Task = ClonedTask<I::Task>;
// 	fn size_hint(&self) -> (usize, Option<usize>) {
// 		self.i.size_hint()
// 	}
// 	fn next_task(&mut self) -> Option<Self::Task> {
// 		self.i.next_task().map(|task| ClonedTask { task })
// 	}
// }

// https://github.com/rust-lang/rust/issues/55731
// https://play.rust-lang.org/?version=nightly&mode=debug&edition=2015&gist=238651c4992913bcd62b68b4832fcd9a
// https://play.rust-lang.org/?version=nightly&mode=debug&edition=2015&gist=2f1da304878b050cc313c0279047b0fa

impl_par_dist! {
	impl<'a, I, Source, T: 'a> ParallelPipe<&'a Source> for Cloned<I, T, Source>
	where
		I: ParallelPipe<&'a Source, Item = &'a T>,
		T: Clone,
	{
		type Item = T;
		type Task = ClonedTask<I::Task>;

		fn task(&self) -> Self::Task {
			let task = self.i.task();
			ClonedTask { task }
		}
	}
}

#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct ClonedTask<T> {
	#[pin]
	task: T,
}
impl<'a, C, Source, T: 'a> PipeTask<&'a Source> for ClonedTask<C>
where
	C: PipeTask<&'a Source, Item = &'a T>,
	T: Clone,
{
	type Item = T;
	type Async = ClonedTask<C::Async>;

	fn into_async(self) -> Self::Async {
		ClonedTask {
			task: self.task.into_async(),
		}
	}
}
// impl<'a,C,T:'a> StreamTask for  ClonedTask<C>
// where
// 	C: StreamTask<Item = &'a T>,
// 	T: Clone,
// {
// 	type Item = T;
// 	fn run(self, i: &mut impl FnMut(Self::Item) -> bool) -> bool {
// 		self.task.run(&mut |item| i(item.clone()))
// 	}
// }
impl<'a, C, Source: 'a, T: 'a> Pipe<&'a Source> for ClonedTask<C>
where
	C: Pipe<&'a Source, Item = &'a T>,
	T: Clone,
{
	type Item = T;

	fn poll_next(
		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = &'a Source>>,
	) -> Poll<Option<Self::Item>> {
		self.project()
			.task
			.poll_next(cx, stream)
			.map(Option::<&_>::cloned)
	}
}