rx_rust/operators/others/
debug.rs1use crate::disposable::callback_disposal::CallbackDisposal;
2use crate::utils::types::{NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::fmt::Display;
10
11#[derive(Educe)]
12#[educe(Debug, Clone, PartialEq, Eq)]
13pub enum DebugEvent<'a, T, E> {
14 OnNext(&'a T),
15 OnTermination(&'a Termination<E>),
16 Subscribed,
17 Disposed,
18}
19
20#[derive(Educe)]
46#[educe(Debug, Clone)]
47pub struct Debug<OE, C, F> {
48 source: OE,
49 context: C,
50 callback: Shared<F>,
51}
52
53impl<OE, C, F> Debug<OE, C, F> {
54 pub fn new<T, E>(source: OE, context: C, callback: F) -> Self
55 where
56 F: Fn(C, DebugEvent<'_, T, E>),
57 {
58 Self {
59 source,
60 context,
61 callback: Shared::new(callback),
62 }
63 }
64}
65
66pub type DefaultPrintType<C, T, E> = fn(C, DebugEvent<'_, T, E>);
67
68impl<T, E, OE, C> Debug<OE, C, DefaultPrintType<C, T, E>> {
69 pub fn new_default_print(source: OE, label: C) -> Self
70 where
71 C: Display,
72 T: std::fmt::Debug,
73 E: std::fmt::Debug,
74 {
75 Self {
76 source,
77 context: label,
78 callback: Shared::new(|label, event| match event {
79 DebugEvent::OnNext(value) => println!("[{}]: OnNext({:?})", label, value),
80 DebugEvent::OnTermination(termination) => {
81 println!("[{}]: OnTermination({:?})", label, termination)
82 }
83 DebugEvent::Subscribed => println!("[{}]: Subscription", label),
84 DebugEvent::Disposed => println!("[{}]: Dispose", label),
85 }),
86 }
87 }
88}
89
90impl<'or, 'sub, T, E, OE, C, F> Observable<'or, 'sub, T, E> for Debug<OE, C, F>
91where
92 OE: Observable<'or, 'sub, T, E>,
93 C: Clone + NecessarySendSync + 'or + 'sub,
94 F: Fn(C, DebugEvent<'_, T, E>) + NecessarySendSync + 'or + 'sub,
95{
96 fn subscribe(
97 self,
98 observer: impl Observer<T, E> + NecessarySendSync + 'or,
99 ) -> Subscription<'sub> {
100 (self.callback)(self.context.clone(), DebugEvent::Subscribed);
101 let observer = DebugObserver {
102 observer,
103 context: self.context.clone(),
104 callback: self.callback.clone(),
105 };
106 self.source.subscribe(observer)
107 + CallbackDisposal::new(move || {
108 (self.callback)(self.context.clone(), DebugEvent::Disposed);
109 })
110 }
111}
112
113struct DebugObserver<OR, C, F> {
114 observer: OR,
115 context: C,
116 callback: Shared<F>,
117}
118
119impl<T, E, OR, C, F> Observer<T, E> for DebugObserver<OR, C, F>
120where
121 OR: Observer<T, E>,
122 C: Clone,
123 F: Fn(C, DebugEvent<'_, T, E>),
124{
125 fn on_next(&mut self, value: T) {
126 (self.callback)(self.context.clone(), DebugEvent::OnNext(&value));
127 self.observer.on_next(value);
128 }
129
130 fn on_termination(self, termination: Termination<E>) {
131 (self.callback)(
132 self.context.clone(),
133 DebugEvent::OnTermination(&termination),
134 );
135 self.observer.on_termination(termination);
136 }
137}