futures_util/stream/
map.rs1use futures_core::{Async, Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Map<S, F> {
12    stream: S,
13    f: F,
14}
15
16pub fn new<S, U, F>(s: S, f: F) -> Map<S, F>
17    where S: Stream,
18          F: FnMut(S::Item) -> U,
19{
20    Map {
21        stream: s,
22        f: f,
23    }
24}
25
26impl<S, F> Map<S, F> {
27    pub fn get_ref(&self) -> &S {
30        &self.stream
31    }
32
33    pub fn get_mut(&mut self) -> &mut S {
39        &mut self.stream
40    }
41
42    pub fn into_inner(self) -> S {
47        self.stream
48    }
49}
50
51impl<S, F> Sink for Map<S, F>
53    where S: Sink
54{
55    type SinkItem = S::SinkItem;
56    type SinkError = S::SinkError;
57
58    delegate_sink!(stream);
59}
60
61impl<S, F, U> Stream for Map<S, F>
62    where S: Stream,
63          F: FnMut(S::Item) -> U,
64{
65    type Item = U;
66    type Error = S::Error;
67
68    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<U>, S::Error> {
69        let option = try_ready!(self.stream.poll_next(cx));
70        Ok(Async::Ready(option.map(&mut self.f)))
71    }
72}