rx_rust/operators/others/
debug.rs1use crate::disposable::callback_disposal::CallbackDisposal;
2use crate::utils::types::NecessarySend;
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::fmt::Display;
10
11#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct Debug<OE, D> {
39 source: OE,
40 label: D,
41}
42
43impl<OE, D> Debug<OE, D> {
44 pub fn new(source: OE, label: D) -> Self {
45 Self { source, label }
46 }
47}
48
49impl<'or, 'sub, T, E, OE, D> Observable<'or, 'sub, T, E> for Debug<OE, D>
50where
51 T: std::fmt::Debug,
52 E: std::fmt::Debug,
53 OE: Observable<'or, 'sub, T, E>,
54 D: Display + Clone + NecessarySend + 'or + 'sub,
55{
56 fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
57 let observer = DebugObserver {
58 observer,
59 label: self.label.clone(),
60 };
61 println!("[{}] subscribe", self.label);
62 self.source.subscribe(observer)
63 + CallbackDisposal::new(move || println!("[{}] dispose", self.label))
64 }
65}
66
67struct DebugObserver<OR, D> {
68 observer: OR,
69 label: D,
70}
71
72impl<T, E, OR, D> Observer<T, E> for DebugObserver<OR, D>
73where
74 T: std::fmt::Debug,
75 E: std::fmt::Debug,
76 OR: Observer<T, E>,
77 D: Display,
78{
79 fn on_next(&mut self, value: T) {
80 println!("[{}] on_next: {:?}", self.label, value);
81 self.observer.on_next(value);
82 }
83
84 fn on_termination(self, termination: Termination<E>) {
85 println!("[{}] on_termination: {:?}", self.label, termination);
86 self.observer.on_termination(termination);
87 }
88}