rx_rust/operators/utility/
do_before_termination.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::{Observable, observable_ext::ObservableExt},
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Invokes a callback when the source Observable terminates (either completes or errors), before the termination notification has been emitted to the downstream observer.
10/// See <https://reactivex.io/documentation/operators/do.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         creating::from_iter::FromIter,
19///         utility::do_before_termination::DoBeforeTermination,
20///     },
21/// };
22/// use std::sync::{Arc, Mutex};
23///
24/// let mut terminations = Vec::new();
25/// let callback_terminations = Arc::new(Mutex::new(Vec::new()));
26/// let callback_terminations_observer = Arc::clone(&callback_terminations);
27///
28/// DoBeforeTermination::new(FromIter::new(vec![1, 2]), move |termination| {
29///     callback_terminations_observer
30///         .lock()
31///         .unwrap()
32///         .push(termination.clone());
33/// })
34/// .subscribe_with_callback(
35///     |_| {},
36///     |termination| terminations.push(termination),
37/// );
38///
39/// assert_eq!(
40///     &*callback_terminations.lock().unwrap(),
41///     &[Termination::Completed]
42/// );
43/// assert_eq!(terminations, vec![Termination::Completed]);
44/// ```
45#[derive(Educe)]
46#[educe(Debug, Clone)]
47pub struct DoBeforeTermination<OE, F> {
48    source: OE,
49    callback: F,
50}
51
52impl<OE, F> DoBeforeTermination<OE, F> {
53    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
54    where
55        OE: Observable<'or, 'sub, T, E>,
56        F: FnOnce(&Termination<E>),
57    {
58        Self { source, callback }
59    }
60}
61
62impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for DoBeforeTermination<OE, F>
63where
64    T: 'or,
65    E: 'or,
66    OE: Observable<'or, 'sub, T, E>,
67    F: FnOnce(&Termination<E>) + NecessarySendSync + 'or,
68{
69    fn subscribe(
70        self,
71        observer: impl Observer<T, E> + NecessarySendSync + 'or,
72    ) -> Subscription<'sub> {
73        self.source
74            .hook_on_termination(move |observer, termination| {
75                (self.callback)(&termination);
76                observer.on_termination(termination)
77            })
78            .subscribe(observer)
79    }
80}