tokio_stream_util/try_stream/ext/inspect/
inspect_err.rs

1use crate::try_stream::IntoStream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7use crate::{FusedStream, TryStream};
8
9/// Stream for the [`inspect_err`](crate::TryStreamExt::inspect_err) method.
10pub struct InspectErr<St, F> {
11    stream: IntoStream<St>,
12    f: F,
13}
14
15pub(crate) struct InspectErrProj<'pin, St: 'pin, F: 'pin> {
16    pub stream: Pin<&'pin mut IntoStream<St>>,
17    pub f: &'pin mut F,
18}
19
20impl<St, F> InspectErr<St, F>
21where
22    St: TryStream,
23    F: FnMut(&St::Error),
24{
25    /// Construct a new `InspectErr` wrapper.
26    pub fn new(stream: St, f: F) -> Self {
27        let stream = IntoStream::new(stream);
28        Self { stream, f }
29    }
30
31    /// Return a mutable reference to the underlying original stream `St`.
32    pub fn get_mut(&mut self) -> &mut St {
33        self.stream.get_mut()
34    }
35
36    pub(crate) fn project<'pin>(self: Pin<&'pin mut Self>) -> InspectErrProj<'pin, St, F> {
37        unsafe {
38            let this = self.get_unchecked_mut();
39            InspectErrProj {
40                stream: Pin::new_unchecked(&mut this.stream),
41                f: &mut this.f,
42            }
43        }
44    }
45
46    /// Return a pinned mutable reference to the underlying original stream `St`.
47    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
48        self.project().stream.get_pin_mut()
49    }
50
51    /// Consume wrapper and return the underlying original stream `St`.
52    pub fn into_inner(self) -> St {
53        self.stream.into_inner()
54    }
55}
56
57//
58// Debug impls (delegate to inner types' Debug where available)
59//
60impl<St, F> fmt::Debug for InspectErr<St, F>
61where
62    F: fmt::Debug,
63    St: fmt::Debug,
64{
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        fmt::Debug::fmt(&self.stream, f)?;
67        fmt::Debug::fmt(&self.f, f)
68    }
69}
70
71//
72// Stream + FusedStream delegations
73//
74impl<St, F> tokio_stream::Stream for InspectErr<St, F>
75where
76    St: TryStream,
77    F: FnMut(&<St as crate::try_stream::TryStream>::Error),
78{
79    type Item = Result<St::Ok, St::Error>;
80
81    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82        let proj = self.project();
83        match proj.stream.poll_next(cx) {
84            Poll::Ready(Some(Ok(ok))) => Poll::Ready(Some(Ok(ok))),
85            Poll::Ready(Some(Err(err))) => {
86                (proj.f)(&err);
87                Poll::Ready(Some(Err(err)))
88            }
89            Poll::Ready(None) => Poll::Ready(None),
90            Poll::Pending => Poll::Pending,
91        }
92    }
93
94    fn size_hint(&self) -> (usize, Option<usize>) {
95        self.stream.size_hint()
96    }
97}
98
99impl<St, F> FusedStream for InspectErr<St, F>
100where
101    St: TryStream,
102    F: FnMut(&St::Error),
103    IntoStream<St>: FusedStream + tokio_stream::Stream,
104{
105    fn is_terminated(&self) -> bool {
106        self.stream.is_terminated()
107    }
108}
109
110#[cfg(feature = "sink")]
111use async_sink::Sink;
112#[cfg(feature = "sink")]
113impl<St, Item, F> Sink<Item> for InspectErr<St, F>
114where
115    St: TryStream + Sink<Item>,
116    F: FnMut(&<St as crate::try_stream::TryStream>::Error),
117{
118    type Error = <St as async_sink::Sink<Item>>::Error;
119
120    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121        let this = unsafe { self.get_unchecked_mut() };
122        let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
123        into_stream.get_pin_mut().poll_ready(cx)
124    }
125
126    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
127        let this = unsafe { self.get_unchecked_mut() };
128        let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
129        into_stream.get_pin_mut().start_send(item)
130    }
131
132    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133        let this = unsafe { self.get_unchecked_mut() };
134        let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
135        into_stream.get_pin_mut().poll_flush(cx)
136    }
137
138    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
139        let this = unsafe { self.get_unchecked_mut() };
140        let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
141        into_stream.get_pin_mut().poll_close(cx)
142    }
143}