another_rxrust/operators/
contains.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Contains<Item>
6where
7  Item: Clone + Send + Sync + PartialEq,
8{
9  target: Item,
10}
11
12impl<'a, Item> Contains<Item>
13where
14  Item: Clone + Send + Sync + PartialEq,
15{
16  pub fn new(target: Item) -> Contains<Item> {
17    Contains { target }
18  }
19  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, bool> {
20    let target = self.target.clone();
21
22    Observable::<bool>::create(move |s| {
23      let target = target.clone();
24      let sctl = StreamController::new(s);
25      let sctl_next = sctl.clone();
26      let sctl_error = sctl.clone();
27      let sctl_complete = sctl.clone();
28      source.inner_subscribe(sctl.new_observer(
29        move |serial, x| {
30          if x == target {
31            sctl_next.sink_next(true);
32            sctl_next.sink_complete(&serial);
33          }
34        },
35        move |serial, _| {
36          sctl_error.sink_next(false);
37          sctl_error.sink_complete(&serial);
38        },
39        move |serial| {
40          sctl_complete.sink_next(false);
41          sctl_complete.sink_complete(&serial);
42        },
43      ));
44    })
45  }
46}
47
48impl<'a, Item> Observable<'a, Item>
49where
50  Item: Clone + Send + Sync + PartialEq,
51{
52  pub fn contains(&self, target: Item) -> Observable<'a, bool> {
53    Contains::new(target).execute(self.clone())
54  }
55}
56
57#[cfg(test)]
58mod test {
59  use crate::prelude::*;
60
61  #[test]
62  fn basic() {
63    observables::from_iter(0..10).contains(5).subscribe(
64      |x| {
65        println!("{}", x);
66        assert_eq!(x, true);
67      },
68      print_error!(),
69      print_complete!(),
70    );
71  }
72
73  #[test]
74  fn not_found() {
75    observables::from_iter(0..10).contains(100).subscribe(
76      |x| {
77        println!("{}", x);
78        assert_eq!(x, false);
79      },
80      print_error!(),
81      print_complete!(),
82    );
83  }
84
85  #[test]
86  fn error() {
87    observables::error(RxError::from_error("ERR!"))
88      .contains(5)
89      .subscribe(
90        |x| {
91          println!("{}", x);
92          assert_eq!(x, false);
93        },
94        print_error_as!(&str),
95        print_complete!(),
96      );
97  }
98
99  #[test]
100  fn empty() {
101    observables::empty().contains(5).subscribe(
102      |x| {
103        println!("{}", x);
104        assert_eq!(x, false);
105      },
106      print_error!(),
107      print_complete!(),
108    );
109  }
110}