rxrust/ops/
merge.rs

1use crate::next_proxy_impl;
2use crate::prelude::*;
3use std::cell::RefCell;
4use std::rc::Rc;
5use std::sync::{Arc, Mutex};
6
7#[derive(Clone)]
8pub struct MergeOp<S1, S2> {
9  pub(crate) source1: S1,
10  pub(crate) source2: S2,
11}
12
13impl<S1, S2> Observable for MergeOp<S1, S2>
14where
15  S1: Observable,
16  S2: Observable<Item = S1::Item, Err = S1::Err>,
17{
18  type Item = S1::Item;
19  type Err = S1::Err;
20}
21
22impl<'a, S1, S2> LocalObservable<'a> for MergeOp<S1, S2>
23where
24  S1: LocalObservable<'a>,
25  S2: LocalObservable<'a, Item = S1::Item, Err = S1::Err>,
26{
27  type Unsub = LocalSubscription;
28
29  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
30    self,
31    subscriber: Subscriber<O, LocalSubscription>,
32  ) -> Self::Unsub {
33    let subscription = subscriber.subscription;
34    let merge_observer = Rc::new(RefCell::new(MergeObserver {
35      observer: subscriber.observer,
36      subscription: subscription.clone(),
37      completed_one: false,
38    }));
39    subscription.add(self.source1.actual_subscribe(Subscriber {
40      observer: merge_observer.clone(),
41      subscription: LocalSubscription::default(),
42    }));
43    subscription.add(self.source2.actual_subscribe(Subscriber {
44      observer: merge_observer,
45      subscription: LocalSubscription::default(),
46    }));
47    subscription
48  }
49}
50
51impl<S1, S2> SharedObservable for MergeOp<S1, S2>
52where
53  S1: SharedObservable,
54  S2: SharedObservable<Item = S1::Item, Err = S1::Err, Unsub = S1::Unsub>,
55  S1::Unsub: Send + Sync,
56{
57  type Unsub = SharedSubscription;
58  fn actual_subscribe<
59    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
60  >(
61    self,
62    subscriber: Subscriber<O, SharedSubscription>,
63  ) -> Self::Unsub {
64    let subscription = subscriber.subscription;
65    let merge_observer = Arc::new(Mutex::new(MergeObserver {
66      observer: subscriber.observer,
67      subscription: subscription.clone(),
68      completed_one: false,
69    }));
70    subscription.add(self.source1.actual_subscribe(Subscriber {
71      observer: merge_observer.clone(),
72      subscription: SharedSubscription::default(),
73    }));
74    subscription.add(self.source2.actual_subscribe(Subscriber {
75      observer: merge_observer,
76      subscription: SharedSubscription::default(),
77    }));
78    subscription
79  }
80}
81
82#[derive(Clone)]
83pub struct MergeObserver<O, Unsub> {
84  observer: O,
85  subscription: Unsub,
86  completed_one: bool,
87}
88
89impl<Item, Err, O, Unsub> Observer for MergeObserver<O, Unsub>
90where
91  O: Observer<Item = Item, Err = Err>,
92  Unsub: SubscriptionLike,
93{
94  type Item = Item;
95  type Err = Err;
96  next_proxy_impl!(Item, observer);
97  fn error(&mut self, err: Err) {
98    self.observer.error(err);
99    self.subscription.unsubscribe();
100  }
101
102  fn complete(&mut self) {
103    if self.completed_one {
104      self.observer.complete();
105    } else {
106      self.completed_one = true;
107    }
108  }
109
110  #[inline]
111  fn is_stopped(&self) -> bool { self.observer.is_stopped() }
112}
113
114#[cfg(test)]
115mod test {
116  use crate::prelude::*;
117  use std::sync::{
118    atomic::{AtomicBool, Ordering},
119    Arc, Mutex,
120  };
121
122  #[test]
123  fn odd_even_merge() {
124    // three collection to store streams emissions
125    let mut odd_store = vec![];
126    let mut even_store = vec![];
127    let mut numbers_store = vec![];
128
129    {
130      let mut numbers = LocalSubject::new();
131      // enabling multiple observers for even stream;
132      let even = numbers.clone().filter(|v| *v % 2 == 0);
133      // enabling multiple observers for odd stream;
134      let odd = numbers.clone().filter(|v| *v % 2 != 0);
135
136      // merge odd and even stream again
137      let merged = even.clone().merge(odd.clone());
138
139      //  attach observers
140      merged.subscribe(|v| numbers_store.push(v));
141      odd.subscribe(|v| odd_store.push(v));
142      even.subscribe(|v| even_store.push(v));
143
144      (0..10).for_each(|v| {
145        numbers.next(v);
146      });
147    }
148    assert_eq!(even_store, vec![0, 2, 4, 6, 8]);
149    assert_eq!(odd_store, vec![1, 3, 5, 7, 9]);
150    assert_eq!(numbers_store, (0..10).collect::<Vec<_>>());
151  }
152
153  #[test]
154  fn merge_unsubscribe_work() {
155    let mut numbers = LocalSubject::new();
156    // enabling multiple observers for even stream;
157    let even = numbers.clone().filter(|v| *v % 2 == 0);
158    // enabling multiple observers for odd stream;
159    let odd = numbers.clone().filter(|v| *v % 2 != 0);
160
161    even
162      .merge(odd)
163      .subscribe(|_| unreachable!("oh, unsubscribe not work."))
164      .unsubscribe();
165
166    numbers.next(&1);
167  }
168
169  #[test]
170  fn completed_test() {
171    let completed = Arc::new(AtomicBool::new(false));
172    let c_clone = completed.clone();
173    let mut even = LocalSubject::new();
174    let mut odd = LocalSubject::new();
175
176    even.clone().merge(odd.clone()).subscribe_complete(
177      |_: &()| {},
178      move || completed.store(true, Ordering::Relaxed),
179    );
180
181    even.complete();
182    assert!(!c_clone.load(Ordering::Relaxed));
183    odd.complete();
184    assert!(c_clone.load(Ordering::Relaxed));
185    c_clone.store(false, Ordering::Relaxed);
186    even.complete();
187    assert!(!c_clone.load(Ordering::Relaxed));
188  }
189
190  #[test]
191  fn error_test() {
192    let completed = Arc::new(Mutex::new(0));
193    let cc = completed.clone();
194    let error = Arc::new(Mutex::new(0));
195    let ec = error.clone();
196    let mut even = LocalSubject::new();
197    let mut odd = LocalSubject::new();
198
199    even.clone().merge(odd.clone()).subscribe_all(
200      |_: ()| {},
201      move |_| *error.lock().unwrap() += 1,
202      move || *completed.lock().unwrap() += 1,
203    );
204
205    odd.error("");
206    even.clone().error("");
207    even.complete();
208
209    // if error occur,  stream terminated.
210    assert_eq!(*cc.lock().unwrap(), 0);
211    // error should be hit just once
212    assert_eq!(*ec.lock().unwrap(), 1);
213  }
214
215  #[test]
216  fn merge_fork() {
217    let o = observable::create(|mut s| {
218      s.next(1);
219      s.next(2);
220      s.error(());
221    });
222
223    let m = o.clone().merge(o.clone());
224    m.clone().merge(m.clone()).subscribe(|_| {});
225  }
226
227  #[test]
228  fn merge_local_and_shared() {
229    let mut res = vec![];
230    let shared = observable::of(1);
231    let local = observable::of(2);
232
233    shared.merge(local).into_shared().subscribe(move |v| {
234      res.push(v);
235    });
236  }
237
238  #[test]
239  fn bench() { do_bench(); }
240
241  benchmark_group!(do_bench, bench_merge);
242
243  fn bench_merge(b: &mut bencher::Bencher) { b.iter(odd_even_merge); }
244}