rxrust/ops/
finalize.rs

1use crate::prelude::*;
2use std::{
3  cell::RefCell,
4  rc::Rc,
5  sync::{Arc, Mutex},
6};
7
8#[derive(Clone)]
9pub struct FinalizeOp<S, F> {
10  pub(crate) source: S,
11  pub(crate) func: F,
12}
13
14impl<S, F> Observable for FinalizeOp<S, F>
15where
16  S: Observable,
17  F: FnMut(),
18{
19  type Item = S::Item;
20  type Err = S::Err;
21}
22
23impl<'a, S, F> LocalObservable<'a> for FinalizeOp<S, F>
24where
25  S: LocalObservable<'a>,
26  F: FnMut() + 'static,
27{
28  type Unsub = S::Unsub;
29
30  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
31    self,
32    subscriber: Subscriber<O, LocalSubscription>,
33  ) -> Self::Unsub {
34    let subscription = subscriber.subscription.clone();
35    let func = Rc::new(RefCell::new(Some(self.func)));
36    subscription.add(FinalizerSubscription {
37      is_closed: false,
38      func: func.clone(),
39    });
40    self.source.actual_subscribe(Subscriber {
41      observer: FinalizerObserver {
42        observer: subscriber.observer,
43        func,
44      },
45      subscription,
46    })
47  }
48}
49
50impl<S, F> SharedObservable for FinalizeOp<S, F>
51where
52  S: SharedObservable,
53  F: FnMut() + Send + Sync + 'static,
54  S::Unsub: Send + Sync,
55{
56  type Unsub = S::Unsub;
57
58  fn actual_subscribe<
59    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
60  >(
61    self,
62    subscriber: Subscriber<O, SharedSubscription>,
63  ) -> Self::Unsub {
64    let subscription = subscriber.subscription.clone();
65    let func = Arc::new(Mutex::new(Some(self.func)));
66    subscription.add(FinalizerSubscription {
67      is_closed: false,
68      func: func.clone(),
69    });
70    self.source.actual_subscribe(Subscriber {
71      observer: FinalizerObserver {
72        observer: subscriber.observer,
73        func,
74      },
75      subscription,
76    })
77  }
78}
79
80struct FinalizerObserver<O, F> {
81  observer: O,
82  func: F,
83}
84
85struct FinalizerSubscription<F> {
86  is_closed: bool,
87  func: F,
88}
89
90impl<Target> SubscriptionLike
91  for FinalizerSubscription<Arc<Mutex<Option<Target>>>>
92where
93  Target: FnMut(),
94{
95  fn unsubscribe(&mut self) {
96    self.is_closed = true;
97    if let Some(mut func) = (self.func.lock().unwrap()).take() {
98      func()
99    }
100  }
101
102  #[inline]
103  fn is_closed(&self) -> bool { self.is_closed }
104}
105
106impl<Target> SubscriptionLike
107  for FinalizerSubscription<Rc<RefCell<Option<Target>>>>
108where
109  Target: FnMut(),
110{
111  fn unsubscribe(&mut self) {
112    self.is_closed = true;
113    if let Some(mut func) = (self.func.borrow_mut()).take() {
114      func()
115    }
116  }
117
118  #[inline]
119  fn is_closed(&self) -> bool { self.is_closed }
120}
121
122impl<Target> SubscriptionLike for FinalizerSubscription<Box<Option<Target>>>
123where
124  Target: FnMut(),
125{
126  fn unsubscribe(&mut self) {
127    self.is_closed = true;
128    if let Some(mut func) = (self.func).take() {
129      func()
130    }
131  }
132
133  #[inline]
134  fn is_closed(&self) -> bool { self.is_closed }
135}
136
137impl<Item, Err, O, Target> Observer
138  for FinalizerObserver<O, Arc<Mutex<Option<Target>>>>
139where
140  O: Observer<Item = Item, Err = Err>,
141  Target: FnMut(),
142{
143  type Item = Item;
144  type Err = Err;
145  #[inline]
146  fn next(&mut self, value: Item) { self.observer.next(value); }
147
148  fn error(&mut self, err: Err) {
149    self.observer.error(err);
150    if let Some(mut func) = (self.func.lock().unwrap()).take() {
151      func()
152    }
153  }
154
155  fn complete(&mut self) {
156    self.observer.complete();
157    if let Some(mut func) = (self.func.lock().unwrap()).take() {
158      func()
159    }
160  }
161
162  #[inline]
163  fn is_stopped(&self) -> bool { self.observer.is_stopped() }
164}
165
166impl<Item, Err, O, Target> Observer
167  for FinalizerObserver<O, Rc<RefCell<Option<Target>>>>
168where
169  O: Observer<Item = Item, Err = Err>,
170  Target: FnMut(),
171{
172  type Item = Item;
173  type Err = Err;
174  #[inline]
175  fn next(&mut self, value: Item) { self.observer.next(value); }
176
177  fn error(&mut self, err: Err) {
178    self.observer.error(err);
179    if let Some(mut func) = (self.func.borrow_mut()).take() {
180      func()
181    }
182  }
183
184  fn complete(&mut self) {
185    self.observer.complete();
186    if let Some(mut func) = (self.func.borrow_mut()).take() {
187      func()
188    }
189  }
190
191  #[inline]
192  fn is_stopped(&self) -> bool { self.observer.is_stopped() }
193}
194
195impl<Item, Err, O, Target> Observer
196  for FinalizerObserver<O, Box<Option<Target>>>
197where
198  O: Observer<Item = Item, Err = Err>,
199  Target: FnMut(),
200{
201  type Item = Item;
202  type Err = Err;
203  #[inline]
204  fn next(&mut self, value: Item) { self.observer.next(value); }
205
206  fn error(&mut self, err: Err) {
207    self.observer.error(err);
208    if let Some(mut func) = (self.func).take() {
209      func()
210    }
211  }
212
213  fn complete(&mut self) {
214    self.observer.complete();
215    if let Some(mut func) = (self.func).take() {
216      func()
217    }
218  }
219
220  #[inline]
221  fn is_stopped(&self) -> bool { self.observer.is_stopped() }
222}
223
224#[cfg(test)]
225mod test {
226  use crate::prelude::*;
227  use std::cell::Cell;
228  use std::rc::Rc;
229  use std::sync::{
230    atomic::{AtomicBool, Ordering},
231    Arc,
232  };
233
234  #[test]
235  fn finalize_on_complete_simple() {
236    // Given
237    let finalized = Rc::new(Cell::new(false));
238    let mut nexted = false;
239    let o = observable::of(1);
240    // When
241    let finalized_clone = finalized.clone();
242    o.finalize(move || finalized_clone.set(true))
243      .subscribe(|_| nexted = true);
244    // Then
245    assert!(finalized.get());
246    assert!(nexted);
247  }
248
249  #[test]
250  fn finalize_on_complete_subject() {
251    // Given
252    let finalized = Rc::new(Cell::new(false));
253    let nexted = Rc::new(Cell::new(false));
254    let mut s = LocalSubject::new();
255    // When
256    let finalized_clone = finalized.clone();
257    let nexted_clone = nexted.clone();
258    s.clone()
259      .finalize(move || finalized_clone.set(true))
260      .subscribe(move |_| nexted_clone.set(true));
261    s.next(1);
262    s.next(2);
263    s.complete();
264    // Then
265    assert!(finalized.get());
266    assert!(nexted.get());
267  }
268
269  #[test]
270  fn finalize_on_unsubscribe() {
271    // Given
272    let finalized = Rc::new(Cell::new(false));
273    let nexted = Rc::new(Cell::new(false));
274    let mut s = LocalSubject::new();
275    // When
276    let finalized_clone = finalized.clone();
277    let nexted_clone = nexted.clone();
278    let mut subscription = s
279      .clone()
280      .finalize(move || finalized_clone.set(true))
281      .subscribe(move |_| nexted_clone.set(true));
282    s.next(1);
283    s.next(2);
284    subscription.unsubscribe();
285    // Then
286    assert!(finalized.get());
287    assert!(nexted.get());
288  }
289
290  #[test]
291  fn finalize_on_error() {
292    // Given
293    let finalized = Rc::new(Cell::new(false));
294    let nexted = Rc::new(Cell::new(false));
295    let errored = Rc::new(Cell::new(false));
296    let mut s: LocalSubject<i32, &'static str> = LocalSubject::new();
297    // When
298    let finalized_clone = finalized.clone();
299    let nexted_clone = nexted.clone();
300    let errored_clone = errored.clone();
301    s.clone()
302      .finalize(move || finalized_clone.set(true))
303      .subscribe_err(
304        move |_| nexted_clone.set(true),
305        move |_| errored_clone.set(true),
306      );
307    s.next(1);
308    s.next(2);
309    s.error("oops");
310    // Then
311    assert!(finalized.get());
312    assert!(errored.get());
313    assert!(nexted.get());
314  }
315
316  #[test]
317  fn finalize_only_once() {
318    // Given
319    let finalize_count = Rc::new(Cell::new(0));
320    let mut s: LocalSubject<i32, &'static str> = LocalSubject::new();
321    // When
322    let finalized_clone = finalize_count.clone();
323    let mut subscription = s
324      .clone()
325      .finalize(move || finalized_clone.set(finalized_clone.get() + 1))
326      .subscribe_err(|_| (), |_| ());
327    s.next(1);
328    s.next(2);
329    s.error("oops");
330    s.complete();
331    subscription.unsubscribe();
332    // Then
333    assert_eq!(finalize_count.get(), 1);
334  }
335
336  #[test]
337  fn finalize_shared() {
338    // Given
339    let finalized = Arc::new(AtomicBool::new(false));
340    let mut s = SharedSubject::new();
341    // When
342    let finalized_clone = finalized.clone();
343    let mut subscription = s
344      .clone()
345      .into_shared()
346      .finalize(move || finalized_clone.store(true, Ordering::Relaxed))
347      .into_shared()
348      .subscribe(|_| ());
349    s.next(1);
350    s.next(2);
351    subscription.unsubscribe();
352    // Then
353    assert!(finalized.load(Ordering::Relaxed));
354  }
355
356  #[test]
357  fn bench() { do_bench(); }
358
359  benchmark_group!(do_bench, bench_finalize);
360
361  fn bench_finalize(b: &mut bencher::Bencher) {
362    b.iter(finalize_on_complete_simple);
363  }
364}