tokio_stream_util/try_stream/ext/
err_into.rs

1use super::TryStream;
2use core::marker::PhantomData;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use tokio_stream::Stream;
6
7/// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
8#[derive(Clone)]
9#[must_use = "streams do nothing unless polled"]
10pub struct ErrInto<St, E> {
11    stream: St,
12    _phantom: PhantomData<E>,
13}
14
15impl<St, E> ErrInto<St, E> {
16    pub(super) fn new(stream: St) -> Self {
17        Self {
18            stream,
19            _phantom: PhantomData,
20        }
21    }
22
23    /// Acquires a reference to the underlying stream that this combinator is
24    /// pulling from.
25    ///
26    /// Note that care must be taken to avoid tampering with the state of the
27    /// stream which may otherwise confuse this combinator.
28    ///
29    /// # SAFETY
30    /// The returned reference is valid as long as `self` is valid.
31    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
32        // SAFETY: `stream` is pinned because it is inside a `Pin<ErrInto>`.
33        unsafe { self.map_unchecked_mut(|s| &mut s.stream) }
34    }
35}
36
37impl<St, E> Stream for ErrInto<St, E>
38where
39    St: TryStream,
40    St::Error: Into<E>,
41{
42    type Item = Result<St::Ok, E>;
43
44    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45        match self.get_pin_mut().try_poll_next(cx) {
46            Poll::Ready(Some(Ok(ok))) => Poll::Ready(Some(Ok(ok))),
47            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))),
48            Poll::Ready(None) => Poll::Ready(None),
49            Poll::Pending => Poll::Pending,
50        }
51    }
52}
53
54#[cfg(feature = "sink")]
55use async_sink::Sink;
56#[cfg(feature = "sink")]
57impl<St, E, Item> async_sink::Sink<Item> for ErrInto<St, E>
58where
59    E: From<St::Error> + core::error::Error,
60    St: Sink<Item>,
61{
62    type Error = E;
63
64    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
65        match self.get_pin_mut().poll_ready(cx) {
66            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
67            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
68            Poll::Pending => Poll::Pending,
69        }
70    }
71
72    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
73        match self.get_pin_mut().start_send(item) {
74            Ok(()) => Ok(()),
75            Err(e) => Err(e.into()),
76        }
77    }
78
79    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80        match self.get_pin_mut().poll_flush(cx) {
81            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
82            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
83            Poll::Pending => Poll::Pending,
84        }
85    }
86
87    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88        match self.get_pin_mut().poll_close(cx) {
89            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
90            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
91            Poll::Pending => Poll::Pending,
92        }
93    }
94}