rx_rust/operators/others/
debug.rs

1use crate::disposable::callback_disposal::CallbackDisposal;
2use crate::utils::types::{NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::fmt::Display;
10
11pub enum DebugEvent<'a, T, E> {
12    OnNext(&'a T),
13    OnTermination(&'a Termination<E>),
14    Subscribed,
15    Disposed,
16}
17
18/// Logs all items from the source Observable to the console, and re-emits them. This is useful for debugging.
19///
20/// # Examples
21/// ```rust
22/// use rx_rust::{
23///     observable::observable_ext::ObservableExt,
24///     observer::Termination,
25///     operators::{
26///         creating::from_iter::FromIter,
27///         others::debug::Debug,
28///     },
29/// };
30///
31/// let mut values = Vec::new();
32/// let mut terminations = Vec::new();
33///
34/// let observable = Debug::new_default_print(FromIter::new(vec![1, 2]), "trace");
35/// observable.subscribe_with_callback(
36///     |value| values.push(value),
37///     |termination| terminations.push(termination),
38/// );
39///
40/// assert_eq!(values, vec![1, 2]);
41/// assert_eq!(terminations, vec![Termination::Completed]);
42/// ```
43#[derive(Educe)]
44#[educe(Debug, Clone)]
45pub struct Debug<OE, C, F> {
46    source: OE,
47    context: C,
48    callback: Shared<F>,
49}
50
51impl<OE, C, F> Debug<OE, C, F> {
52    pub fn new<T, E>(source: OE, context: C, callback: F) -> Self
53    where
54        F: Fn(C, DebugEvent<'_, T, E>),
55    {
56        Self {
57            source,
58            context,
59            callback: Shared::new(callback),
60        }
61    }
62}
63
64pub type DefaultPrintType<C, T, E> = fn(C, DebugEvent<'_, T, E>);
65
66impl<T, E, OE, C> Debug<OE, C, DefaultPrintType<C, T, E>> {
67    pub fn new_default_print(source: OE, label: C) -> Self
68    where
69        C: Display,
70        T: std::fmt::Debug,
71        E: std::fmt::Debug,
72    {
73        Self {
74            source,
75            context: label,
76            callback: Shared::new(|label, event| match event {
77                DebugEvent::OnNext(value) => println!("[{}]: OnNext({:?})", label, value),
78                DebugEvent::OnTermination(termination) => {
79                    println!("[{}]: OnTermination({:?})", label, termination)
80                }
81                DebugEvent::Subscribed => println!("[{}]: Subscription", label),
82                DebugEvent::Disposed => println!("[{}]: Dispose", label),
83            }),
84        }
85    }
86}
87
88impl<'or, 'sub, T, E, OE, C, F> Observable<'or, 'sub, T, E> for Debug<OE, C, F>
89where
90    OE: Observable<'or, 'sub, T, E>,
91    C: Clone + NecessarySendSync + 'or + 'sub,
92    F: Fn(C, DebugEvent<'_, T, E>) + NecessarySendSync + 'or + 'sub,
93{
94    fn subscribe(
95        self,
96        observer: impl Observer<T, E> + NecessarySendSync + 'or,
97    ) -> Subscription<'sub> {
98        (self.callback)(self.context.clone(), DebugEvent::Subscribed);
99        let observer = DebugObserver {
100            observer,
101            context: self.context.clone(),
102            callback: self.callback.clone(),
103        };
104        self.source.subscribe(observer)
105            + CallbackDisposal::new(move || {
106                (self.callback)(self.context.clone(), DebugEvent::Disposed);
107            })
108    }
109}
110
111struct DebugObserver<OR, C, F> {
112    observer: OR,
113    context: C,
114    callback: Shared<F>,
115}
116
117impl<T, E, OR, C, F> Observer<T, E> for DebugObserver<OR, C, F>
118where
119    OR: Observer<T, E>,
120    C: Clone,
121    F: Fn(C, DebugEvent<'_, T, E>),
122{
123    fn on_next(&mut self, value: T) {
124        (self.callback)(self.context.clone(), DebugEvent::OnNext(&value));
125        self.observer.on_next(value);
126    }
127
128    fn on_termination(self, termination: Termination<E>) {
129        (self.callback)(
130            self.context.clone(),
131            DebugEvent::OnTermination(&termination),
132        );
133        self.observer.on_termination(termination);
134    }
135}