1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
use crate::observer::{complete_proxy_impl, error_proxy_impl}; use crate::prelude::*; #[derive(Clone)] pub struct FilterMapOp<S, F> { pub(crate) source: S, pub(crate) f: F, } #[doc(hidden)] macro observable_impl($subscription:ty, $($marker:ident +)* $lf: lifetime) { fn actual_subscribe<O: Observer<Self::Item, Self::Err> + $($marker +)* $lf>( self, subscriber: Subscriber<O, $subscription>, ) -> Self::Unsub { self.source.actual_subscribe(Subscriber { observer: FilterMapObserver { down_observer: subscriber.observer, f: self.f, }, subscription: subscriber.subscription, }) } } impl<'a, Item, S, F> Observable for FilterMapOp<S, F> where S: Observable, F: FnMut(S::Item) -> Option<Item>, { type Item = Item; type Err = S::Err; } impl<'a, Item, S, F> LocalObservable<'a> for FilterMapOp<S, F> where S: LocalObservable<'a>, F: FnMut(S::Item) -> Option<Item> + 'a, { type Unsub = S::Unsub; observable_impl!(LocalSubscription, 'a); } impl<Item, S, F> SharedObservable for FilterMapOp<S, F> where S: SharedObservable, F: FnMut(S::Item) -> Option<Item> + Send + Sync + 'static, { type Unsub = S::Unsub; observable_impl!(SharedSubscription, Send + Sync + 'static); } pub struct FilterMapObserver<O, F> { down_observer: O, f: F, } impl<O, F, Item, Err, OutputItem> Observer<Item, Err> for FilterMapObserver<O, F> where O: Observer<OutputItem, Err>, F: FnMut(Item) -> Option<OutputItem>, { fn next(&mut self, value: Item) { if let Some(v) = (self.f)(value) { self.down_observer.next(v) } } error_proxy_impl!(Err, down_observer); complete_proxy_impl!(down_observer); #[inline] fn is_stopped(&self) -> bool { self.down_observer.is_stopped() } } #[cfg(test)] mod test { use crate::prelude::*; #[test] fn map_types_mixed() { let mut i = 0; observable::from_iter(vec!['a', 'b', 'c']) .filter_map(|_v| Some(1)) .subscribe(|v| i += v); assert_eq!(i, 3); } #[test] fn filter_map_shared_and_fork() { observable::of(1) .filter_map(|_| Some("str")) .clone() .to_shared() .subscribe(|_| {}); } #[test] fn filter_map_return_ref() { observable::of(&1) .filter_map(Some) .clone() .to_shared() .subscribe(|_| {}); } }