1use crate::prelude::*;
2use crate::{complete_proxy_impl, error_proxy_impl, is_stopped_proxy_impl};
3
4#[derive(Clone)]
5pub struct TakeOp<S> {
6 pub(crate) source: S,
7 pub(crate) count: u32,
8}
9
10#[doc(hidden)]
11macro_rules! observable_impl {
12 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
13 fn actual_subscribe<O>(
14 self,
15 subscriber: Subscriber<O, $subscription>,
16 ) -> Self::Unsub
17 where O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf {
18 let subscriber = Subscriber {
19 observer: TakeObserver {
20 observer: subscriber.observer,
21 subscription: subscriber.subscription.clone(),
22 count: self.count,
23 hits: 0,
24 },
25 subscription: subscriber.subscription,
26 };
27 self.source.actual_subscribe(subscriber)
28 }
29}
30}
31
32observable_proxy_impl!(TakeOp, S);
33
34impl<'a, S> LocalObservable<'a> for TakeOp<S>
35where
36 S: LocalObservable<'a>,
37{
38 type Unsub = S::Unsub;
39 observable_impl!(LocalSubscription, 'a);
40}
41
42impl<S> SharedObservable for TakeOp<S>
43where
44 S: SharedObservable,
45{
46 type Unsub = S::Unsub;
47 observable_impl!(SharedSubscription, Send + Sync + 'static);
48}
49
50pub struct TakeObserver<O, S> {
51 observer: O,
52 subscription: S,
53 count: u32,
54 hits: u32,
55}
56
57impl<O, U, Item, Err> Observer for TakeObserver<O, U>
58where
59 O: Observer<Item = Item, Err = Err>,
60 U: SubscriptionLike,
61{
62 type Item = Item;
63 type Err = Err;
64 fn next(&mut self, value: Item) {
65 if self.hits < self.count {
66 self.hits += 1;
67 self.observer.next(value);
68 if self.hits == self.count {
69 self.complete();
70 self.subscription.unsubscribe();
71 }
72 }
73 }
74 error_proxy_impl!(Err, observer);
75 complete_proxy_impl!(observer);
76 is_stopped_proxy_impl!(observer);
77}
78
79#[cfg(test)]
80mod test {
81 use crate::prelude::*;
82
83 #[test]
84 fn base_function() {
85 let mut completed = false;
86 let mut next_count = 0;
87
88 observable::from_iter(0..100)
89 .take(5)
90 .subscribe_complete(|_| next_count += 1, || completed = true);
91
92 assert_eq!(next_count, 5);
93 assert!(completed);
94 }
95
96 #[test]
97 fn take_support_fork() {
98 let mut nc1 = 0;
99 let mut nc2 = 0;
100 {
101 let take5 = observable::from_iter(0..100).take(5);
102 let f1 = take5.clone();
103 let f2 = take5;
104
105 f1.take(5).subscribe(|_| nc1 += 1);
106 f2.take(5).subscribe(|_| nc2 += 1);
107 }
108 assert_eq!(nc1, 5);
109 assert_eq!(nc2, 5);
110 }
111
112 #[test]
113 fn ininto_shared() {
114 observable::from_iter(0..100)
115 .take(5)
116 .take(5)
117 .into_shared()
118 .subscribe(|_| {});
119 }
120
121 #[test]
122 fn bench() { do_bench(); }
123
124 benchmark_group!(do_bench, bench_take);
125
126 fn bench_take(b: &mut bencher::Bencher) { b.iter(base_function); }
127}