rx_rust/operators/utility/do_before_termination.rs
1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::{Observable, observable_ext::ObservableExt},
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Invokes a callback when the source Observable terminates (either completes or errors), before the termination notification has been emitted to the downstream observer.
10/// See <https://reactivex.io/documentation/operators/do.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15/// observable::observable_ext::ObservableExt,
16/// observer::Termination,
17/// operators::{
18/// creating::from_iter::FromIter,
19/// utility::do_before_termination::DoBeforeTermination,
20/// },
21/// };
22/// use std::sync::{Arc, Mutex};
23///
24/// let mut terminations = Vec::new();
25/// let callback_terminations = Arc::new(Mutex::new(Vec::new()));
26/// let callback_terminations_observer = Arc::clone(&callback_terminations);
27///
28/// DoBeforeTermination::new(FromIter::new(vec![1, 2]), move |termination| {
29/// callback_terminations_observer
30/// .lock()
31/// .unwrap()
32/// .push(termination.clone());
33/// })
34/// .subscribe_with_callback(
35/// |_| {},
36/// |termination| terminations.push(termination),
37/// );
38///
39/// assert_eq!(
40/// &*callback_terminations.lock().unwrap(),
41/// &[Termination::Completed]
42/// );
43/// assert_eq!(terminations, vec![Termination::Completed]);
44/// ```
45#[derive(Educe)]
46#[educe(Debug, Clone)]
47pub struct DoBeforeTermination<OE, F> {
48 source: OE,
49 callback: F,
50}
51
52impl<OE, F> DoBeforeTermination<OE, F> {
53 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
54 where
55 OE: Observable<'or, 'sub, T, E>,
56 F: FnOnce(&Termination<E>),
57 {
58 Self { source, callback }
59 }
60}
61
62impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for DoBeforeTermination<OE, F>
63where
64 T: 'or,
65 E: 'or,
66 OE: Observable<'or, 'sub, T, E>,
67 F: FnOnce(&Termination<E>) + NecessarySendSync + 'or,
68{
69 fn subscribe(
70 self,
71 observer: impl Observer<T, E> + NecessarySendSync + 'or,
72 ) -> Subscription<'sub> {
73 self.source
74 .hook_on_termination(move |observer, termination| {
75 (self.callback)(&termination);
76 observer.on_termination(termination)
77 })
78 .subscribe(observer)
79 }
80}