rx_rust/operators/conditional_boolean/
sequence_equal.rs

1use crate::safe_lock_option_observer;
2use crate::utils::types::{MarkerType, Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::utils::unsub_after_termination::subscribe_unsub_after_termination;
4use crate::{
5    disposable::subscription::Subscription,
6    observable::Observable,
7    observer::{Observer, Termination},
8};
9use educe::Educe;
10use std::collections::VecDeque;
11use std::marker::PhantomData;
12
13/// Emits a single boolean value that indicates whether two Observables emit the same sequence of items.
14/// See <https://reactivex.io/documentation/operators/sequenceequal.html>
15///
16/// # Examples
17/// ```rust
18/// use rx_rust::{
19///     observable::observable_ext::ObservableExt,
20///     observer::Termination,
21///     operators::{
22///         conditional_boolean::sequence_equal::SequenceEqual,
23///         creating::from_iter::FromIter,
24///     },
25/// };
26///
27/// let mut values = Vec::new();
28/// let mut terminations = Vec::new();
29///
30/// let observable = SequenceEqual::new(
31///     FromIter::new(vec![1, 2]),
32///     FromIter::new(vec![1, 2]),
33/// );
34/// observable.subscribe_with_callback(
35///     |value| values.push(value),
36///     |termination| terminations.push(termination),
37/// );
38///
39/// assert_eq!(values, vec![true]);
40/// assert_eq!(terminations, vec![Termination::Completed]);
41/// ```
42#[derive(Educe)]
43#[educe(Debug, Clone)]
44pub struct SequenceEqual<T, OE1, OE2> {
45    source_1: OE1,
46    source_2: OE2,
47    _marker: MarkerType<T>,
48}
49
50impl<T, OE1, OE2> SequenceEqual<T, OE1, OE2> {
51    pub fn new<'or, 'sub, E>(source_1: OE1, source_2: OE2) -> Self
52    where
53        OE1: Observable<'or, 'sub, T, E>,
54        OE2: Observable<'or, 'sub, T, E>,
55    {
56        Self {
57            source_1,
58            source_2,
59            _marker: PhantomData,
60        }
61    }
62}
63
64impl<'or, 'sub, T, E, OE1, OE2> Observable<'or, 'sub, bool, E> for SequenceEqual<T, OE1, OE2>
65where
66    T: PartialEq + NecessarySendSync + 'or,
67    OE1: Observable<'or, 'sub, T, E>,
68    OE2: Observable<'or, 'sub, T, E>,
69    'sub: 'or,
70{
71    fn subscribe(
72        self,
73        observer: impl Observer<bool, E> + NecessarySendSync + 'or,
74    ) -> Subscription<'sub> {
75        subscribe_unsub_after_termination(observer, |observer| {
76            let observer = Shared::new(Mutable::new(Some(observer)));
77            let state = Shared::new(Mutable::new(SequenceEqualObserverState::None(false)));
78            let observer_1 = SequenceEqualObserver {
79                observer: observer.clone(),
80                state: state.clone(),
81                is_one: true,
82            };
83            let observer_2 = SequenceEqualObserver {
84                observer: observer.clone(),
85                state: state.clone(),
86                is_one: false,
87            };
88            let subscription_1 = self.source_1.subscribe(observer_1);
89            let subscription_2 = self.source_2.subscribe(observer_2);
90            subscription_1 + subscription_2
91        })
92    }
93}
94
95enum SequenceEqualObserverState<T> {
96    None(bool),
97    One(VecDeque<T>, bool),
98    Two(VecDeque<T>, bool),
99}
100
101struct SequenceEqualObserver<T, OR> {
102    observer: Shared<Mutable<Option<OR>>>,
103    state: Shared<Mutable<SequenceEqualObserverState<T>>>,
104    is_one: bool,
105}
106
107impl<T, E, OR> Observer<T, E> for SequenceEqualObserver<T, OR>
108where
109    OR: Observer<bool, E>,
110    T: PartialEq,
111{
112    fn on_next(&mut self, value: T) {
113        self.state.lock_mut(|mut lock| match &mut *lock {
114            SequenceEqualObserverState::None(is_completed) => {
115                if *is_completed {
116                    safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
117                } else if self.is_one {
118                    *lock = SequenceEqualObserverState::One(VecDeque::from([value]), false);
119                } else {
120                    *lock = SequenceEqualObserverState::Two(VecDeque::from([value]), false);
121                }
122            }
123            SequenceEqualObserverState::One(values, is_completed) => {
124                if self.is_one {
125                    assert!(!*is_completed);
126                    values.push_back(value);
127                } else {
128                    let top = values.pop_front().unwrap();
129                    if top == value {
130                        if values.is_empty() {
131                            *lock = SequenceEqualObserverState::None(*is_completed);
132                        }
133                    } else {
134                        drop(lock);
135                        safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
136                    }
137                }
138            }
139            SequenceEqualObserverState::Two(values, is_completed) => {
140                if !self.is_one {
141                    assert!(!*is_completed);
142                    values.push_back(value);
143                } else {
144                    let top = values.pop_front().unwrap();
145                    if top == value {
146                        if values.is_empty() {
147                            *lock = SequenceEqualObserverState::None(*is_completed);
148                        }
149                    } else {
150                        drop(lock);
151                        safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
152                    }
153                }
154            }
155        });
156    }
157
158    fn on_termination(self, termination: Termination<E>) {
159        match termination {
160            Termination::Completed => {
161                self.state.lock_mut(|mut lock| match &mut *lock {
162                    SequenceEqualObserverState::None(is_completed) => {
163                        if *is_completed {
164                            safe_lock_option_observer!(on_next_and_termination: self.observer, true, Termination::Completed);
165                        } else {
166                            *is_completed = true;
167                        }
168                    }
169                    SequenceEqualObserverState::One(_, is_completed) => {
170                        if *is_completed {
171                            safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
172                        } else if self.is_one {
173                            *is_completed = true;
174                        } else {
175                            safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
176                        }
177                    }
178                    SequenceEqualObserverState::Two(_, is_completed) => {
179                        if *is_completed {
180                            safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
181                        } else if !self.is_one {
182                            *is_completed = true;
183                        } else {
184                            safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
185                        }
186                    }
187                });
188            }
189            Termination::Error(_) => {
190                safe_lock_option_observer!(on_termination: self.observer, termination);
191            }
192        }
193    }
194}