another_rxrust/operators/
all.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct All<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 filter_op: operators::Filter<'a, Item>,
10}
11
12impl<'a, Item> All<'a, Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn new<F>(f: F) -> All<'a, Item>
17 where
18 F: Fn(Item) -> bool + Send + Sync + 'a,
19 {
20 All {
21 filter_op: operators::Filter::new(move |x| !f(x)),
22 }
23 }
24
25 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, bool> {
26 let filter_op = self.filter_op.clone();
27
28 Observable::create(move |s| {
29 let source = source.clone();
30
31 let sctl = StreamController::new(s);
32
33 let sctl_next = sctl.clone();
34 let sctl_error = sctl.clone();
35 let sctl_complete = sctl.clone();
36
37 filter_op
38 .execute(source)
39 .take(1)
40 .inner_subscribe(sctl.new_observer(
41 move |serial, _| {
42 sctl_next.upstream_abort_observe(&serial);
43 sctl_next.sink_next(false);
44 sctl_next.sink_complete(&serial);
45 },
46 move |_, e| {
47 sctl_error.sink_error(e);
48 },
49 move |serial| {
50 sctl_complete.sink_next(true);
51 sctl_complete.sink_complete(&serial);
52 },
53 ));
54 })
55 }
56}
57
58impl<'a, Item> Observable<'a, Item>
59where
60 Item: Clone + Send + Sync,
61{
62 pub fn all<F>(&self, f: F) -> Observable<'a, bool>
63 where
64 F: Fn(Item) -> bool + Send + Sync + 'a,
65 {
66 All::new(f).execute(self.clone())
67 }
68}
69
70#[cfg(test)]
71mod test {
72 use crate::prelude::*;
73 use std::{thread, time};
74
75 #[test]
76 fn basic() {
77 observables::repeat(86).take(10).all(|x| x == 86).subscribe(
78 |x| {
79 println!("next - {}", x);
80 assert_eq!(x, true);
81 },
82 print_error!(),
83 print_complete!(),
84 );
85 thread::sleep(time::Duration::from_millis(1000));
86 }
87
88 #[test]
89 fn ne() {
90 observables::repeat(86).take(10).all(|x| x != 86).subscribe(
91 |x| {
92 println!("next - {}", x);
93 assert_eq!(x, false);
94 },
95 print_error!(),
96 print_complete!(),
97 );
98 thread::sleep(time::Duration::from_millis(1000));
99 }
100}