rx_rust/observer/
callback_observer.rs

1use super::{Observer, Termination};
2use crate::utils::types::NecessarySendSync;
3
4cfg_if::cfg_if! {
5    if #[cfg(feature = "single-threaded")] {
6        /// Observer implementation backed by user-provided callbacks.
7        pub struct CallbackObserver<'cb, T, E> {
8            on_next: Box<dyn FnMut(T) + 'cb>,
9            on_termination: Box<dyn FnOnce(Termination<E>) + 'cb>,
10        }
11    } else {
12        /// Observer implementation backed by user-provided callbacks.
13        pub struct CallbackObserver<'cb, T, E> {
14            on_next: Box<dyn FnMut(T) + Send + Sync + 'cb>,
15            on_termination: Box<dyn FnOnce(Termination<E>) + Send + Sync + 'cb>,
16        }
17    }
18}
19
20impl<'cb, T, E> CallbackObserver<'cb, T, E> {
21    pub fn new<FN, FT>(on_next: FN, on_termination: FT) -> Self
22    where
23        FN: FnMut(T) + NecessarySendSync + 'cb,
24        FT: FnOnce(Termination<E>) + NecessarySendSync + 'cb,
25    {
26        Self {
27            on_next: Box::new(on_next),
28            on_termination: Box::new(on_termination),
29        }
30    }
31}
32
33impl<T, E> Observer<T, E> for CallbackObserver<'_, T, E> {
34    fn on_next(&mut self, value: T) {
35        (self.on_next)(value);
36    }
37
38    fn on_termination(self, termination: Termination<E>) {
39        (self.on_termination)(termination);
40    }
41}
42
43impl<T, E> std::fmt::Debug for CallbackObserver<'_, T, E> {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.write_str(std::any::type_name::<Self>())
46    }
47}