1use crate::prelude::*;
2use crate::{complete_proxy_impl, error_proxy_impl};
3
4#[derive(Clone)]
5pub struct FilterMapOp<S, F> {
6 pub(crate) source: S,
7 pub(crate) f: F,
8}
9
10#[doc(hidden)]
11macro_rules! observable_impl {
12 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
13 fn actual_subscribe<O>(
14 self,
15 subscriber: Subscriber<O, $subscription>,
16 ) -> Self::Unsub
17 where O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf {
18 self.source.actual_subscribe(Subscriber {
19 observer: FilterMapObserver {
20 down_observer: subscriber.observer,
21 f: self.f,
22 _marker: TypeHint::new(),
23 },
24 subscription: subscriber.subscription,
25 })
26 }
27}
28}
29
30impl<'a, Item, S, F> Observable for FilterMapOp<S, F>
31where
32 S: Observable,
33 F: FnMut(S::Item) -> Option<Item>,
34{
35 type Item = Item;
36 type Err = S::Err;
37}
38
39impl<'a, Item, S, F> LocalObservable<'a> for FilterMapOp<S, F>
40where
41 S: LocalObservable<'a>,
42 F: FnMut(S::Item) -> Option<Item> + 'a,
43 S::Item: 'a,
44{
45 type Unsub = S::Unsub;
46 observable_impl!(LocalSubscription, 'a);
47}
48
49impl<Item, S, F> SharedObservable for FilterMapOp<S, F>
50where
51 S: SharedObservable,
52 F: FnMut(S::Item) -> Option<Item> + Send + Sync + 'static,
53 S::Item: 'static,
54{
55 type Unsub = S::Unsub;
56 observable_impl!(SharedSubscription, Send + Sync + 'static);
57}
58
59pub struct FilterMapObserver<O, F, Item> {
60 down_observer: O,
61 f: F,
62 _marker: TypeHint<*const Item>,
63}
64
65impl<O, F, Item, Err, OutputItem> Observer for FilterMapObserver<O, F, Item>
66where
67 O: Observer<Item = OutputItem, Err = Err>,
68 F: FnMut(Item) -> Option<OutputItem>,
69{
70 type Item = Item;
71 type Err = Err;
72 fn next(&mut self, value: Item) {
73 if let Some(v) = (self.f)(value) {
74 self.down_observer.next(v)
75 }
76 }
77 error_proxy_impl!(Err, down_observer);
78 complete_proxy_impl!(down_observer);
79
80 #[inline]
81 fn is_stopped(&self) -> bool { self.down_observer.is_stopped() }
82}
83
84#[cfg(test)]
85mod test {
86 use crate::prelude::*;
87
88 #[test]
89 fn map_types_mixed() {
90 let mut i = 0;
91 observable::from_iter(vec!['a', 'b', 'c'])
92 .filter_map(|_v| Some(1))
93 .subscribe(|v| i += v);
94 assert_eq!(i, 3);
95 }
96
97 #[test]
98 fn filter_map_shared_and_fork() {
99 observable::of(1)
100 .filter_map(|_| Some("str"))
101 .clone()
102 .into_shared()
103 .subscribe(|_| {});
104 }
105
106 #[test]
107 fn filter_map_return_ref() {
108 observable::of(&1)
109 .filter_map(Some)
110 .clone()
111 .into_shared()
112 .subscribe(|_| {});
113 }
114
115 #[test]
116 fn bench() { do_bench(); }
117
118 benchmark_group!(do_bench, bench_map_types_mixed);
119
120 fn bench_map_types_mixed(b: &mut bencher::Bencher) {
121 b.iter(map_types_mixed);
122 }
123}