another_rxrust/operators/
zip.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  collections::VecDeque,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct Zip<'a, Item>
10where
11  Item: Clone + Send + Sync,
12{
13  observables: Vec<Observable<'a, Item>>,
14}
15
16impl<'a, Item> Zip<'a, Item>
17where
18  Item: Clone + Send + Sync,
19{
20  pub fn new(observables: &[Observable<'a, Item>]) -> Zip<'a, Item> {
21    Zip { observables: observables.to_vec() }
22  }
23  pub fn execute(
24    &self,
25    source: Observable<'a, Item>,
26  ) -> Observable<'a, Vec<Item>> {
27    let observables = self.observables.clone();
28    Observable::<Vec<Item>>::create(move |s| {
29      let sctl = StreamController::new(s);
30
31      let results = Arc::new(RwLock::new({
32        let mut r = Vec::<VecDeque<Item>>::new();
33        (0..(observables.len() + 1)).for_each(|_| {
34          r.push(VecDeque::<Item>::new());
35        });
36        r
37      }));
38
39      let sctl_f = sctl.clone();
40      let results_f = Arc::clone(&results);
41      let register = move |id: &usize, item: Item| {
42        results_f
43          .write()
44          .unwrap()
45          .get_mut(id.clone())
46          .unwrap()
47          .push_back(item);
48
49        let re = Arc::clone(&results_f);
50        let get = move || {
51          let mut re = re.write().unwrap();
52          let filled = re.iter().filter(|x| x.len() > 0).count();
53          if filled == re.len() {
54            let mut v = Vec::new();
55            for items in re.iter_mut() {
56              v.push(items.pop_front().unwrap());
57            }
58            Some(v)
59          } else {
60            None
61          }
62        };
63        while let Some(items) = get() {
64          if !sctl_f.is_subscribed() {
65            break;
66          }
67          sctl_f.sink_next(items);
68        }
69      };
70
71      // prepare subscribers
72      let mut sbs = {
73        let sctl = sctl.clone();
74        VecDeque::from_iter(
75          (0..(observables.len() + 1)).map(move |id| {
76            let register = register.clone();
77            let sctl_error = sctl.clone();
78            let sctl_complete = sctl.clone();
79            sctl.new_observer(
80              move |_, x| register(&id, x),
81              move |_, e| {
82                sctl_error.sink_error(e);
83              },
84              move |serial| {
85                sctl_complete.sink_complete(&serial);
86              },
87            )
88          }),
89        )
90      };
91
92      source.inner_subscribe(sbs.pop_front().unwrap());
93      observables.iter().for_each(|o| {
94        o.inner_subscribe(sbs.pop_front().unwrap());
95      });
96    })
97  }
98}
99
100impl<'a, Item> Observable<'a, Item>
101where
102  Item: Clone + Send + Sync,
103{
104  pub fn zip(
105    &self,
106    observables: &[Observable<'a, Item>],
107  ) -> Observable<'a, Vec<Item>> {
108    Zip::new(observables).execute(self.clone())
109  }
110}
111
112#[cfg(all(test, not(feature = "web")))]
113mod test {
114  use crate::prelude::*;
115  use std::{thread, time};
116
117  #[test]
118  fn basic() {
119    let ob = observables::from_iter(0..10);
120
121    ob.zip(&[ob.map(|x| x + 10), ob.map(|x| x + 20)]).subscribe(
122      print_next_fmt!("{:?}"),
123      print_error!(),
124      print_complete!(),
125    );
126  }
127
128  #[test]
129  fn thread() {
130    observables::from_iter(0..10)
131      .observe_on(schedulers::new_thread_scheduler())
132      .zip(&[
133        observables::from_iter(10..20)
134          .observe_on(schedulers::new_thread_scheduler()),
135        observables::from_iter(20..30)
136          .observe_on(schedulers::new_thread_scheduler()),
137      ])
138      .subscribe(
139        print_next_fmt!("{:?}"),
140        print_error!(),
141        print_complete!(),
142      );
143    thread::sleep(time::Duration::from_millis(1000));
144  }
145}