rx_rust/operators/others/
debug.rs1use crate::disposable::callback_disposal::CallbackDisposal;
2use crate::utils::types::{NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::fmt::Display;
10
11pub enum DebugEvent<'a, T, E> {
12 OnNext(&'a T),
13 OnTermination(&'a Termination<E>),
14 Subscribed,
15 Disposed,
16}
17
18#[derive(Educe)]
44#[educe(Debug, Clone)]
45pub struct Debug<OE, L, F> {
46 source: OE,
47 label: L,
48 callback: Shared<F>,
49}
50
51impl<OE, L, F> Debug<OE, L, F> {
52 pub fn new(source: OE, label: L, callback: F) -> Self {
53 Self {
54 source,
55 label,
56 callback: Shared::new(callback),
57 }
58 }
59}
60
61pub type DefaultPrintType<L, T, E> = fn(L, DebugEvent<'_, T, E>);
62
63impl<T, E, OE, L> Debug<OE, L, DefaultPrintType<L, T, E>> {
64 pub fn new_default_print(source: OE, label: L) -> Self
65 where
66 L: Display,
67 T: std::fmt::Debug,
68 E: std::fmt::Debug,
69 {
70 Self {
71 source,
72 label,
73 callback: Shared::new(|label, event| match event {
74 DebugEvent::OnNext(value) => println!("[{}]: OnNext({:?})", label, value),
75 DebugEvent::OnTermination(termination) => {
76 println!("[{}]: OnTermination({:?})", label, termination)
77 }
78 DebugEvent::Subscribed => println!("[{}]: Subscription", label),
79 DebugEvent::Disposed => println!("[{}]: Dispose", label),
80 }),
81 }
82 }
83}
84
85impl<'or, 'sub, T, E, OE, L, F> Observable<'or, 'sub, T, E> for Debug<OE, L, F>
86where
87 OE: Observable<'or, 'sub, T, E>,
88 L: Clone + NecessarySendSync + 'or + 'sub,
89 F: Fn(L, DebugEvent<'_, T, E>) + NecessarySendSync + 'or + 'sub,
90{
91 fn subscribe(
92 self,
93 observer: impl Observer<T, E> + NecessarySendSync + 'or,
94 ) -> Subscription<'sub> {
95 (self.callback)(self.label.clone(), DebugEvent::Subscribed);
96 let observer = DebugObserver {
97 observer,
98 label: self.label.clone(),
99 callback: self.callback.clone(),
100 };
101 self.source.subscribe(observer)
102 + CallbackDisposal::new(move || {
103 (self.callback)(self.label.clone(), DebugEvent::Disposed);
104 })
105 }
106}
107
108struct DebugObserver<OR, L, F> {
109 observer: OR,
110 label: L,
111 callback: Shared<F>,
112}
113
114impl<T, E, OR, L, F> Observer<T, E> for DebugObserver<OR, L, F>
115where
116 OR: Observer<T, E>,
117 L: Clone,
118 F: Fn(L, DebugEvent<'_, T, E>),
119{
120 fn on_next(&mut self, value: T) {
121 (self.callback)(self.label.clone(), DebugEvent::OnNext(&value));
122 self.observer.on_next(value);
123 }
124
125 fn on_termination(self, termination: Termination<E>) {
126 (self.callback)(self.label.clone(), DebugEvent::OnTermination(&termination));
127 self.observer.on_termination(termination);
128 }
129}