tokio_stream_util/try_stream/ext/
try_skip_while.rs1#[cfg(feature = "sink")]
2use async_sink::Sink;
3use core::fmt;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use futures_core::future::TryFuture;
7use tokio_stream::Stream;
8
9use super::{FusedStream, TryStream};
10
11#[must_use = "streams do nothing unless polled"]
14pub struct TrySkipWhile<St, Fut, F>
15where
16 St: TryStream,
17{
18 stream: St,
19 f: F,
20 pending_fut: Option<Fut>,
21 pending_item: Option<St::Ok>,
22 done_skipping: bool,
23}
24
25impl<St, Fut, F> Unpin for TrySkipWhile<St, Fut, F>
26where
27 St: TryStream + Unpin,
28 Fut: Unpin,
29{
30}
31
32impl<St, Fut, F> fmt::Debug for TrySkipWhile<St, Fut, F>
33where
34 St: TryStream + fmt::Debug,
35 St::Ok: fmt::Debug,
36 Fut: fmt::Debug,
37{
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("TrySkipWhile")
40 .field("stream", &self.stream)
41 .field("pending_fut", &self.pending_fut)
42 .field("pending_item", &self.pending_item)
43 .field("done_skipping", &self.done_skipping)
44 .finish()
45 }
46}
47
48impl<St, Fut, F> TrySkipWhile<St, Fut, F>
49where
50 St: TryStream,
51 F: FnMut(&St::Ok) -> Fut,
52 Fut: TryFuture<Ok = bool, Error = St::Error>,
53{
54 pub(super) fn new(stream: St, f: F) -> Self {
55 Self {
56 stream,
57 f,
58 pending_fut: None,
59 pending_item: None,
60 done_skipping: false,
61 }
62 }
63
64 pub fn get_ref(&self) -> &St {
67 &self.stream
68 }
69
70 pub fn get_mut(&mut self) -> &mut St {
76 &mut self.stream
77 }
78
79 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
85 unsafe { self.map_unchecked_mut(|s| &mut s.stream) }
86 }
87
88 pub fn into_inner(self) -> St {
93 self.stream
94 }
95}
96
97impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>
98where
99 St: TryStream,
100 F: FnMut(&St::Ok) -> Fut,
101 Fut: TryFuture<Ok = bool, Error = St::Error>,
102{
103 type Item = Result<St::Ok, St::Error>;
104
105 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106 let this = unsafe { self.get_unchecked_mut() };
107
108 if this.done_skipping {
109 let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
110 return stream.try_poll_next(cx);
111 }
112
113 loop {
114 if this.pending_fut.is_some() {
115 let mut fut = unsafe { Pin::new_unchecked(this.pending_fut.as_mut().unwrap()) };
116 let skipped = match fut.as_mut().try_poll(cx) {
117 Poll::Ready(Ok(skipped)) => skipped,
118 Poll::Ready(Err(e)) => {
119 this.done_skipping = true;
120 this.pending_fut = None;
121 this.pending_item = None;
122 return Poll::Ready(Some(Err(e)));
123 }
124 Poll::Pending => return Poll::Pending,
125 };
126
127 this.pending_fut = None;
128 let item = this.pending_item.take();
129
130 if !skipped {
131 this.done_skipping = true;
132 return Poll::Ready(item.map(Ok));
133 }
134 } else {
135 let mut stream = unsafe { Pin::new_unchecked(&mut this.stream) };
136 match stream.as_mut().try_poll_next(cx) {
137 Poll::Ready(Some(Ok(item))) => {
138 this.pending_fut = Some((this.f)(&item));
139 this.pending_item = Some(item);
140 }
141 Poll::Ready(Some(Err(e))) => {
142 this.done_skipping = true;
143 return Poll::Ready(Some(Err(e)));
144 }
145 Poll::Ready(None) => {
146 this.done_skipping = true;
147 return Poll::Ready(None);
148 }
149 Poll::Pending => return Poll::Pending,
150 }
151 }
152 }
153 }
154
155 fn size_hint(&self) -> (usize, Option<usize>) {
156 let pending_len = usize::from(self.pending_item.is_some());
157 let (_, upper) = self.stream.size_hint();
158 let upper = match upper {
159 Some(x) => x.checked_add(pending_len),
160 None => None,
161 };
162 (0, upper) }
164}
165
166impl<St, Fut, F> FusedStream for TrySkipWhile<St, Fut, F>
167where
168 St: TryStream + FusedStream,
169 F: FnMut(&St::Ok) -> Fut,
170 Fut: TryFuture<Ok = bool, Error = St::Error>,
171{
172 fn is_terminated(&self) -> bool {
173 self.pending_item.is_none() && self.stream.is_terminated()
174 }
175}
176
177#[cfg(feature = "sink")]
179impl<St, Fut, F, Item, E> Sink<Item> for TrySkipWhile<St, Fut, F>
180where
181 E: core::error::Error,
182 St: TryStream + Sink<Item, Error = E>,
183 Fut: TryFuture,
184{
185 type Error = E;
186
187 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
188 unsafe { self.map_unchecked_mut(|s| &mut s.stream) }.poll_ready(cx)
189 }
190
191 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
192 unsafe { self.map_unchecked_mut(|s| &mut s.stream) }.start_send(item)
193 }
194
195 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
196 unsafe { self.map_unchecked_mut(|s| &mut s.stream) }.poll_flush(cx)
197 }
198
199 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
200 unsafe { self.map_unchecked_mut(|s| &mut s.stream) }.poll_close(cx)
201 }
202}