another_rxrust/operators/
sequence_equal.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct SequenceEqual<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  zip_op: operators::Zip<'a, Item>,
10}
11
12impl<'a, Item> SequenceEqual<'a, Item>
13where
14  Item: Clone + Send + Sync + PartialEq,
15{
16  pub fn new(observables: &[Observable<'a, Item>]) -> SequenceEqual<'a, Item> {
17    SequenceEqual { zip_op: operators::Zip::new(observables) }
18  }
19  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, bool> {
20    let zip_op = self.zip_op.clone();
21
22    Observable::create(move |s| {
23      let source = source.clone();
24
25      let sctl = StreamController::new(s);
26
27      let sctl_next = sctl.clone();
28      let sctl_error = sctl.clone();
29      let sctl_complete = sctl.clone();
30
31      zip_op.execute(source).inner_subscribe(sctl.new_observer(
32        move |serial, x: Vec<Item>| {
33          let check = x.get(0).unwrap();
34          if !x.iter().all(|i| i == check) {
35            sctl_next.upstream_abort_observe(&serial);
36            sctl_next.sink_next(false);
37            sctl_next.sink_complete(&serial);
38          }
39        },
40        move |_, e| {
41          sctl_error.sink_error(e);
42        },
43        move |serial| {
44          sctl_complete.sink_next(true);
45          sctl_complete.sink_complete(&serial);
46        },
47      ));
48    })
49  }
50}
51
52impl<'a, Item> Observable<'a, Item>
53where
54  Item: Clone + Send + Sync + PartialEq,
55{
56  pub fn sequence_equal(
57    &self,
58    observables: &[Observable<'a, Item>],
59  ) -> Observable<'a, bool> {
60    SequenceEqual::new(observables).execute(self.clone())
61  }
62}
63
64#[cfg(all(test, not(feature = "web")))]
65mod test {
66  use crate::prelude::*;
67  use std::{thread, time};
68
69  #[test]
70  fn basic() {
71    fn ob() -> Observable<'static, i32> {
72      observables::from_iter(0..10)
73    }
74    ob().sequence_equal(&[ob(), ob()]).subscribe(
75      |x| {
76        println!("next - {}", x);
77        assert_eq!(x, true);
78      },
79      print_error!(),
80      print_complete!(),
81    );
82  }
83
84  #[test]
85  fn thread() {
86    fn ob() -> Observable<'static, i32> {
87      observables::from_iter(0..10)
88        .observe_on(schedulers::new_thread_scheduler())
89    }
90    ob().sequence_equal(&[ob(), ob()]).subscribe(
91      |x| {
92        println!("next - {}", x);
93        assert_eq!(x, true);
94      },
95      print_error!(),
96      print_complete!(),
97    );
98    thread::sleep(time::Duration::from_millis(1000));
99  }
100
101  #[test]
102  fn ne() {
103    fn ob(start: i32, memo: &'static str) -> Observable<'static, i32> {
104      observables::from_iter(start..start + 10)
105        .observe_on(schedulers::new_thread_scheduler())
106        .tap(
107          move |x| println!("tap ({}) {}", memo, x),
108          junk_error!(),
109          junk_complete!(),
110        )
111    }
112    ob(0, "A")
113      .sequence_equal(&[ob(0, "B"), ob(1, "C")])
114      .subscribe(
115        |x| {
116          println!("next - {}", x);
117          assert_eq!(x, false);
118        },
119        print_error!(),
120        print_complete!(),
121      );
122    thread::sleep(time::Duration::from_millis(1000));
123  }
124}