rx_rust/operators/combining/
concat_all.rs

1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{MutGuard, Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5    observable::Observable,
6    observer::{Observer, Termination},
7    operators::creating::from_iter::FromIter,
8    utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer};
11use educe::Educe;
12use std::{
13    collections::VecDeque,
14    marker::PhantomData,
15    sync::atomic::{AtomicBool, Ordering},
16};
17
18/// Concatenates an Observable of Observables, emitting all values from each inner Observable in sequence.
19/// See <https://reactivex.io/documentation/operators/concat.html> (referencing concat operator for general concept)
20///
21/// # Examples
22/// ```rust
23/// use rx_rust::{
24///     observable::observable_ext::ObservableExt,
25///     observer::Termination,
26///     operators::{
27///         combining::concat_all::ConcatAll,
28///         creating::from_iter::FromIter,
29///     },
30/// };
31///
32/// let mut values = Vec::new();
33/// let mut terminations = Vec::new();
34///
35/// let observable = ConcatAll::new_from_iter([
36///     FromIter::new(vec![1, 2]),
37///     FromIter::new(vec![3, 4]),
38/// ]);
39/// observable.subscribe_with_callback(
40///     |value| values.push(value),
41///     |termination| terminations.push(termination),
42/// );
43///
44/// assert_eq!(values, vec![1, 2, 3, 4]);
45/// assert_eq!(terminations, vec![Termination::Completed]);
46/// ```
47#[derive(Educe)]
48#[educe(Debug, Clone)]
49pub struct ConcatAll<OE, OE1> {
50    source: OE,
51    _marker: MarkerType<OE1>,
52}
53
54impl<OE, OE1> ConcatAll<OE, OE1> {
55    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
56    where
57        OE: Observable<'or, 'sub, OE1, E>,
58        OE1: Observable<'or, 'sub, T, E>,
59    {
60        Self {
61            source,
62            _marker: PhantomData,
63        }
64    }
65}
66
67impl<OE1, I> ConcatAll<FromIter<I>, OE1> {
68    pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
69    where
70        I: IntoIterator<Item = OE1>,
71        OE1: Observable<'or, 'sub, T, E>,
72    {
73        Self {
74            source: FromIter::new(into_iterator),
75            _marker: PhantomData,
76        }
77    }
78}
79
80impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for ConcatAll<OE, OE1>
81where
82    T: 'or,
83    OE: Observable<'or, 'sub, OE1, E>,
84    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'sub,
85    'sub: 'or,
86{
87    fn subscribe(
88        self,
89        observer: impl Observer<T, E> + NecessarySendSync + 'or,
90    ) -> Subscription<'sub> {
91        subscribe_unsub_after_termination(observer, |observer| {
92            let context = Shared::new(Mutable::new(ConcatAllContext {
93                pending_observables: VecDeque::new(),
94                on_going_sub: None,
95                completed: false,
96            }));
97            let observer = ConcatAllObserver {
98                observer: Shared::new(Mutable::new(Some(observer))),
99                context: context.clone(),
100                _marker: PhantomData,
101            };
102            self.source.subscribe(observer) + context
103        })
104    }
105}
106
107struct ConcatAllContext<'sub, OE1> {
108    pending_observables: VecDeque<OE1>,
109    on_going_sub: Option<Subscription<'sub>>,
110    completed: bool,
111}
112
113impl<OE1> Disposable for Shared<Mutable<ConcatAllContext<'_, OE1>>> {
114    fn dispose(self) {
115        safe_lock_option_disposable!(dispose: self, on_going_sub);
116    }
117}
118
119fn subscribe_next<'or, 'sub, T, E, OR, OE1>(
120    lock: Option<MutGuard<'_, ConcatAllContext<'sub, OE1>>>,
121    context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
122    observer: Shared<Mutable<Option<OR>>>,
123) where
124    OR: Observer<T, E> + NecessarySendSync + 'or,
125    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
126    'sub: 'or,
127{
128    let implementation = |mut lock: MutGuard<'_, ConcatAllContext<'sub, OE1>>| {
129        if let Some(observable) = lock.pending_observables.pop_front() {
130            drop(lock);
131            let terminated = Shared::new(AtomicBool::new(false));
132            let observer = ConcatAllInnerObserver {
133                observer: observer.clone(),
134                context: context.clone(),
135                terminated: terminated.clone(),
136            };
137            let sub = observable.subscribe(observer);
138            if !terminated.load(Ordering::SeqCst) {
139                safe_lock_option!(replace: context, on_going_sub, sub);
140            }
141        } else if lock.completed {
142            drop(lock);
143            safe_lock_option_observer!(on_termination: observer, Termination::Completed);
144        } else {
145            lock.on_going_sub.take();
146        }
147    };
148    if let Some(lock) = lock {
149        implementation(lock);
150    } else {
151        context.lock_mut(implementation);
152    }
153}
154
155struct ConcatAllObserver<'sub, T, OR, OE1> {
156    observer: Shared<Mutable<Option<OR>>>,
157    context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
158    _marker: MarkerType<T>,
159}
160
161impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for ConcatAllObserver<'sub, T, OR, OE1>
162where
163    OR: Observer<T, E> + NecessarySendSync + 'or,
164    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
165    'sub: 'or,
166{
167    fn on_next(&mut self, value: OE1) {
168        self.context.lock_mut(|mut lock| {
169            lock.pending_observables.push_back(value);
170            if lock.on_going_sub.is_none() {
171                subscribe_next(Some(lock), self.context.clone(), self.observer.clone());
172            }
173        });
174    }
175
176    fn on_termination(self, termination: Termination<E>) {
177        match termination {
178            Termination::Completed => {
179                self.context.lock_mut(|mut lock| {
180                    lock.completed = true;
181                    if lock.on_going_sub.is_none() && lock.pending_observables.is_empty() {
182                        drop(lock);
183                        safe_lock_option_observer!(on_termination: self.observer, termination);
184                    }
185                });
186            }
187            Termination::Error(_) => {
188                safe_lock_option_observer!(on_termination: self.observer, termination);
189            }
190        }
191    }
192}
193
194struct ConcatAllInnerObserver<'sub, OR, OE1> {
195    observer: Shared<Mutable<Option<OR>>>,
196    context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
197    terminated: Shared<AtomicBool>,
198}
199
200impl<'or, 'sub, T, E, OR, OE1> Observer<T, E> for ConcatAllInnerObserver<'sub, OR, OE1>
201where
202    OR: Observer<T, E> + NecessarySendSync + 'or,
203    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
204    'sub: 'or,
205{
206    fn on_next(&mut self, value: T) {
207        safe_lock_option_observer!(on_next: self.observer, value);
208    }
209
210    fn on_termination(self, termination: Termination<E>) {
211        self.terminated.store(true, Ordering::SeqCst);
212        match termination {
213            Termination::Completed => subscribe_next(None, self.context, self.observer),
214            Termination::Error(_) => {
215                safe_lock_option_observer!(on_termination: self.observer, termination);
216            }
217        }
218    }
219}