another_rxrust/operators/
filter.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Filter<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  predicate_f: FunctionWrapper<'a, Item, bool>,
10}
11
12impl<'a, Item> Filter<'a, Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn new<F>(f: F) -> Filter<'a, Item>
17  where
18    F: Fn(Item) -> bool + Send + Sync + 'a,
19  {
20    Filter { predicate_f: FunctionWrapper::new(f) }
21  }
22  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
23    let f = self.predicate_f.clone();
24
25    Observable::<Item>::create(move |s| {
26      let f = f.clone();
27
28      let sctl = StreamController::new(s);
29      let sctl_next = sctl.clone();
30      let sctl_error = sctl.clone();
31      let sctl_complete = sctl.clone();
32
33      source.inner_subscribe(sctl.new_observer(
34        move |_, x: Item| {
35          if f.call(x.clone()) {
36            sctl_next.sink_next(x);
37          }
38        },
39        move |_, e| {
40          sctl_error.sink_error(e);
41        },
42        move |serial| sctl_complete.sink_complete(&serial),
43      ));
44    })
45  }
46}
47
48impl<'a, Item> Observable<'a, Item>
49where
50  Item: Clone + Send + Sync,
51{
52  pub fn filter<F>(&self, f: F) -> Observable<'a, Item>
53  where
54    F: Fn(Item) -> bool + Send + Sync + 'a,
55  {
56    Filter::new(f).execute(self.clone())
57  }
58}
59
60#[cfg(test)]
61mod test {
62  use crate::prelude::*;
63
64  #[test]
65  fn basic() {
66    let o = Observable::create(|s| {
67      for n in 0..10 {
68        s.next(n);
69      }
70      s.complete();
71    });
72
73    o.filter(|x| 3 <= x && x <= 5)
74      .tap(
75        |x| {
76          assert!(3 <= x && x <= 5);
77        },
78        junk_error!(),
79        junk_complete!(),
80      )
81      .subscribe(
82        print_next_fmt!("{}"),
83        print_error!(),
84        print_complete!(),
85      );
86  }
87}