another_rxrust/operators/
merge.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Merge<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  observables: Vec<Observable<'a, Item>>,
10}
11
12impl<'a, Item> Merge<'a, Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn new(observables: &[Observable<'a, Item>]) -> Merge<'a, Item> {
17    Merge { observables: observables.to_vec() }
18  }
19  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
20    let observables = self.observables.clone();
21    Observable::<Item>::create(move |s| {
22      let sctl = StreamController::new(s);
23
24      // prepare subscribers
25      let mut sbs = {
26        let sctl = sctl.clone();
27        Vec::from_iter(
28          (0..(observables.len() + 1)).map(move |_| {
29            let sctl_next = sctl.clone();
30            let sctl_error = sctl.clone();
31            let sctl_complete = sctl.clone();
32
33            sctl.new_observer(
34              move |_, x| {
35                sctl_next.sink_next(x);
36              },
37              move |_, e| {
38                sctl_error.sink_error(e);
39              },
40              move |serial| sctl_complete.sink_complete(&serial),
41            )
42          }),
43        )
44      };
45
46      source.inner_subscribe(sbs.pop().unwrap());
47      observables.iter().for_each(|o| {
48        o.inner_subscribe(sbs.pop().unwrap());
49      });
50    })
51  }
52}
53
54impl<'a, Item> Observable<'a, Item>
55where
56  Item: Clone + Send + Sync,
57{
58  pub fn merge(
59    &self,
60    observables: &[Observable<'a, Item>],
61  ) -> Observable<'a, Item> {
62    Merge::new(observables).execute(self.clone())
63  }
64}
65
66#[cfg(all(test, not(feature = "web")))]
67mod test {
68  use crate::prelude::*;
69  use std::{thread, time};
70
71  #[test]
72  fn basic() {
73    fn ob(len: usize, maker: &'static str) -> Observable<String> {
74      observables::from_iter(0..len)
75        .map(move |x| format!("{} - {} / {}", maker, x + 1, len))
76    }
77
78    ob(5, "#1")
79      .merge(&[ob(3, "#2"), ob(2, "#3"), ob(6, "#4")])
80      .subscribe(
81        print_next_fmt!("{}"),
82        print_error!(),
83        print_complete!(),
84      );
85  }
86
87  #[test]
88  fn thread() {
89    fn ob(len: usize, maker: &'static str) -> Observable<String> {
90      observables::interval(
91        time::Duration::from_millis(100),
92        schedulers::new_thread_scheduler(),
93      )
94      .take(len)
95      .map(move |x| format!("{} - {} / {}", maker, x + 1, len))
96    }
97
98    ob(5, "#1")
99      .merge(&[ob(3, "#2"), ob(2, "#3"), ob(6, "#4")])
100      .subscribe(
101        print_next_fmt!("{}"),
102        print_error!(),
103        print_complete!(),
104      );
105
106    thread::sleep(time::Duration::from_millis(1500));
107  }
108}