rx_rust/operators/utility/
do_after_termination.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::{Observable, observable_ext::ObservableExt},
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Invokes a callback when the source Observable terminates (either completes or errors), after the termination notification has been emitted to the downstream observer.
10/// See <https://reactivex.io/documentation/operators/do.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         creating::from_iter::FromIter,
19///         utility::do_after_termination::DoAfterTermination,
20///     },
21/// };
22/// use std::sync::{Arc, Mutex};
23///
24/// let mut terminations = Vec::new();
25/// let callback_terminations = Arc::new(Mutex::new(Vec::new()));
26/// let callback_terminations_observer = Arc::clone(&callback_terminations);
27///
28/// DoAfterTermination::new(FromIter::new(vec![1, 2]), move |termination| {
29///     callback_terminations_observer
30///         .lock()
31///         .unwrap()
32///         .push(termination);
33/// })
34/// .subscribe_with_callback(
35///     |_| {},
36///     |termination| terminations.push(termination),
37/// );
38///
39/// assert_eq!(terminations, vec![Termination::Completed]);
40/// assert_eq!(
41///     &*callback_terminations.lock().unwrap(),
42///     &[Termination::Completed]
43/// );
44/// ```
45#[derive(Educe)]
46#[educe(Debug, Clone)]
47pub struct DoAfterTermination<OE, F> {
48    source: OE,
49    callback: F,
50}
51
52impl<OE, F> DoAfterTermination<OE, F> {
53    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
54    where
55        OE: Observable<'or, 'sub, T, E>,
56        F: FnOnce(Termination<E>),
57    {
58        Self { source, callback }
59    }
60}
61
62impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for DoAfterTermination<OE, F>
63where
64    T: 'or,
65    E: Clone + 'or,
66    OE: Observable<'or, 'sub, T, E>,
67    F: FnOnce(Termination<E>) + NecessarySendSync + 'or,
68{
69    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
70        self.source
71            .hook_on_termination(move |observer, termination| {
72                observer.on_termination(termination.clone());
73                (self.callback)(termination);
74            })
75            .subscribe(observer)
76    }
77}