futures_util/sink/
map_err.rs

1use futures_core::{Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5/// Sink for the `Sink::sink_map_err` combinator.
6#[derive(Clone, Debug)]
7#[must_use = "sinks do nothing unless polled"]
8pub struct SinkMapErr<S, F> {
9    sink: S,
10    f: Option<F>,
11}
12
13pub fn new<S, F>(s: S, f: F) -> SinkMapErr<S, F> {
14    SinkMapErr { sink: s, f: Some(f) }
15}
16
17impl<S, F> SinkMapErr<S, F> {
18    /// Get a shared reference to the inner sink.
19    pub fn get_ref(&self) -> &S {
20        &self.sink
21    }
22
23    /// Get a mutable reference to the inner sink.
24    pub fn get_mut(&mut self) -> &mut S {
25        &mut self.sink
26    }
27
28    /// Consumes this combinator, returning the underlying sink.
29    ///
30    /// Note that this may discard intermediate state of this combinator, so
31    /// care should be taken to avoid losing resources when this is called.
32    pub fn into_inner(self) -> S {
33        self.sink
34    }
35
36    fn expect_f(&mut self) -> F {
37        self.f.take().expect("cannot use MapErr after an error")
38    }
39}
40
41impl<S, F, E> Sink for SinkMapErr<S, F>
42    where S: Sink,
43          F: FnOnce(S::SinkError) -> E,
44{
45    type SinkItem = S::SinkItem;
46    type SinkError = E;
47
48    fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
49        self.sink.poll_ready(cx).map_err(|e| self.expect_f()(e))
50    }
51
52    fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
53        self.sink.start_send(item).map_err(|e| self.expect_f()(e))
54    }
55
56    fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
57        self.sink.poll_flush(cx).map_err(|e| self.expect_f()(e))
58    }
59
60    fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
61        self.sink.poll_close(cx).map_err(|e| self.expect_f()(e))
62    }
63}
64
65impl<S: Stream, F> Stream for SinkMapErr<S, F> {
66    type Item = S::Item;
67    type Error = S::Error;
68
69    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
70        self.sink.poll_next(cx)
71    }
72}