rx_rust/operators/conditional_boolean/
sequence_equal.rs1use crate::safe_lock_option_observer;
2use crate::utils::types::{MarkerType, Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::utils::unsub_after_termination::subscribe_unsub_after_termination;
4use crate::{
5 disposable::subscription::Subscription,
6 observable::Observable,
7 observer::{Observer, Termination},
8};
9use educe::Educe;
10use std::collections::VecDeque;
11use std::marker::PhantomData;
12
13#[derive(Educe)]
43#[educe(Debug, Clone)]
44pub struct SequenceEqual<T, OE1, OE2> {
45 source_1: OE1,
46 source_2: OE2,
47 _marker: MarkerType<T>,
48}
49
50impl<T, OE1, OE2> SequenceEqual<T, OE1, OE2> {
51 pub fn new<'or, 'sub, E>(source_1: OE1, source_2: OE2) -> Self
52 where
53 OE1: Observable<'or, 'sub, T, E>,
54 OE2: Observable<'or, 'sub, T, E>,
55 {
56 Self {
57 source_1,
58 source_2,
59 _marker: PhantomData,
60 }
61 }
62}
63
64impl<'or, 'sub, T, E, OE1, OE2> Observable<'or, 'sub, bool, E> for SequenceEqual<T, OE1, OE2>
65where
66 T: PartialEq + NecessarySendSync + 'or,
67 OE1: Observable<'or, 'sub, T, E>,
68 OE2: Observable<'or, 'sub, T, E>,
69 'sub: 'or,
70{
71 fn subscribe(
72 self,
73 observer: impl Observer<bool, E> + NecessarySendSync + 'or,
74 ) -> Subscription<'sub> {
75 subscribe_unsub_after_termination(observer, |observer| {
76 let observer = Shared::new(Mutable::new(Some(observer)));
77 let state = Shared::new(Mutable::new(SequenceEqualObserverState::None(false)));
78 let observer_1 = SequenceEqualObserver {
79 observer: observer.clone(),
80 state: state.clone(),
81 is_one: true,
82 };
83 let observer_2 = SequenceEqualObserver {
84 observer: observer.clone(),
85 state: state.clone(),
86 is_one: false,
87 };
88 let subscription_1 = self.source_1.subscribe(observer_1);
89 let subscription_2 = self.source_2.subscribe(observer_2);
90 subscription_1 + subscription_2
91 })
92 }
93}
94
95enum SequenceEqualObserverState<T> {
96 None(bool),
97 One(VecDeque<T>, bool),
98 Two(VecDeque<T>, bool),
99}
100
101struct SequenceEqualObserver<T, OR> {
102 observer: Shared<Mutable<Option<OR>>>,
103 state: Shared<Mutable<SequenceEqualObserverState<T>>>,
104 is_one: bool,
105}
106
107impl<T, E, OR> Observer<T, E> for SequenceEqualObserver<T, OR>
108where
109 OR: Observer<bool, E>,
110 T: PartialEq,
111{
112 fn on_next(&mut self, value: T) {
113 self.state.lock_mut(|mut lock| match &mut *lock {
114 SequenceEqualObserverState::None(is_completed) => {
115 if *is_completed {
116 safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
117 } else if self.is_one {
118 *lock = SequenceEqualObserverState::One(VecDeque::from([value]), false);
119 } else {
120 *lock = SequenceEqualObserverState::Two(VecDeque::from([value]), false);
121 }
122 }
123 SequenceEqualObserverState::One(values, is_completed) => {
124 if self.is_one {
125 assert!(!*is_completed);
126 values.push_back(value);
127 } else {
128 let top = values.pop_front().unwrap();
129 if top == value {
130 if values.is_empty() {
131 *lock = SequenceEqualObserverState::None(*is_completed);
132 }
133 } else {
134 drop(lock);
135 safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
136 }
137 }
138 }
139 SequenceEqualObserverState::Two(values, is_completed) => {
140 if !self.is_one {
141 assert!(!*is_completed);
142 values.push_back(value);
143 } else {
144 let top = values.pop_front().unwrap();
145 if top == value {
146 if values.is_empty() {
147 *lock = SequenceEqualObserverState::None(*is_completed);
148 }
149 } else {
150 drop(lock);
151 safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
152 }
153 }
154 }
155 });
156 }
157
158 fn on_termination(self, termination: Termination<E>) {
159 match termination {
160 Termination::Completed => {
161 self.state.lock_mut(|mut lock| match &mut *lock {
162 SequenceEqualObserverState::None(is_completed) => {
163 if *is_completed {
164 safe_lock_option_observer!(on_next_and_termination: self.observer, true, Termination::Completed);
165 } else {
166 *is_completed = true;
167 }
168 }
169 SequenceEqualObserverState::One(_, is_completed) => {
170 if *is_completed {
171 safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
172 } else if self.is_one {
173 *is_completed = true;
174 } else {
175 safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
176 }
177 }
178 SequenceEqualObserverState::Two(_, is_completed) => {
179 if *is_completed {
180 safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
181 } else if !self.is_one {
182 *is_completed = true;
183 } else {
184 safe_lock_option_observer!(on_next_and_termination: self.observer, false, Termination::Completed);
185 }
186 }
187 });
188 }
189 Termination::Error(_) => {
190 safe_lock_option_observer!(on_termination: self.observer, termination);
191 }
192 }
193 }
194}