1use crate::prelude::*;
2use crate::{complete_proxy_impl, error_proxy_impl, is_stopped_proxy_impl};
3use std::cell::RefCell;
4use std::collections::VecDeque;
5use std::rc::Rc;
6use std::sync::{Arc, Mutex};
7
8#[derive(Clone)]
13pub struct ZipOp<A, B> {
14 pub(crate) a: A,
15 pub(crate) b: B,
16}
17
18impl<A, B> Observable for ZipOp<A, B>
19where
20 A: Observable,
21 B: Observable<Err = A::Err>,
22{
23 type Item = (A::Item, B::Item);
24 type Err = A::Err;
25}
26
27impl<'a, A, B> LocalObservable<'a> for ZipOp<A, B>
28where
29 A: LocalObservable<'a>,
30 B: LocalObservable<'a, Err = A::Err>,
31 A::Item: 'a,
32 B::Item: 'a,
33{
34 type Unsub = LocalSubscription;
35 fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
36 self,
37 subscriber: Subscriber<O, LocalSubscription>,
38 ) -> Self::Unsub {
39 let sub = subscriber.subscription;
40 let o_zip = ZipObserver::new(subscriber.observer, sub.clone());
41 let o_zip = Rc::new(RefCell::new(o_zip));
42 sub.add(self.a.actual_subscribe(Subscriber {
43 observer: AObserver(o_zip.clone(), TypeHint::new()),
44 subscription: LocalSubscription::default(),
45 }));
46
47 sub.add(self.b.actual_subscribe(Subscriber {
48 observer: BObserver(o_zip, TypeHint::new()),
49 subscription: LocalSubscription::default(),
50 }));
51 sub
52 }
53}
54
55impl<A, B> SharedObservable for ZipOp<A, B>
56where
57 A: SharedObservable,
58 B: SharedObservable<Err = A::Err>,
59 A::Item: Send + Sync + 'static,
60 B::Item: Send + Sync + 'static,
61 A::Unsub: Send + Sync,
62 B::Unsub: Send + Sync,
63{
64 type Unsub = SharedSubscription;
65 fn actual_subscribe<
66 O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
67 >(
68 self,
69 subscriber: Subscriber<O, SharedSubscription>,
70 ) -> Self::Unsub {
71 let sub = subscriber.subscription;
72 let o_zip = ZipObserver::new(subscriber.observer, sub.clone());
73 let o_zip = Arc::new(Mutex::new(o_zip));
74 sub.add(self.a.actual_subscribe(Subscriber {
75 observer: AObserver(o_zip.clone(), TypeHint::new()),
76 subscription: SharedSubscription::default(),
77 }));
78
79 sub.add(self.b.actual_subscribe(Subscriber {
80 observer: BObserver(o_zip, TypeHint::new()),
81 subscription: SharedSubscription::default(),
82 }));
83 sub
84 }
85}
86
87enum ZipItem<A, B> {
88 ItemA(A),
89 ItemB(B),
90}
91
92struct ZipObserver<O, U, A, B> {
93 observer: O,
94 subscription: U,
95 a: VecDeque<A>,
96 b: VecDeque<B>,
97 completed_one: bool,
98}
99
100impl<O, U, A, B> ZipObserver<O, U, A, B> {
101 fn new(o: O, u: U) -> Self {
102 ZipObserver {
103 observer: o,
104 subscription: u,
105 a: VecDeque::default(),
106 b: VecDeque::default(),
107 completed_one: false,
108 }
109 }
110}
111
112impl<O, U, A, B, Err> Observer for ZipObserver<O, U, A, B>
113where
114 O: Observer<Item = (A, B), Err = Err>,
115 U: SubscriptionLike,
116{
117 type Item = ZipItem<A, B>;
118 type Err = Err;
119 fn next(&mut self, value: ZipItem<A, B>) {
120 match value {
121 ZipItem::ItemA(v) => {
122 if !self.b.is_empty() {
123 self.observer.next((v, self.b.pop_front().unwrap()))
124 } else {
125 self.a.push_back(v);
126 }
127 }
128 ZipItem::ItemB(v) => {
129 if !self.a.is_empty() {
130 self.observer.next((self.a.pop_front().unwrap(), v))
131 } else {
132 self.b.push_back(v)
133 }
134 }
135 }
136 }
137
138 fn error(&mut self, err: Err) {
139 self.observer.error(err);
140 self.subscription.unsubscribe();
141 }
142
143 fn complete(&mut self) {
144 if self.completed_one {
145 self.subscription.unsubscribe();
146 self.observer.complete();
147 } else {
148 self.completed_one = true;
149 }
150 }
151
152 is_stopped_proxy_impl!(observer);
153}
154
155struct AObserver<O, B>(O, TypeHint<B>);
156
157impl<O, A, B, Err> Observer for AObserver<O, B>
158where
159 O: Observer<Item = ZipItem<A, B>, Err = Err>,
160{
161 type Item = A;
162 type Err = Err;
163 fn next(&mut self, value: A) { self.0.next(ZipItem::ItemA(value)); }
164
165 error_proxy_impl!(Err, 0);
166 complete_proxy_impl!(0);
167 is_stopped_proxy_impl!(0);
168}
169
170struct BObserver<O, A>(O, TypeHint<A>);
171
172impl<O, A, B, Err> Observer for BObserver<O, A>
173where
174 O: Observer<Item = ZipItem<A, B>, Err = Err>,
175{
176 type Item = B;
177 type Err = Err;
178 fn next(&mut self, value: B) { self.0.next(ZipItem::ItemB(value)); }
179
180 error_proxy_impl!(Err, 0);
181 complete_proxy_impl!(0);
182 is_stopped_proxy_impl!(0);
183}
184
185#[cfg(test)]
186mod test {
187 use crate::prelude::*;
188 use std::sync::atomic::{AtomicUsize, Ordering};
189 use std::sync::Arc;
190
191 #[test]
192 fn smoke() {
193 let zip = observable::from_iter(0..10).zip(observable::from_iter(0..10));
194 let zipped_count = Arc::new(AtomicUsize::new(0));
195 let zcc = zipped_count.clone();
196 zip
197 .clone()
198 .count()
199 .subscribe(|v| zipped_count.store(v, Ordering::Relaxed));
200 let mut zipped_sum = 0;
201 assert_eq!(zcc.load(Ordering::Relaxed), 10);
202 zip.map(|(a, b)| a + b).sum().subscribe(|v| zipped_sum = v);
203 assert_eq!(zipped_sum, 90);
204 }
205
206 #[test]
207 fn complete() {
208 let mut complete = false;
209 {
210 let mut s1 = LocalSubject::new();
211 s1.clone()
212 .zip(LocalSubject::new())
213 .subscribe_complete(|((), ())| {}, || complete = true);
214
215 s1.complete();
216 }
217 assert!(!complete);
218
219 {
220 let mut s1 = LocalSubject::new();
221 let mut s2 = LocalSubject::new();
222 s1.clone()
223 .zip(s2.clone())
224 .subscribe_complete(|((), ())| {}, || complete = true);
225
226 s1.complete();
227 s2.complete();
228 }
229 assert!(complete);
230 }
231
232 #[test]
233 fn bench() { do_bench(); }
234
235 benchmark_group!(do_bench, bench_zip);
236
237 fn bench_zip(b: &mut bencher::Bencher) { b.iter(smoke); }
238}