rx_rust/operators/combining/
concat.rs1use crate::disposable::subscription::Subscription;
2use crate::safe_lock_option;
3use crate::utils::types::{Mutable, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9
10#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Concat<OE1, OE2> {
40 source_1: OE1,
41 source_2: OE2,
42}
43
44impl<OE1, OE2> Concat<OE1, OE2> {
45 pub fn new<'or, 'sub, T, E>(source_1: OE1, source_2: OE2) -> Self
46 where
47 OE1: Observable<'or, 'sub, T, E>,
48 OE2: Observable<'or, 'sub, T, E>,
49 {
50 Self { source_1, source_2 }
51 }
52}
53
54impl<'or, 'sub, T, E, OE1, OE2> Observable<'or, 'sub, T, E> for Concat<OE1, OE2>
55where
56 OE1: Observable<'or, 'sub, T, E>,
57 OE2: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
58 'sub: 'or,
59{
60 fn subscribe(
61 self,
62 observer: impl Observer<T, E> + NecessarySendSync + 'or,
63 ) -> Subscription<'sub> {
64 let sub_2 = Shared::new(Mutable::new(None));
65 let onserver = ConcatObserver {
66 observer,
67 source_2: self.source_2,
68 sub_2: sub_2.clone(),
69 };
70 self.source_1.subscribe(onserver) + sub_2
71 }
72}
73
74struct ConcatObserver<'sub, OR, OE2> {
75 observer: OR,
76 source_2: OE2,
77 sub_2: Shared<Mutable<Option<Subscription<'sub>>>>,
78}
79
80impl<'or, 'sub, T, E, OR, OE2> Observer<T, E> for ConcatObserver<'sub, OR, OE2>
81where
82 OR: Observer<T, E> + NecessarySendSync + 'or,
83 OE2: Observable<'or, 'sub, T, E>,
84{
85 fn on_next(&mut self, value: T) {
86 self.observer.on_next(value);
87 }
88
89 fn on_termination(self, termination: Termination<E>) {
90 match termination {
91 Termination::Completed => {
92 let sub = self.source_2.subscribe(self.observer);
93 safe_lock_option!(replace: self.sub_2, sub);
94 }
95 Termination::Error(_) => {
96 self.observer.on_termination(termination);
97 }
98 }
99 }
100}