rx_rust/operators/others/
hook_on_subscription.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, boxed_observer::BoxedObserver},
6};
7use educe::Educe;
8
9/// Invokes a callback when the Observable is subscribed to.
10///
11/// # Examples
12/// ```rust
13/// use rx_rust::{
14///     observable::observable_ext::ObservableExt,
15///     observer::Termination,
16///     operators::{
17///         creating::from_iter::FromIter,
18///         others::hook_on_subscription::HookOnSubscription,
19///     },
20/// };
21/// use rx_rust::observable::Observable;
22/// use std::cell::Cell;
23/// use std::rc::Rc;
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27/// let subscribed = Rc::new(Cell::new(false));
28/// let subscribed_flag = Rc::clone(&subscribed);
29///
30/// let observable = HookOnSubscription::new(FromIter::new(vec![1, 2]), move |source, observer| {
31///     subscribed_flag.set(true);
32///     source.subscribe(observer)
33/// });
34/// observable.subscribe_with_callback(
35///     |value| values.push(value),
36///     |termination| terminations.push(termination),
37/// );
38///
39/// assert!(subscribed.get());
40/// assert_eq!(values, vec![1, 2]);
41/// assert_eq!(terminations, vec![Termination::Completed]);
42/// ```
43#[derive(Educe)]
44#[educe(Debug, Clone)]
45pub struct HookOnSubscription<OE, F> {
46    source: OE,
47    callback: F,
48}
49
50impl<OE, F> HookOnSubscription<OE, F> {
51    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
52    where
53        OE: Observable<'or, 'sub, T, E>,
54        F: FnOnce(OE, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
55    {
56        Self { source, callback }
57    }
58}
59
60impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnSubscription<OE, F>
61where
62    OE: Observable<'or, 'sub, T, E>,
63    F: FnOnce(OE, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
64{
65    fn subscribe(
66        self,
67        observer: impl Observer<T, E> + NecessarySendSync + 'or,
68    ) -> Subscription<'sub> {
69        (self.callback)(self.source, BoxedObserver::new(observer))
70    }
71}