rx_rust/operators/combining/
merge_all.rs

1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5    observable::Observable,
6    observer::{Observer, Termination},
7    operators::creating::from_iter::FromIter,
8    utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock, safe_lock_option_observer, safe_lock_slot_map};
11use educe::Educe;
12use slotmap::{DefaultKey, SlotMap};
13use std::marker::PhantomData;
14
15/// Merges an Observable of Observables into a single Observable that emits all of their emissions.
16/// See <https://reactivex.io/documentation/operators/merge.html> (referencing merge operator for general concept)
17///
18/// # Examples
19/// ```rust
20/// use rx_rust::{
21///     observable::observable_ext::ObservableExt,
22///     observer::Termination,
23///     operators::{
24///         combining::merge_all::MergeAll,
25///         creating::from_iter::FromIter,
26///     },
27/// };
28///
29/// let mut values = Vec::new();
30/// let mut terminations = Vec::new();
31///
32/// let observable = MergeAll::new_from_iter([
33///     FromIter::new(vec![1, 3]),
34///     FromIter::new(vec![2, 4]),
35/// ]);
36/// observable.subscribe_with_callback(
37///     |value| values.push(value),
38///     |termination| terminations.push(termination),
39/// );
40///
41/// assert_eq!(values, vec![1, 3, 2, 4]);
42/// assert_eq!(terminations, vec![Termination::Completed]);
43/// ```
44#[derive(Educe)]
45#[educe(Debug, Clone)]
46pub struct MergeAll<OE, OE1> {
47    source: OE,
48    _marker: MarkerType<OE1>,
49}
50
51impl<OE, OE1> MergeAll<OE, OE1> {
52    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
53    where
54        OE: Observable<'or, 'sub, OE1, E>,
55        OE1: Observable<'or, 'sub, T, E>,
56    {
57        Self {
58            source,
59            _marker: PhantomData,
60        }
61    }
62}
63
64impl<OE1, I> MergeAll<FromIter<I>, OE1> {
65    pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
66    where
67        I: IntoIterator<Item = OE1>,
68        OE1: Observable<'or, 'sub, T, E>,
69    {
70        Self {
71            source: FromIter::new(into_iterator),
72            _marker: PhantomData,
73        }
74    }
75}
76
77impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for MergeAll<OE, OE1>
78where
79    T: 'or,
80    OE: Observable<'or, 'sub, OE1, E>,
81    OE1: Observable<'or, 'sub, T, E>,
82    'sub: 'or,
83{
84    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
85        subscribe_unsub_after_termination(observer, |observer| {
86            let context = Shared::new(Mutable::new(MergeAllContext {
87                subscriptions: SlotMap::new(),
88                terminated: false,
89            }));
90            let observer = MergeAllObserver {
91                observer: Shared::new(Mutable::new(Some(observer))),
92                context: context.clone(),
93                _marker: PhantomData,
94            };
95            self.source.subscribe(observer) + context
96        })
97    }
98}
99
100struct MergeAllContext<'sub> {
101    subscriptions: SlotMap<DefaultKey, Subscription<'sub>>,
102    terminated: bool,
103}
104
105impl Disposable for Shared<Mutable<MergeAllContext<'_>>> {
106    fn dispose(self) {
107        safe_lock!(mem_take: self, subscriptions).clear();
108    }
109}
110
111struct MergeAllObserver<'sub, T, OR> {
112    observer: Shared<Mutable<Option<OR>>>,
113    context: Shared<Mutable<MergeAllContext<'sub>>>,
114    _marker: MarkerType<T>,
115}
116
117impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for MergeAllObserver<'sub, T, OR>
118where
119    OR: Observer<T, E> + NecessarySendSync + 'or,
120    OE1: Observable<'or, 'sub, T, E>,
121    'sub: 'or,
122{
123    fn on_next(&mut self, value: OE1) {
124        // Insert a placeholder subscription.
125        let key = safe_lock_slot_map!(insert: self.context, subscriptions, Subscription::default());
126
127        let observer = MergeAllInnerObserver {
128            observer: self.observer.clone(),
129            context: self.context.clone(),
130            key,
131        };
132        let sub = value.subscribe(observer);
133
134        self.context.lock_mut(|mut lock| {
135            if lock.subscriptions.contains_key(key) {
136                lock.subscriptions[key] = sub;
137            } else {
138                // already terminated
139            }
140        });
141    }
142
143    fn on_termination(self, termination: Termination<E>) {
144        match termination {
145            Termination::Completed => {
146                self.context.lock_mut(|mut lock| {
147                    if lock.subscriptions.is_empty() {
148                        drop(lock);
149                        safe_lock_option_observer!(on_termination: self.observer, termination);
150                    } else {
151                        lock.terminated = true;
152                    }
153                });
154            }
155            Termination::Error(_) => {
156                safe_lock_option_observer!(on_termination: self.observer, termination);
157            }
158        }
159    }
160}
161
162struct MergeAllInnerObserver<'sub, OR> {
163    observer: Shared<Mutable<Option<OR>>>,
164    context: Shared<Mutable<MergeAllContext<'sub>>>,
165    key: DefaultKey,
166}
167
168impl<T, E, OR> Observer<T, E> for MergeAllInnerObserver<'_, OR>
169where
170    OR: Observer<T, E>,
171{
172    fn on_next(&mut self, value: T) {
173        safe_lock_option_observer!(on_next: self.observer, value);
174    }
175
176    fn on_termination(self, termination: Termination<E>) {
177        self.context.lock_mut(|mut lock| {
178            lock.subscriptions.remove(self.key);
179            match termination {
180                Termination::Completed => {
181                    if lock.terminated && lock.subscriptions.is_empty() {
182                        drop(lock);
183                        safe_lock_option_observer!(on_termination: self.observer, termination);
184                    }
185                }
186                Termination::Error(_) => {
187                    drop(lock);
188                    safe_lock_option_observer!(on_termination: self.observer, termination);
189                }
190            }
191        });
192    }
193}