rx_rust/operators/combining/
merge_all.rs1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7 operators::creating::from_iter::FromIter,
8 utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock, safe_lock_option_observer, safe_lock_slot_map};
11use educe::Educe;
12use slotmap::{DefaultKey, SlotMap};
13use std::marker::PhantomData;
14
15#[derive(Educe)]
45#[educe(Debug, Clone)]
46pub struct MergeAll<OE, OE1> {
47 source: OE,
48 _marker: MarkerType<OE1>,
49}
50
51impl<OE, OE1> MergeAll<OE, OE1> {
52 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
53 where
54 OE: Observable<'or, 'sub, OE1, E>,
55 OE1: Observable<'or, 'sub, T, E>,
56 {
57 Self {
58 source,
59 _marker: PhantomData,
60 }
61 }
62}
63
64impl<OE1, I> MergeAll<FromIter<I>, OE1> {
65 pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
66 where
67 I: IntoIterator<Item = OE1>,
68 OE1: Observable<'or, 'sub, T, E>,
69 {
70 Self {
71 source: FromIter::new(into_iterator),
72 _marker: PhantomData,
73 }
74 }
75}
76
77impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for MergeAll<OE, OE1>
78where
79 T: 'or,
80 OE: Observable<'or, 'sub, OE1, E>,
81 OE1: Observable<'or, 'sub, T, E>,
82 'sub: 'or,
83{
84 fn subscribe(
85 self,
86 observer: impl Observer<T, E> + NecessarySendSync + 'or,
87 ) -> Subscription<'sub> {
88 subscribe_unsub_after_termination(observer, |observer| {
89 let context = Shared::new(Mutable::new(MergeAllContext {
90 subscriptions: SlotMap::new(),
91 terminated: false,
92 }));
93 let observer = MergeAllObserver {
94 observer: Shared::new(Mutable::new(Some(observer))),
95 context: context.clone(),
96 _marker: PhantomData,
97 };
98 self.source.subscribe(observer) + context
99 })
100 }
101}
102
103struct MergeAllContext<'sub> {
104 subscriptions: SlotMap<DefaultKey, Subscription<'sub>>,
105 terminated: bool,
106}
107
108impl Disposable for Shared<Mutable<MergeAllContext<'_>>> {
109 fn dispose(self) {
110 safe_lock!(mem_take: self, subscriptions).clear();
111 }
112}
113
114struct MergeAllObserver<'sub, T, OR> {
115 observer: Shared<Mutable<Option<OR>>>,
116 context: Shared<Mutable<MergeAllContext<'sub>>>,
117 _marker: MarkerType<T>,
118}
119
120impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for MergeAllObserver<'sub, T, OR>
121where
122 OR: Observer<T, E> + NecessarySendSync + 'or,
123 OE1: Observable<'or, 'sub, T, E>,
124 'sub: 'or,
125{
126 fn on_next(&mut self, value: OE1) {
127 let key = safe_lock_slot_map!(insert: self.context, subscriptions, Subscription::default());
129
130 let observer = MergeAllInnerObserver {
131 observer: self.observer.clone(),
132 context: self.context.clone(),
133 key,
134 };
135 let sub = value.subscribe(observer);
136
137 self.context.lock_mut(|mut lock| {
138 if lock.subscriptions.contains_key(key) {
139 lock.subscriptions[key] = sub;
140 } else {
141 }
143 });
144 }
145
146 fn on_termination(self, termination: Termination<E>) {
147 match termination {
148 Termination::Completed => {
149 self.context.lock_mut(|mut lock| {
150 if lock.subscriptions.is_empty() {
151 drop(lock);
152 safe_lock_option_observer!(on_termination: self.observer, termination);
153 } else {
154 lock.terminated = true;
155 }
156 });
157 }
158 Termination::Error(_) => {
159 safe_lock_option_observer!(on_termination: self.observer, termination);
160 }
161 }
162 }
163}
164
165struct MergeAllInnerObserver<'sub, OR> {
166 observer: Shared<Mutable<Option<OR>>>,
167 context: Shared<Mutable<MergeAllContext<'sub>>>,
168 key: DefaultKey,
169}
170
171impl<T, E, OR> Observer<T, E> for MergeAllInnerObserver<'_, OR>
172where
173 OR: Observer<T, E>,
174{
175 fn on_next(&mut self, value: T) {
176 safe_lock_option_observer!(on_next: self.observer, value);
177 }
178
179 fn on_termination(self, termination: Termination<E>) {
180 self.context.lock_mut(|mut lock| {
181 lock.subscriptions.remove(self.key);
182 match termination {
183 Termination::Completed => {
184 if lock.terminated && lock.subscriptions.is_empty() {
185 drop(lock);
186 safe_lock_option_observer!(on_termination: self.observer, termination);
187 }
188 }
189 Termination::Error(_) => {
190 drop(lock);
191 safe_lock_option_observer!(on_termination: self.observer, termination);
192 }
193 }
194 });
195 }
196}