tokio_stream_util/try_stream/ext/inspect/
inspect_ok.rs1use crate::try_stream::IntoStream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7use crate::{FusedStream, TryStream};
8
9pub struct InspectOk<St, F> {
11 stream: IntoStream<St>,
12 f: F,
13}
14
15pub(crate) struct InspectOkProj<'pin, St: 'pin, F: 'pin> {
16 pub stream: Pin<&'pin mut IntoStream<St>>,
17 pub f: &'pin mut F,
18}
19
20impl<St, F> InspectOk<St, F>
21where
22 St: TryStream,
23 F: FnMut(&St::Ok),
24{
25 pub fn new(stream: St, f: F) -> Self {
27 let stream = IntoStream::new(stream);
28 Self { stream, f }
29 }
30
31 pub fn get_mut(&mut self) -> &mut St {
33 self.stream.get_mut()
37 }
38
39 pub(crate) fn project<'pin>(self: Pin<&'pin mut Self>) -> InspectOkProj<'pin, St, F> {
40 unsafe {
41 let this = self.get_unchecked_mut();
42 InspectOkProj {
43 stream: Pin::new_unchecked(&mut this.stream),
44 f: &mut this.f,
45 }
46 }
47 }
48
49 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
51 self.project().stream.get_pin_mut()
52 }
53
54 pub fn into_inner(self) -> St {
56 self.stream.into_inner()
57 }
58}
59
60impl<St, F> fmt::Debug for InspectOk<St, F>
65where
66 F: fmt::Debug,
67 St: fmt::Debug,
68{
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 fmt::Debug::fmt(&self.stream, f)?;
71 fmt::Debug::fmt(&self.f, f)
72 }
73}
74
75impl<St, F> tokio_stream::Stream for InspectOk<St, F>
80where
81 St: TryStream,
82 F: FnMut(&<St as crate::try_stream::TryStream>::Ok),
83{
84 type Item = Result<St::Ok, St::Error>;
85
86 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 let proj = self.project();
88 match proj.stream.poll_next(cx) {
89 Poll::Ready(Some(Ok(ok))) => {
90 (proj.f)(&ok);
91 Poll::Ready(Some(Ok(ok)))
92 }
93 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
94 Poll::Ready(None) => Poll::Ready(None),
95 Poll::Pending => Poll::Pending,
96 }
97 }
98
99 fn size_hint(&self) -> (usize, Option<usize>) {
100 self.stream.size_hint()
101 }
102}
103
104impl<St, F> FusedStream for InspectOk<St, F>
105where
106 St: TryStream,
107 F: FnMut(&St::Ok),
108 IntoStream<St>: FusedStream + tokio_stream::Stream,
109{
110 fn is_terminated(&self) -> bool {
111 self.stream.is_terminated()
112 }
113}
114
115#[cfg(feature = "sink")]
116use async_sink::Sink;
117#[cfg(feature = "sink")]
118impl<St, Item, F> Sink<Item> for InspectOk<St, F>
119where
120 St: TryStream + Sink<Item>,
121 F: FnMut(&<St as crate::try_stream::TryStream>::Ok),
122{
123 type Error = <St as async_sink::Sink<Item>>::Error;
124
125 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
126 let this = unsafe { self.get_unchecked_mut() };
127 let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
128 into_stream.get_pin_mut().poll_ready(cx)
129 }
130
131 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
132 let this = unsafe { self.get_unchecked_mut() };
133 let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
134 into_stream.get_pin_mut().start_send(item)
135 }
136
137 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 let this = unsafe { self.get_unchecked_mut() };
139 let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
140 into_stream.get_pin_mut().poll_flush(cx)
141 }
142
143 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144 let this = unsafe { self.get_unchecked_mut() };
145 let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
146 into_stream.get_pin_mut().poll_close(cx)
147 }
148}