another_rxrust/operators/
combine_latest.rs

1use crate::internals::function_wrapper::FunctionWrapper;
2use crate::internals::stream_controller::*;
3use crate::prelude::*;
4
5#[derive(Clone)]
6pub struct CombineLatest<'a, Item, Out>
7where
8  Item: Clone + Send + Sync,
9  Out: Clone + Send + Sync,
10{
11  zip_op: operators::Zip<'a, Item>,
12  combine_f: FunctionWrapper<'a, Vec<Item>, Out>,
13}
14
15impl<'a, Item, Out> CombineLatest<'a, Item, Out>
16where
17  Item: Clone + Send + Sync,
18  Out: Clone + Send + Sync,
19{
20  pub fn new<F>(
21    observables: &[Observable<'a, Item>],
22    f: F,
23  ) -> CombineLatest<'a, Item, Out>
24  where
25    F: Fn(Vec<Item>) -> Out + Send + Sync + 'a,
26  {
27    CombineLatest {
28      zip_op: operators::Zip::new(observables),
29      combine_f: FunctionWrapper::new(f),
30    }
31  }
32  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Out> {
33    let zip_op = self.zip_op.clone();
34    let combine_f = self.combine_f.clone();
35
36    Observable::create(move |s| {
37      let source = source.clone();
38      let combine_f = combine_f.clone();
39
40      let sctl = StreamController::new(s);
41
42      let sctl_next = sctl.clone();
43      let sctl_error = sctl.clone();
44      let sctl_complete = sctl.clone();
45
46      zip_op.execute(source).inner_subscribe(sctl.new_observer(
47        move |_, x| {
48          sctl_next.sink_next(combine_f.call(x));
49        },
50        move |_, e| {
51          sctl_error.sink_error(e);
52        },
53        move |serial| {
54          sctl_complete.sink_complete(&serial);
55        },
56      ));
57    })
58  }
59}
60
61impl<'a, Item> Observable<'a, Item>
62where
63  Item: Clone + Send + Sync,
64{
65  pub fn combine_latest<Out, F>(
66    &self,
67    observables: &[Observable<'a, Item>],
68    f: F,
69  ) -> Observable<'a, Out>
70  where
71    Out: Clone + Send + Sync,
72    F: Fn(Vec<Item>) -> Out + Send + Sync + 'a,
73  {
74    CombineLatest::new(observables, f).execute(self.clone())
75  }
76}
77
78#[cfg(all(test, not(feature = "web")))]
79mod test {
80  use crate::prelude::*;
81  use std::{thread, time};
82
83  #[test]
84  fn basic() {
85    observables::from_iter(0..10)
86      .combine_latest(
87        &[
88          observables::from_iter(10..20),
89          observables::from_iter(20..30),
90        ],
91        |v| format!("function {:?}", v),
92      )
93      .subscribe(
94        print_next_fmt!("{:?}"),
95        print_error!(),
96        print_complete!(),
97      );
98  }
99
100  #[test]
101  fn thread() {
102    observables::from_iter(0..10)
103      .observe_on(schedulers::new_thread_scheduler())
104      .combine_latest(
105        &[
106          observables::from_iter(10..20)
107            .observe_on(schedulers::new_thread_scheduler()),
108          observables::from_iter(20..30)
109            .observe_on(schedulers::new_thread_scheduler()),
110        ],
111        |v| format!("function {:?}", v),
112      )
113      .subscribe(
114        print_next_fmt!("{:?}"),
115        print_error!(),
116        print_complete!(),
117      );
118    thread::sleep(time::Duration::from_millis(1000));
119  }
120}