futures_util/sink/
map_err.rs1use futures_core::{Poll, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Clone, Debug)]
7#[must_use = "sinks do nothing unless polled"]
8pub struct SinkMapErr<S, F> {
9 sink: S,
10 f: Option<F>,
11}
12
13pub fn new<S, F>(s: S, f: F) -> SinkMapErr<S, F> {
14 SinkMapErr { sink: s, f: Some(f) }
15}
16
17impl<S, F> SinkMapErr<S, F> {
18 pub fn get_ref(&self) -> &S {
20 &self.sink
21 }
22
23 pub fn get_mut(&mut self) -> &mut S {
25 &mut self.sink
26 }
27
28 pub fn into_inner(self) -> S {
33 self.sink
34 }
35
36 fn expect_f(&mut self) -> F {
37 self.f.take().expect("cannot use MapErr after an error")
38 }
39}
40
41impl<S, F, E> Sink for SinkMapErr<S, F>
42 where S: Sink,
43 F: FnOnce(S::SinkError) -> E,
44{
45 type SinkItem = S::SinkItem;
46 type SinkError = E;
47
48 fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
49 self.sink.poll_ready(cx).map_err(|e| self.expect_f()(e))
50 }
51
52 fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
53 self.sink.start_send(item).map_err(|e| self.expect_f()(e))
54 }
55
56 fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
57 self.sink.poll_flush(cx).map_err(|e| self.expect_f()(e))
58 }
59
60 fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
61 self.sink.poll_close(cx).map_err(|e| self.expect_f()(e))
62 }
63}
64
65impl<S: Stream, F> Stream for SinkMapErr<S, F> {
66 type Item = S::Item;
67 type Error = S::Error;
68
69 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
70 self.sink.poll_next(cx)
71 }
72}