rx_rust/operators/utility/
do_after_subscription.rs

1use crate::utils::types::NecessarySend;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::{Observable, observable_ext::ObservableExt},
5    observer::Observer,
6};
7use educe::Educe;
8
9/// Invokes a callback when the Observable is subscribed to, after the subscription has been established.
10/// See <https://reactivex.io/documentation/operators/do.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     operators::{
17///         creating::from_iter::FromIter,
18///         utility::do_after_subscription::DoAfterSubscription,
19///     },
20/// };
21/// use std::sync::{Arc, Mutex};
22///
23/// let called = Arc::new(Mutex::new(false));
24/// let called_observer = Arc::clone(&called);
25///
26/// DoAfterSubscription::new(FromIter::new(vec![1]), move || {
27///     *called_observer.lock().unwrap() = true;
28/// })
29/// .subscribe_with_callback(|_| {}, |_| {});
30///
31/// assert!(*called.lock().unwrap());
32/// ```
33#[derive(Educe)]
34#[educe(Debug, Clone)]
35pub struct DoAfterSubscription<OE, F> {
36    source: OE,
37    callback: F,
38}
39
40impl<OE, F> DoAfterSubscription<OE, F> {
41    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
42    where
43        OE: Observable<'or, 'sub, T, E>,
44        F: FnOnce(),
45    {
46        Self { source, callback }
47    }
48}
49
50impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for DoAfterSubscription<OE, F>
51where
52    T: 'or,
53    E: 'or,
54    OE: Observable<'or, 'sub, T, E>,
55    F: FnOnce(),
56{
57    fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
58        self.source
59            .hook_on_subscription(move |observable, observer| {
60                let sub = observable.subscribe(observer);
61                (self.callback)();
62                sub
63            })
64            .subscribe(observer)
65    }
66}