rx_rust/operators/combining/
combine_latest.rs1use crate::safe_lock_option_observer;
2use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7 utils::unsub_after_termination::subscribe_unsub_after_termination,
8};
9use educe::Educe;
10
11#[derive(Educe)]
46#[educe(Debug, Clone)]
47pub struct CombineLatest<OE1, OE2> {
48 source_1: OE1,
49 source_2: OE2,
50}
51
52impl<OE1, OE2> CombineLatest<OE1, OE2> {
53 pub fn new<'or, 'sub, T1, T2, E>(source_1: OE1, source_2: OE2) -> Self
54 where
55 OE1: Observable<'or, 'sub, T1, E>,
56 OE2: Observable<'or, 'sub, T2, E>,
57 {
58 Self { source_1, source_2 }
59 }
60}
61
62impl<'or, 'sub, T1, T2, E, OE1, OE2> Observable<'or, 'sub, (T1, T2), E> for CombineLatest<OE1, OE2>
63where
64 T1: Clone + NecessarySendSync + 'or,
65 T2: Clone + NecessarySendSync + 'or,
66 OE1: Observable<'or, 'sub, T1, E>,
67 OE2: Observable<'or, 'sub, T2, E>,
68 'sub: 'or,
69{
70 fn subscribe(
71 self,
72 observer: impl Observer<(T1, T2), E> + NecessarySendSync + 'or,
73 ) -> Subscription<'sub> {
74 subscribe_unsub_after_termination(observer, |observer| {
75 let observer = Shared::new(Mutable::new(Some(observer)));
76 let context = Shared::new(Mutable::new(CombineLatestContext {
77 latest_1: None,
78 latest_2: None,
79 should_completed: false,
80 }));
81 let observer_1 = CombineLatestObserver1 {
82 context: context.clone(),
83 observer: observer.clone(),
84 };
85 let observer_2 = CombineLatestObserver2 { context, observer };
86 let subscription_1 = self.source_1.subscribe(observer_1);
87 let subscription_2 = self.source_2.subscribe(observer_2);
88 subscription_1 + subscription_2
89 })
90 }
91}
92
93struct CombineLatestContext<T1, T2> {
94 latest_1: Option<T1>,
95 latest_2: Option<T2>,
96 should_completed: bool,
97}
98
99struct CombineLatestObserver1<T1, T2, OR> {
100 context: Shared<Mutable<CombineLatestContext<T1, T2>>>,
101 observer: Shared<Mutable<Option<OR>>>,
102}
103
104impl<T1, T2, E, OR> Observer<T1, E> for CombineLatestObserver1<T1, T2, OR>
105where
106 T1: Clone,
107 T2: Clone,
108 OR: Observer<(T1, T2), E>,
109{
110 fn on_next(&mut self, latest_1: T1) {
111 self.context.lock_mut(|mut lock| {
112 lock.latest_1 = Some(latest_1.clone());
113 if let Some(latest_2) = lock.latest_2.clone() {
114 drop(lock);
115 safe_lock_option_observer!(on_next: self.observer, (latest_1, latest_2));
116 }
117 });
118 }
119
120 fn on_termination(self, termination: Termination<E>) {
121 match termination {
122 Termination::Completed => {
123 self.context.lock_mut(|mut lock| {
124 if lock.should_completed || lock.latest_1.is_none() {
125 drop(lock);
126 safe_lock_option_observer!(on_termination: self.observer, termination);
127 } else {
128 lock.should_completed = true;
129 }
130 });
131 }
132 Termination::Error(_) => {
133 safe_lock_option_observer!(on_termination: self.observer, termination);
134 }
135 }
136 }
137}
138
139struct CombineLatestObserver2<T1, T2, OR> {
140 context: Shared<Mutable<CombineLatestContext<T1, T2>>>,
141 observer: Shared<Mutable<Option<OR>>>,
142}
143
144impl<T1, T2, E, OR> Observer<T2, E> for CombineLatestObserver2<T1, T2, OR>
145where
146 T1: Clone,
147 T2: Clone,
148 OR: Observer<(T1, T2), E>,
149{
150 fn on_next(&mut self, latest_2: T2) {
151 self.context.lock_mut(|mut lock| {
152 lock.latest_2 = Some(latest_2.clone());
153 if let Some(latest_1) = lock.latest_1.clone() {
154 drop(lock);
155 safe_lock_option_observer!(on_next: self.observer, (latest_1, latest_2));
156 }
157 });
158 }
159
160 fn on_termination(self, termination: Termination<E>) {
161 match termination {
162 Termination::Completed => {
163 self.context.lock_mut(|mut lock| {
164 if lock.should_completed || lock.latest_2.is_none() {
165 drop(lock);
166 safe_lock_option_observer!(on_termination: self.observer, termination);
167 } else {
168 lock.should_completed = true;
169 }
170 });
171 }
172 Termination::Error(_) => {
173 safe_lock_option_observer!(on_termination: self.observer, termination);
174 }
175 }
176 }
177}