rx_rust/operators/others/
hook_on_next.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination},
6};
7use educe::Educe;
8
9#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct HookOnNext<OE, F> {
39 source: OE,
40 callback: F,
41}
42
43impl<OE, F> HookOnNext<OE, F> {
44 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
45 where
46 OE: Observable<'or, 'sub, T, E>,
47 F: FnMut(&mut dyn Observer<T, E>, T),
48 {
49 Self { source, callback }
50 }
51}
52
53impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnNext<OE, F>
54where
55 OE: Observable<'or, 'sub, T, E>,
56 F: FnMut(&mut dyn Observer<T, E>, T) + NecessarySendSync + 'or,
57{
58 fn subscribe(
59 self,
60 observer: impl Observer<T, E> + NecessarySendSync + 'or,
61 ) -> Subscription<'sub> {
62 let observer = HookOnNextObserver {
63 observer,
64 callback: self.callback,
65 };
66 self.source.subscribe(observer)
67 }
68}
69
70struct HookOnNextObserver<OR, F> {
71 observer: OR,
72 callback: F,
73}
74
75impl<T, E, OR, F> Observer<T, E> for HookOnNextObserver<OR, F>
76where
77 OR: Observer<T, E>,
78 F: FnMut(&mut dyn Observer<T, E>, T),
79{
80 fn on_next(&mut self, value: T) {
81 (self.callback)(&mut self.observer, value);
82 }
83
84 fn on_termination(self, termination: Termination<E>) {
85 self.observer.on_termination(termination);
86 }
87}