another_rxrust/operators/
combine_latest.rs1use crate::internals::function_wrapper::FunctionWrapper;
2use crate::internals::stream_controller::*;
3use crate::prelude::*;
4
5#[derive(Clone)]
6pub struct CombineLatest<'a, Item, Out>
7where
8 Item: Clone + Send + Sync,
9 Out: Clone + Send + Sync,
10{
11 zip_op: operators::Zip<'a, Item>,
12 combine_f: FunctionWrapper<'a, Vec<Item>, Out>,
13}
14
15impl<'a, Item, Out> CombineLatest<'a, Item, Out>
16where
17 Item: Clone + Send + Sync,
18 Out: Clone + Send + Sync,
19{
20 pub fn new<F>(
21 observables: &[Observable<'a, Item>],
22 f: F,
23 ) -> CombineLatest<'a, Item, Out>
24 where
25 F: Fn(Vec<Item>) -> Out + Send + Sync + 'a,
26 {
27 CombineLatest {
28 zip_op: operators::Zip::new(observables),
29 combine_f: FunctionWrapper::new(f),
30 }
31 }
32 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Out> {
33 let zip_op = self.zip_op.clone();
34 let combine_f = self.combine_f.clone();
35
36 Observable::create(move |s| {
37 let source = source.clone();
38 let combine_f = combine_f.clone();
39
40 let sctl = StreamController::new(s);
41
42 let sctl_next = sctl.clone();
43 let sctl_error = sctl.clone();
44 let sctl_complete = sctl.clone();
45
46 zip_op.execute(source).inner_subscribe(sctl.new_observer(
47 move |_, x| {
48 sctl_next.sink_next(combine_f.call(x));
49 },
50 move |_, e| {
51 sctl_error.sink_error(e);
52 },
53 move |serial| {
54 sctl_complete.sink_complete(&serial);
55 },
56 ));
57 })
58 }
59}
60
61impl<'a, Item> Observable<'a, Item>
62where
63 Item: Clone + Send + Sync,
64{
65 pub fn combine_latest<Out, F>(
66 &self,
67 observables: &[Observable<'a, Item>],
68 f: F,
69 ) -> Observable<'a, Out>
70 where
71 Out: Clone + Send + Sync,
72 F: Fn(Vec<Item>) -> Out + Send + Sync + 'a,
73 {
74 CombineLatest::new(observables, f).execute(self.clone())
75 }
76}
77
78#[cfg(all(test, not(feature = "web")))]
79mod test {
80 use crate::prelude::*;
81 use std::{thread, time};
82
83 #[test]
84 fn basic() {
85 observables::from_iter(0..10)
86 .combine_latest(
87 &[
88 observables::from_iter(10..20),
89 observables::from_iter(20..30),
90 ],
91 |v| format!("function {:?}", v),
92 )
93 .subscribe(
94 print_next_fmt!("{:?}"),
95 print_error!(),
96 print_complete!(),
97 );
98 }
99
100 #[test]
101 fn thread() {
102 observables::from_iter(0..10)
103 .observe_on(schedulers::new_thread_scheduler())
104 .combine_latest(
105 &[
106 observables::from_iter(10..20)
107 .observe_on(schedulers::new_thread_scheduler()),
108 observables::from_iter(20..30)
109 .observe_on(schedulers::new_thread_scheduler()),
110 ],
111 |v| format!("function {:?}", v),
112 )
113 .subscribe(
114 print_next_fmt!("{:?}"),
115 print_error!(),
116 print_complete!(),
117 );
118 thread::sleep(time::Duration::from_millis(1000));
119 }
120}