1#![no_std]
7#![warn(
8 missing_debug_implementations,
9 missing_docs,
10 rust_2018_idioms,
11 semicolon_in_expressions_from_macros,
12 unreachable_pub,
13 unused_import_braces,
14 unused_qualifications,
15 clippy::branches_sharing_code,
16 clippy::cloned_instead_of_copied,
17 clippy::dbg_macro,
18 clippy::empty_line_after_outer_attr,
19 clippy::inefficient_to_string,
20 clippy::macro_use_imports,
21 clippy::map_flatten,
22 clippy::mod_module_files,
23 clippy::mut_mut,
24 clippy::nonstandard_macro_braces,
25 clippy::semicolon_if_nothing_returned,
26 clippy::str_to_string,
27 clippy::todo,
28 clippy::unreadable_literal,
29 clippy::unseparated_literal_suffix,
30 clippy::wildcard_imports
31)]
32
33use core::{
34 mem,
35 pin::Pin,
36 task::{ready, Context, Poll},
37};
38
39#[cfg(feature = "alloc")]
40extern crate alloc;
41
42#[cfg(feature = "alloc")]
43use alloc::vec::Vec;
44use futures_core::Stream;
45use pin_project_lite::pin_project;
46
47pub trait StreamExt: Stream + Sized {
49 fn dedup(self) -> Dedup<Self>
57 where
58 Self::Item: Clone + PartialEq,
59 {
60 Dedup::new(self)
61 }
62
63 fn dedup_by_key<T, F>(self, key_fn: F) -> DedupByKey<Self, T, F>
66 where
67 T: PartialEq,
68 F: FnMut(&Self::Item) -> T,
69 {
70 DedupByKey::new(self, key_fn)
71 }
72
73 #[cfg(feature = "alloc")]
85 fn batch_with<S>(self, batch_done_stream: S) -> BatchWith<Self, S>
86 where
87 S: Stream<Item = ()>,
88 {
89 BatchWith::new(self, batch_done_stream)
90 }
91
92 fn switch(self) -> Switch<Self>
100 where
101 Self::Item: Stream,
102 {
103 Switch::new(self)
104 }
105}
106
107impl<S: Stream> StreamExt for S {}
108
109pin_project! {
110 pub struct Dedup<S: Stream> {
112 #[pin]
113 inner: S,
114 prev_item: Option<S::Item>,
115 }
116}
117
118impl<S: Stream> Dedup<S> {
119 fn new(inner: S) -> Self {
120 Self { inner, prev_item: None }
121 }
122}
123
124impl<S> Stream for Dedup<S>
125where
126 S: Stream,
127 S::Item: Clone + PartialEq,
128{
129 type Item = S::Item;
130 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
131 let mut this = self.project();
132 let next = loop {
133 let opt = ready!(this.inner.as_mut().poll_next(cx));
134 match opt {
135 Some(item) => {
136 if this.prev_item.as_ref() != Some(&item) {
137 *this.prev_item = Some(item.clone());
138 break Some(item);
139 }
140 }
141 None => break None,
142 }
143 };
144 Poll::Ready(next)
145 }
146}
147
148pin_project! {
149 pub struct DedupByKey<S, T, F> {
151 #[pin]
152 inner: S,
153 key_fn: F,
154 prev_key: Option<T>,
155 }
156}
157
158impl<S, T, F> DedupByKey<S, T, F> {
159 fn new(inner: S, key_fn: F) -> Self {
160 Self { inner, key_fn, prev_key: None }
161 }
162}
163
164impl<S, T, F> Stream for DedupByKey<S, T, F>
165where
166 S: Stream,
167 T: PartialEq,
168 F: FnMut(&S::Item) -> T,
169{
170 type Item = S::Item;
171
172 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
173 let mut this = self.project();
174 let next = loop {
175 let opt = ready!(this.inner.as_mut().poll_next(cx));
176 match opt {
177 Some(item) => {
178 let key = (this.key_fn)(&item);
179 if this.prev_key.as_ref() != Some(&key) {
180 *this.prev_key = Some(key);
181 break Some(item);
182 }
183 }
184 None => break None,
185 }
186 };
187 Poll::Ready(next)
188 }
189}
190
191#[cfg(feature = "alloc")]
192pin_project! {
193 pub struct BatchWith<S1: Stream, S2> {
195 #[pin]
196 primary_stream: S1,
197 #[pin]
198 batch_done_stream: S2,
199 batch: Vec<S1::Item>,
200 }
201}
202
203#[cfg(feature = "alloc")]
204impl<S1: Stream, S2> BatchWith<S1, S2> {
205 fn new(primary_stream: S1, batch_done_stream: S2) -> Self {
206 Self { primary_stream, batch_done_stream, batch: Vec::new() }
207 }
208}
209
210#[cfg(feature = "alloc")]
211impl<S1, S2> Stream for BatchWith<S1, S2>
212where
213 S1: Stream,
214 S2: Stream<Item = ()>,
215{
216 type Item = Vec<S1::Item>;
217
218 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
219 let mut this = self.project();
220 loop {
221 match this.primary_stream.as_mut().poll_next(cx) {
222 Poll::Ready(Some(item)) => this.batch.push(item),
224 Poll::Ready(None) => {
226 let has_pending_items = !this.batch.is_empty();
227 return Poll::Ready(has_pending_items.then(|| mem::take(this.batch)));
228 }
229 Poll::Pending => break,
231 }
232 }
233
234 ready!(this.batch_done_stream.poll_next(cx));
236
237 if this.batch.is_empty() {
239 Poll::Pending
241 } else {
242 Poll::Ready(Some(mem::take(this.batch)))
244 }
245 }
246}
247
248pin_project! {
249 pub struct Switch<S: Stream> {
251 #[pin]
252 outer_stream: S,
253 #[pin]
254 state: SwitchState<S::Item>,
255 }
256}
257
258pin_project! {
259 #[project = SwitchStateProj]
260 enum SwitchState<S> {
261 None,
262 Some {
263 #[pin]
264 inner_stream: S,
265 }
266 }
267}
268
269impl<S: Stream> Switch<S> {
270 fn new(outer_stream: S) -> Self {
271 Self { outer_stream, state: SwitchState::None }
272 }
273}
274
275impl<S> Stream for Switch<S>
276where
277 S: Stream,
278 S::Item: Stream,
279{
280 type Item = <S::Item as Stream>::Item;
281
282 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
283 let mut this = self.project();
284
285 let mut outer_stream_closed = false;
286 while let Poll::Ready(ready) = this.outer_stream.as_mut().poll_next(cx) {
287 match ready {
288 Some(inner_stream) => {
289 this.state.set(SwitchState::Some { inner_stream });
290 }
291 None => {
292 outer_stream_closed = true;
293 break;
294 }
295 }
296 }
297
298 match this.state.project() {
299 SwitchStateProj::None => {
301 if outer_stream_closed {
302 Poll::Ready(None)
303 } else {
304 Poll::Pending
305 }
306 }
307 SwitchStateProj::Some { inner_stream } => match inner_stream.poll_next(cx) {
309 Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
311 Poll::Ready(None) if outer_stream_closed => Poll::Ready(None),
313 Poll::Ready(None) | Poll::Pending => Poll::Pending,
315 },
316 }
317 }
318}