rx_rust/operators/others/
debug.rs

1use crate::disposable::callback_disposal::CallbackDisposal;
2use crate::utils::types::NecessarySend;
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::fmt::Display;
10
11/// Logs all items from the source Observable to the console, and re-emits them. This is useful for debugging.
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::Termination,
18///     operators::{
19///         creating::from_iter::FromIter,
20///         others::debug::Debug,
21///     },
22/// };
23///
24/// let mut values = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// let observable = Debug::new(FromIter::new(vec![1, 2]), "trace");
28/// observable.subscribe_with_callback(
29///     |value| values.push(value),
30///     |termination| terminations.push(termination),
31/// );
32///
33/// assert_eq!(values, vec![1, 2]);
34/// assert_eq!(terminations, vec![Termination::Completed]);
35/// ```
36#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct Debug<OE, D> {
39    source: OE,
40    label: D,
41}
42
43impl<OE, D> Debug<OE, D> {
44    pub fn new(source: OE, label: D) -> Self {
45        Self { source, label }
46    }
47}
48
49impl<'or, 'sub, T, E, OE, D> Observable<'or, 'sub, T, E> for Debug<OE, D>
50where
51    T: std::fmt::Debug,
52    E: std::fmt::Debug,
53    OE: Observable<'or, 'sub, T, E>,
54    D: Display + Clone + NecessarySend + 'or + 'sub,
55{
56    fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
57        let observer = DebugObserver {
58            observer,
59            label: self.label.clone(),
60        };
61        println!("[{}] subscribe", self.label);
62        self.source.subscribe(observer)
63            + CallbackDisposal::new(move || println!("[{}] dispose", self.label))
64    }
65}
66
67struct DebugObserver<OR, D> {
68    observer: OR,
69    label: D,
70}
71
72impl<T, E, OR, D> Observer<T, E> for DebugObserver<OR, D>
73where
74    T: std::fmt::Debug,
75    E: std::fmt::Debug,
76    OR: Observer<T, E>,
77    D: Display,
78{
79    fn on_next(&mut self, value: T) {
80        println!("[{}] on_next: {:?}", self.label, value);
81        self.observer.on_next(value);
82    }
83
84    fn on_termination(self, termination: Termination<E>) {
85        println!("[{}] on_termination: {:?}", self.label, termination);
86        self.observer.on_termination(termination);
87    }
88}