rx_rust/operators/combining/
merge_all.rs

1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5    observable::Observable,
6    observer::{Observer, Termination},
7    operators::creating::from_iter::FromIter,
8    utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock, safe_lock_option_observer, safe_lock_slot_map};
11use educe::Educe;
12use slotmap::{DefaultKey, SlotMap};
13use std::marker::PhantomData;
14
15/// Merges an Observable of Observables into a single Observable that emits all of their emissions.
16/// See <https://reactivex.io/documentation/operators/merge.html> (referencing merge operator for general concept)
17///
18/// # Examples
19/// ```rust
20/// use rx_rust::{
21///     observable::observable_ext::ObservableExt,
22///     observer::Termination,
23///     operators::{
24///         combining::merge_all::MergeAll,
25///         creating::from_iter::FromIter,
26///     },
27/// };
28///
29/// let mut values = Vec::new();
30/// let mut terminations = Vec::new();
31///
32/// let observable = MergeAll::new_from_iter([
33///     FromIter::new(vec![1, 3]),
34///     FromIter::new(vec![2, 4]),
35/// ]);
36/// observable.subscribe_with_callback(
37///     |value| values.push(value),
38///     |termination| terminations.push(termination),
39/// );
40///
41/// assert_eq!(values, vec![1, 3, 2, 4]);
42/// assert_eq!(terminations, vec![Termination::Completed]);
43/// ```
44#[derive(Educe)]
45#[educe(Debug, Clone)]
46pub struct MergeAll<OE, OE1> {
47    source: OE,
48    _marker: MarkerType<OE1>,
49}
50
51impl<OE, OE1> MergeAll<OE, OE1> {
52    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
53    where
54        OE: Observable<'or, 'sub, OE1, E>,
55        OE1: Observable<'or, 'sub, T, E>,
56    {
57        Self {
58            source,
59            _marker: PhantomData,
60        }
61    }
62}
63
64impl<OE1, I> MergeAll<FromIter<I>, OE1> {
65    pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
66    where
67        I: IntoIterator<Item = OE1>,
68        OE1: Observable<'or, 'sub, T, E>,
69    {
70        Self {
71            source: FromIter::new(into_iterator),
72            _marker: PhantomData,
73        }
74    }
75}
76
77impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for MergeAll<OE, OE1>
78where
79    T: 'or,
80    OE: Observable<'or, 'sub, OE1, E>,
81    OE1: Observable<'or, 'sub, T, E>,
82    'sub: 'or,
83{
84    fn subscribe(
85        self,
86        observer: impl Observer<T, E> + NecessarySendSync + 'or,
87    ) -> Subscription<'sub> {
88        subscribe_unsub_after_termination(observer, |observer| {
89            let context = Shared::new(Mutable::new(MergeAllContext {
90                subscriptions: SlotMap::new(),
91                terminated: false,
92            }));
93            let observer = MergeAllObserver {
94                observer: Shared::new(Mutable::new(Some(observer))),
95                context: context.clone(),
96                _marker: PhantomData,
97            };
98            self.source.subscribe(observer) + context
99        })
100    }
101}
102
103struct MergeAllContext<'sub> {
104    subscriptions: SlotMap<DefaultKey, Subscription<'sub>>,
105    terminated: bool,
106}
107
108impl Disposable for Shared<Mutable<MergeAllContext<'_>>> {
109    fn dispose(self) {
110        safe_lock!(mem_take: self, subscriptions).clear();
111    }
112}
113
114struct MergeAllObserver<'sub, T, OR> {
115    observer: Shared<Mutable<Option<OR>>>,
116    context: Shared<Mutable<MergeAllContext<'sub>>>,
117    _marker: MarkerType<T>,
118}
119
120impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for MergeAllObserver<'sub, T, OR>
121where
122    OR: Observer<T, E> + NecessarySendSync + 'or,
123    OE1: Observable<'or, 'sub, T, E>,
124    'sub: 'or,
125{
126    fn on_next(&mut self, value: OE1) {
127        // Insert a placeholder subscription.
128        let key = safe_lock_slot_map!(insert: self.context, subscriptions, Subscription::default());
129
130        let observer = MergeAllInnerObserver {
131            observer: self.observer.clone(),
132            context: self.context.clone(),
133            key,
134        };
135        let sub = value.subscribe(observer);
136
137        self.context.lock_mut(|mut lock| {
138            if lock.subscriptions.contains_key(key) {
139                lock.subscriptions[key] = sub;
140            } else {
141                // already terminated
142            }
143        });
144    }
145
146    fn on_termination(self, termination: Termination<E>) {
147        match termination {
148            Termination::Completed => {
149                self.context.lock_mut(|mut lock| {
150                    if lock.subscriptions.is_empty() {
151                        drop(lock);
152                        safe_lock_option_observer!(on_termination: self.observer, termination);
153                    } else {
154                        lock.terminated = true;
155                    }
156                });
157            }
158            Termination::Error(_) => {
159                safe_lock_option_observer!(on_termination: self.observer, termination);
160            }
161        }
162    }
163}
164
165struct MergeAllInnerObserver<'sub, OR> {
166    observer: Shared<Mutable<Option<OR>>>,
167    context: Shared<Mutable<MergeAllContext<'sub>>>,
168    key: DefaultKey,
169}
170
171impl<T, E, OR> Observer<T, E> for MergeAllInnerObserver<'_, OR>
172where
173    OR: Observer<T, E>,
174{
175    fn on_next(&mut self, value: T) {
176        safe_lock_option_observer!(on_next: self.observer, value);
177    }
178
179    fn on_termination(self, termination: Termination<E>) {
180        self.context.lock_mut(|mut lock| {
181            lock.subscriptions.remove(self.key);
182            match termination {
183                Termination::Completed => {
184                    if lock.terminated && lock.subscriptions.is_empty() {
185                        drop(lock);
186                        safe_lock_option_observer!(on_termination: self.observer, termination);
187                    }
188                }
189                Termination::Error(_) => {
190                    drop(lock);
191                    safe_lock_option_observer!(on_termination: self.observer, termination);
192                }
193            }
194        });
195    }
196}