rx_rust/observer/
boxed_observer.rs1use super::{Event, Observer, Termination};
2use crate::utils::types::NecessarySendSync;
3
4cfg_if::cfg_if! {
5 if #[cfg(feature = "single-threaded")] {
6 pub struct BoxedObserver<'or, T, E>(Box<dyn FnMut(Event<T, E>) + 'or>);
8 } else {
9 pub struct BoxedObserver<'or, T, E>(Box<dyn FnMut(Event<T, E>) + Send + Sync + 'or>);
11 }
12}
13
14impl<'or, T, E> BoxedObserver<'or, T, E> {
15 pub fn new(observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Self {
16 let mut observer = Some(observer);
17 Self(Box::new(move |event| match event {
18 Event::Next(value) => {
19 if let Some(observer) = &mut observer {
20 observer.on_next(value);
21 }
22 }
23 Event::Termination(termination) => {
24 if let Some(observer) = observer.take() {
25 observer.on_termination(termination);
26 }
27 }
28 }))
29 }
30}
31
32impl<T, E> Observer<T, E> for BoxedObserver<'_, T, E> {
33 fn on_next(&mut self, value: T) {
34 self.0(Event::Next(value));
35 }
36
37 fn on_termination(mut self, termination: Termination<E>) {
38 self.0(Event::Termination(termination));
39 }
40}
41
42impl<T, E> std::fmt::Debug for BoxedObserver<'_, T, E> {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.write_str(std::any::type_name::<Self>())
45 }
46}