rx_rust/operators/combining/
concat.rs

1use crate::disposable::subscription::Subscription;
2use crate::safe_lock_option;
3use crate::utils::types::{Mutable, NecessarySendSync, Shared};
4use crate::{
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use educe::Educe;
9
10/// Concatenates multiple Observables to create an Observable that emits all of the values from the first, then all of the values from the second, and so on.
11/// See <https://reactivex.io/documentation/operators/concat.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::Termination,
18///     operators::{
19///         combining::concat::Concat,
20///         creating::from_iter::FromIter,
21///     },
22/// };
23///
24/// let mut values = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// let observable =
28///     Concat::new(FromIter::new(vec![1, 2]), FromIter::new(vec![3, 4]));
29/// observable.subscribe_with_callback(
30///     |value| values.push(value),
31///     |termination| terminations.push(termination),
32/// );
33///
34/// assert_eq!(values, vec![1, 2, 3, 4]);
35/// assert_eq!(terminations, vec![Termination::Completed]);
36/// ```
37#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Concat<OE1, OE2> {
40    source_1: OE1,
41    source_2: OE2,
42}
43
44impl<OE1, OE2> Concat<OE1, OE2> {
45    pub fn new<'or, 'sub, T, E>(source_1: OE1, source_2: OE2) -> Self
46    where
47        OE1: Observable<'or, 'sub, T, E>,
48        OE2: Observable<'or, 'sub, T, E>,
49    {
50        Self { source_1, source_2 }
51    }
52}
53
54impl<'or, 'sub, T, E, OE1, OE2> Observable<'or, 'sub, T, E> for Concat<OE1, OE2>
55where
56    OE1: Observable<'or, 'sub, T, E>,
57    OE2: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
58    'sub: 'or,
59{
60    fn subscribe(
61        self,
62        observer: impl Observer<T, E> + NecessarySendSync + 'or,
63    ) -> Subscription<'sub> {
64        let sub_2 = Shared::new(Mutable::new(None));
65        let onserver = ConcatObserver {
66            observer,
67            source_2: self.source_2,
68            sub_2: sub_2.clone(),
69        };
70        self.source_1.subscribe(onserver) + sub_2
71    }
72}
73
74struct ConcatObserver<'sub, OR, OE2> {
75    observer: OR,
76    source_2: OE2,
77    sub_2: Shared<Mutable<Option<Subscription<'sub>>>>,
78}
79
80impl<'or, 'sub, T, E, OR, OE2> Observer<T, E> for ConcatObserver<'sub, OR, OE2>
81where
82    OR: Observer<T, E> + NecessarySendSync + 'or,
83    OE2: Observable<'or, 'sub, T, E>,
84{
85    fn on_next(&mut self, value: T) {
86        self.observer.on_next(value);
87    }
88
89    fn on_termination(self, termination: Termination<E>) {
90        match termination {
91            Termination::Completed => {
92                let sub = self.source_2.subscribe(self.observer);
93                safe_lock_option!(replace: self.sub_2, sub);
94            }
95            Termination::Error(_) => {
96                self.observer.on_termination(termination);
97            }
98        }
99    }
100}