1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{MutGuard, Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7 operators::creating::from_iter::FromIter,
8 utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer};
11use educe::Educe;
12use std::{
13 collections::VecDeque,
14 marker::PhantomData,
15 sync::atomic::{AtomicBool, Ordering},
16};
17
18#[derive(Educe)]
48#[educe(Debug, Clone)]
49pub struct ConcatAll<OE, OE1> {
50 source: OE,
51 _marker: MarkerType<OE1>,
52}
53
54impl<OE, OE1> ConcatAll<OE, OE1> {
55 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
56 where
57 OE: Observable<'or, 'sub, OE1, E>,
58 OE1: Observable<'or, 'sub, T, E>,
59 {
60 Self {
61 source,
62 _marker: PhantomData,
63 }
64 }
65}
66
67impl<OE1, I> ConcatAll<FromIter<I>, OE1> {
68 pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
69 where
70 I: IntoIterator<Item = OE1>,
71 OE1: Observable<'or, 'sub, T, E>,
72 {
73 Self {
74 source: FromIter::new(into_iterator),
75 _marker: PhantomData,
76 }
77 }
78}
79
80impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for ConcatAll<OE, OE1>
81where
82 T: 'or,
83 OE: Observable<'or, 'sub, OE1, E>,
84 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'sub,
85 'sub: 'or,
86{
87 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
88 subscribe_unsub_after_termination(observer, |observer| {
89 let context = Shared::new(Mutable::new(ConcatAllContext {
90 pending_observables: VecDeque::new(),
91 on_going_sub: None,
92 completed: false,
93 }));
94 let observer = ConcatAllObserver {
95 observer: Shared::new(Mutable::new(Some(observer))),
96 context: context.clone(),
97 _marker: PhantomData,
98 };
99 self.source.subscribe(observer) + context
100 })
101 }
102}
103
104struct ConcatAllContext<'sub, OE1> {
105 pending_observables: VecDeque<OE1>,
106 on_going_sub: Option<Subscription<'sub>>,
107 completed: bool,
108}
109
110impl<OE1> Disposable for Shared<Mutable<ConcatAllContext<'_, OE1>>> {
111 fn dispose(self) {
112 safe_lock_option_disposable!(dispose: self, on_going_sub);
113 }
114}
115
116fn subscribe_next<'or, 'sub, T, E, OR, OE1>(
117 lock: Option<MutGuard<'_, ConcatAllContext<'sub, OE1>>>,
118 context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
119 observer: Shared<Mutable<Option<OR>>>,
120) where
121 OR: Observer<T, E> + NecessarySendSync + 'or,
122 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
123 'sub: 'or,
124{
125 let implementation = |mut lock: MutGuard<'_, ConcatAllContext<'sub, OE1>>| {
126 if let Some(observable) = lock.pending_observables.pop_front() {
127 drop(lock);
128 let terminated = Shared::new(AtomicBool::new(false));
129 let observer = ConcatAllInnerObserver {
130 observer: observer.clone(),
131 context: context.clone(),
132 terminated: terminated.clone(),
133 };
134 let sub = observable.subscribe(observer);
135 if !terminated.load(Ordering::SeqCst) {
136 safe_lock_option!(replace: context, on_going_sub, sub);
137 }
138 } else if lock.completed {
139 drop(lock);
140 safe_lock_option_observer!(on_termination: observer, Termination::Completed);
141 } else {
142 lock.on_going_sub.take();
143 }
144 };
145 if let Some(lock) = lock {
146 implementation(lock);
147 } else {
148 context.lock_mut(implementation);
149 }
150}
151
152struct ConcatAllObserver<'sub, T, OR, OE1> {
153 observer: Shared<Mutable<Option<OR>>>,
154 context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
155 _marker: MarkerType<T>,
156}
157
158impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for ConcatAllObserver<'sub, T, OR, OE1>
159where
160 OR: Observer<T, E> + NecessarySendSync + 'or,
161 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
162 'sub: 'or,
163{
164 fn on_next(&mut self, value: OE1) {
165 self.context.lock_mut(|mut lock| {
166 lock.pending_observables.push_back(value);
167 if lock.on_going_sub.is_none() {
168 subscribe_next(Some(lock), self.context.clone(), self.observer.clone());
169 }
170 });
171 }
172
173 fn on_termination(self, termination: Termination<E>) {
174 match termination {
175 Termination::Completed => {
176 self.context.lock_mut(|mut lock| {
177 lock.completed = true;
178 if lock.on_going_sub.is_none() && lock.pending_observables.is_empty() {
179 drop(lock);
180 safe_lock_option_observer!(on_termination: self.observer, termination);
181 }
182 });
183 }
184 Termination::Error(_) => {
185 safe_lock_option_observer!(on_termination: self.observer, termination);
186 }
187 }
188 }
189}
190
191struct ConcatAllInnerObserver<'sub, OR, OE1> {
192 observer: Shared<Mutable<Option<OR>>>,
193 context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
194 terminated: Shared<AtomicBool>,
195}
196
197impl<'or, 'sub, T, E, OR, OE1> Observer<T, E> for ConcatAllInnerObserver<'sub, OR, OE1>
198where
199 OR: Observer<T, E> + NecessarySendSync + 'or,
200 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
201 'sub: 'or,
202{
203 fn on_next(&mut self, value: T) {
204 safe_lock_option_observer!(on_next: self.observer, value);
205 }
206
207 fn on_termination(self, termination: Termination<E>) {
208 self.terminated.store(true, Ordering::SeqCst);
209 match termination {
210 Termination::Completed => subscribe_next(None, self.context, self.observer),
211 Termination::Error(_) => {
212 safe_lock_option_observer!(on_termination: self.observer, termination);
213 }
214 }
215 }
216}