tokio_stream_util/try_stream/ext/inspect/
inspect_ok.rs

1use crate::try_stream::IntoStream;
2
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7use crate::{FusedStream, TryStream};
8
9/// Stream for the [`inspect_ok`](crate::TryStreamExt::inspect_ok) method.
10pub struct InspectOk<St, F> {
11    stream: IntoStream<St>,
12    f: F,
13}
14
15pub(crate) struct InspectOkProj<'pin, St: 'pin, F: 'pin> {
16    pub stream: Pin<&'pin mut IntoStream<St>>,
17    pub f: &'pin mut F,
18}
19
20impl<St, F> InspectOk<St, F>
21where
22    St: TryStream,
23    F: FnMut(&St::Ok),
24{
25    /// Construct a new `InspectOk` wrapper.
26    pub fn new(stream: St, f: F) -> Self {
27        let stream = IntoStream::new(stream);
28        Self { stream, f }
29    }
30
31    /// Return a mutable reference to the underlying original stream `St`.
32    pub fn get_mut(&mut self) -> &mut St {
33        // Map/Inspect public API provide get_mut that returns &mut inner stream.
34        // First get mut reference to the inner `Inspect<IntoStream<St>, _>`,
35        // then to the `IntoStream<St>`, then to `St`.
36        self.stream.get_mut()
37    }
38
39    pub(crate) fn project<'pin>(self: Pin<&'pin mut Self>) -> InspectOkProj<'pin, St, F> {
40        unsafe {
41            let this = self.get_unchecked_mut();
42            InspectOkProj {
43                stream: Pin::new_unchecked(&mut this.stream),
44                f: &mut this.f,
45            }
46        }
47    }
48
49    /// Return a pinned mutable reference to the underlying original stream `St`.
50    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
51        self.project().stream.get_pin_mut()
52    }
53
54    /// Consume wrapper and return the underlying original stream `St`.
55    pub fn into_inner(self) -> St {
56        self.stream.into_inner()
57    }
58}
59
60//
61// Debug impls (delegate to inner types' Debug where available)
62//
63
64impl<St, F> fmt::Debug for InspectOk<St, F>
65where
66    F: fmt::Debug,
67    St: fmt::Debug,
68{
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        fmt::Debug::fmt(&self.stream, f)?;
71        fmt::Debug::fmt(&self.f, f)
72    }
73}
74
75//
76// Stream + FusedStream delegations
77//
78
79impl<St, F> tokio_stream::Stream for InspectOk<St, F>
80where
81    St: TryStream,
82    F: FnMut(&<St as crate::try_stream::TryStream>::Ok),
83{
84    type Item = Result<St::Ok, St::Error>;
85
86    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        let proj = self.project();
88        match proj.stream.poll_next(cx) {
89            Poll::Ready(Some(Ok(ok))) => {
90                (proj.f)(&ok);
91                Poll::Ready(Some(Ok(ok)))
92            }
93            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
94            Poll::Ready(None) => Poll::Ready(None),
95            Poll::Pending => Poll::Pending,
96        }
97    }
98
99    fn size_hint(&self) -> (usize, Option<usize>) {
100        self.stream.size_hint()
101    }
102}
103
104impl<St, F> FusedStream for InspectOk<St, F>
105where
106    St: TryStream,
107    F: FnMut(&St::Ok),
108    IntoStream<St>: FusedStream + tokio_stream::Stream,
109{
110    fn is_terminated(&self) -> bool {
111        self.stream.is_terminated()
112    }
113}
114
115#[cfg(feature = "sink")]
116use async_sink::Sink;
117#[cfg(feature = "sink")]
118impl<St, Item, F> Sink<Item> for InspectOk<St, F>
119where
120    St: TryStream + Sink<Item>,
121    F: FnMut(&<St as crate::try_stream::TryStream>::Ok),
122{
123    type Error = <St as async_sink::Sink<Item>>::Error;
124
125    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
126        let this = unsafe { self.get_unchecked_mut() };
127        let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
128        into_stream.get_pin_mut().poll_ready(cx)
129    }
130
131    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
132        let this = unsafe { self.get_unchecked_mut() };
133        let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
134        into_stream.get_pin_mut().start_send(item)
135    }
136
137    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        let this = unsafe { self.get_unchecked_mut() };
139        let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
140        into_stream.get_pin_mut().poll_flush(cx)
141    }
142
143    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144        let this = unsafe { self.get_unchecked_mut() };
145        let into_stream = unsafe { Pin::new_unchecked(&mut this.stream) };
146        into_stream.get_pin_mut().poll_close(cx)
147    }
148}