use derive_new::new;
use futures::{pin_mut, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
marker::PhantomData, pin::Pin, task::{Context, Poll}
};
use super::{ParallelPipe, PipeTask, PipeTaskAsync};
use crate::sink::{Sink, SinkMap};
#[derive(new)]
#[must_use]
pub struct Cloned<I, T, Source> {
i: I,
marker: PhantomData<fn(Source, T)>,
}
impl_par_dist! {
impl<'a, I, Source, T: 'a> ParallelPipe<&'a Source> for Cloned<I, T, Source>
where
I: ParallelPipe<&'a Source, Item = &'a T>,
T: Clone,
{
type Item = T;
type Task = ClonedTask<I::Task>;
fn task(&self) -> Self::Task {
let task = self.i.task();
ClonedTask { task }
}
}
}
#[pin_project]
#[derive(Serialize, Deserialize)]
pub struct ClonedTask<T> {
#[pin]
task: T,
}
impl<'a, C, Source, T: 'a> PipeTask<&'a Source> for ClonedTask<C>
where
C: PipeTask<&'a Source, Item = &'a T>,
T: Clone,
{
type Item = T;
type Async = ClonedTask<C::Async>;
fn into_async(self) -> Self::Async {
ClonedTask {
task: self.task.into_async(),
}
}
}
impl<'a, C, Source: 'a, T: 'a> PipeTaskAsync<&'a Source> for ClonedTask<C>
where
C: PipeTaskAsync<&'a Source, Item = &'a T>,
T: Clone,
{
type Item = T;
fn poll_run(
self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = &'a Source>>,
sink: Pin<&mut impl Sink<Item = Self::Item>>,
) -> Poll<()> {
let sink = SinkMap::new(sink, |item: &T| item.clone());
pin_mut!(sink);
self.project().task.poll_run(cx, stream, sink)
}
}