combine_latest/
stream_combiners.rs1use std::future;
2
3use either::Either;
4use futures_core::Stream;
5use futures_util::{stream::select, StreamExt};
6
7pub fn combine_latest<T1: Clone, T2: Clone>(
11 s1: impl Stream<Item = T1>,
12 s2: impl Stream<Item = T2>,
13) -> impl Stream<Item = (T1, T2)> {
14 combine_latest_opt(s1, s2).filter_map(|(v1, v2)| future::ready(v1.zip(v2)))
15}
16
17#[deprecated(since = "1.1.0", note = "Use combine_latest_opt instead")]
18pub fn combine_latest_optional<T1: Clone, T2: Clone>(
19 s1: impl Stream<Item = T1>,
20 s2: impl Stream<Item = T2>,
21) -> impl Stream<Item = (Option<T1>, Option<T2>)> {
22 combine_latest_opt(s1, s2)
23}
24
25pub fn combine_latest_opt<T1: Clone, T2: Clone>(
29 s1: impl Stream<Item = T1>,
30 s2: impl Stream<Item = T2>,
31) -> impl Stream<Item = (Option<T1>, Option<T2>)> {
32 let mut current1 = None;
33 let mut current2 = None;
34 select(s1.map(Either::Left), s2.map(Either::Right)).map(move |t1_or_t2| {
35 match t1_or_t2 {
36 Either::Left(t1) => current1 = Some(t1),
37 Either::Right(t2) => current2 = Some(t2),
38 };
39 (current1.clone(), current2.clone())
40 })
41}
42
43pub fn map_latest<T1, T2, U>(
48 s1: impl Stream<Item = T1>,
49 s2: impl Stream<Item = T2>,
50 mut f: impl for<'a, 'b> FnMut(&'a T1, &'b T2) -> U,
51) -> impl Stream<Item = U> {
52 let mut current1 = None;
53 let mut current2 = None;
54
55 select(s1.map(Either::Left), s2.map(Either::Right)).filter_map(move |t1_or_t2| {
56 match t1_or_t2 {
57 Either::Left(t1) => current1 = Some(t1),
58 Either::Right(t2) => current2 = Some(t2),
59 };
60 future::ready(
61 current1
62 .as_ref()
63 .zip(current2.as_ref())
64 .map(|args| f(args.0, args.1)),
65 )
66 })
67}
68
69pub fn map_latest_opt<T1, T2, U>(
73 s1: impl Stream<Item = T1>,
74 s2: impl Stream<Item = T2>,
75 mut f: impl for<'a, 'b> FnMut(Option<&'a T1>, Option<&'b T2>) -> U,
76) -> impl Stream<Item = U> {
77 let mut current1 = None;
78 let mut current2 = None;
79
80 select(s1.map(Either::Left), s2.map(Either::Right)).map(move |t1_or_t2| {
81 match t1_or_t2 {
82 Either::Left(t1) => current1 = Some(t1),
83 Either::Right(t2) => current2 = Some(t2),
84 };
85 f(current1.as_ref(), current2.as_ref())
86 })
87}