1#![no_std]
7#![warn(
8 missing_debug_implementations,
9 missing_docs,
10 rust_2018_idioms,
11 semicolon_in_expressions_from_macros,
12 unreachable_pub,
13 unused_import_braces,
14 unused_qualifications,
15 clippy::branches_sharing_code,
16 clippy::cloned_instead_of_copied,
17 clippy::dbg_macro,
18 clippy::empty_line_after_outer_attr,
19 clippy::inefficient_to_string,
20 clippy::macro_use_imports,
21 clippy::map_flatten,
22 clippy::mod_module_files,
23 clippy::mut_mut,
24 clippy::nonstandard_macro_braces,
25 clippy::semicolon_if_nothing_returned,
26 clippy::str_to_string,
27 clippy::todo,
28 clippy::unreadable_literal,
29 clippy::unseparated_literal_suffix,
30 clippy::wildcard_imports
31)]
32
33use core::{
34 mem,
35 pin::Pin,
36 task::{ready, Context, Poll},
37};
38
39#[cfg(feature = "alloc")]
40extern crate alloc;
41
42#[cfg(feature = "alloc")]
43use alloc::vec::Vec;
44use futures_core::{FusedStream, Stream};
45use pin_project_lite::pin_project;
46
47pub trait StreamExt: Stream + Sized {
49 fn dedup(self) -> Dedup<Self>
57 where
58 Self::Item: Clone + PartialEq,
59 {
60 Dedup::new(self)
61 }
62
63 fn dedup_by_key<T, F>(self, key_fn: F) -> DedupByKey<Self, T, F>
66 where
67 T: PartialEq,
68 F: FnMut(&Self::Item) -> T,
69 {
70 DedupByKey::new(self, key_fn)
71 }
72
73 #[cfg(feature = "alloc")]
85 fn batch_with<S>(self, batch_done_stream: S) -> BatchWith<Self, S>
86 where
87 S: Stream<Item = ()>,
88 {
89 BatchWith::new(self, batch_done_stream)
90 }
91
92 fn switch(self) -> Switch<Self>
100 where
101 Self: FusedStream,
102 Self::Item: Stream,
103 {
104 Switch::new(self)
105 }
106}
107
108impl<S: Stream> StreamExt for S {}
109
110pin_project! {
111 pub struct Dedup<S: Stream> {
113 #[pin]
114 inner: S,
115 prev_item: Option<S::Item>,
116 }
117}
118
119impl<S: Stream> Dedup<S> {
120 fn new(inner: S) -> Self {
121 Self { inner, prev_item: None }
122 }
123}
124
125impl<S> Stream for Dedup<S>
126where
127 S: Stream,
128 S::Item: Clone + PartialEq,
129{
130 type Item = S::Item;
131 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
132 let mut this = self.project();
133 let next = loop {
134 let opt = ready!(this.inner.as_mut().poll_next(cx));
135 match opt {
136 Some(item) => {
137 if this.prev_item.as_ref() != Some(&item) {
138 *this.prev_item = Some(item.clone());
139 break Some(item);
140 }
141 }
142 None => break None,
143 }
144 };
145 Poll::Ready(next)
146 }
147}
148
149pin_project! {
150 pub struct DedupByKey<S, T, F> {
152 #[pin]
153 inner: S,
154 key_fn: F,
155 prev_key: Option<T>,
156 }
157}
158
159impl<S, T, F> DedupByKey<S, T, F> {
160 fn new(inner: S, key_fn: F) -> Self {
161 Self { inner, key_fn, prev_key: None }
162 }
163}
164
165impl<S, T, F> Stream for DedupByKey<S, T, F>
166where
167 S: Stream,
168 T: PartialEq,
169 F: FnMut(&S::Item) -> T,
170{
171 type Item = S::Item;
172
173 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
174 let mut this = self.project();
175 let next = loop {
176 let opt = ready!(this.inner.as_mut().poll_next(cx));
177 match opt {
178 Some(item) => {
179 let key = (this.key_fn)(&item);
180 if this.prev_key.as_ref() != Some(&key) {
181 *this.prev_key = Some(key);
182 break Some(item);
183 }
184 }
185 None => break None,
186 }
187 };
188 Poll::Ready(next)
189 }
190}
191
192#[cfg(feature = "alloc")]
193pin_project! {
194 pub struct BatchWith<S1: Stream, S2> {
196 #[pin]
197 primary_stream: S1,
198 #[pin]
199 batch_done_stream: S2,
200 batch: Vec<S1::Item>,
201 }
202}
203
204#[cfg(feature = "alloc")]
205impl<S1: Stream, S2> BatchWith<S1, S2> {
206 fn new(primary_stream: S1, batch_done_stream: S2) -> Self {
207 Self { primary_stream, batch_done_stream, batch: Vec::new() }
208 }
209}
210
211#[cfg(feature = "alloc")]
212impl<S1, S2> Stream for BatchWith<S1, S2>
213where
214 S1: Stream,
215 S2: Stream<Item = ()>,
216{
217 type Item = Vec<S1::Item>;
218
219 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220 let mut this = self.project();
221 loop {
222 match this.primary_stream.as_mut().poll_next(cx) {
223 Poll::Ready(Some(item)) => this.batch.push(item),
225 Poll::Ready(None) => {
227 let has_pending_items = !this.batch.is_empty();
228 return Poll::Ready(has_pending_items.then(|| mem::take(this.batch)));
229 }
230 Poll::Pending => break,
232 }
233 }
234
235 ready!(this.batch_done_stream.poll_next(cx));
237
238 if this.batch.is_empty() {
240 Poll::Pending
242 } else {
243 Poll::Ready(Some(mem::take(this.batch)))
245 }
246 }
247}
248
249pin_project! {
250 pub struct Switch<S: Stream> {
252 #[pin]
253 outer_stream: S,
254 #[pin]
255 state: Option<S::Item>,
256 }
257}
258
259impl<S> Switch<S>
260where
261 S: FusedStream,
262{
263 fn new(outer_stream: S) -> Self {
264 Self { outer_stream, state: None }
265 }
266}
267
268impl<S> Stream for Switch<S>
269where
270 S: FusedStream,
271 S::Item: Stream,
272{
273 type Item = <S::Item as Stream>::Item;
274
275 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
276 let mut this = self.project();
277
278 let mut outer_stream_is_closed = this.outer_stream.is_terminated();
279
280 if !outer_stream_is_closed {
281 while let Poll::Ready(ready) = this.outer_stream.as_mut().poll_next(cx) {
282 match ready {
283 Some(inner_stream) => {
284 this.state.set(Some(inner_stream));
285 }
286 None => {
287 outer_stream_is_closed = true;
288 break;
289 }
290 }
291 }
292 }
293
294 match this.state.as_mut().as_pin_mut() {
295 None => {
297 if outer_stream_is_closed {
298 Poll::Ready(None)
299 } else {
300 Poll::Pending
301 }
302 }
303 Some(inner_stream) => match inner_stream.poll_next(cx) {
305 Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
307 Poll::Ready(None) if outer_stream_is_closed => Poll::Ready(None),
309 Poll::Ready(None) | Poll::Pending => Poll::Pending,
311 },
312 }
313 }
314}