Skip to main content

async_rx/
lib.rs

1//! Utility functions for async reactive programming.
2//!
3//! This crate is intentionally very small as it only provides utilities that
4//! are not already found in `futures-util`. It is meant as a supplement, not a
5//! replacement for the existing well-known futures crates.
6#![no_std]
7#![warn(
8    missing_debug_implementations,
9    missing_docs,
10    rust_2018_idioms,
11    semicolon_in_expressions_from_macros,
12    unreachable_pub,
13    unused_import_braces,
14    unused_qualifications,
15    clippy::branches_sharing_code,
16    clippy::cloned_instead_of_copied,
17    clippy::dbg_macro,
18    clippy::empty_line_after_outer_attr,
19    clippy::inefficient_to_string,
20    clippy::macro_use_imports,
21    clippy::map_flatten,
22    clippy::mod_module_files,
23    clippy::mut_mut,
24    clippy::nonstandard_macro_braces,
25    clippy::semicolon_if_nothing_returned,
26    clippy::str_to_string,
27    clippy::todo,
28    clippy::unreadable_literal,
29    clippy::unseparated_literal_suffix,
30    clippy::wildcard_imports
31)]
32
33use core::{
34    mem,
35    pin::Pin,
36    task::{ready, Context, Poll},
37};
38
39#[cfg(feature = "alloc")]
40extern crate alloc;
41
42#[cfg(feature = "alloc")]
43use alloc::vec::Vec;
44use futures_core::{FusedStream, Stream};
45use pin_project_lite::pin_project;
46
47/// Extensions to the [`Stream`] trait.
48pub trait StreamExt: Stream + Sized {
49    /// Deduplicate consecutive identical items.
50    ///
51    /// To be able to immediately yield items of the underlying stream once it
52    /// is produced, but still compare them to the next ones, `Dedup` keeps a
53    /// clone of the value that was produced last. If cloning the inner value
54    /// is expensive but only part of it is used for comparison, you can use
55    /// [`dedup_by_key`][Self::dedup_by_key] as a more efficient alternative.
56    fn dedup(self) -> Dedup<Self>
57    where
58        Self::Item: Clone + PartialEq,
59    {
60        Dedup::new(self)
61    }
62
63    /// Deduplicate consecutive items that the given function produces the same
64    /// key for.
65    fn dedup_by_key<T, F>(self, key_fn: F) -> DedupByKey<Self, T, F>
66    where
67        T: PartialEq,
68        F: FnMut(&Self::Item) -> T,
69    {
70        DedupByKey::new(self, key_fn)
71    }
72
73    /// Buffer the items from `self` until `batch_done_stream` produces a value,
74    /// and return all buffered values in one batch.
75    ///
76    /// `batch_done_stream` is polled after all ready items from `self` has been
77    /// read.
78    ///
79    /// Examples for possible `batch_done_stream`s:
80    ///
81    /// - `futures_channel::mpsc::Receiver<()>`
82    /// - `tokio_stream::wrappers::IntervalStream` with its item type mapped to
83    ///   `()` using `.map(|_| ())` (`use tokio_stream::StreamExt` for `map`)
84    #[cfg(feature = "alloc")]
85    fn batch_with<S>(self, batch_done_stream: S) -> BatchWith<Self, S>
86    where
87        S: Stream<Item = ()>,
88    {
89        BatchWith::new(self, batch_done_stream)
90    }
91
92    /// Flattens a stream of streams by always keeping one inner stream and
93    /// yielding its items until the outer stream produces a new inner stream,
94    /// at which point the inner stream to yield items from is switched to the
95    /// new one.
96    ///
97    /// Equivalent to RxJS'es
98    /// [`switchAll`](https://rxjs.dev/api/index/function/switchAll).
99    fn switch(self) -> Switch<Self>
100    where
101        Self: FusedStream,
102        Self::Item: Stream,
103    {
104        Switch::new(self)
105    }
106}
107
108impl<S: Stream> StreamExt for S {}
109
110pin_project! {
111    /// Stream adapter produced by [`StreamExt::dedup`].
112    pub struct Dedup<S: Stream> {
113        #[pin]
114        inner: S,
115        prev_item: Option<S::Item>,
116    }
117}
118
119impl<S: Stream> Dedup<S> {
120    fn new(inner: S) -> Self {
121        Self { inner, prev_item: None }
122    }
123}
124
125impl<S> Stream for Dedup<S>
126where
127    S: Stream,
128    S::Item: Clone + PartialEq,
129{
130    type Item = S::Item;
131    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
132        let mut this = self.project();
133        let next = loop {
134            let opt = ready!(this.inner.as_mut().poll_next(cx));
135            match opt {
136                Some(item) => {
137                    if this.prev_item.as_ref() != Some(&item) {
138                        *this.prev_item = Some(item.clone());
139                        break Some(item);
140                    }
141                }
142                None => break None,
143            }
144        };
145        Poll::Ready(next)
146    }
147}
148
149pin_project! {
150    /// Stream adapter produced by [`StreamExt::dedup_by_key`].
151    pub struct DedupByKey<S, T, F> {
152        #[pin]
153        inner: S,
154        key_fn: F,
155        prev_key: Option<T>,
156    }
157}
158
159impl<S, T, F> DedupByKey<S, T, F> {
160    fn new(inner: S, key_fn: F) -> Self {
161        Self { inner, key_fn, prev_key: None }
162    }
163}
164
165impl<S, T, F> Stream for DedupByKey<S, T, F>
166where
167    S: Stream,
168    T: PartialEq,
169    F: FnMut(&S::Item) -> T,
170{
171    type Item = S::Item;
172
173    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
174        let mut this = self.project();
175        let next = loop {
176            let opt = ready!(this.inner.as_mut().poll_next(cx));
177            match opt {
178                Some(item) => {
179                    let key = (this.key_fn)(&item);
180                    if this.prev_key.as_ref() != Some(&key) {
181                        *this.prev_key = Some(key);
182                        break Some(item);
183                    }
184                }
185                None => break None,
186            }
187        };
188        Poll::Ready(next)
189    }
190}
191
192#[cfg(feature = "alloc")]
193pin_project! {
194    /// Stream adapter produced by [`StreamExt::batch_with`].
195    pub struct BatchWith<S1: Stream, S2> {
196        #[pin]
197        primary_stream: S1,
198        #[pin]
199        batch_done_stream: S2,
200        batch: Vec<S1::Item>,
201    }
202}
203
204#[cfg(feature = "alloc")]
205impl<S1: Stream, S2> BatchWith<S1, S2> {
206    fn new(primary_stream: S1, batch_done_stream: S2) -> Self {
207        Self { primary_stream, batch_done_stream, batch: Vec::new() }
208    }
209}
210
211#[cfg(feature = "alloc")]
212impl<S1, S2> Stream for BatchWith<S1, S2>
213where
214    S1: Stream,
215    S2: Stream<Item = ()>,
216{
217    type Item = Vec<S1::Item>;
218
219    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
220        let mut this = self.project();
221        loop {
222            match this.primary_stream.as_mut().poll_next(cx) {
223                // Primary stream produced a new item
224                Poll::Ready(Some(item)) => this.batch.push(item),
225                // Primary stream is closed, don't wait for batch_done_stream
226                Poll::Ready(None) => {
227                    let has_pending_items = !this.batch.is_empty();
228                    return Poll::Ready(has_pending_items.then(|| mem::take(this.batch)));
229                }
230                // Primary stream is pending (and this task is scheduled for wakeup on new items)
231                Poll::Pending => break,
232            }
233        }
234
235        // Primary stream is pending, check the batch_done_stream
236        ready!(this.batch_done_stream.poll_next(cx));
237
238        // batch_done_stream produced an item …
239        if this.batch.is_empty() {
240            // … but we didn't queue any items from the primary stream.
241            Poll::Pending
242        } else {
243            // … and we have some queued items.
244            Poll::Ready(Some(mem::take(this.batch)))
245        }
246    }
247}
248
249pin_project! {
250    /// Stream adapter produced by [`StreamExt::switch`].
251    pub struct Switch<S: Stream> {
252        #[pin]
253        outer_stream: S,
254        #[pin]
255        state: Option<S::Item>,
256    }
257}
258
259impl<S> Switch<S>
260where
261    S: FusedStream,
262{
263    fn new(outer_stream: S) -> Self {
264        Self { outer_stream, state: None }
265    }
266}
267
268impl<S> Stream for Switch<S>
269where
270    S: FusedStream,
271    S::Item: Stream,
272{
273    type Item = <S::Item as Stream>::Item;
274
275    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
276        let mut this = self.project();
277
278        let mut outer_stream_is_closed = this.outer_stream.is_terminated();
279
280        if !outer_stream_is_closed {
281            while let Poll::Ready(ready) = this.outer_stream.as_mut().poll_next(cx) {
282                match ready {
283                    Some(inner_stream) => {
284                        this.state.set(Some(inner_stream));
285                    }
286                    None => {
287                        outer_stream_is_closed = true;
288                        break;
289                    }
290                }
291            }
292        }
293
294        match this.state.as_mut().as_pin_mut() {
295            // No inner stream has been produced yet.
296            None => {
297                if outer_stream_is_closed {
298                    Poll::Ready(None)
299                } else {
300                    Poll::Pending
301                }
302            }
303            // An inner stream exists => poll it.
304            Some(inner_stream) => match inner_stream.poll_next(cx) {
305                // Inner stream produced an item.
306                Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
307                // Both inner and outer stream are closed.
308                Poll::Ready(None) if outer_stream_is_closed => Poll::Ready(None),
309                // Only inner stream is closed, or inner stream is pending.
310                Poll::Ready(None) | Poll::Pending => Poll::Pending,
311            },
312        }
313    }
314}