rx_rust/operators/others/
hook_on_termination.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination, boxed_observer::BoxedObserver},
6};
7use educe::Educe;
8
9/// Invokes a callback when the source Observable terminates.
10///
11/// # Examples
12/// ```rust
13/// use rx_rust::{
14///     observable::observable_ext::ObservableExt,
15///     observer::Termination,
16///     operators::{
17///         creating::from_iter::FromIter,
18///         others::hook_on_termination::HookOnTermination,
19///     },
20/// };
21/// use rx_rust::observer::Observer;
22/// use std::cell::Cell;
23/// use std::rc::Rc;
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27///
28/// let observable =
29///     HookOnTermination::new(FromIter::new(vec![1]), move |observer, termination| {
30///         // Do whatever you want here
31///         observer.on_termination(termination);
32///     });
33/// observable.subscribe_with_callback(
34///     |value| values.push(value),
35///     |termination| terminations.push(termination),
36/// );
37///
38/// assert_eq!(values, vec![1]);
39/// assert_eq!(terminations, vec![Termination::Completed]);
40/// ```
41#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct HookOnTermination<OE, F> {
44    source: OE,
45    callback: F,
46}
47
48impl<OE, F> HookOnTermination<OE, F> {
49    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
50    where
51        OE: Observable<'or, 'sub, T, E>,
52        F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
53    {
54        Self { source, callback }
55    }
56}
57
58impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnTermination<OE, F>
59where
60    T: 'or,
61    E: 'or,
62    OE: Observable<'or, 'sub, T, E>,
63    F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>) + NecessarySendSync + 'or,
64{
65    fn subscribe(
66        self,
67        observer: impl Observer<T, E> + NecessarySendSync + 'or,
68    ) -> Subscription<'sub> {
69        let observer = HookOnTerminationObserver {
70            observer: BoxedObserver::new(observer),
71            callback: self.callback,
72        };
73        self.source.subscribe(observer)
74    }
75}
76
77struct HookOnTerminationObserver<OR, F> {
78    observer: OR,
79    callback: F,
80}
81
82impl<T, E, OR, F> Observer<T, E> for HookOnTerminationObserver<OR, F>
83where
84    OR: Observer<T, E>,
85    F: FnOnce(OR, Termination<E>),
86{
87    fn on_next(&mut self, value: T) {
88        self.observer.on_next(value);
89    }
90
91    fn on_termination(self, termination: Termination<E>) {
92        (self.callback)(self.observer, termination);
93    }
94}