combine_latest/
stream_combiners.rs

1use std::future;
2
3use either::Either;
4use futures_core::Stream;
5use futures_util::{stream::select, StreamExt};
6
7/// Combines two streams into a new stream that always contains the latest items from both streams
8/// as a tuple. This stream won't yield a tuple until both input streams have yielded at least one
9/// item each.
10pub fn combine_latest<T1: Clone, T2: Clone>(
11    s1: impl Stream<Item = T1>,
12    s2: impl Stream<Item = T2>,
13) -> impl Stream<Item = (T1, T2)> {
14    combine_latest_opt(s1, s2).filter_map(|(v1, v2)| future::ready(v1.zip(v2)))
15}
16
17#[deprecated(since = "1.1.0", note = "Use combine_latest_opt instead")]
18pub fn combine_latest_optional<T1: Clone, T2: Clone>(
19    s1: impl Stream<Item = T1>,
20    s2: impl Stream<Item = T2>,
21) -> impl Stream<Item = (Option<T1>, Option<T2>)> {
22    combine_latest_opt(s1, s2)
23}
24
25/// Combines two streams into a new stream, yielding tuples of `(Option<T1>, Option<T2>)`. The
26/// stream starts yielding tuples as soon as one of the input streams yields an item, and the one
27/// that has not yet yielded has a corresponding `None` in its field of the tuple.
28pub fn combine_latest_opt<T1: Clone, T2: Clone>(
29    s1: impl Stream<Item = T1>,
30    s2: impl Stream<Item = T2>,
31) -> impl Stream<Item = (Option<T1>, Option<T2>)> {
32    let mut current1 = None;
33    let mut current2 = None;
34    select(s1.map(Either::Left), s2.map(Either::Right)).map(move |t1_or_t2| {
35        match t1_or_t2 {
36            Either::Left(t1) => current1 = Some(t1),
37            Either::Right(t2) => current2 = Some(t2),
38        };
39        (current1.clone(), current2.clone())
40    })
41}
42
43/// Combines two streams into a new stream and apply the given function to each item. The function
44/// takes references as arguments, so unlike `combine_latest` the types T1 and T2 don't have to
45/// implement `Clone`. The returned stream won't yield until both streams have yielded at least one
46/// item each.
47pub fn map_latest<T1, T2, U>(
48    s1: impl Stream<Item = T1>,
49    s2: impl Stream<Item = T2>,
50    mut f: impl for<'a, 'b> FnMut(&'a T1, &'b T2) -> U,
51) -> impl Stream<Item = U> {
52    let mut current1 = None;
53    let mut current2 = None;
54
55    select(s1.map(Either::Left), s2.map(Either::Right)).filter_map(move |t1_or_t2| {
56        match t1_or_t2 {
57            Either::Left(t1) => current1 = Some(t1),
58            Either::Right(t2) => current2 = Some(t2),
59        };
60        future::ready(
61            current1
62                .as_ref()
63                .zip(current2.as_ref())
64                .map(|args| f(args.0, args.1)),
65        )
66    })
67}
68
69/// Combines two streams into a new stream and apply the given function to each item. The function
70/// takes references as arguments, so unlike `combine_latest` the types T1 and T2 don't have to
71/// implement `Clone`. The returned stream will yield as soon as one input stream yields.
72pub fn map_latest_opt<T1, T2, U>(
73    s1: impl Stream<Item = T1>,
74    s2: impl Stream<Item = T2>,
75    mut f: impl for<'a, 'b> FnMut(Option<&'a T1>, Option<&'b T2>) -> U,
76) -> impl Stream<Item = U> {
77    let mut current1 = None;
78    let mut current2 = None;
79
80    select(s1.map(Either::Left), s2.map(Either::Right)).map(move |t1_or_t2| {
81        match t1_or_t2 {
82            Either::Left(t1) => current1 = Some(t1),
83            Either::Right(t2) => current2 = Some(t2),
84        };
85        f(current1.as_ref(), current2.as_ref())
86    })
87}