rx_rust/operators/others/
hook_on_next.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct HookOnNext<OE, F> {
39 source: OE,
40 callback: F,
41}
42
43impl<OE, F> HookOnNext<OE, F> {
44 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
45 where
46 OE: Observable<'or, 'sub, T, E>,
47 F: FnMut(&mut dyn Observer<T, E>, T),
48 {
49 Self { source, callback }
50 }
51}
52
53impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnNext<OE, F>
54where
55 OE: Observable<'or, 'sub, T, E>,
56 F: FnMut(&mut dyn Observer<T, E>, T) + NecessarySendSync + 'or,
57{
58 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
59 let observer = HookOnNextObserver {
60 observer,
61 callback: self.callback,
62 };
63 self.source.subscribe(observer)
64 }
65}
66
67struct HookOnNextObserver<OR, F> {
68 observer: OR,
69 callback: F,
70}
71
72impl<T, E, OR, F> Observer<T, E> for HookOnNextObserver<OR, F>
73where
74 OR: Observer<T, E>,
75 F: FnMut(&mut dyn Observer<T, E>, T),
76{
77 fn on_next(&mut self, value: T) {
78 (self.callback)(&mut self.observer, value);
79 }
80
81 fn on_termination(self, termination: Termination<E>) {
82 self.observer.on_termination(termination);
83 }
84}