rx_rust/operators/combining/
concat_all.rs

1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{MutGuard, Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5    observable::Observable,
6    observer::{Observer, Termination},
7    operators::creating::from_iter::FromIter,
8    utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer};
11use educe::Educe;
12use std::{
13    collections::VecDeque,
14    marker::PhantomData,
15    sync::atomic::{AtomicBool, Ordering},
16};
17
18/// Concatenates an Observable of Observables, emitting all values from each inner Observable in sequence.
19/// See <https://reactivex.io/documentation/operators/concat.html> (referencing concat operator for general concept)
20///
21/// # Examples
22/// ```rust
23/// use rx_rust::{
24///     observable::observable_ext::ObservableExt,
25///     observer::Termination,
26///     operators::{
27///         combining::concat_all::ConcatAll,
28///         creating::from_iter::FromIter,
29///     },
30/// };
31///
32/// let mut values = Vec::new();
33/// let mut terminations = Vec::new();
34///
35/// let observable = ConcatAll::new_from_iter([
36///     FromIter::new(vec![1, 2]),
37///     FromIter::new(vec![3, 4]),
38/// ]);
39/// observable.subscribe_with_callback(
40///     |value| values.push(value),
41///     |termination| terminations.push(termination),
42/// );
43///
44/// assert_eq!(values, vec![1, 2, 3, 4]);
45/// assert_eq!(terminations, vec![Termination::Completed]);
46/// ```
47#[derive(Educe)]
48#[educe(Debug, Clone)]
49pub struct ConcatAll<OE, OE1> {
50    source: OE,
51    _marker: MarkerType<OE1>,
52}
53
54impl<OE, OE1> ConcatAll<OE, OE1> {
55    pub fn new<'or, 'sub, T, E>(source: OE) -> Self
56    where
57        OE: Observable<'or, 'sub, OE1, E>,
58        OE1: Observable<'or, 'sub, T, E>,
59    {
60        Self {
61            source,
62            _marker: PhantomData,
63        }
64    }
65}
66
67impl<OE1, I> ConcatAll<FromIter<I>, OE1> {
68    pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
69    where
70        I: IntoIterator<Item = OE1>,
71        OE1: Observable<'or, 'sub, T, E>,
72    {
73        Self {
74            source: FromIter::new(into_iterator),
75            _marker: PhantomData,
76        }
77    }
78}
79
80impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for ConcatAll<OE, OE1>
81where
82    T: 'or,
83    OE: Observable<'or, 'sub, OE1, E>,
84    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'sub,
85    'sub: 'or,
86{
87    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
88        subscribe_unsub_after_termination(observer, |observer| {
89            let context = Shared::new(Mutable::new(ConcatAllContext {
90                pending_observables: VecDeque::new(),
91                on_going_sub: None,
92                completed: false,
93            }));
94            let observer = ConcatAllObserver {
95                observer: Shared::new(Mutable::new(Some(observer))),
96                context: context.clone(),
97                _marker: PhantomData,
98            };
99            self.source.subscribe(observer) + context
100        })
101    }
102}
103
104struct ConcatAllContext<'sub, OE1> {
105    pending_observables: VecDeque<OE1>,
106    on_going_sub: Option<Subscription<'sub>>,
107    completed: bool,
108}
109
110impl<OE1> Disposable for Shared<Mutable<ConcatAllContext<'_, OE1>>> {
111    fn dispose(self) {
112        safe_lock_option_disposable!(dispose: self, on_going_sub);
113    }
114}
115
116fn subscribe_next<'or, 'sub, T, E, OR, OE1>(
117    lock: Option<MutGuard<'_, ConcatAllContext<'sub, OE1>>>,
118    context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
119    observer: Shared<Mutable<Option<OR>>>,
120) where
121    OR: Observer<T, E> + NecessarySendSync + 'or,
122    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
123    'sub: 'or,
124{
125    let implementation = |mut lock: MutGuard<'_, ConcatAllContext<'sub, OE1>>| {
126        if let Some(observable) = lock.pending_observables.pop_front() {
127            drop(lock);
128            let terminated = Shared::new(AtomicBool::new(false));
129            let observer = ConcatAllInnerObserver {
130                observer: observer.clone(),
131                context: context.clone(),
132                terminated: terminated.clone(),
133            };
134            let sub = observable.subscribe(observer);
135            if !terminated.load(Ordering::SeqCst) {
136                safe_lock_option!(replace: context, on_going_sub, sub);
137            }
138        } else if lock.completed {
139            drop(lock);
140            safe_lock_option_observer!(on_termination: observer, Termination::Completed);
141        } else {
142            lock.on_going_sub.take();
143        }
144    };
145    if let Some(lock) = lock {
146        implementation(lock);
147    } else {
148        context.lock_mut(implementation);
149    }
150}
151
152struct ConcatAllObserver<'sub, T, OR, OE1> {
153    observer: Shared<Mutable<Option<OR>>>,
154    context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
155    _marker: MarkerType<T>,
156}
157
158impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for ConcatAllObserver<'sub, T, OR, OE1>
159where
160    OR: Observer<T, E> + NecessarySendSync + 'or,
161    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
162    'sub: 'or,
163{
164    fn on_next(&mut self, value: OE1) {
165        self.context.lock_mut(|mut lock| {
166            lock.pending_observables.push_back(value);
167            if lock.on_going_sub.is_none() {
168                subscribe_next(Some(lock), self.context.clone(), self.observer.clone());
169            }
170        });
171    }
172
173    fn on_termination(self, termination: Termination<E>) {
174        match termination {
175            Termination::Completed => {
176                self.context.lock_mut(|mut lock| {
177                    lock.completed = true;
178                    if lock.on_going_sub.is_none() && lock.pending_observables.is_empty() {
179                        drop(lock);
180                        safe_lock_option_observer!(on_termination: self.observer, termination);
181                    }
182                });
183            }
184            Termination::Error(_) => {
185                safe_lock_option_observer!(on_termination: self.observer, termination);
186            }
187        }
188    }
189}
190
191struct ConcatAllInnerObserver<'sub, OR, OE1> {
192    observer: Shared<Mutable<Option<OR>>>,
193    context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
194    terminated: Shared<AtomicBool>,
195}
196
197impl<'or, 'sub, T, E, OR, OE1> Observer<T, E> for ConcatAllInnerObserver<'sub, OR, OE1>
198where
199    OR: Observer<T, E> + NecessarySendSync + 'or,
200    OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
201    'sub: 'or,
202{
203    fn on_next(&mut self, value: T) {
204        safe_lock_option_observer!(on_next: self.observer, value);
205    }
206
207    fn on_termination(self, termination: Termination<E>) {
208        self.terminated.store(true, Ordering::SeqCst);
209        match termination {
210            Termination::Completed => subscribe_next(None, self.context, self.observer),
211            Termination::Error(_) => {
212                safe_lock_option_observer!(on_termination: self.observer, termination);
213            }
214        }
215    }
216}