async_sink/ext/
map_err.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3use tokio_stream::Stream;
4
5use super::Sink;
6
7/// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
8#[derive(Debug, Clone)]
9#[must_use = "sinks do nothing unless polled"]
10pub struct SinkMapErr<Si, F> {
11    sink: Si,
12    f: Option<F>,
13}
14
15impl<Si, F> SinkMapErr<Si, F> {
16    pub(super) fn new(sink: Si, f: F) -> Self {
17        Self { sink, f: Some(f) }
18    }
19
20    /// Acquires a reference to the underlying sink.
21    pub fn get_ref(&self) -> &Si {
22        &self.sink
23    }
24
25    /// Acquires a mutable reference to the underlying sink.
26    ///
27    /// Note that care must be taken to avoid tampering with the state of the
28    /// sink which may otherwise confuse this combinator.
29    pub fn get_mut(&mut self) -> &mut Si {
30        &mut self.sink
31    }
32
33    /// Acquires a pinned mutable reference to the underlying sink.
34    ///
35    /// Note that care must be taken to avoid tampering with the state of the
36    /// sink which may otherwise confuse this combinator.
37    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
38        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().sink) }
39    }
40
41    /// Consumes this combinator, returning the underlying sink.
42    ///
43    /// Note that this may discard intermediate state of this combinator, so
44    /// care should be taken to avoid losing resources when this is called.
45    pub fn into_inner(self) -> Si {
46        self.sink
47    }
48
49    fn take_f(self: Pin<&mut Self>) -> F {
50        unsafe { self.get_unchecked_mut() }
51            .f
52            .take()
53            .expect("polled MapErr after completion")
54    }
55}
56
57impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
58where
59    E: core::error::Error,
60    Si: Sink<Item>,
61    F: FnOnce(Si::Error) -> E,
62{
63    type Error = E;
64
65    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66        match self.as_mut().get_pin_mut().poll_ready(cx) {
67            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
68            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
69            Poll::Pending => Poll::Pending,
70        }
71    }
72
73    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
74        match self.as_mut().get_pin_mut().start_send(item) {
75            Ok(()) => Ok(()),
76            Err(e) => Err(self.take_f()(e)),
77        }
78    }
79
80    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        match self.as_mut().get_pin_mut().poll_flush(cx) {
82            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
83            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
84            Poll::Pending => Poll::Pending,
85        }
86    }
87
88    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
89        match self.as_mut().get_pin_mut().poll_close(cx) {
90            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
91            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
92            Poll::Pending => Poll::Pending,
93        }
94    }
95}
96
97// Forwarding impl of Stream from the underlying sink
98impl<S: Stream, F> Stream for SinkMapErr<S, F> {
99    type Item = S::Item;
100
101    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
102        self.as_mut().get_pin_mut().poll_next(cx)
103    }
104
105    fn size_hint(&self) -> (usize, Option<usize>) {
106        self.sink.size_hint()
107    }
108}