tokio_stream_util/try_stream/ext/
try_skip_while.rs

1#[cfg(feature = "sink")]
2use async_sink::Sink;
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use futures_core::future::TryFuture;
7use tokio_stream::Stream;
8
9use super::{FusedStream, TryStream};
10
11/// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
12/// method.
13#[must_use = "streams do nothing unless polled"]
14pub struct TrySkipWhile<St, Fut, F>
15where
16    St: TryStream,
17{
18    stream: St,
19    f: F,
20    pending_fut: Option<Fut>,
21    pending_item: Option<St::Ok>,
22    done_skipping: bool,
23}
24
25impl<St, Fut, F> Unpin for TrySkipWhile<St, Fut, F>
26where
27    St: TryStream + Unpin,
28    Fut: Unpin,
29{
30}
31
32impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F>
33where
34    St: TryStream + fmt::Debug,
35    St::Ok: fmt::Debug,
36    Fut: fmt::Debug,
37{
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("TrySkipWhile")
40            .field("stream", &self.stream)
41            .field("pending_fut", &self.pending_fut)
42            .field("pending_item", &self.pending_item)
43            .field("done_skipping", &self.done_skipping)
44            .finish()
45    }
46}
47
48impl<St, Fut, F> TrySkipWhile<St, Fut, F>
49where
50    St: TryStream,
51    F: FnMut(&St::Ok) -> Fut,
52    Fut: TryFuture<Ok = bool, Error = St::Error>,
53{
54    pub(super) fn new(stream: St, f: F) -> Self {
55        Self {
56            stream,
57            f,
58            pending_fut: None,
59            pending_item: None,
60            done_skipping: false,
61        }
62    }
63
64    /// Acquires a reference to the underlying stream that this combinator is
65    /// pulling from.
66    pub fn get_ref(&self) -> &St {
67        &self.stream
68    }
69
70    /// Acquires a mutable reference to the underlying stream that this
71    /// combinator is pulling from.
72    ///
73    /// Note that care must be taken to avoid tampering with the state of the
74    /// stream which may otherwise confuse this combinator.
75    pub fn get_mut(&mut self) -> &mut St {
76        &mut self.stream
77    }
78
79    /// Acquires a pinned mutable reference to the underlying stream that this
80    /// combinator is pulling from.
81    ///
82    /// Note that care must be taken to avoid tampering with the state of the
83    /// stream which may otherwise confuse this combinator.
84    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
85        unsafe { self.map_unchecked_mut(|s| &mut s.stream) }
86    }
87
88    /// Consumes this combinator, returning the underlying stream.
89    ///
90    /// Note that this may discard intermediate state of this combinator, so
91    /// care should be taken to avoid losing resources when this is called.
92    pub fn into_inner(self) -> St {
93        self.stream
94    }
95}
96
97impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
98where
99    St: TryStream,
100    F: FnMut(&St::Ok) -> Fut,
101    Fut: TryFuture<Ok = bool, Error = St::Error>,
102{
103    type Item = Result<St::Ok, St::Error>;
104
105    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106        let this = unsafe { self.get_unchecked_mut() };
107
108        if this.done_skipping {
109            let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
110            return stream.try_poll_next(cx);
111        }
112
113        loop {
114            if this.pending_fut.is_some() {
115                let mut fut = unsafe { Pin::new_unchecked(this.pending_fut.as_mut().unwrap()) };
116                let skipped = match fut.as_mut().try_poll(cx) {
117                    Poll::Ready(Ok(skipped)) => skipped,
118                    Poll::Ready(Err(e)) => {
119                        this.done_skipping = true;
120                        this.pending_fut = None;
121                        this.pending_item = None;
122                        return Poll::Ready(Some(Err(e)));
123                    }
124                    Poll::Pending => return Poll::Pending,
125                };
126
127                this.pending_fut = None;
128                let item = this.pending_item.take();
129
130                if !skipped {
131                    this.done_skipping = true;
132                    return Poll::Ready(item.map(Ok));
133                }
134            } else {
135                let mut stream = unsafe { Pin::new_unchecked(&mut this.stream) };
136                match stream.as_mut().try_poll_next(cx) {
137                    Poll::Ready(Some(Ok(item))) => {
138                        this.pending_fut = Some((this.f)(&item));
139                        this.pending_item = Some(item);
140                    }
141                    Poll::Ready(Some(Err(e))) => {
142                        this.done_skipping = true;
143                        return Poll::Ready(Some(Err(e)));
144                    }
145                    Poll::Ready(None) => {
146                        this.done_skipping = true;
147                        return Poll::Ready(None);
148                    }
149                    Poll::Pending => return Poll::Pending,
150                }
151            }
152        }
153    }
154
155    fn size_hint(&self) -> (usize, Option<usize>) {
156        let pending_len = usize::from(self.pending_item.is_some());
157        let (_, upper) = self.stream.size_hint();
158        let upper = match upper {
159            Some(x) => x.checked_add(pending_len),
160            None => None,
161        };
162        (0, upper) // can't know a lower bound, due to the predicate
163    }
164}
165
166impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F>
167where
168    St: TryStream + FusedStream,
169    F: FnMut(&St::Ok) -> Fut,
170    Fut: TryFuture<Ok = bool, Error = St::Error>,
171{
172    fn is_terminated(&self) -> bool {
173        self.pending_item.is_none() && self.stream.is_terminated()
174    }
175}
176
177// Forwarding impl of Sink from the underlying stream
178#[cfg(feature = "sink")]
179impl<St, Fut, F, Item, E> Sink<Item> for TrySkipWhile<St, Fut, F>
180where
181    E: core::error::Error,
182    St: TryStream + Sink<Item, Error = E>,
183    Fut: TryFuture,
184{
185    type Error = E;
186
187    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
188        unsafe { self.map_unchecked_mut(|s| &mut s.stream) }.poll_ready(cx)
189    }
190
191    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
192        unsafe { self.map_unchecked_mut(|s| &mut s.stream) }.start_send(item)
193    }
194
195    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
196        unsafe { self.map_unchecked_mut(|s| &mut s.stream) }.poll_flush(cx)
197    }
198
199    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200        unsafe { self.map_unchecked_mut(|s| &mut s.stream) }.poll_close(cx)
201    }
202}