another_rxrust/operators/
filter.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Filter<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 predicate_f: FunctionWrapper<'a, Item, bool>,
10}
11
12impl<'a, Item> Filter<'a, Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn new<F>(f: F) -> Filter<'a, Item>
17 where
18 F: Fn(Item) -> bool + Send + Sync + 'a,
19 {
20 Filter { predicate_f: FunctionWrapper::new(f) }
21 }
22 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
23 let f = self.predicate_f.clone();
24
25 Observable::<Item>::create(move |s| {
26 let f = f.clone();
27
28 let sctl = StreamController::new(s);
29 let sctl_next = sctl.clone();
30 let sctl_error = sctl.clone();
31 let sctl_complete = sctl.clone();
32
33 source.inner_subscribe(sctl.new_observer(
34 move |_, x: Item| {
35 if f.call(x.clone()) {
36 sctl_next.sink_next(x);
37 }
38 },
39 move |_, e| {
40 sctl_error.sink_error(e);
41 },
42 move |serial| sctl_complete.sink_complete(&serial),
43 ));
44 })
45 }
46}
47
48impl<'a, Item> Observable<'a, Item>
49where
50 Item: Clone + Send + Sync,
51{
52 pub fn filter<F>(&self, f: F) -> Observable<'a, Item>
53 where
54 F: Fn(Item) -> bool + Send + Sync + 'a,
55 {
56 Filter::new(f).execute(self.clone())
57 }
58}
59
60#[cfg(test)]
61mod test {
62 use crate::prelude::*;
63
64 #[test]
65 fn basic() {
66 let o = Observable::create(|s| {
67 for n in 0..10 {
68 s.next(n);
69 }
70 s.complete();
71 });
72
73 o.filter(|x| 3 <= x && x <= 5)
74 .tap(
75 |x| {
76 assert!(3 <= x && x <= 5);
77 },
78 junk_error!(),
79 junk_complete!(),
80 )
81 .subscribe(
82 print_next_fmt!("{}"),
83 print_error!(),
84 print_complete!(),
85 );
86 }
87}