rxrust/ops/
take.rs

1use crate::prelude::*;
2use crate::{complete_proxy_impl, error_proxy_impl, is_stopped_proxy_impl};
3
4#[derive(Clone)]
5pub struct TakeOp<S> {
6  pub(crate) source: S,
7  pub(crate) count: u32,
8}
9
10#[doc(hidden)]
11macro_rules! observable_impl {
12  ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
13  fn actual_subscribe<O>(
14    self,
15    subscriber: Subscriber<O, $subscription>,
16  ) -> Self::Unsub
17  where O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf {
18    let subscriber = Subscriber {
19      observer: TakeObserver {
20        observer: subscriber.observer,
21        subscription: subscriber.subscription.clone(),
22        count: self.count,
23        hits: 0,
24      },
25      subscription: subscriber.subscription,
26    };
27    self.source.actual_subscribe(subscriber)
28  }
29}
30}
31
32observable_proxy_impl!(TakeOp, S);
33
34impl<'a, S> LocalObservable<'a> for TakeOp<S>
35where
36  S: LocalObservable<'a>,
37{
38  type Unsub = S::Unsub;
39  observable_impl!(LocalSubscription, 'a);
40}
41
42impl<S> SharedObservable for TakeOp<S>
43where
44  S: SharedObservable,
45{
46  type Unsub = S::Unsub;
47  observable_impl!(SharedSubscription, Send + Sync + 'static);
48}
49
50pub struct TakeObserver<O, S> {
51  observer: O,
52  subscription: S,
53  count: u32,
54  hits: u32,
55}
56
57impl<O, U, Item, Err> Observer for TakeObserver<O, U>
58where
59  O: Observer<Item = Item, Err = Err>,
60  U: SubscriptionLike,
61{
62  type Item = Item;
63  type Err = Err;
64  fn next(&mut self, value: Item) {
65    if self.hits < self.count {
66      self.hits += 1;
67      self.observer.next(value);
68      if self.hits == self.count {
69        self.complete();
70        self.subscription.unsubscribe();
71      }
72    }
73  }
74  error_proxy_impl!(Err, observer);
75  complete_proxy_impl!(observer);
76  is_stopped_proxy_impl!(observer);
77}
78
79#[cfg(test)]
80mod test {
81  use crate::prelude::*;
82
83  #[test]
84  fn base_function() {
85    let mut completed = false;
86    let mut next_count = 0;
87
88    observable::from_iter(0..100)
89      .take(5)
90      .subscribe_complete(|_| next_count += 1, || completed = true);
91
92    assert_eq!(next_count, 5);
93    assert!(completed);
94  }
95
96  #[test]
97  fn take_support_fork() {
98    let mut nc1 = 0;
99    let mut nc2 = 0;
100    {
101      let take5 = observable::from_iter(0..100).take(5);
102      let f1 = take5.clone();
103      let f2 = take5;
104
105      f1.take(5).subscribe(|_| nc1 += 1);
106      f2.take(5).subscribe(|_| nc2 += 1);
107    }
108    assert_eq!(nc1, 5);
109    assert_eq!(nc2, 5);
110  }
111
112  #[test]
113  fn ininto_shared() {
114    observable::from_iter(0..100)
115      .take(5)
116      .take(5)
117      .into_shared()
118      .subscribe(|_| {});
119  }
120
121  #[test]
122  fn bench() { do_bench(); }
123
124  benchmark_group!(do_bench, bench_take);
125
126  fn bench_take(b: &mut bencher::Bencher) { b.iter(base_function); }
127}