rx_rust/operators/others/
debug.rs

1use crate::disposable::callback_disposal::CallbackDisposal;
2use crate::utils::types::{NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::fmt::Display;
10
11pub enum DebugEvent<'a, T, E> {
12    OnNext(&'a T),
13    OnTermination(&'a Termination<E>),
14    Subscribed,
15    Disposed,
16}
17
18/// Logs all items from the source Observable to the console, and re-emits them. This is useful for debugging.
19///
20/// # Examples
21/// ```rust
22/// use rx_rust::{
23///     observable::observable_ext::ObservableExt,
24///     observer::Termination,
25///     operators::{
26///         creating::from_iter::FromIter,
27///         others::debug::Debug,
28///     },
29/// };
30///
31/// let mut values = Vec::new();
32/// let mut terminations = Vec::new();
33///
34/// let observable = Debug::new_default_print(FromIter::new(vec![1, 2]), "trace");
35/// observable.subscribe_with_callback(
36///     |value| values.push(value),
37///     |termination| terminations.push(termination),
38/// );
39///
40/// assert_eq!(values, vec![1, 2]);
41/// assert_eq!(terminations, vec![Termination::Completed]);
42/// ```
43#[derive(Educe)]
44#[educe(Debug, Clone)]
45pub struct Debug<OE, L, F> {
46    source: OE,
47    label: L,
48    callback: Shared<F>,
49}
50
51impl<OE, L, F> Debug<OE, L, F> {
52    pub fn new(source: OE, label: L, callback: F) -> Self {
53        Self {
54            source,
55            label,
56            callback: Shared::new(callback),
57        }
58    }
59}
60
61pub type DefaultPrintType<L, T, E> = fn(L, DebugEvent<'_, T, E>);
62
63impl<T, E, OE, L> Debug<OE, L, DefaultPrintType<L, T, E>> {
64    pub fn new_default_print(source: OE, label: L) -> Self
65    where
66        L: Display,
67        T: std::fmt::Debug,
68        E: std::fmt::Debug,
69    {
70        Self {
71            source,
72            label,
73            callback: Shared::new(|label, event| match event {
74                DebugEvent::OnNext(value) => println!("[{}]: OnNext({:?})", label, value),
75                DebugEvent::OnTermination(termination) => {
76                    println!("[{}]: OnTermination({:?})", label, termination)
77                }
78                DebugEvent::Subscribed => println!("[{}]: Subscription", label),
79                DebugEvent::Disposed => println!("[{}]: Dispose", label),
80            }),
81        }
82    }
83}
84
85impl<'or, 'sub, T, E, OE, L, F> Observable<'or, 'sub, T, E> for Debug<OE, L, F>
86where
87    OE: Observable<'or, 'sub, T, E>,
88    L: Clone + NecessarySendSync + 'or + 'sub,
89    F: Fn(L, DebugEvent<'_, T, E>) + NecessarySendSync + 'or + 'sub,
90{
91    fn subscribe(
92        self,
93        observer: impl Observer<T, E> + NecessarySendSync + 'or,
94    ) -> Subscription<'sub> {
95        (self.callback)(self.label.clone(), DebugEvent::Subscribed);
96        let observer = DebugObserver {
97            observer,
98            label: self.label.clone(),
99            callback: self.callback.clone(),
100        };
101        self.source.subscribe(observer)
102            + CallbackDisposal::new(move || {
103                (self.callback)(self.label.clone(), DebugEvent::Disposed);
104            })
105    }
106}
107
108struct DebugObserver<OR, L, F> {
109    observer: OR,
110    label: L,
111    callback: Shared<F>,
112}
113
114impl<T, E, OR, L, F> Observer<T, E> for DebugObserver<OR, L, F>
115where
116    OR: Observer<T, E>,
117    L: Clone,
118    F: Fn(L, DebugEvent<'_, T, E>),
119{
120    fn on_next(&mut self, value: T) {
121        (self.callback)(self.label.clone(), DebugEvent::OnNext(&value));
122        self.observer.on_next(value);
123    }
124
125    fn on_termination(self, termination: Termination<E>) {
126        (self.callback)(self.label.clone(), DebugEvent::OnTermination(&termination));
127        self.observer.on_termination(termination);
128    }
129}