rx_rust/operators/others/
hook_on_next.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Invokes a callback for each item emitted by the source Observable.
10///
11/// # Examples
12/// ```rust
13/// use rx_rust::{
14///     observable::observable_ext::ObservableExt,
15///     observer::Termination,
16///     operators::{
17///         creating::from_iter::FromIter,
18///         others::hook_on_next::HookOnNext,
19///     },
20/// };
21///
22/// let mut values = Vec::new();
23/// let mut terminations = Vec::new();
24///
25/// let observable = HookOnNext::new(FromIter::new(vec![1, 2]), |observer, value| {
26///     observer.on_next(value * 10);
27/// });
28/// observable.subscribe_with_callback(
29///     |value| values.push(value),
30///     |termination| terminations.push(termination),
31/// );
32///
33/// assert_eq!(values, vec![10, 20]);
34/// assert_eq!(terminations, vec![Termination::Completed]);
35/// ```
36#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct HookOnNext<OE, F> {
39    source: OE,
40    callback: F,
41}
42
43impl<OE, F> HookOnNext<OE, F> {
44    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
45    where
46        OE: Observable<'or, 'sub, T, E>,
47        F: FnMut(&mut dyn Observer<T, E>, T),
48    {
49        Self { source, callback }
50    }
51}
52
53impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnNext<OE, F>
54where
55    OE: Observable<'or, 'sub, T, E>,
56    F: FnMut(&mut dyn Observer<T, E>, T) + NecessarySendSync + 'or,
57{
58    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
59        let observer = HookOnNextObserver {
60            observer,
61            callback: self.callback,
62        };
63        self.source.subscribe(observer)
64    }
65}
66
67struct HookOnNextObserver<OR, F> {
68    observer: OR,
69    callback: F,
70}
71
72impl<T, E, OR, F> Observer<T, E> for HookOnNextObserver<OR, F>
73where
74    OR: Observer<T, E>,
75    F: FnMut(&mut dyn Observer<T, E>, T),
76{
77    fn on_next(&mut self, value: T) {
78        (self.callback)(&mut self.observer, value);
79    }
80
81    fn on_termination(self, termination: Termination<E>) {
82        self.observer.on_termination(termination);
83    }
84}