rx_rust/observer/
boxed_observer.rs

1use super::{Event, Observer, Termination};
2use crate::utils::types::NecessarySendSync;
3
4cfg_if::cfg_if! {
5    if #[cfg(feature = "single-threaded")] {
6        /// Type-erased observer for single-threaded builds to handle this problem <https://stackoverflow.com/q/46620790/9315497>
7        pub struct BoxedObserver<'or, T, E>(Box<dyn FnMut(Event<T, E>) + 'or>);
8    } else {
9        /// Type-erased observer for multi-threaded builds to handle this problem <https://stackoverflow.com/q/46620790/9315497>
10        pub struct BoxedObserver<'or, T, E>(Box<dyn FnMut(Event<T, E>) + Send + Sync + 'or>);
11    }
12}
13
14impl<'or, T, E> BoxedObserver<'or, T, E> {
15    pub fn new(observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Self {
16        let mut observer = Some(observer);
17        Self(Box::new(move |event| match event {
18            Event::Next(value) => {
19                if let Some(observer) = &mut observer {
20                    observer.on_next(value);
21                }
22            }
23            Event::Termination(termination) => {
24                if let Some(observer) = observer.take() {
25                    observer.on_termination(termination);
26                }
27            }
28        }))
29    }
30}
31
32impl<T, E> Observer<T, E> for BoxedObserver<'_, T, E> {
33    fn on_next(&mut self, value: T) {
34        self.0(Event::Next(value));
35    }
36
37    fn on_termination(mut self, termination: Termination<E>) {
38        self.0(Event::Termination(termination));
39    }
40}
41
42impl<T, E> std::fmt::Debug for BoxedObserver<'_, T, E> {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.write_str(std::any::type_name::<Self>())
45    }
46}