1use crate::next_proxy_impl;
2use crate::prelude::*;
3use std::cell::RefCell;
4use std::rc::Rc;
5use std::sync::{Arc, Mutex};
6
7#[derive(Clone)]
8pub struct MergeOp<S1, S2> {
9 pub(crate) source1: S1,
10 pub(crate) source2: S2,
11}
12
13impl<S1, S2> Observable for MergeOp<S1, S2>
14where
15 S1: Observable,
16 S2: Observable<Item = S1::Item, Err = S1::Err>,
17{
18 type Item = S1::Item;
19 type Err = S1::Err;
20}
21
22impl<'a, S1, S2> LocalObservable<'a> for MergeOp<S1, S2>
23where
24 S1: LocalObservable<'a>,
25 S2: LocalObservable<'a, Item = S1::Item, Err = S1::Err>,
26{
27 type Unsub = LocalSubscription;
28
29 fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
30 self,
31 subscriber: Subscriber<O, LocalSubscription>,
32 ) -> Self::Unsub {
33 let subscription = subscriber.subscription;
34 let merge_observer = Rc::new(RefCell::new(MergeObserver {
35 observer: subscriber.observer,
36 subscription: subscription.clone(),
37 completed_one: false,
38 }));
39 subscription.add(self.source1.actual_subscribe(Subscriber {
40 observer: merge_observer.clone(),
41 subscription: LocalSubscription::default(),
42 }));
43 subscription.add(self.source2.actual_subscribe(Subscriber {
44 observer: merge_observer,
45 subscription: LocalSubscription::default(),
46 }));
47 subscription
48 }
49}
50
51impl<S1, S2> SharedObservable for MergeOp<S1, S2>
52where
53 S1: SharedObservable,
54 S2: SharedObservable<Item = S1::Item, Err = S1::Err, Unsub = S1::Unsub>,
55 S1::Unsub: Send + Sync,
56{
57 type Unsub = SharedSubscription;
58 fn actual_subscribe<
59 O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
60 >(
61 self,
62 subscriber: Subscriber<O, SharedSubscription>,
63 ) -> Self::Unsub {
64 let subscription = subscriber.subscription;
65 let merge_observer = Arc::new(Mutex::new(MergeObserver {
66 observer: subscriber.observer,
67 subscription: subscription.clone(),
68 completed_one: false,
69 }));
70 subscription.add(self.source1.actual_subscribe(Subscriber {
71 observer: merge_observer.clone(),
72 subscription: SharedSubscription::default(),
73 }));
74 subscription.add(self.source2.actual_subscribe(Subscriber {
75 observer: merge_observer,
76 subscription: SharedSubscription::default(),
77 }));
78 subscription
79 }
80}
81
82#[derive(Clone)]
83pub struct MergeObserver<O, Unsub> {
84 observer: O,
85 subscription: Unsub,
86 completed_one: bool,
87}
88
89impl<Item, Err, O, Unsub> Observer for MergeObserver<O, Unsub>
90where
91 O: Observer<Item = Item, Err = Err>,
92 Unsub: SubscriptionLike,
93{
94 type Item = Item;
95 type Err = Err;
96 next_proxy_impl!(Item, observer);
97 fn error(&mut self, err: Err) {
98 self.observer.error(err);
99 self.subscription.unsubscribe();
100 }
101
102 fn complete(&mut self) {
103 if self.completed_one {
104 self.observer.complete();
105 } else {
106 self.completed_one = true;
107 }
108 }
109
110 #[inline]
111 fn is_stopped(&self) -> bool { self.observer.is_stopped() }
112}
113
114#[cfg(test)]
115mod test {
116 use crate::prelude::*;
117 use std::sync::{
118 atomic::{AtomicBool, Ordering},
119 Arc, Mutex,
120 };
121
122 #[test]
123 fn odd_even_merge() {
124 let mut odd_store = vec![];
126 let mut even_store = vec![];
127 let mut numbers_store = vec![];
128
129 {
130 let mut numbers = LocalSubject::new();
131 let even = numbers.clone().filter(|v| *v % 2 == 0);
133 let odd = numbers.clone().filter(|v| *v % 2 != 0);
135
136 let merged = even.clone().merge(odd.clone());
138
139 merged.subscribe(|v| numbers_store.push(v));
141 odd.subscribe(|v| odd_store.push(v));
142 even.subscribe(|v| even_store.push(v));
143
144 (0..10).for_each(|v| {
145 numbers.next(v);
146 });
147 }
148 assert_eq!(even_store, vec![0, 2, 4, 6, 8]);
149 assert_eq!(odd_store, vec![1, 3, 5, 7, 9]);
150 assert_eq!(numbers_store, (0..10).collect::<Vec<_>>());
151 }
152
153 #[test]
154 fn merge_unsubscribe_work() {
155 let mut numbers = LocalSubject::new();
156 let even = numbers.clone().filter(|v| *v % 2 == 0);
158 let odd = numbers.clone().filter(|v| *v % 2 != 0);
160
161 even
162 .merge(odd)
163 .subscribe(|_| unreachable!("oh, unsubscribe not work."))
164 .unsubscribe();
165
166 numbers.next(&1);
167 }
168
169 #[test]
170 fn completed_test() {
171 let completed = Arc::new(AtomicBool::new(false));
172 let c_clone = completed.clone();
173 let mut even = LocalSubject::new();
174 let mut odd = LocalSubject::new();
175
176 even.clone().merge(odd.clone()).subscribe_complete(
177 |_: &()| {},
178 move || completed.store(true, Ordering::Relaxed),
179 );
180
181 even.complete();
182 assert!(!c_clone.load(Ordering::Relaxed));
183 odd.complete();
184 assert!(c_clone.load(Ordering::Relaxed));
185 c_clone.store(false, Ordering::Relaxed);
186 even.complete();
187 assert!(!c_clone.load(Ordering::Relaxed));
188 }
189
190 #[test]
191 fn error_test() {
192 let completed = Arc::new(Mutex::new(0));
193 let cc = completed.clone();
194 let error = Arc::new(Mutex::new(0));
195 let ec = error.clone();
196 let mut even = LocalSubject::new();
197 let mut odd = LocalSubject::new();
198
199 even.clone().merge(odd.clone()).subscribe_all(
200 |_: ()| {},
201 move |_| *error.lock().unwrap() += 1,
202 move || *completed.lock().unwrap() += 1,
203 );
204
205 odd.error("");
206 even.clone().error("");
207 even.complete();
208
209 assert_eq!(*cc.lock().unwrap(), 0);
211 assert_eq!(*ec.lock().unwrap(), 1);
213 }
214
215 #[test]
216 fn merge_fork() {
217 let o = observable::create(|mut s| {
218 s.next(1);
219 s.next(2);
220 s.error(());
221 });
222
223 let m = o.clone().merge(o.clone());
224 m.clone().merge(m.clone()).subscribe(|_| {});
225 }
226
227 #[test]
228 fn merge_local_and_shared() {
229 let mut res = vec![];
230 let shared = observable::of(1);
231 let local = observable::of(2);
232
233 shared.merge(local).into_shared().subscribe(move |v| {
234 res.push(v);
235 });
236 }
237
238 #[test]
239 fn bench() { do_bench(); }
240
241 benchmark_group!(do_bench, bench_merge);
242
243 fn bench_merge(b: &mut bencher::Bencher) { b.iter(odd_even_merge); }
244}