rx_rust/operators/others/
hook_on_termination.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Observer, Termination, boxed_observer::BoxedObserver},
6};
7use educe::Educe;
8
9#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct HookOnTermination<OE, F> {
44 source: OE,
45 callback: F,
46}
47
48impl<OE, F> HookOnTermination<OE, F> {
49 pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
50 where
51 OE: Observable<'or, 'sub, T, E>,
52 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>),
53 {
54 Self { source, callback }
55 }
56}
57
58impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for HookOnTermination<OE, F>
59where
60 T: 'or,
61 E: 'or,
62 OE: Observable<'or, 'sub, T, E>,
63 F: FnOnce(BoxedObserver<'or, T, E>, Termination<E>) + NecessarySendSync + 'or,
64{
65 fn subscribe(
66 self,
67 observer: impl Observer<T, E> + NecessarySendSync + 'or,
68 ) -> Subscription<'sub> {
69 let observer = HookOnTerminationObserver {
70 observer: BoxedObserver::new(observer),
71 callback: self.callback,
72 };
73 self.source.subscribe(observer)
74 }
75}
76
77struct HookOnTerminationObserver<OR, F> {
78 observer: OR,
79 callback: F,
80}
81
82impl<T, E, OR, F> Observer<T, E> for HookOnTerminationObserver<OR, F>
83where
84 OR: Observer<T, E>,
85 F: FnOnce(OR, Termination<E>),
86{
87 fn on_next(&mut self, value: T) {
88 self.observer.on_next(value);
89 }
90
91 fn on_termination(self, termination: Termination<E>) {
92 (self.callback)(self.observer, termination);
93 }
94}