another_rxrust/operators/
contains.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct Contains<Item>
6where
7 Item: Clone + Send + Sync + PartialEq,
8{
9 target: Item,
10}
11
12impl<'a, Item> Contains<Item>
13where
14 Item: Clone + Send + Sync + PartialEq,
15{
16 pub fn new(target: Item) -> Contains<Item> {
17 Contains { target }
18 }
19 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, bool> {
20 let target = self.target.clone();
21
22 Observable::<bool>::create(move |s| {
23 let target = target.clone();
24 let sctl = StreamController::new(s);
25 let sctl_next = sctl.clone();
26 let sctl_error = sctl.clone();
27 let sctl_complete = sctl.clone();
28 source.inner_subscribe(sctl.new_observer(
29 move |serial, x| {
30 if x == target {
31 sctl_next.sink_next(true);
32 sctl_next.sink_complete(&serial);
33 }
34 },
35 move |serial, _| {
36 sctl_error.sink_next(false);
37 sctl_error.sink_complete(&serial);
38 },
39 move |serial| {
40 sctl_complete.sink_next(false);
41 sctl_complete.sink_complete(&serial);
42 },
43 ));
44 })
45 }
46}
47
48impl<'a, Item> Observable<'a, Item>
49where
50 Item: Clone + Send + Sync + PartialEq,
51{
52 pub fn contains(&self, target: Item) -> Observable<'a, bool> {
53 Contains::new(target).execute(self.clone())
54 }
55}
56
57#[cfg(test)]
58mod test {
59 use crate::prelude::*;
60
61 #[test]
62 fn basic() {
63 observables::from_iter(0..10).contains(5).subscribe(
64 |x| {
65 println!("{}", x);
66 assert_eq!(x, true);
67 },
68 print_error!(),
69 print_complete!(),
70 );
71 }
72
73 #[test]
74 fn not_found() {
75 observables::from_iter(0..10).contains(100).subscribe(
76 |x| {
77 println!("{}", x);
78 assert_eq!(x, false);
79 },
80 print_error!(),
81 print_complete!(),
82 );
83 }
84
85 #[test]
86 fn error() {
87 observables::error(RxError::from_error("ERR!"))
88 .contains(5)
89 .subscribe(
90 |x| {
91 println!("{}", x);
92 assert_eq!(x, false);
93 },
94 print_error_as!(&str),
95 print_complete!(),
96 );
97 }
98
99 #[test]
100 fn empty() {
101 observables::empty().contains(5).subscribe(
102 |x| {
103 println!("{}", x);
104 assert_eq!(x, false);
105 },
106 print_error!(),
107 print_complete!(),
108 );
109 }
110}