async_sink/ext/
map_err.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3use tokio_stream::Stream;
4
5use super::Sink;
6
7/// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
8#[derive(Debug, Clone)]
9#[must_use = "sinks do nothing unless polled"]
10pub struct SinkMapErr<Si, F> {
11    sink: Si,
12    f: Option<F>,
13}
14
15impl<Si, F> SinkMapErr<Si, F> {
16    pub(super) fn new(sink: Si, f: F) -> Self {
17        Self { sink, f: Some(f) }
18    }
19
20    /// Acquires a reference to the underlying sink.
21    pub fn get_ref(&self) -> &Si {
22        &self.sink
23    }
24
25    /// Acquires a mutable reference to the underlying sink.
26    ///
27    /// Note that care must be taken to avoid tampering with the state of the
28    /// sink which may otherwise confuse this combinator.
29    pub fn get_mut(&mut self) -> &mut Si {
30        &mut self.sink
31    }
32
33    /// Acquires a pinned mutable reference to the underlying sink.
34    ///
35    /// Note that care must be taken to avoid tampering with the state of the
36    /// sink which may otherwise confuse this combinator.
37    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
38        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().sink) }
39    }
40
41    /// Consumes this combinator, returning the underlying sink.
42    ///
43    /// Note that this may discard intermediate state of this combinator, so
44    /// care should be taken to avoid losing resources when this is called.
45    pub fn into_inner(self) -> Si {
46        self.sink
47    }
48
49    fn take_f(self: Pin<&mut Self>) -> F {
50        unsafe { self.get_unchecked_mut() }
51            .f
52            .take()
53            .expect("polled MapErr after completion")
54    }
55}
56
57impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
58where
59    Si: Sink<Item>,
60    F: FnOnce(Si::Error) -> E,
61{
62    type Error = E;
63
64    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
65        match self.as_mut().get_pin_mut().poll_ready(cx) {
66            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
67            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
68            Poll::Pending => Poll::Pending,
69        }
70    }
71
72    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
73        match self.as_mut().get_pin_mut().start_send(item) {
74            Ok(()) => Ok(()),
75            Err(e) => Err(self.take_f()(e)),
76        }
77    }
78
79    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80        match self.as_mut().get_pin_mut().poll_flush(cx) {
81            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
82            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
83            Poll::Pending => Poll::Pending,
84        }
85    }
86
87    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88        match self.as_mut().get_pin_mut().poll_close(cx) {
89            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
90            Poll::Ready(Err(e)) => Poll::Ready(Err(self.take_f()(e))),
91            Poll::Pending => Poll::Pending,
92        }
93    }
94}
95
96// Forwarding impl of Stream from the underlying sink
97impl<S: Stream, F> Stream for SinkMapErr<S, F> {
98    type Item = S::Item;
99
100    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101        self.as_mut().get_pin_mut().poll_next(cx)
102    }
103
104    fn size_hint(&self) -> (usize, Option<usize>) {
105        self.sink.size_hint()
106    }
107}