rx_rust/operators/others/
debug.rs1use crate::disposable::callback_disposal::CallbackDisposal;
2use crate::utils::types::{NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::fmt::Display;
10
11pub enum DebugEvent<'a, T, E> {
12 OnNext(&'a T),
13 OnTermination(&'a Termination<E>),
14 Subscribed,
15 Disposed,
16}
17
18#[derive(Educe)]
44#[educe(Debug, Clone)]
45pub struct Debug<OE, C, F> {
46 source: OE,
47 context: C,
48 callback: Shared<F>,
49}
50
51impl<OE, C, F> Debug<OE, C, F> {
52 pub fn new<T, E>(source: OE, context: C, callback: F) -> Self
53 where
54 F: Fn(C, DebugEvent<'_, T, E>),
55 {
56 Self {
57 source,
58 context,
59 callback: Shared::new(callback),
60 }
61 }
62}
63
64pub type DefaultPrintType<C, T, E> = fn(C, DebugEvent<'_, T, E>);
65
66impl<T, E, OE, C> Debug<OE, C, DefaultPrintType<C, T, E>> {
67 pub fn new_default_print(source: OE, label: C) -> Self
68 where
69 C: Display,
70 T: std::fmt::Debug,
71 E: std::fmt::Debug,
72 {
73 Self {
74 source,
75 context: label,
76 callback: Shared::new(|label, event| match event {
77 DebugEvent::OnNext(value) => println!("[{}]: OnNext({:?})", label, value),
78 DebugEvent::OnTermination(termination) => {
79 println!("[{}]: OnTermination({:?})", label, termination)
80 }
81 DebugEvent::Subscribed => println!("[{}]: Subscription", label),
82 DebugEvent::Disposed => println!("[{}]: Dispose", label),
83 }),
84 }
85 }
86}
87
88impl<'or, 'sub, T, E, OE, C, F> Observable<'or, 'sub, T, E> for Debug<OE, C, F>
89where
90 OE: Observable<'or, 'sub, T, E>,
91 C: Clone + NecessarySendSync + 'or + 'sub,
92 F: Fn(C, DebugEvent<'_, T, E>) + NecessarySendSync + 'or + 'sub,
93{
94 fn subscribe(
95 self,
96 observer: impl Observer<T, E> + NecessarySendSync + 'or,
97 ) -> Subscription<'sub> {
98 (self.callback)(self.context.clone(), DebugEvent::Subscribed);
99 let observer = DebugObserver {
100 observer,
101 context: self.context.clone(),
102 callback: self.callback.clone(),
103 };
104 self.source.subscribe(observer)
105 + CallbackDisposal::new(move || {
106 (self.callback)(self.context.clone(), DebugEvent::Disposed);
107 })
108 }
109}
110
111struct DebugObserver<OR, C, F> {
112 observer: OR,
113 context: C,
114 callback: Shared<F>,
115}
116
117impl<T, E, OR, C, F> Observer<T, E> for DebugObserver<OR, C, F>
118where
119 OR: Observer<T, E>,
120 C: Clone,
121 F: Fn(C, DebugEvent<'_, T, E>),
122{
123 fn on_next(&mut self, value: T) {
124 (self.callback)(self.context.clone(), DebugEvent::OnNext(&value));
125 self.observer.on_next(value);
126 }
127
128 fn on_termination(self, termination: Termination<E>) {
129 (self.callback)(
130 self.context.clone(),
131 DebugEvent::OnTermination(&termination),
132 );
133 self.observer.on_termination(termination);
134 }
135}