rx_rust/operators/combining/
merge_all.rs1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7 operators::creating::from_iter::FromIter,
8 utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock, safe_lock_option_observer, safe_lock_slot_map};
11use educe::Educe;
12use slotmap::{DefaultKey, SlotMap};
13use std::marker::PhantomData;
14
15#[derive(Educe)]
45#[educe(Debug, Clone)]
46pub struct MergeAll<OE, OE1> {
47 source: OE,
48 _marker: MarkerType<OE1>,
49}
50
51impl<OE, OE1> MergeAll<OE, OE1> {
52 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
53 where
54 OE: Observable<'or, 'sub, OE1, E>,
55 OE1: Observable<'or, 'sub, T, E>,
56 {
57 Self {
58 source,
59 _marker: PhantomData,
60 }
61 }
62}
63
64impl<OE1, I> MergeAll<FromIter<I>, OE1> {
65 pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
66 where
67 I: IntoIterator<Item = OE1>,
68 OE1: Observable<'or, 'sub, T, E>,
69 {
70 Self {
71 source: FromIter::new(into_iterator),
72 _marker: PhantomData,
73 }
74 }
75}
76
77impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for MergeAll<OE, OE1>
78where
79 T: 'or,
80 OE: Observable<'or, 'sub, OE1, E>,
81 OE1: Observable<'or, 'sub, T, E>,
82 'sub: 'or,
83{
84 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
85 subscribe_unsub_after_termination(observer, |observer| {
86 let context = Shared::new(Mutable::new(MergeAllContext {
87 subscriptions: SlotMap::new(),
88 terminated: false,
89 }));
90 let observer = MergeAllObserver {
91 observer: Shared::new(Mutable::new(Some(observer))),
92 context: context.clone(),
93 _marker: PhantomData,
94 };
95 self.source.subscribe(observer) + context
96 })
97 }
98}
99
100struct MergeAllContext<'sub> {
101 subscriptions: SlotMap<DefaultKey, Subscription<'sub>>,
102 terminated: bool,
103}
104
105impl Disposable for Shared<Mutable<MergeAllContext<'_>>> {
106 fn dispose(self) {
107 safe_lock!(mem_take: self, subscriptions).clear();
108 }
109}
110
111struct MergeAllObserver<'sub, T, OR> {
112 observer: Shared<Mutable<Option<OR>>>,
113 context: Shared<Mutable<MergeAllContext<'sub>>>,
114 _marker: MarkerType<T>,
115}
116
117impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for MergeAllObserver<'sub, T, OR>
118where
119 OR: Observer<T, E> + NecessarySendSync + 'or,
120 OE1: Observable<'or, 'sub, T, E>,
121 'sub: 'or,
122{
123 fn on_next(&mut self, value: OE1) {
124 let key = safe_lock_slot_map!(insert: self.context, subscriptions, Subscription::default());
126
127 let observer = MergeAllInnerObserver {
128 observer: self.observer.clone(),
129 context: self.context.clone(),
130 key,
131 };
132 let sub = value.subscribe(observer);
133
134 self.context.lock_mut(|mut lock| {
135 if lock.subscriptions.contains_key(key) {
136 lock.subscriptions[key] = sub;
137 } else {
138 }
140 });
141 }
142
143 fn on_termination(self, termination: Termination<E>) {
144 match termination {
145 Termination::Completed => {
146 self.context.lock_mut(|mut lock| {
147 if lock.subscriptions.is_empty() {
148 drop(lock);
149 safe_lock_option_observer!(on_termination: self.observer, termination);
150 } else {
151 lock.terminated = true;
152 }
153 });
154 }
155 Termination::Error(_) => {
156 safe_lock_option_observer!(on_termination: self.observer, termination);
157 }
158 }
159 }
160}
161
162struct MergeAllInnerObserver<'sub, OR> {
163 observer: Shared<Mutable<Option<OR>>>,
164 context: Shared<Mutable<MergeAllContext<'sub>>>,
165 key: DefaultKey,
166}
167
168impl<T, E, OR> Observer<T, E> for MergeAllInnerObserver<'_, OR>
169where
170 OR: Observer<T, E>,
171{
172 fn on_next(&mut self, value: T) {
173 safe_lock_option_observer!(on_next: self.observer, value);
174 }
175
176 fn on_termination(self, termination: Termination<E>) {
177 self.context.lock_mut(|mut lock| {
178 lock.subscriptions.remove(self.key);
179 match termination {
180 Termination::Completed => {
181 if lock.terminated && lock.subscriptions.is_empty() {
182 drop(lock);
183 safe_lock_option_observer!(on_termination: self.observer, termination);
184 }
185 }
186 Termination::Error(_) => {
187 drop(lock);
188 safe_lock_option_observer!(on_termination: self.observer, termination);
189 }
190 }
191 });
192 }
193}