rx_rust/operators/combining/
switch.rs1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7 operators::creating::from_iter::FromIter,
8 utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer};
11use educe::Educe;
12use std::marker::PhantomData;
13
14#[derive(Educe)]
43#[educe(Debug, Clone)]
44pub struct Switch<OE, OE1> {
45 source: OE,
46 _marker: MarkerType<OE1>,
47}
48
49impl<OE, OE1> Switch<OE, OE1> {
50 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
51 where
52 OE: Observable<'or, 'sub, OE1, E>,
53 OE1: Observable<'or, 'sub, T, E>,
54 {
55 Self {
56 source,
57 _marker: PhantomData,
58 }
59 }
60}
61
62impl<OE1, I> Switch<FromIter<I>, OE1> {
63 pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
64 where
65 I: IntoIterator<Item = OE1>,
66 OE1: Observable<'or, 'sub, T, E>,
67 {
68 Self {
69 source: FromIter::new(into_iterator),
70 _marker: PhantomData,
71 }
72 }
73}
74
75impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for Switch<OE, OE1>
76where
77 T: 'or,
78 OE: Observable<'or, 'sub, OE1, E>,
79 OE1: Observable<'or, 'sub, T, E>,
80 'sub: 'or,
81{
82 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
83 subscribe_unsub_after_termination(observer, |observer| {
84 let context = Shared::new(Mutable::new(SwitchContext {
85 on_going_sub: None,
86 completed: false,
87 }));
88 let observer = SwitchObserver {
89 observer: Shared::new(Mutable::new(Some(observer))),
90 context: context.clone(),
91 _marker: PhantomData,
92 };
93 self.source.subscribe(observer) + context
94 })
95 }
96}
97
98struct SwitchContext<'sub> {
99 on_going_sub: Option<Subscription<'sub>>,
100 completed: bool,
101}
102
103impl Disposable for Shared<Mutable<SwitchContext<'_>>> {
104 fn dispose(self) {
105 safe_lock_option_disposable!(dispose: self, on_going_sub);
106 }
107}
108
109struct SwitchObserver<'sub, T, OR> {
110 observer: Shared<Mutable<Option<OR>>>,
111 context: Shared<Mutable<SwitchContext<'sub>>>,
112 _marker: MarkerType<T>,
113}
114
115impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for SwitchObserver<'sub, T, OR>
116where
117 OR: Observer<T, E> + NecessarySendSync + 'or,
118 OE1: Observable<'or, 'sub, T, E>,
119 'sub: 'or,
120{
121 fn on_next(&mut self, value: OE1) {
122 if let Some(on_going_sub) =
124 safe_lock_option!(replace: self.context, on_going_sub, Subscription::default())
125 {
126 on_going_sub.dispose();
127 }
128 let observer = SwitchInnerObserver {
129 observer: self.observer.clone(),
130 context: self.context.clone(),
131 };
132 let sub = value.subscribe(observer);
133 self.context.lock_mut(|mut lock| {
134 if lock.on_going_sub.is_some() {
135 lock.on_going_sub = Some(sub);
136 } else {
137 }
139 });
140 }
141
142 fn on_termination(self, termination: Termination<E>) {
143 match termination {
144 Termination::Completed => {
145 self.context.lock_mut(|mut lock| {
146 lock.completed = true;
147 if lock.on_going_sub.is_none() {
148 drop(lock);
149 safe_lock_option_observer!(on_termination: self.observer, termination);
150 }
151 });
152 }
153 Termination::Error(_) => {
154 safe_lock_option_observer!(on_termination: self.observer, termination);
155 }
156 }
157 }
158}
159
160struct SwitchInnerObserver<'sub, OR> {
161 observer: Shared<Mutable<Option<OR>>>,
162 context: Shared<Mutable<SwitchContext<'sub>>>,
163}
164
165impl<T, E, OR> Observer<T, E> for SwitchInnerObserver<'_, OR>
166where
167 OR: Observer<T, E>,
168{
169 fn on_next(&mut self, value: T) {
170 safe_lock_option_observer!(on_next: self.observer, value);
171 }
172
173 fn on_termination(self, termination: Termination<E>) {
174 match termination {
175 Termination::Completed => {
176 self.context.lock_mut(|mut lock| {
177 if lock.completed {
178 drop(lock);
179 safe_lock_option_observer!(on_termination: self.observer, termination);
180 } else if let Some(on_going_sub) = lock.on_going_sub.take() {
181 drop(lock);
182 on_going_sub.dispose();
183 }
184 });
185 }
186 Termination::Error(_) => {
187 safe_lock_option_observer!(on_termination: self.observer, termination);
188 }
189 }
190 }
191}