rx_rust/operators/combining/
concat.rs

1use crate::disposable::subscription::Subscription;
2use crate::safe_lock_option;
3use crate::utils::types::{Mutable, NecessarySendSync, Shared};
4use crate::{
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use educe::Educe;
9
10/// Concatenates multiple Observables to create an Observable that emits all of the values from the first, then all of the values from the second, and so on.
11/// See <https://reactivex.io/documentation/operators/concat.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::Termination,
18///     operators::{
19///         combining::concat::Concat,
20///         creating::from_iter::FromIter,
21///     },
22/// };
23///
24/// let mut values = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// let observable =
28///     Concat::new(FromIter::new(vec![1, 2]), FromIter::new(vec![3, 4]));
29/// observable.subscribe_with_callback(
30///     |value| values.push(value),
31///     |termination| terminations.push(termination),
32/// );
33///
34/// assert_eq!(values, vec![1, 2, 3, 4]);
35/// assert_eq!(terminations, vec![Termination::Completed]);
36/// ```
37#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Concat<OE1, OE2> {
40    source_1: OE1,
41    source_2: OE2,
42}
43
44impl<OE1, OE2> Concat<OE1, OE2> {
45    pub fn new<'or, 'sub, T, E>(source_1: OE1, source_2: OE2) -> Self
46    where
47        OE1: Observable<'or, 'sub, T, E>,
48        OE2: Observable<'or, 'sub, T, E>,
49    {
50        Self { source_1, source_2 }
51    }
52}
53
54impl<'or, 'sub, T, E, OE1, OE2> Observable<'or, 'sub, T, E> for Concat<OE1, OE2>
55where
56    OE1: Observable<'or, 'sub, T, E>,
57    OE2: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
58    'sub: 'or,
59{
60    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
61        let sub_2 = Shared::new(Mutable::new(None));
62        let onserver = ConcatObserver {
63            observer,
64            source_2: self.source_2,
65            sub_2: sub_2.clone(),
66        };
67        self.source_1.subscribe(onserver) + sub_2
68    }
69}
70
71struct ConcatObserver<'sub, OR, OE2> {
72    observer: OR,
73    source_2: OE2,
74    sub_2: Shared<Mutable<Option<Subscription<'sub>>>>,
75}
76
77impl<'or, 'sub, T, E, OR, OE2> Observer<T, E> for ConcatObserver<'sub, OR, OE2>
78where
79    OR: Observer<T, E> + NecessarySendSync + 'or,
80    OE2: Observable<'or, 'sub, T, E>,
81{
82    fn on_next(&mut self, value: T) {
83        self.observer.on_next(value);
84    }
85
86    fn on_termination(self, termination: Termination<E>) {
87        match termination {
88            Termination::Completed => {
89                let sub = self.source_2.subscribe(self.observer);
90                safe_lock_option!(replace: self.sub_2, sub);
91            }
92            Termination::Error(_) => {
93                self.observer.on_termination(termination);
94            }
95        }
96    }
97}