1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::internals::stream_controller::*;
use crate::prelude::*;

#[derive(Clone)]
pub struct SequenceEqual<'a, Item>
where
  Item: Clone + Send + Sync,
{
  zip_op: operators::Zip<'a, Item>,
}

impl<'a, Item> SequenceEqual<'a, Item>
where
  Item: Clone + Send + Sync + PartialEq,
{
  pub fn new(observables: &[Observable<'a, Item>]) -> SequenceEqual<'a, Item> {
    SequenceEqual { zip_op: operators::Zip::new(observables) }
  }
  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, bool> {
    let zip_op = self.zip_op.clone();

    Observable::create(move |s| {
      let source = source.clone();

      let sctl = StreamController::new(s);

      let sctl_next = sctl.clone();
      let sctl_error = sctl.clone();
      let sctl_complete = sctl.clone();

      zip_op.execute(source).inner_subscribe(sctl.new_observer(
        move |serial, x: Vec<Item>| {
          let check = x.get(0).unwrap();
          if !x.iter().all(|i| i == check) {
            sctl_next.upstream_abort_observe(&serial);
            sctl_next.sink_next(false);
            sctl_next.sink_complete(&serial);
          }
        },
        move |_, e| {
          sctl_error.sink_error(e);
        },
        move |serial| {
          sctl_complete.sink_next(true);
          sctl_complete.sink_complete(&serial);
        },
      ));
    })
  }
}

impl<'a, Item> Observable<'a, Item>
where
  Item: Clone + Send + Sync + PartialEq,
{
  pub fn sequence_equal(
    &self,
    observables: &[Observable<'a, Item>],
  ) -> Observable<'a, bool> {
    SequenceEqual::new(observables).execute(self.clone())
  }
}

#[cfg(all(test, not(feature = "web")))]
mod test {
  use crate::prelude::*;
  use std::{thread, time};

  #[test]
  fn basic() {
    fn ob() -> Observable<'static, i32> {
      observables::from_iter(0..10)
    }
    ob().sequence_equal(&[ob(), ob()]).subscribe(
      |x| {
        println!("next - {}", x);
        assert_eq!(x, true);
      },
      print_error!(),
      print_complete!(),
    );
  }

  #[test]
  fn thread() {
    fn ob() -> Observable<'static, i32> {
      observables::from_iter(0..10)
        .observe_on(schedulers::new_thread_scheduler())
    }
    ob().sequence_equal(&[ob(), ob()]).subscribe(
      |x| {
        println!("next - {}", x);
        assert_eq!(x, true);
      },
      print_error!(),
      print_complete!(),
    );
    thread::sleep(time::Duration::from_millis(1000));
  }

  #[test]
  fn ne() {
    fn ob(start: i32, memo: &'static str) -> Observable<'static, i32> {
      observables::from_iter(start..start + 10)
        .observe_on(schedulers::new_thread_scheduler())
        .tap(
          move |x| println!("tap ({}) {}", memo, x),
          junk_error!(),
          junk_complete!(),
        )
    }
    ob(0, "A")
      .sequence_equal(&[ob(0, "B"), ob(1, "C")])
      .subscribe(
        |x| {
          println!("next - {}", x);
          assert_eq!(x, false);
        },
        print_error!(),
        print_complete!(),
      );
    thread::sleep(time::Duration::from_millis(1000));
  }
}