rxrust/ops/
filter_map.rs

1use crate::prelude::*;
2use crate::{complete_proxy_impl, error_proxy_impl};
3
4#[derive(Clone)]
5pub struct FilterMapOp<S, F> {
6  pub(crate) source: S,
7  pub(crate) f: F,
8}
9
10#[doc(hidden)]
11macro_rules! observable_impl {
12    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
13  fn actual_subscribe<O>(
14    self,
15    subscriber: Subscriber<O, $subscription>,
16  ) -> Self::Unsub
17  where O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf {
18    self.source.actual_subscribe(Subscriber {
19      observer: FilterMapObserver {
20        down_observer: subscriber.observer,
21        f: self.f,
22        _marker: TypeHint::new(),
23      },
24      subscription: subscriber.subscription,
25    })
26  }
27}
28}
29
30impl<'a, Item, S, F> Observable for FilterMapOp<S, F>
31where
32  S: Observable,
33  F: FnMut(S::Item) -> Option<Item>,
34{
35  type Item = Item;
36  type Err = S::Err;
37}
38
39impl<'a, Item, S, F> LocalObservable<'a> for FilterMapOp<S, F>
40where
41  S: LocalObservable<'a>,
42  F: FnMut(S::Item) -> Option<Item> + 'a,
43  S::Item: 'a,
44{
45  type Unsub = S::Unsub;
46  observable_impl!(LocalSubscription, 'a);
47}
48
49impl<Item, S, F> SharedObservable for FilterMapOp<S, F>
50where
51  S: SharedObservable,
52  F: FnMut(S::Item) -> Option<Item> + Send + Sync + 'static,
53  S::Item: 'static,
54{
55  type Unsub = S::Unsub;
56  observable_impl!(SharedSubscription, Send + Sync + 'static);
57}
58
59pub struct FilterMapObserver<O, F, Item> {
60  down_observer: O,
61  f: F,
62  _marker: TypeHint<*const Item>,
63}
64
65impl<O, F, Item, Err, OutputItem> Observer for FilterMapObserver<O, F, Item>
66where
67  O: Observer<Item = OutputItem, Err = Err>,
68  F: FnMut(Item) -> Option<OutputItem>,
69{
70  type Item = Item;
71  type Err = Err;
72  fn next(&mut self, value: Item) {
73    if let Some(v) = (self.f)(value) {
74      self.down_observer.next(v)
75    }
76  }
77  error_proxy_impl!(Err, down_observer);
78  complete_proxy_impl!(down_observer);
79
80  #[inline]
81  fn is_stopped(&self) -> bool { self.down_observer.is_stopped() }
82}
83
84#[cfg(test)]
85mod test {
86  use crate::prelude::*;
87
88  #[test]
89  fn map_types_mixed() {
90    let mut i = 0;
91    observable::from_iter(vec!['a', 'b', 'c'])
92      .filter_map(|_v| Some(1))
93      .subscribe(|v| i += v);
94    assert_eq!(i, 3);
95  }
96
97  #[test]
98  fn filter_map_shared_and_fork() {
99    observable::of(1)
100      .filter_map(|_| Some("str"))
101      .clone()
102      .into_shared()
103      .subscribe(|_| {});
104  }
105
106  #[test]
107  fn filter_map_return_ref() {
108    observable::of(&1)
109      .filter_map(Some)
110      .clone()
111      .into_shared()
112      .subscribe(|_| {});
113  }
114
115  #[test]
116  fn bench() { do_bench(); }
117
118  benchmark_group!(do_bench, bench_map_types_mixed);
119
120  fn bench_map_types_mixed(b: &mut bencher::Bencher) {
121    b.iter(map_types_mixed);
122  }
123}