another_rxrust/operators/
all.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct All<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  filter_op: operators::Filter<'a, Item>,
10}
11
12impl<'a, Item> All<'a, Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn new<F>(f: F) -> All<'a, Item>
17  where
18    F: Fn(Item) -> bool + Send + Sync + 'a,
19  {
20    All {
21      filter_op: operators::Filter::new(move |x| !f(x)),
22    }
23  }
24
25  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, bool> {
26    let filter_op = self.filter_op.clone();
27
28    Observable::create(move |s| {
29      let source = source.clone();
30
31      let sctl = StreamController::new(s);
32
33      let sctl_next = sctl.clone();
34      let sctl_error = sctl.clone();
35      let sctl_complete = sctl.clone();
36
37      filter_op
38        .execute(source)
39        .take(1)
40        .inner_subscribe(sctl.new_observer(
41          move |serial, _| {
42            sctl_next.upstream_abort_observe(&serial);
43            sctl_next.sink_next(false);
44            sctl_next.sink_complete(&serial);
45          },
46          move |_, e| {
47            sctl_error.sink_error(e);
48          },
49          move |serial| {
50            sctl_complete.sink_next(true);
51            sctl_complete.sink_complete(&serial);
52          },
53        ));
54    })
55  }
56}
57
58impl<'a, Item> Observable<'a, Item>
59where
60  Item: Clone + Send + Sync,
61{
62  pub fn all<F>(&self, f: F) -> Observable<'a, bool>
63  where
64    F: Fn(Item) -> bool + Send + Sync + 'a,
65  {
66    All::new(f).execute(self.clone())
67  }
68}
69
70#[cfg(test)]
71mod test {
72  use crate::prelude::*;
73  use std::{thread, time};
74
75  #[test]
76  fn basic() {
77    observables::repeat(86).take(10).all(|x| x == 86).subscribe(
78      |x| {
79        println!("next - {}", x);
80        assert_eq!(x, true);
81      },
82      print_error!(),
83      print_complete!(),
84    );
85    thread::sleep(time::Duration::from_millis(1000));
86  }
87
88  #[test]
89  fn ne() {
90    observables::repeat(86).take(10).all(|x| x != 86).subscribe(
91      |x| {
92        println!("next - {}", x);
93        assert_eq!(x, false);
94      },
95      print_error!(),
96      print_complete!(),
97    );
98    thread::sleep(time::Duration::from_millis(1000));
99  }
100}