rx_rust/operators/utility/
do_before_disposal.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::{Observable, observable_ext::ObservableExt},
5    observer::Observer,
6};
7use educe::Educe;
8
9/// Invokes a callback when the subscription is disposed, before the disposal logic is executed.
10/// See <https://reactivex.io/documentation/operators/do.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     operators::{
17///         creating::from_iter::FromIter,
18///         utility::do_before_disposal::DoBeforeDisposal,
19///     },
20/// };
21/// use std::sync::{Arc, Mutex};
22///
23/// let disposed = Arc::new(Mutex::new(false));
24/// let disposed_observer = Arc::clone(&disposed);
25///
26/// let subscription = DoBeforeDisposal::new(
27///     FromIter::new(vec![1, 2]),
28///     move || *disposed_observer.lock().unwrap() = true,
29/// )
30/// .subscribe_with_callback(|_| {}, |_| {});
31///
32/// drop(subscription);
33///
34/// assert!(*disposed.lock().unwrap());
35/// ```
36#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct DoBeforeDisposal<OE, F> {
39    source: OE,
40    callback: F,
41}
42
43impl<OE, F> DoBeforeDisposal<OE, F> {
44    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
45    where
46        OE: Observable<'or, 'sub, T, E>,
47        F: FnOnce(),
48    {
49        Self { source, callback }
50    }
51}
52
53impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for DoBeforeDisposal<OE, F>
54where
55    T: 'or,
56    E: 'or,
57    OE: Observable<'or, 'sub, T, E>,
58    F: FnOnce() + NecessarySendSync + 'sub,
59{
60    fn subscribe(
61        self,
62        observer: impl Observer<T, E> + NecessarySendSync + 'or,
63    ) -> Subscription<'sub> {
64        self.source
65            .hook_on_subscription(move |observable, observer| {
66                Subscription::new_with_disposal_callback(self.callback)
67                    + observable.subscribe(observer)
68            })
69            .subscribe(observer)
70    }
71}