rx_rust/operators/others/hook_on_subscription.rs
1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, boxed_observer::BoxedObserver},
6};
7use educe::Educe;
8
9/// Invokes a callback when the Observable is subscribed to.
10///
11/// # Examples
12/// ```rust
13/// use rx_rust::{
14/// observable::observable_ext::ObservableExt,
15/// observer::Termination,
16/// operators::{
17/// creating::from_iter::FromIter,
18/// others::hook_on_subscription::HookOnSubscription,
19/// },
20/// };
21/// use rx_rust::observable::Observable;
22/// use std::cell::Cell;
23/// use std::rc::Rc;
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27/// let subscribed = Rc::new(Cell::new(false));
28/// let subscribed_flag = Rc::clone(&subscribed);
29///
30/// let observable = HookOnSubscription::new(FromIter::new(vec![1, 2]), move |source, observer| {
31/// subscribed_flag.set(true);
32/// source.subscribe(observer)
33/// });
34/// observable.subscribe_with_callback(
35/// |value| values.push(value),
36/// |termination| terminations.push(termination),
37/// );
38///
39/// assert!(subscribed.get());
40/// assert_eq!(values, vec![1, 2]);
41/// assert_eq!(terminations, vec![Termination::Completed]);
42/// ```
43#[derive(Educe)]
44#[educe(Debug, Clone)]
45pub struct HookOnSubscription<OE, F> {
46 source: OE,
47 callback: F,
48}
49
50impl<OE, F> HookOnSubscription<OE, F> {
51 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
52 where
53 OE: Observable<'or, 'sub, T, E>,
54 F: FnOnce(OE, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
55 {
56 Self { source, callback }
57 }
58}
59
60impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnSubscription<OE, F>
61where
62 OE: Observable<'or, 'sub, T, E>,
63 F: FnOnce(OE, BoxedObserver<'or, T, E>) -> Subscription<'sub>,
64{
65 fn subscribe(
66 self,
67 observer: impl Observer<T, E> + NecessarySendSync + 'or,
68 ) -> Subscription<'sub> {
69 (self.callback)(self.source, BoxedObserver::new(observer))
70 }
71}