rx_rust/operators/others/
hook_on_termination.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination, boxed_observer::BoxedObserver},
6};
7use educe::Educe;
8
9#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct HookOnTermination<OE, F> {
44 source: OE,
45 callback: F,
46}
47
48impl<OE, F> HookOnTermination<OE, F> {
49 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
50 where
51 OE: Observable<'or, 'sub, T, E>,
52 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
53 {
54 Self { source, callback }
55 }
56}
57
58impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnTermination<OE, F>
59where
60 T: 'or,
61 E: 'or,
62 OE: Observable<'or, 'sub, T, E>,
63 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>) + NecessarySendSync + 'or,
64{
65 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
66 let observer = HookOnTerminationObserver {
67 observer: BoxedObserver::new(observer),
68 callback: self.callback,
69 };
70 self.source.subscribe(observer)
71 }
72}
73
74struct HookOnTerminationObserver<OR, F> {
75 observer: OR,
76 callback: F,
77}
78
79impl<T, E, OR, F> Observer<T, E> for HookOnTerminationObserver<OR, F>
80where
81 OR: Observer<T, E>,
82 F: FnOnce(OR, Termination<E>),
83{
84 fn on_next(&mut self, value: T) {
85 self.observer.on_next(value);
86 }
87
88 fn on_termination(self, termination: Termination<E>) {
89 (self.callback)(self.observer, termination);
90 }
91}