rx_rust/operators/combining/
zip.rs1use crate::safe_lock_option_observer;
2use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7 utils::unsub_after_termination::subscribe_unsub_after_termination,
8};
9use educe::Educe;
10use std::collections::VecDeque;
11
12#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct Zip<OE1, OE2> {
44 source_1: OE1,
45 source_2: OE2,
46}
47
48impl<OE1, OE2> Zip<OE1, OE2> {
49 pub fn new<'or, 'sub, T1, T2, E>(source_1: OE1, source_2: OE2) -> Self
50 where
51 OE1: Observable<'or, 'sub, T1, E>,
52 OE2: Observable<'or, 'sub, T2, E>,
53 {
54 Self { source_1, source_2 }
55 }
56}
57
58impl<'or, 'sub, T1, T2, E, OE1, OE2> Observable<'or, 'sub, (T1, T2), E> for Zip<OE1, OE2>
59where
60 T1: NecessarySendSync + 'or,
61 T2: NecessarySendSync + 'or,
62 OE1: Observable<'or, 'sub, T1, E>,
63 OE2: Observable<'or, 'sub, T2, E>,
64 'sub: 'or,
65{
66 fn subscribe(
67 self,
68 observer: impl Observer<(T1, T2), E> + NecessarySendSync + 'or,
69 ) -> Subscription<'sub> {
70 subscribe_unsub_after_termination(observer, |observer| {
71 let observer = Shared::new(Mutable::new(Some(observer)));
72 let buffer = Shared::new(Mutable::new(ZipObserverBufferState::None));
73 let observer_1 = ZipObserver1 {
74 observer: observer.clone(),
75 buffer: buffer.clone(),
76 };
77 let observer_2 = ZipObserver2 { observer, buffer };
78 let subscription_1 = self.source_1.subscribe(observer_1);
79 let subscription_2 = self.source_2.subscribe(observer_2);
80 subscription_1 + subscription_2
81 })
82 }
83}
84
85enum ZipObserverBufferState<T1, T2> {
86 None,
87 One {
88 buffer: VecDeque<T1>,
89 is_completed: bool,
90 },
91 Two {
92 buffer: VecDeque<T2>,
93 is_completed: bool,
94 },
95}
96
97struct ZipObserver1<T1, T2, OR> {
98 observer: Shared<Mutable<Option<OR>>>,
99 buffer: Shared<Mutable<ZipObserverBufferState<T1, T2>>>,
100}
101
102impl<T1, T2, E, OR> Observer<T1, E> for ZipObserver1<T1, T2, OR>
103where
104 OR: Observer<(T1, T2), E>,
105{
106 fn on_next(&mut self, value: T1) {
107 self.buffer.lock_mut(|mut lock| match &mut *lock {
108 ZipObserverBufferState::None => {
109 *lock = ZipObserverBufferState::One {
110 buffer: VecDeque::from([value]),
111 is_completed: false,
112 };
113 }
114 ZipObserverBufferState::One { buffer: items, .. } => {
115 items.push_back(value);
116 }
117 ZipObserverBufferState::Two {
118 buffer: items,
119 is_completed,
120 } => {
121 let item = items.pop_front().unwrap();
122 let should_complete;
123 if items.is_empty() {
124 should_complete = *is_completed;
125 *lock = ZipObserverBufferState::None;
126 } else {
127 should_complete = false;
128 }
129 drop(lock);
130 if should_complete {
131 safe_lock_option_observer!(on_next_and_termination: self.observer, (value, item), Termination::Completed);
132 } else {
133 safe_lock_option_observer!(on_next: self.observer, (value, item));
134 }
135 }
136 });
137 }
138
139 fn on_termination(self, termination: Termination<E>) {
140 let should_terminate = match termination {
141 Termination::Completed => self.buffer.lock_mut(|mut lock| match &mut *lock {
142 ZipObserverBufferState::None => true,
143 ZipObserverBufferState::One { is_completed, .. } => {
144 *is_completed = true;
145 false
146 }
147 ZipObserverBufferState::Two { .. } => true,
148 }),
149 Termination::Error(_) => true,
150 };
151 if should_terminate {
152 safe_lock_option_observer!(on_termination: self.observer, termination);
153 }
154 }
155}
156
157struct ZipObserver2<T1, T2, OR> {
158 observer: Shared<Mutable<Option<OR>>>,
159 buffer: Shared<Mutable<ZipObserverBufferState<T1, T2>>>,
160}
161
162impl<T1, T2, E, OR> Observer<T2, E> for ZipObserver2<T1, T2, OR>
163where
164 OR: Observer<(T1, T2), E>,
165{
166 fn on_next(&mut self, value: T2) {
167 self.buffer.lock_mut(|mut lock| match &mut *lock {
168 ZipObserverBufferState::None => {
169 *lock = ZipObserverBufferState::Two {
170 buffer: VecDeque::from([value]),
171 is_completed: false,
172 };
173 }
174 ZipObserverBufferState::One {
175 buffer: items,
176 is_completed,
177 } => {
178 let item = items.pop_front().unwrap();
179 let should_complete;
180 if items.is_empty() {
181 should_complete = *is_completed;
182 *lock = ZipObserverBufferState::None;
183 } else {
184 should_complete = false;
185 }
186 drop(lock);
187 if should_complete {
188 safe_lock_option_observer!(on_next_and_termination: self.observer, (item, value), Termination::Completed);
189 } else {
190 safe_lock_option_observer!(on_next: self.observer, (item, value));
191 }
192 }
193 ZipObserverBufferState::Two { buffer: items, .. } => {
194 items.push_back(value);
195 }
196 });
197 }
198
199 fn on_termination(self, termination: Termination<E>) {
200 let should_terminate = match termination {
201 Termination::Completed => self.buffer.lock_mut(|mut lock| match &mut *lock {
202 ZipObserverBufferState::None => true,
203 ZipObserverBufferState::One { .. } => true,
204 ZipObserverBufferState::Two { is_completed, .. } => {
205 *is_completed = true;
206 false
207 }
208 }),
209 Termination::Error(_) => true,
210 };
211 if should_terminate {
212 safe_lock_option_observer!(on_termination: self.observer, termination);
213 }
214 }
215}