1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
use derive_new::new; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use serde_closure::traits::FnMut; use std::{ pin::Pin, task::{Context, Poll} }; use super::{ParallelPipe, ParallelStream, PipeTask, StreamTask}; #[pin_project] #[derive(new)] #[must_use] pub struct Filter<P, F> { #[pin] pipe: P, f: F, } impl_par_dist! { impl<P: ParallelStream, F> ParallelStream for Filter<P, F> where F: for<'a> FnMut<(&'a P::Item,), Output = bool> + Clone + Send + 'static, { type Item = P::Item; type Task = FilterTask<P::Task, F>; fn size_hint(&self) -> (usize, Option<usize>) { (0, self.pipe.size_hint().1) } fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> { let self_ = self.project(); let f = self_.f; self_.pipe.next_task(cx).map(|task| { task.map(|task| { let f = f.clone(); FilterTask { task, f } }) }) } } impl<P: ParallelPipe<Input>, F, Input> ParallelPipe<Input> for Filter<P, F> where F: for<'a> FnMut<(&'a P::Output,), Output = bool> + Clone + Send + 'static, { type Output = P::Output; type Task = FilterTask<P::Task, F>; fn task(&self) -> Self::Task { let task = self.pipe.task(); let f = self.f.clone(); FilterTask { task, f } } } } #[derive(Serialize, Deserialize)] pub struct FilterTask<C, F> { task: C, f: F, } impl<C: StreamTask, F> StreamTask for FilterTask<C, F> where F: for<'a> FnMut<(&'a C::Item,), Output = bool>, { type Item = C::Item; type Async = crate::pipe::Filter<C::Async, F>; fn into_async(self) -> Self::Async { crate::pipe::Filter::new(self.task.into_async(), self.f) } } impl<C: PipeTask<Input>, F, Input> PipeTask<Input> for FilterTask<C, F> where F: for<'a> FnMut<(&'a C::Output,), Output = bool>, { type Output = C::Output; type Async = crate::pipe::Filter<C::Async, F>; fn into_async(self) -> Self::Async { crate::pipe::Filter::new(self.task.into_async(), self.f) } }