1use crate::disposable::Disposable;
2use crate::disposable::subscription::Subscription;
3use crate::utils::types::{MutGuard, Mutable, MutableHelper, NecessarySendSync, Shared};
4use crate::{
5 observable::Observable,
6 observer::{Observer, Termination},
7 operators::creating::from_iter::FromIter,
8 utils::{types::MarkerType, unsub_after_termination::subscribe_unsub_after_termination},
9};
10use crate::{safe_lock_option, safe_lock_option_disposable, safe_lock_option_observer};
11use educe::Educe;
12use std::{
13 collections::VecDeque,
14 marker::PhantomData,
15 sync::atomic::{AtomicBool, Ordering},
16};
17
18#[derive(Educe)]
48#[educe(Debug, Clone)]
49pub struct ConcatAll<OE, OE1> {
50 source: OE,
51 _marker: MarkerType<OE1>,
52}
53
54impl<OE, OE1> ConcatAll<OE, OE1> {
55 pub fn new<'or, 'sub, T, E>(source: OE) -> Self
56 where
57 OE: Observable<'or, 'sub, OE1, E>,
58 OE1: Observable<'or, 'sub, T, E>,
59 {
60 Self {
61 source,
62 _marker: PhantomData,
63 }
64 }
65}
66
67impl<OE1, I> ConcatAll<FromIter<I>, OE1> {
68 pub fn new_from_iter<'or, 'sub, T, E>(into_iterator: I) -> Self
69 where
70 I: IntoIterator<Item = OE1>,
71 OE1: Observable<'or, 'sub, T, E>,
72 {
73 Self {
74 source: FromIter::new(into_iterator),
75 _marker: PhantomData,
76 }
77 }
78}
79
80impl<'or, 'sub, T, E, OE, OE1> Observable<'or, 'sub, T, E> for ConcatAll<OE, OE1>
81where
82 T: 'or,
83 OE: Observable<'or, 'sub, OE1, E>,
84 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'sub,
85 'sub: 'or,
86{
87 fn subscribe(
88 self,
89 observer: impl Observer<T, E> + NecessarySendSync + 'or,
90 ) -> Subscription<'sub> {
91 subscribe_unsub_after_termination(observer, |observer| {
92 let context = Shared::new(Mutable::new(ConcatAllContext {
93 pending_observables: VecDeque::new(),
94 on_going_sub: None,
95 completed: false,
96 }));
97 let observer = ConcatAllObserver {
98 observer: Shared::new(Mutable::new(Some(observer))),
99 context: context.clone(),
100 _marker: PhantomData,
101 };
102 self.source.subscribe(observer) + context
103 })
104 }
105}
106
107struct ConcatAllContext<'sub, OE1> {
108 pending_observables: VecDeque<OE1>,
109 on_going_sub: Option<Subscription<'sub>>,
110 completed: bool,
111}
112
113impl<OE1> Disposable for Shared<Mutable<ConcatAllContext<'_, OE1>>> {
114 fn dispose(self) {
115 safe_lock_option_disposable!(dispose: self, on_going_sub);
116 }
117}
118
119fn subscribe_next<'or, 'sub, T, E, OR, OE1>(
120 lock: Option<MutGuard<'_, ConcatAllContext<'sub, OE1>>>,
121 context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
122 observer: Shared<Mutable<Option<OR>>>,
123) where
124 OR: Observer<T, E> + NecessarySendSync + 'or,
125 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
126 'sub: 'or,
127{
128 let implementation = |mut lock: MutGuard<'_, ConcatAllContext<'sub, OE1>>| {
129 if let Some(observable) = lock.pending_observables.pop_front() {
130 drop(lock);
131 let terminated = Shared::new(AtomicBool::new(false));
132 let observer = ConcatAllInnerObserver {
133 observer: observer.clone(),
134 context: context.clone(),
135 terminated: terminated.clone(),
136 };
137 let sub = observable.subscribe(observer);
138 if !terminated.load(Ordering::SeqCst) {
139 safe_lock_option!(replace: context, on_going_sub, sub);
140 }
141 } else if lock.completed {
142 drop(lock);
143 safe_lock_option_observer!(on_termination: observer, Termination::Completed);
144 } else {
145 lock.on_going_sub.take();
146 }
147 };
148 if let Some(lock) = lock {
149 implementation(lock);
150 } else {
151 context.lock_mut(implementation);
152 }
153}
154
155struct ConcatAllObserver<'sub, T, OR, OE1> {
156 observer: Shared<Mutable<Option<OR>>>,
157 context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
158 _marker: MarkerType<T>,
159}
160
161impl<'or, 'sub, T, E, OR, OE1> Observer<OE1, E> for ConcatAllObserver<'sub, T, OR, OE1>
162where
163 OR: Observer<T, E> + NecessarySendSync + 'or,
164 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
165 'sub: 'or,
166{
167 fn on_next(&mut self, value: OE1) {
168 self.context.lock_mut(|mut lock| {
169 lock.pending_observables.push_back(value);
170 if lock.on_going_sub.is_none() {
171 subscribe_next(Some(lock), self.context.clone(), self.observer.clone());
172 }
173 });
174 }
175
176 fn on_termination(self, termination: Termination<E>) {
177 match termination {
178 Termination::Completed => {
179 self.context.lock_mut(|mut lock| {
180 lock.completed = true;
181 if lock.on_going_sub.is_none() && lock.pending_observables.is_empty() {
182 drop(lock);
183 safe_lock_option_observer!(on_termination: self.observer, termination);
184 }
185 });
186 }
187 Termination::Error(_) => {
188 safe_lock_option_observer!(on_termination: self.observer, termination);
189 }
190 }
191 }
192}
193
194struct ConcatAllInnerObserver<'sub, OR, OE1> {
195 observer: Shared<Mutable<Option<OR>>>,
196 context: Shared<Mutable<ConcatAllContext<'sub, OE1>>>,
197 terminated: Shared<AtomicBool>,
198}
199
200impl<'or, 'sub, T, E, OR, OE1> Observer<T, E> for ConcatAllInnerObserver<'sub, OR, OE1>
201where
202 OR: Observer<T, E> + NecessarySendSync + 'or,
203 OE1: Observable<'or, 'sub, T, E> + NecessarySendSync + 'or,
204 'sub: 'or,
205{
206 fn on_next(&mut self, value: T) {
207 safe_lock_option_observer!(on_next: self.observer, value);
208 }
209
210 fn on_termination(self, termination: Termination<E>) {
211 self.terminated.store(true, Ordering::SeqCst);
212 match termination {
213 Termination::Completed => subscribe_next(None, self.context, self.observer),
214 Termination::Error(_) => {
215 safe_lock_option_observer!(on_termination: self.observer, termination);
216 }
217 }
218 }
219}