another_rxrust/operators/
merge.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Merge<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 observables: Vec<Observable<'a, Item>>,
10}
11
12impl<'a, Item> Merge<'a, Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn new(observables: &[Observable<'a, Item>]) -> Merge<'a, Item> {
17 Merge { observables: observables.to_vec() }
18 }
19 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
20 let observables = self.observables.clone();
21 Observable::<Item>::create(move |s| {
22 let sctl = StreamController::new(s);
23
24 let mut sbs = {
26 let sctl = sctl.clone();
27 Vec::from_iter(
28 (0..(observables.len() + 1)).map(move |_| {
29 let sctl_next = sctl.clone();
30 let sctl_error = sctl.clone();
31 let sctl_complete = sctl.clone();
32
33 sctl.new_observer(
34 move |_, x| {
35 sctl_next.sink_next(x);
36 },
37 move |_, e| {
38 sctl_error.sink_error(e);
39 },
40 move |serial| sctl_complete.sink_complete(&serial),
41 )
42 }),
43 )
44 };
45
46 source.inner_subscribe(sbs.pop().unwrap());
47 observables.iter().for_each(|o| {
48 o.inner_subscribe(sbs.pop().unwrap());
49 });
50 })
51 }
52}
53
54impl<'a, Item> Observable<'a, Item>
55where
56 Item: Clone + Send + Sync,
57{
58 pub fn merge(
59 &self,
60 observables: &[Observable<'a, Item>],
61 ) -> Observable<'a, Item> {
62 Merge::new(observables).execute(self.clone())
63 }
64}
65
66#[cfg(all(test, not(feature = "web")))]
67mod test {
68 use crate::prelude::*;
69 use std::{thread, time};
70
71 #[test]
72 fn basic() {
73 fn ob(len: usize, maker: &'static str) -> Observable<String> {
74 observables::from_iter(0..len)
75 .map(move |x| format!("{} - {} / {}", maker, x + 1, len))
76 }
77
78 ob(5, "#1")
79 .merge(&[ob(3, "#2"), ob(2, "#3"), ob(6, "#4")])
80 .subscribe(
81 print_next_fmt!("{}"),
82 print_error!(),
83 print_complete!(),
84 );
85 }
86
87 #[test]
88 fn thread() {
89 fn ob(len: usize, maker: &'static str) -> Observable<String> {
90 observables::interval(
91 time::Duration::from_millis(100),
92 schedulers::new_thread_scheduler(),
93 )
94 .take(len)
95 .map(move |x| format!("{} - {} / {}", maker, x + 1, len))
96 }
97
98 ob(5, "#1")
99 .merge(&[ob(3, "#2"), ob(2, "#3"), ob(6, "#4")])
100 .subscribe(
101 print_next_fmt!("{}"),
102 print_error!(),
103 print_complete!(),
104 );
105
106 thread::sleep(time::Duration::from_millis(1500));
107 }
108}