1use crate::prelude::*;
2use std::{
3 cell::RefCell,
4 rc::Rc,
5 sync::{Arc, Mutex},
6};
7
8#[derive(Clone)]
9pub struct FinalizeOp<S, F> {
10 pub(crate) source: S,
11 pub(crate) func: F,
12}
13
14impl<S, F> Observable for FinalizeOp<S, F>
15where
16 S: Observable,
17 F: FnMut(),
18{
19 type Item = S::Item;
20 type Err = S::Err;
21}
22
23impl<'a, S, F> LocalObservable<'a> for FinalizeOp<S, F>
24where
25 S: LocalObservable<'a>,
26 F: FnMut() + 'static,
27{
28 type Unsub = S::Unsub;
29
30 fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
31 self,
32 subscriber: Subscriber<O, LocalSubscription>,
33 ) -> Self::Unsub {
34 let subscription = subscriber.subscription.clone();
35 let func = Rc::new(RefCell::new(Some(self.func)));
36 subscription.add(FinalizerSubscription {
37 is_closed: false,
38 func: func.clone(),
39 });
40 self.source.actual_subscribe(Subscriber {
41 observer: FinalizerObserver {
42 observer: subscriber.observer,
43 func,
44 },
45 subscription,
46 })
47 }
48}
49
50impl<S, F> SharedObservable for FinalizeOp<S, F>
51where
52 S: SharedObservable,
53 F: FnMut() + Send + Sync + 'static,
54 S::Unsub: Send + Sync,
55{
56 type Unsub = S::Unsub;
57
58 fn actual_subscribe<
59 O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
60 >(
61 self,
62 subscriber: Subscriber<O, SharedSubscription>,
63 ) -> Self::Unsub {
64 let subscription = subscriber.subscription.clone();
65 let func = Arc::new(Mutex::new(Some(self.func)));
66 subscription.add(FinalizerSubscription {
67 is_closed: false,
68 func: func.clone(),
69 });
70 self.source.actual_subscribe(Subscriber {
71 observer: FinalizerObserver {
72 observer: subscriber.observer,
73 func,
74 },
75 subscription,
76 })
77 }
78}
79
80struct FinalizerObserver<O, F> {
81 observer: O,
82 func: F,
83}
84
85struct FinalizerSubscription<F> {
86 is_closed: bool,
87 func: F,
88}
89
90impl<Target> SubscriptionLike
91 for FinalizerSubscription<Arc<Mutex<Option<Target>>>>
92where
93 Target: FnMut(),
94{
95 fn unsubscribe(&mut self) {
96 self.is_closed = true;
97 if let Some(mut func) = (self.func.lock().unwrap()).take() {
98 func()
99 }
100 }
101
102 #[inline]
103 fn is_closed(&self) -> bool { self.is_closed }
104}
105
106impl<Target> SubscriptionLike
107 for FinalizerSubscription<Rc<RefCell<Option<Target>>>>
108where
109 Target: FnMut(),
110{
111 fn unsubscribe(&mut self) {
112 self.is_closed = true;
113 if let Some(mut func) = (self.func.borrow_mut()).take() {
114 func()
115 }
116 }
117
118 #[inline]
119 fn is_closed(&self) -> bool { self.is_closed }
120}
121
122impl<Target> SubscriptionLike for FinalizerSubscription<Box<Option<Target>>>
123where
124 Target: FnMut(),
125{
126 fn unsubscribe(&mut self) {
127 self.is_closed = true;
128 if let Some(mut func) = (self.func).take() {
129 func()
130 }
131 }
132
133 #[inline]
134 fn is_closed(&self) -> bool { self.is_closed }
135}
136
137impl<Item, Err, O, Target> Observer
138 for FinalizerObserver<O, Arc<Mutex<Option<Target>>>>
139where
140 O: Observer<Item = Item, Err = Err>,
141 Target: FnMut(),
142{
143 type Item = Item;
144 type Err = Err;
145 #[inline]
146 fn next(&mut self, value: Item) { self.observer.next(value); }
147
148 fn error(&mut self, err: Err) {
149 self.observer.error(err);
150 if let Some(mut func) = (self.func.lock().unwrap()).take() {
151 func()
152 }
153 }
154
155 fn complete(&mut self) {
156 self.observer.complete();
157 if let Some(mut func) = (self.func.lock().unwrap()).take() {
158 func()
159 }
160 }
161
162 #[inline]
163 fn is_stopped(&self) -> bool { self.observer.is_stopped() }
164}
165
166impl<Item, Err, O, Target> Observer
167 for FinalizerObserver<O, Rc<RefCell<Option<Target>>>>
168where
169 O: Observer<Item = Item, Err = Err>,
170 Target: FnMut(),
171{
172 type Item = Item;
173 type Err = Err;
174 #[inline]
175 fn next(&mut self, value: Item) { self.observer.next(value); }
176
177 fn error(&mut self, err: Err) {
178 self.observer.error(err);
179 if let Some(mut func) = (self.func.borrow_mut()).take() {
180 func()
181 }
182 }
183
184 fn complete(&mut self) {
185 self.observer.complete();
186 if let Some(mut func) = (self.func.borrow_mut()).take() {
187 func()
188 }
189 }
190
191 #[inline]
192 fn is_stopped(&self) -> bool { self.observer.is_stopped() }
193}
194
195impl<Item, Err, O, Target> Observer
196 for FinalizerObserver<O, Box<Option<Target>>>
197where
198 O: Observer<Item = Item, Err = Err>,
199 Target: FnMut(),
200{
201 type Item = Item;
202 type Err = Err;
203 #[inline]
204 fn next(&mut self, value: Item) { self.observer.next(value); }
205
206 fn error(&mut self, err: Err) {
207 self.observer.error(err);
208 if let Some(mut func) = (self.func).take() {
209 func()
210 }
211 }
212
213 fn complete(&mut self) {
214 self.observer.complete();
215 if let Some(mut func) = (self.func).take() {
216 func()
217 }
218 }
219
220 #[inline]
221 fn is_stopped(&self) -> bool { self.observer.is_stopped() }
222}
223
224#[cfg(test)]
225mod test {
226 use crate::prelude::*;
227 use std::cell::Cell;
228 use std::rc::Rc;
229 use std::sync::{
230 atomic::{AtomicBool, Ordering},
231 Arc,
232 };
233
234 #[test]
235 fn finalize_on_complete_simple() {
236 let finalized = Rc::new(Cell::new(false));
238 let mut nexted = false;
239 let o = observable::of(1);
240 let finalized_clone = finalized.clone();
242 o.finalize(move || finalized_clone.set(true))
243 .subscribe(|_| nexted = true);
244 assert!(finalized.get());
246 assert!(nexted);
247 }
248
249 #[test]
250 fn finalize_on_complete_subject() {
251 let finalized = Rc::new(Cell::new(false));
253 let nexted = Rc::new(Cell::new(false));
254 let mut s = LocalSubject::new();
255 let finalized_clone = finalized.clone();
257 let nexted_clone = nexted.clone();
258 s.clone()
259 .finalize(move || finalized_clone.set(true))
260 .subscribe(move |_| nexted_clone.set(true));
261 s.next(1);
262 s.next(2);
263 s.complete();
264 assert!(finalized.get());
266 assert!(nexted.get());
267 }
268
269 #[test]
270 fn finalize_on_unsubscribe() {
271 let finalized = Rc::new(Cell::new(false));
273 let nexted = Rc::new(Cell::new(false));
274 let mut s = LocalSubject::new();
275 let finalized_clone = finalized.clone();
277 let nexted_clone = nexted.clone();
278 let mut subscription = s
279 .clone()
280 .finalize(move || finalized_clone.set(true))
281 .subscribe(move |_| nexted_clone.set(true));
282 s.next(1);
283 s.next(2);
284 subscription.unsubscribe();
285 assert!(finalized.get());
287 assert!(nexted.get());
288 }
289
290 #[test]
291 fn finalize_on_error() {
292 let finalized = Rc::new(Cell::new(false));
294 let nexted = Rc::new(Cell::new(false));
295 let errored = Rc::new(Cell::new(false));
296 let mut s: LocalSubject<i32, &'static str> = LocalSubject::new();
297 let finalized_clone = finalized.clone();
299 let nexted_clone = nexted.clone();
300 let errored_clone = errored.clone();
301 s.clone()
302 .finalize(move || finalized_clone.set(true))
303 .subscribe_err(
304 move |_| nexted_clone.set(true),
305 move |_| errored_clone.set(true),
306 );
307 s.next(1);
308 s.next(2);
309 s.error("oops");
310 assert!(finalized.get());
312 assert!(errored.get());
313 assert!(nexted.get());
314 }
315
316 #[test]
317 fn finalize_only_once() {
318 let finalize_count = Rc::new(Cell::new(0));
320 let mut s: LocalSubject<i32, &'static str> = LocalSubject::new();
321 let finalized_clone = finalize_count.clone();
323 let mut subscription = s
324 .clone()
325 .finalize(move || finalized_clone.set(finalized_clone.get() + 1))
326 .subscribe_err(|_| (), |_| ());
327 s.next(1);
328 s.next(2);
329 s.error("oops");
330 s.complete();
331 subscription.unsubscribe();
332 assert_eq!(finalize_count.get(), 1);
334 }
335
336 #[test]
337 fn finalize_shared() {
338 let finalized = Arc::new(AtomicBool::new(false));
340 let mut s = SharedSubject::new();
341 let finalized_clone = finalized.clone();
343 let mut subscription = s
344 .clone()
345 .into_shared()
346 .finalize(move || finalized_clone.store(true, Ordering::Relaxed))
347 .into_shared()
348 .subscribe(|_| ());
349 s.next(1);
350 s.next(2);
351 subscription.unsubscribe();
352 assert!(finalized.load(Ordering::Relaxed));
354 }
355
356 #[test]
357 fn bench() { do_bench(); }
358
359 benchmark_group!(do_bench, bench_finalize);
360
361 fn bench_finalize(b: &mut bencher::Bencher) {
362 b.iter(finalize_on_complete_simple);
363 }
364}