async_sink/ext/
map_err.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3use tokio_stream::Stream;
4use tokio_stream_util::FusedStream;
5
6use super::Sink;
7
8/// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
9#[derive(Debug, Clone)]
10#[must_use = "sinks do nothing unless polled"]
11pub struct SinkMapErr<Si, F> {
12    sink: Si,
13    f: Option<F>,
14}
15
16impl<Si, F> SinkMapErr<Si, F> {
17    pub(super) fn new(sink: Si, f: F) -> Self {
18        Self { sink, f: Some(f) }
19    }
20
21    /// Acquires a reference to the underlying sink.
22    pub fn get_ref(&self) -> &Si {
23        &self.sink
24    }
25
26    /// Acquires a mutable reference to the underlying sink.
27    ///
28    /// Note that care must be taken to avoid tampering with the state of the
29    /// sink which may otherwise confuse this combinator.
30    pub fn get_mut(&mut self) -> &mut Si {
31        &mut self.sink
32    }
33
34    /// Acquires a pinned mutable reference to the underlying sink.
35    ///
36    /// Note that care must be taken to avoid tampering with the state of the
37    /// sink which may otherwise confuse this combinator.
38    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
39        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().sink) }
40    }
41
42    /// Consumes this combinator, returning the underlying sink.
43    ///
44    /// Note that this may discard intermediate state of this combinator, so
45    /// care should be taken to avoid losing resources when this is called.
46    pub fn into_inner(self) -> Si {
47        self.sink
48    }
49
50    fn take_f(self: Pin<&mut Self>) -> F {
51        unsafe { self.get_unchecked_mut() }
52            .f
53            .take()
54            .expect("polled MapErr after completion")
55    }
56}
57
58impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
59where
60    Si: Sink<Item>,
61    F: FnOnce(Si::Error) -> E,
62{
63    type Error = E;
64
65    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66        match self.as_mut().get_pin_mut().poll_ready(cx) {
67            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
68            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
69            Poll::Pending => Poll::Pending,
70        }
71    }
72
73    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
74        match self.as_mut().get_pin_mut().start_send(item) {
75            Ok(()) => Ok(()),
76            Err(e) => Err(self.take_f()(e)),
77        }
78    }
79
80    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        match self.as_mut().get_pin_mut().poll_flush(cx) {
82            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
83            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
84            Poll::Pending => Poll::Pending,
85        }
86    }
87
88    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89        match self.as_mut().get_pin_mut().poll_close(cx) {
90            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
91            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
92            Poll::Pending => Poll::Pending,
93        }
94    }
95}
96
97// Forwarding impl of Stream from the underlying sink
98impl<S: Stream, F> Stream for SinkMapErr<S, F> {
99    type Item = S::Item;
100
101    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102        self.as_mut().get_pin_mut().poll_next(cx)
103    }
104
105    fn size_hint(&self) -> (usize, Option<usize>) {
106        self.sink.size_hint()
107    }
108}
109
110impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
111    fn is_terminated(&self) -> bool {
112        self.sink.is_terminated()
113    }
114}