rx_rust/operators/others/
debug.rs

1use crate::disposable::callback_disposal::CallbackDisposal;
2use crate::utils::types::{NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::fmt::Display;
10
11#[derive(Educe)]
12#[educe(Debug, Clone, PartialEq, Eq)]
13pub enum DebugEvent<'a, T, E> {
14    OnNext(&'a T),
15    OnTermination(&'a Termination<E>),
16    Subscribed,
17    Disposed,
18}
19
20/// Logs all items from the source Observable to the console, and re-emits them. This is useful for debugging.
21///
22/// # Examples
23/// ```rust
24/// use rx_rust::{
25///     observable::observable_ext::ObservableExt,
26///     observer::Termination,
27///     operators::{
28///         creating::from_iter::FromIter,
29///         others::debug::Debug,
30///     },
31/// };
32///
33/// let mut values = Vec::new();
34/// let mut terminations = Vec::new();
35///
36/// let observable = Debug::new_default_print(FromIter::new(vec![1, 2]), "trace");
37/// observable.subscribe_with_callback(
38///     |value| values.push(value),
39///     |termination| terminations.push(termination),
40/// );
41///
42/// assert_eq!(values, vec![1, 2]);
43/// assert_eq!(terminations, vec![Termination::Completed]);
44/// ```
45#[derive(Educe)]
46#[educe(Debug, Clone)]
47pub struct Debug<OE, C, F> {
48    source: OE,
49    context: C,
50    callback: Shared<F>,
51}
52
53impl<OE, C, F> Debug<OE, C, F> {
54    pub fn new<T, E>(source: OE, context: C, callback: F) -> Self
55    where
56        F: Fn(C, DebugEvent<'_, T, E>),
57    {
58        Self {
59            source,
60            context,
61            callback: Shared::new(callback),
62        }
63    }
64}
65
66pub type DefaultPrintType<C, T, E> = fn(C, DebugEvent<'_, T, E>);
67
68impl<T, E, OE, C> Debug<OE, C, DefaultPrintType<C, T, E>> {
69    pub fn new_default_print(source: OE, label: C) -> Self
70    where
71        C: Display,
72        T: std::fmt::Debug,
73        E: std::fmt::Debug,
74    {
75        Self {
76            source,
77            context: label,
78            callback: Shared::new(|label, event| match event {
79                DebugEvent::OnNext(value) => println!("[{}]: OnNext({:?})", label, value),
80                DebugEvent::OnTermination(termination) => {
81                    println!("[{}]: OnTermination({:?})", label, termination)
82                }
83                DebugEvent::Subscribed => println!("[{}]: Subscription", label),
84                DebugEvent::Disposed => println!("[{}]: Dispose", label),
85            }),
86        }
87    }
88}
89
90impl<'or, 'sub, T, E, OE, C, F> Observable<'or, 'sub, T, E> for Debug<OE, C, F>
91where
92    OE: Observable<'or, 'sub, T, E>,
93    C: Clone + NecessarySendSync + 'or + 'sub,
94    F: Fn(C, DebugEvent<'_, T, E>) + NecessarySendSync + 'or + 'sub,
95{
96    fn subscribe(
97        self,
98        observer: impl Observer<T, E> + NecessarySendSync + 'or,
99    ) -> Subscription<'sub> {
100        (self.callback)(self.context.clone(), DebugEvent::Subscribed);
101        let observer = DebugObserver {
102            observer,
103            context: self.context.clone(),
104            callback: self.callback.clone(),
105        };
106        self.source.subscribe(observer)
107            + CallbackDisposal::new(move || {
108                (self.callback)(self.context.clone(), DebugEvent::Disposed);
109            })
110    }
111}
112
113struct DebugObserver<OR, C, F> {
114    observer: OR,
115    context: C,
116    callback: Shared<F>,
117}
118
119impl<T, E, OR, C, F> Observer<T, E> for DebugObserver<OR, C, F>
120where
121    OR: Observer<T, E>,
122    C: Clone,
123    F: Fn(C, DebugEvent<'_, T, E>),
124{
125    fn on_next(&mut self, value: T) {
126        (self.callback)(self.context.clone(), DebugEvent::OnNext(&value));
127        self.observer.on_next(value);
128    }
129
130    fn on_termination(self, termination: Termination<E>) {
131        (self.callback)(
132            self.context.clone(),
133            DebugEvent::OnTermination(&termination),
134        );
135        self.observer.on_termination(termination);
136    }
137}