rx_rust/operators/combining/
switch.rs1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7 operators::creating::from_iter::FromIter,
8 utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer};
11use educe::Educe;
12use std::marker::PhantomData;
13
14#[derive(Educe)]
43#[educe(Debug, Clone)]
44pub struct Switch<OE, OE1> {
45 source: OE,
46 _marker: MarkerType<OE1>,
47}
48
49impl<OE, OE1> Switch<OE, OE1> {
50 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
51 where
52 OE: Observable<'or, 'sub, OE1, E>,
53 OE1: Observable<'or, 'sub, T, E>,
54 {
55 Self {
56 source,
57 _marker: PhantomData,
58 }
59 }
60}
61
62impl<OE1, I> Switch<FromIter<I>, OE1> {
63 pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
64 where
65 I: IntoIterator<Item = OE1>,
66 OE1: Observable<'or, 'sub, T, E>,
67 {
68 Self {
69 source: FromIter::new(into_iterator),
70 _marker: PhantomData,
71 }
72 }
73}
74
75impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for Switch<OE, OE1>
76where
77 T: 'or,
78 OE: Observable<'or, 'sub, OE1, E>,
79 OE1: Observable<'or, 'sub, T, E>,
80 'sub: 'or,
81{
82 fn subscribe(
83 self,
84 observer: impl Observer<T, E> + NecessarySendSync + 'or,
85 ) -> Subscription<'sub> {
86 subscribe_unsub_after_termination(observer, |observer| {
87 let context = Shared::new(Mutable::new(SwitchContext {
88 on_going_sub: None,
89 completed: false,
90 }));
91 let observer = SwitchObserver {
92 observer: Shared::new(Mutable::new(Some(observer))),
93 context: context.clone(),
94 _marker: PhantomData,
95 };
96 self.source.subscribe(observer) + context
97 })
98 }
99}
100
101struct SwitchContext<'sub> {
102 on_going_sub: Option<Subscription<'sub>>,
103 completed: bool,
104}
105
106impl Disposable for Shared<Mutable<SwitchContext<'_>>> {
107 fn dispose(self) {
108 safe_lock_option_disposable!(dispose: self, on_going_sub);
109 }
110}
111
112struct SwitchObserver<'sub, T, OR> {
113 observer: Shared<Mutable<Option<OR>>>,
114 context: Shared<Mutable<SwitchContext<'sub>>>,
115 _marker: MarkerType<T>,
116}
117
118impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for SwitchObserver<'sub, T, OR>
119where
120 OR: Observer<T, E> + NecessarySendSync + 'or,
121 OE1: Observable<'or, 'sub, T, E>,
122 'sub: 'or,
123{
124 fn on_next(&mut self, value: OE1) {
125 if let Some(on_going_sub) =
127 safe_lock_option!(replace: self.context, on_going_sub, Subscription::default())
128 {
129 on_going_sub.dispose();
130 }
131 let observer = SwitchInnerObserver {
132 observer: self.observer.clone(),
133 context: self.context.clone(),
134 };
135 let sub = value.subscribe(observer);
136 self.context.lock_mut(|mut lock| {
137 if lock.on_going_sub.is_some() {
138 lock.on_going_sub = Some(sub);
139 } else {
140 }
142 });
143 }
144
145 fn on_termination(self, termination: Termination<E>) {
146 match termination {
147 Termination::Completed => {
148 self.context.lock_mut(|mut lock| {
149 lock.completed = true;
150 if lock.on_going_sub.is_none() {
151 drop(lock);
152 safe_lock_option_observer!(on_termination: self.observer, termination);
153 }
154 });
155 }
156 Termination::Error(_) => {
157 safe_lock_option_observer!(on_termination: self.observer, termination);
158 }
159 }
160 }
161}
162
163struct SwitchInnerObserver<'sub, OR> {
164 observer: Shared<Mutable<Option<OR>>>,
165 context: Shared<Mutable<SwitchContext<'sub>>>,
166}
167
168impl<T, E, OR> Observer<T, E> for SwitchInnerObserver<'_, OR>
169where
170 OR: Observer<T, E>,
171{
172 fn on_next(&mut self, value: T) {
173 safe_lock_option_observer!(on_next: self.observer, value);
174 }
175
176 fn on_termination(self, termination: Termination<E>) {
177 match termination {
178 Termination::Completed => {
179 self.context.lock_mut(|mut lock| {
180 if lock.completed {
181 drop(lock);
182 safe_lock_option_observer!(on_termination: self.observer, termination);
183 } else if let Some(on_going_sub) = lock.on_going_sub.take() {
184 drop(lock);
185 on_going_sub.dispose();
186 }
187 });
188 }
189 Termination::Error(_) => {
190 safe_lock_option_observer!(on_termination: self.observer, termination);
191 }
192 }
193 }
194}