rxrust/ops/
zip.rs

1use crate::prelude::*;
2use crate::{complete_proxy_impl, error_proxy_impl, is_stopped_proxy_impl};
3use std::cell::RefCell;
4use std::collections::VecDeque;
5use std::rc::Rc;
6use std::sync::{Arc, Mutex};
7
8/// An Observable that combines from two other two Observables.
9///
10/// This struct is created by the zip method on [Observable](Observable::zip).
11/// See its documentation for more.
12#[derive(Clone)]
13pub struct ZipOp<A, B> {
14  pub(crate) a: A,
15  pub(crate) b: B,
16}
17
18impl<A, B> Observable for ZipOp<A, B>
19where
20  A: Observable,
21  B: Observable<Err = A::Err>,
22{
23  type Item = (A::Item, B::Item);
24  type Err = A::Err;
25}
26
27impl<'a, A, B> LocalObservable<'a> for ZipOp<A, B>
28where
29  A: LocalObservable<'a>,
30  B: LocalObservable<'a, Err = A::Err>,
31  A::Item: 'a,
32  B::Item: 'a,
33{
34  type Unsub = LocalSubscription;
35  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
36    self,
37    subscriber: Subscriber<O, LocalSubscription>,
38  ) -> Self::Unsub {
39    let sub = subscriber.subscription;
40    let o_zip = ZipObserver::new(subscriber.observer, sub.clone());
41    let o_zip = Rc::new(RefCell::new(o_zip));
42    sub.add(self.a.actual_subscribe(Subscriber {
43      observer: AObserver(o_zip.clone(), TypeHint::new()),
44      subscription: LocalSubscription::default(),
45    }));
46
47    sub.add(self.b.actual_subscribe(Subscriber {
48      observer: BObserver(o_zip, TypeHint::new()),
49      subscription: LocalSubscription::default(),
50    }));
51    sub
52  }
53}
54
55impl<A, B> SharedObservable for ZipOp<A, B>
56where
57  A: SharedObservable,
58  B: SharedObservable<Err = A::Err>,
59  A::Item: Send + Sync + 'static,
60  B::Item: Send + Sync + 'static,
61  A::Unsub: Send + Sync,
62  B::Unsub: Send + Sync,
63{
64  type Unsub = SharedSubscription;
65  fn actual_subscribe<
66    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
67  >(
68    self,
69    subscriber: Subscriber<O, SharedSubscription>,
70  ) -> Self::Unsub {
71    let sub = subscriber.subscription;
72    let o_zip = ZipObserver::new(subscriber.observer, sub.clone());
73    let o_zip = Arc::new(Mutex::new(o_zip));
74    sub.add(self.a.actual_subscribe(Subscriber {
75      observer: AObserver(o_zip.clone(), TypeHint::new()),
76      subscription: SharedSubscription::default(),
77    }));
78
79    sub.add(self.b.actual_subscribe(Subscriber {
80      observer: BObserver(o_zip, TypeHint::new()),
81      subscription: SharedSubscription::default(),
82    }));
83    sub
84  }
85}
86
87enum ZipItem<A, B> {
88  ItemA(A),
89  ItemB(B),
90}
91
92struct ZipObserver<O, U, A, B> {
93  observer: O,
94  subscription: U,
95  a: VecDeque<A>,
96  b: VecDeque<B>,
97  completed_one: bool,
98}
99
100impl<O, U, A, B> ZipObserver<O, U, A, B> {
101  fn new(o: O, u: U) -> Self {
102    ZipObserver {
103      observer: o,
104      subscription: u,
105      a: VecDeque::default(),
106      b: VecDeque::default(),
107      completed_one: false,
108    }
109  }
110}
111
112impl<O, U, A, B, Err> Observer for ZipObserver<O, U, A, B>
113where
114  O: Observer<Item = (A, B), Err = Err>,
115  U: SubscriptionLike,
116{
117  type Item = ZipItem<A, B>;
118  type Err = Err;
119  fn next(&mut self, value: ZipItem<A, B>) {
120    match value {
121      ZipItem::ItemA(v) => {
122        if !self.b.is_empty() {
123          self.observer.next((v, self.b.pop_front().unwrap()))
124        } else {
125          self.a.push_back(v);
126        }
127      }
128      ZipItem::ItemB(v) => {
129        if !self.a.is_empty() {
130          self.observer.next((self.a.pop_front().unwrap(), v))
131        } else {
132          self.b.push_back(v)
133        }
134      }
135    }
136  }
137
138  fn error(&mut self, err: Err) {
139    self.observer.error(err);
140    self.subscription.unsubscribe();
141  }
142
143  fn complete(&mut self) {
144    if self.completed_one {
145      self.subscription.unsubscribe();
146      self.observer.complete();
147    } else {
148      self.completed_one = true;
149    }
150  }
151
152  is_stopped_proxy_impl!(observer);
153}
154
155struct AObserver<O, B>(O, TypeHint<B>);
156
157impl<O, A, B, Err> Observer for AObserver<O, B>
158where
159  O: Observer<Item = ZipItem<A, B>, Err = Err>,
160{
161  type Item = A;
162  type Err = Err;
163  fn next(&mut self, value: A) { self.0.next(ZipItem::ItemA(value)); }
164
165  error_proxy_impl!(Err, 0);
166  complete_proxy_impl!(0);
167  is_stopped_proxy_impl!(0);
168}
169
170struct BObserver<O, A>(O, TypeHint<A>);
171
172impl<O, A, B, Err> Observer for BObserver<O, A>
173where
174  O: Observer<Item = ZipItem<A, B>, Err = Err>,
175{
176  type Item = B;
177  type Err = Err;
178  fn next(&mut self, value: B) { self.0.next(ZipItem::ItemB(value)); }
179
180  error_proxy_impl!(Err, 0);
181  complete_proxy_impl!(0);
182  is_stopped_proxy_impl!(0);
183}
184
185#[cfg(test)]
186mod test {
187  use crate::prelude::*;
188  use std::sync::atomic::{AtomicUsize, Ordering};
189  use std::sync::Arc;
190
191  #[test]
192  fn smoke() {
193    let zip = observable::from_iter(0..10).zip(observable::from_iter(0..10));
194    let zipped_count = Arc::new(AtomicUsize::new(0));
195    let zcc = zipped_count.clone();
196    zip
197      .clone()
198      .count()
199      .subscribe(|v| zipped_count.store(v, Ordering::Relaxed));
200    let mut zipped_sum = 0;
201    assert_eq!(zcc.load(Ordering::Relaxed), 10);
202    zip.map(|(a, b)| a + b).sum().subscribe(|v| zipped_sum = v);
203    assert_eq!(zipped_sum, 90);
204  }
205
206  #[test]
207  fn complete() {
208    let mut complete = false;
209    {
210      let mut s1 = LocalSubject::new();
211      s1.clone()
212        .zip(LocalSubject::new())
213        .subscribe_complete(|((), ())| {}, || complete = true);
214
215      s1.complete();
216    }
217    assert!(!complete);
218
219    {
220      let mut s1 = LocalSubject::new();
221      let mut s2 = LocalSubject::new();
222      s1.clone()
223        .zip(s2.clone())
224        .subscribe_complete(|((), ())| {}, || complete = true);
225
226      s1.complete();
227      s2.complete();
228    }
229    assert!(complete);
230  }
231
232  #[test]
233  fn bench() { do_bench(); }
234
235  benchmark_group!(do_bench, bench_zip);
236
237  fn bench_zip(b: &mut bencher::Bencher) { b.iter(smoke); }
238}