rx_rust/operators/combining/
concat.rs1use crate::disposable::subscription::Subscription;
2use crate::safe_lock_option;
3use crate::utils::types::{Mutable, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9
10#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct Concat<OE1, OE2> {
40 source_1: OE1,
41 source_2: OE2,
42}
43
44impl<OE1, OE2> Concat<OE1, OE2> {
45 pub fn new<'or, 'sub, T, E>(source_1: OE1, source_2: OE2) -> Self
46 where
47 OE1: Observable<'or, 'sub, T, E>,
48 OE2: Observable<'or, 'sub, T, E>,
49 {
50 Self { source_1, source_2 }
51 }
52}
53
54impl<'or, 'sub, T, E, OE1, OE2> Observable<'or, 'sub, T, E> for Concat<OE1, OE2>
55where
56 OE1: Observable<'or, 'sub, T, E>,
57 OE2: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
58 'sub: 'or,
59{
60 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
61 let sub_2 = Shared::new(Mutable::new(None));
62 let onserver = ConcatObserver {
63 observer,
64 source_2: self.source_2,
65 sub_2: sub_2.clone(),
66 };
67 self.source_1.subscribe(onserver) + sub_2
68 }
69}
70
71struct ConcatObserver<'sub, OR, OE2> {
72 observer: OR,
73 source_2: OE2,
74 sub_2: Shared<Mutable<Option<Subscription<'sub>>>>,
75}
76
77impl<'or, 'sub, T, E, OR, OE2> Observer<T, E> for ConcatObserver<'sub, OR, OE2>
78where
79 OR: Observer<T, E> + NecessarySendSync + 'or,
80 OE2: Observable<'or, 'sub, T, E>,
81{
82 fn on_next(&mut self, value: T) {
83 self.observer.on_next(value);
84 }
85
86 fn on_termination(self, termination: Termination<E>) {
87 match termination {
88 Termination::Completed => {
89 let sub = self.source_2.subscribe(self.observer);
90 safe_lock_option!(replace: self.sub_2, sub);
91 }
92 Termination::Error(_) => {
93 self.observer.on_termination(termination);
94 }
95 }
96 }
97}