tokio_stream_util/try_stream/ext/inspect/
inspect_err.rs1use crate::try_stream::IntoStream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7use crate::{FusedStream, TryStream};
8
9pub struct InspectErr<St, F> {
11 stream: IntoStream<St>,
12 f: F,
13}
14
15pub(crate) struct InspectErrProj<'pin, St: 'pin, F: 'pin> {
16 pub stream: Pin<&'pin mut IntoStream<St>>,
17 pub f: &'pin mut F,
18}
19
20impl<St, F> InspectErr<St, F>
21where
22 St: TryStream,
23 F: FnMut(&St::Error),
24{
25 pub fn new(stream: St, f: F) -> Self {
27 let stream = IntoStream::new(stream);
28 Self { stream, f }
29 }
30
31 pub fn get_mut(&mut self) -> &mut St {
33 self.stream.get_mut()
34 }
35
36 pub(crate) fn project<'pin>(self: Pin<&'pin mut Self>) -> InspectErrProj<'pin, St, F> {
37 unsafe {
38 let this = self.get_unchecked_mut();
39 InspectErrProj {
40 stream: Pin::new_unchecked(&mut this.stream),
41 f: &mut this.f,
42 }
43 }
44 }
45
46 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
48 self.project().stream.get_pin_mut()
49 }
50
51 pub fn into_inner(self) -> St {
53 self.stream.into_inner()
54 }
55}
56
57impl<St, F> fmt::Debug for InspectErr<St, F>
61where
62 F: fmt::Debug,
63 St: fmt::Debug,
64{
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 fmt::Debug::fmt(&self.stream, f)?;
67 fmt::Debug::fmt(&self.f, f)
68 }
69}
70
71impl<St, F> tokio_stream::Stream for InspectErr<St, F>
75where
76 St: TryStream,
77 F: FnMut(&<St as crate::try_stream::TryStream>::Error),
78{
79 type Item = Result<St::Ok, St::Error>;
80
81 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82 let proj = self.project();
83 match proj.stream.poll_next(cx) {
84 Poll::Ready(Some(Ok(ok))) => Poll::Ready(Some(Ok(ok))),
85 Poll::Ready(Some(Err(err))) => {
86 (proj.f)(&err);
87 Poll::Ready(Some(Err(err)))
88 }
89 Poll::Ready(None) => Poll::Ready(None),
90 Poll::Pending => Poll::Pending,
91 }
92 }
93
94 fn size_hint(&self) -> (usize, Option<usize>) {
95 self.stream.size_hint()
96 }
97}
98
99impl<St, F> FusedStream for InspectErr<St, F>
100where
101 St: TryStream,
102 F: FnMut(&St::Error),
103 IntoStream<St>: FusedStream + tokio_stream::Stream,
104{
105 fn is_terminated(&self) -> bool {
106 self.stream.is_terminated()
107 }
108}
109
110#[cfg(feature = "sink")]
111use async_sink::Sink;
112#[cfg(feature = "sink")]
113impl<St, Item, F> Sink<Item> for InspectErr<St, F>
114where
115 St: TryStream + Sink<Item>,
116 F: FnMut(&<St as crate::try_stream::TryStream>::Error),
117{
118 type Error = <St as async_sink::Sink<Item>>::Error;
119
120 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121 let this = unsafe { self.get_unchecked_mut() };
122 let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
123 into_stream.get_pin_mut().poll_ready(cx)
124 }
125
126 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
127 let this = unsafe { self.get_unchecked_mut() };
128 let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
129 into_stream.get_pin_mut().start_send(item)
130 }
131
132 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133 let this = unsafe { self.get_unchecked_mut() };
134 let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
135 into_stream.get_pin_mut().poll_flush(cx)
136 }
137
138 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
139 let this = unsafe { self.get_unchecked_mut() };
140 let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
141 into_stream.get_pin_mut().poll_close(cx)
142 }
143}