rx_rust/operators/others/
hook_on_termination.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination, boxed_observer::BoxedObserver},
6};
7use educe::Educe;
8
9/// Invokes a callback when the source Observable terminates.
10///
11/// # Examples
12/// ```rust
13/// use rx_rust::{
14///     observable::observable_ext::ObservableExt,
15///     observer::Termination,
16///     operators::{
17///         creating::from_iter::FromIter,
18///         others::hook_on_termination::HookOnTermination,
19///     },
20/// };
21/// use rx_rust::observer::Observer;
22/// use std::cell::Cell;
23/// use std::rc::Rc;
24///
25/// let mut values = Vec::new();
26/// let mut terminations = Vec::new();
27///
28/// let observable =
29///     HookOnTermination::new(FromIter::new(vec![1]), move |observer, termination| {
30///         // Do whatever you want here
31///         observer.on_termination(termination);
32///     });
33/// observable.subscribe_with_callback(
34///     |value| values.push(value),
35///     |termination| terminations.push(termination),
36/// );
37///
38/// assert_eq!(values, vec![1]);
39/// assert_eq!(terminations, vec![Termination::Completed]);
40/// ```
41#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct HookOnTermination<OE, F> {
44    source: OE,
45    callback: F,
46}
47
48impl<OE, F> HookOnTermination<OE, F> {
49    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
50    where
51        OE: Observable<'or, 'sub, T, E>,
52        F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
53    {
54        Self { source, callback }
55    }
56}
57
58impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnTermination<OE, F>
59where
60    T: 'or,
61    E: 'or,
62    OE: Observable<'or, 'sub, T, E>,
63    F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>) + NecessarySendSync + 'or,
64{
65    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
66        let observer = HookOnTerminationObserver {
67            observer: BoxedObserver::new(observer),
68            callback: self.callback,
69        };
70        self.source.subscribe(observer)
71    }
72}
73
74struct HookOnTerminationObserver<OR, F> {
75    observer: OR,
76    callback: F,
77}
78
79impl<T, E, OR, F> Observer<T, E> for HookOnTerminationObserver<OR, F>
80where
81    OR: Observer<T, E>,
82    F: FnOnce(OR, Termination<E>),
83{
84    fn on_next(&mut self, value: T) {
85        self.observer.on_next(value);
86    }
87
88    fn on_termination(self, termination: Termination<E>) {
89        (self.callback)(self.observer, termination);
90    }
91}