async_rx/
lib.rs

1//! Utility functions for async reactive programming.
2//!
3//! This crate is intentionally very small as it only provides utilities that
4//! are not already found in `futures-util`. It is meant as a supplement, not a
5//! replacement for the existing well-known futures crates.
6#![no_std]
7#![warn(
8    missing_debug_implementations,
9    missing_docs,
10    rust_2018_idioms,
11    semicolon_in_expressions_from_macros,
12    unreachable_pub,
13    unused_import_braces,
14    unused_qualifications,
15    clippy::branches_sharing_code,
16    clippy::cloned_instead_of_copied,
17    clippy::dbg_macro,
18    clippy::empty_line_after_outer_attr,
19    clippy::inefficient_to_string,
20    clippy::macro_use_imports,
21    clippy::map_flatten,
22    clippy::mod_module_files,
23    clippy::mut_mut,
24    clippy::nonstandard_macro_braces,
25    clippy::semicolon_if_nothing_returned,
26    clippy::str_to_string,
27    clippy::todo,
28    clippy::unreadable_literal,
29    clippy::unseparated_literal_suffix,
30    clippy::wildcard_imports
31)]
32
33use core::{
34    mem,
35    pin::Pin,
36    task::{ready, Context, Poll},
37};
38
39#[cfg(feature = "alloc")]
40extern crate alloc;
41
42#[cfg(feature = "alloc")]
43use alloc::vec::Vec;
44use futures_core::Stream;
45use pin_project_lite::pin_project;
46
47/// Extensions to the [`Stream`] trait.
48pub trait StreamExt: Stream + Sized {
49    /// Deduplicate consecutive identical items.
50    ///
51    /// To be able to immediately yield items of the underlying stream once it
52    /// is produced, but still compare them to the next ones, `Dedup` keeps a
53    /// clone of the value that was produced last. If cloning the inner value
54    /// is expensive but only part of it is used for comparison, you can use
55    /// [`dedup_by_key`][Self::dedup_by_key] as a more efficient alternative.
56    fn dedup(self) -> Dedup<Self>
57    where
58        Self::Item: Clone + PartialEq,
59    {
60        Dedup::new(self)
61    }
62
63    /// Deduplicate consecutive items that the given function produces the same
64    /// key for.
65    fn dedup_by_key<T, F>(self, key_fn: F) -> DedupByKey<Self, T, F>
66    where
67        T: PartialEq,
68        F: FnMut(&Self::Item) -> T,
69    {
70        DedupByKey::new(self, key_fn)
71    }
72
73    /// Buffer the items from `self` until `batch_done_stream` produces a value,
74    /// and return all buffered values in one batch.
75    ///
76    /// `batch_done_stream` is polled after all ready items from `self` has been
77    /// read.
78    ///
79    /// Examples for possible `batch_done_stream`s:
80    ///
81    /// - `futures_channel::mpsc::Receiver<()>`
82    /// - `tokio_stream::wrappers::IntervalStream` with its item type mapped to
83    ///   `()` using `.map(|_| ())` (`use tokio_stream::StreamExt` for `map`)
84    #[cfg(feature = "alloc")]
85    fn batch_with<S>(self, batch_done_stream: S) -> BatchWith<Self, S>
86    where
87        S: Stream<Item = ()>,
88    {
89        BatchWith::new(self, batch_done_stream)
90    }
91
92    /// Flattens a stream of streams by always keeping one inner stream and
93    /// yielding its items until the outer stream produces a new inner stream,
94    /// at which point the inner stream to yield items from is switched to the
95    /// new one.
96    ///
97    /// Equivalent to RxJS'es
98    /// [`switchAll`](https://rxjs.dev/api/index/function/switchAll).
99    fn switch(self) -> Switch<Self>
100    where
101        Self::Item: Stream,
102    {
103        Switch::new(self)
104    }
105}
106
107impl<S: Stream> StreamExt for S {}
108
109pin_project! {
110    /// Stream adapter produced by [`StreamExt::dedup`].
111    pub struct Dedup<S: Stream> {
112        #[pin]
113        inner: S,
114        prev_item: Option<S::Item>,
115    }
116}
117
118impl<S: Stream> Dedup<S> {
119    fn new(inner: S) -> Self {
120        Self { inner, prev_item: None }
121    }
122}
123
124impl<S> Stream for Dedup<S>
125where
126    S: Stream,
127    S::Item: Clone + PartialEq,
128{
129    type Item = S::Item;
130    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
131        let mut this = self.project();
132        let next = loop {
133            let opt = ready!(this.inner.as_mut().poll_next(cx));
134            match opt {
135                Some(item) => {
136                    if this.prev_item.as_ref() != Some(&item) {
137                        *this.prev_item = Some(item.clone());
138                        break Some(item);
139                    }
140                }
141                None => break None,
142            }
143        };
144        Poll::Ready(next)
145    }
146}
147
148pin_project! {
149    /// Stream adapter produced by [`StreamExt::dedup_by_key`].
150    pub struct DedupByKey<S, T, F> {
151        #[pin]
152        inner: S,
153        key_fn: F,
154        prev_key: Option<T>,
155    }
156}
157
158impl<S, T, F> DedupByKey<S, T, F> {
159    fn new(inner: S, key_fn: F) -> Self {
160        Self { inner, key_fn, prev_key: None }
161    }
162}
163
164impl<S, T, F> Stream for DedupByKey<S, T, F>
165where
166    S: Stream,
167    T: PartialEq,
168    F: FnMut(&S::Item) -> T,
169{
170    type Item = S::Item;
171
172    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
173        let mut this = self.project();
174        let next = loop {
175            let opt = ready!(this.inner.as_mut().poll_next(cx));
176            match opt {
177                Some(item) => {
178                    let key = (this.key_fn)(&item);
179                    if this.prev_key.as_ref() != Some(&key) {
180                        *this.prev_key = Some(key);
181                        break Some(item);
182                    }
183                }
184                None => break None,
185            }
186        };
187        Poll::Ready(next)
188    }
189}
190
191#[cfg(feature = "alloc")]
192pin_project! {
193    /// Stream adapter produced by [`StreamExt::batch_with`].
194    pub struct BatchWith<S1: Stream, S2> {
195        #[pin]
196        primary_stream: S1,
197        #[pin]
198        batch_done_stream: S2,
199        batch: Vec<S1::Item>,
200    }
201}
202
203#[cfg(feature = "alloc")]
204impl<S1: Stream, S2> BatchWith<S1, S2> {
205    fn new(primary_stream: S1, batch_done_stream: S2) -> Self {
206        Self { primary_stream, batch_done_stream, batch: Vec::new() }
207    }
208}
209
210#[cfg(feature = "alloc")]
211impl<S1, S2> Stream for BatchWith<S1, S2>
212where
213    S1: Stream,
214    S2: Stream<Item = ()>,
215{
216    type Item = Vec<S1::Item>;
217
218    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
219        let mut this = self.project();
220        loop {
221            match this.primary_stream.as_mut().poll_next(cx) {
222                // Primary stream produced a new item
223                Poll::Ready(Some(item)) => this.batch.push(item),
224                // Primary stream is closed, don't wait for batch_done_stream
225                Poll::Ready(None) => {
226                    let has_pending_items = !this.batch.is_empty();
227                    return Poll::Ready(has_pending_items.then(|| mem::take(this.batch)));
228                }
229                // Primary stream is pending (and this task is scheduled for wakeup on new items)
230                Poll::Pending => break,
231            }
232        }
233
234        // Primary stream is pending, check the batch_done_stream
235        ready!(this.batch_done_stream.poll_next(cx));
236
237        // batch_done_stream produced an item …
238        if this.batch.is_empty() {
239            // … but we didn't queue any items from the primary stream.
240            Poll::Pending
241        } else {
242            // … and we have some queued items.
243            Poll::Ready(Some(mem::take(this.batch)))
244        }
245    }
246}
247
248pin_project! {
249    /// Stream adapter produced by [`StreamExt::switch`].
250    pub struct Switch<S: Stream> {
251        #[pin]
252        outer_stream: S,
253        #[pin]
254        state: SwitchState<S::Item>,
255    }
256}
257
258pin_project! {
259    #[project = SwitchStateProj]
260    enum SwitchState<S> {
261        None,
262        Some {
263            #[pin]
264            inner_stream: S,
265        }
266    }
267}
268
269impl<S: Stream> Switch<S> {
270    fn new(outer_stream: S) -> Self {
271        Self { outer_stream, state: SwitchState::None }
272    }
273}
274
275impl<S> Stream for Switch<S>
276where
277    S: Stream,
278    S::Item: Stream,
279{
280    type Item = <S::Item as Stream>::Item;
281
282    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
283        let mut this = self.project();
284
285        let mut outer_stream_closed = false;
286        while let Poll::Ready(ready) = this.outer_stream.as_mut().poll_next(cx) {
287            match ready {
288                Some(inner_stream) => {
289                    this.state.set(SwitchState::Some { inner_stream });
290                }
291                None => {
292                    outer_stream_closed = true;
293                    break;
294                }
295            }
296        }
297
298        match this.state.project() {
299            // No inner stream has been produced yet.
300            SwitchStateProj::None => {
301                if outer_stream_closed {
302                    Poll::Ready(None)
303                } else {
304                    Poll::Pending
305                }
306            }
307            // An inner stream exists => poll it.
308            SwitchStateProj::Some { inner_stream } => match inner_stream.poll_next(cx) {
309                // Inner stream produced an item.
310                Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
311                // Both inner and outer stream are closed.
312                Poll::Ready(None) if outer_stream_closed => Poll::Ready(None),
313                // Only inner stream is closed, or inner stream is pending.
314                Poll::Ready(None) | Poll::Pending => Poll::Pending,
315            },
316        }
317    }
318}