1use derive_new::new;
2use futures::Stream;
3use pin_project::pin_project;
4use serde_closure::traits::FnMut;
5use std::{
6 pin::Pin, task::{Context, Poll}
7};
8
9use super::Pipe;
10
11#[pin_project]
12#[derive(new)]
13pub struct Map<P, F> {
14 #[pin]
15 pipe: P,
16 f: F,
17}
18
19impl<P: Stream, F> Stream for Map<P, F>
20where
21 F: FnMut<(P::Item,)>,
22{
23 type Item = F::Output;
24
25 #[inline]
26 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
27 let mut self_ = self.project();
28 let (mut pipe, f) = (self_.pipe, &mut self_.f);
29 pipe.as_mut()
30 .poll_next(cx)
31 .map(|t| t.map(|t| f.call_mut((t,))))
32 }
33}
34
35impl<P: Pipe<Input>, F, Input> Pipe<Input> for Map<P, F>
36where
37 F: FnMut<(P::Output,)>,
38{
39 type Output = F::Output;
40
41 #[inline]
42 fn poll_next(
43 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
44 ) -> Poll<Option<Self::Output>> {
45 let mut self_ = self.project();
46 let (mut pipe, f) = (self_.pipe, &mut self_.f);
47 pipe.as_mut()
48 .poll_next(cx, stream.as_mut())
49 .map(|t| t.map(|t| f.call_mut((t,))))
50 }
51}