another_rxrust/operators/
sequence_equal.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct SequenceEqual<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 zip_op: operators::Zip<'a, Item>,
10}
11
12impl<'a, Item> SequenceEqual<'a, Item>
13where
14 Item: Clone + Send + Sync + PartialEq,
15{
16 pub fn new(observables: &[Observable<'a, Item>]) -> SequenceEqual<'a, Item> {
17 SequenceEqual { zip_op: operators::Zip::new(observables) }
18 }
19 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, bool> {
20 let zip_op = self.zip_op.clone();
21
22 Observable::create(move |s| {
23 let source = source.clone();
24
25 let sctl = StreamController::new(s);
26
27 let sctl_next = sctl.clone();
28 let sctl_error = sctl.clone();
29 let sctl_complete = sctl.clone();
30
31 zip_op.execute(source).inner_subscribe(sctl.new_observer(
32 move |serial, x: Vec<Item>| {
33 let check = x.get(0).unwrap();
34 if !x.iter().all(|i| i == check) {
35 sctl_next.upstream_abort_observe(&serial);
36 sctl_next.sink_next(false);
37 sctl_next.sink_complete(&serial);
38 }
39 },
40 move |_, e| {
41 sctl_error.sink_error(e);
42 },
43 move |serial| {
44 sctl_complete.sink_next(true);
45 sctl_complete.sink_complete(&serial);
46 },
47 ));
48 })
49 }
50}
51
52impl<'a, Item> Observable<'a, Item>
53where
54 Item: Clone + Send + Sync + PartialEq,
55{
56 pub fn sequence_equal(
57 &self,
58 observables: &[Observable<'a, Item>],
59 ) -> Observable<'a, bool> {
60 SequenceEqual::new(observables).execute(self.clone())
61 }
62}
63
64#[cfg(all(test, not(feature = "web")))]
65mod test {
66 use crate::prelude::*;
67 use std::{thread, time};
68
69 #[test]
70 fn basic() {
71 fn ob() -> Observable<'static, i32> {
72 observables::from_iter(0..10)
73 }
74 ob().sequence_equal(&[ob(), ob()]).subscribe(
75 |x| {
76 println!("next - {}", x);
77 assert_eq!(x, true);
78 },
79 print_error!(),
80 print_complete!(),
81 );
82 }
83
84 #[test]
85 fn thread() {
86 fn ob() -> Observable<'static, i32> {
87 observables::from_iter(0..10)
88 .observe_on(schedulers::new_thread_scheduler())
89 }
90 ob().sequence_equal(&[ob(), ob()]).subscribe(
91 |x| {
92 println!("next - {}", x);
93 assert_eq!(x, true);
94 },
95 print_error!(),
96 print_complete!(),
97 );
98 thread::sleep(time::Duration::from_millis(1000));
99 }
100
101 #[test]
102 fn ne() {
103 fn ob(start: i32, memo: &'static str) -> Observable<'static, i32> {
104 observables::from_iter(start..start + 10)
105 .observe_on(schedulers::new_thread_scheduler())
106 .tap(
107 move |x| println!("tap ({}) {}", memo, x),
108 junk_error!(),
109 junk_complete!(),
110 )
111 }
112 ob(0, "A")
113 .sequence_equal(&[ob(0, "B"), ob(1, "C")])
114 .subscribe(
115 |x| {
116 println!("next - {}", x);
117 assert_eq!(x, false);
118 },
119 print_error!(),
120 print_complete!(),
121 );
122 thread::sleep(time::Duration::from_millis(1000));
123 }
124}