rx_rust/observer/
callback_observer.rs1use super::{Observer, Termination};
2use crate::utils::types::NecessarySend;
3
4cfg_if::cfg_if! {
5 if #[cfg(feature = "single-threaded")] {
6 pub struct CallbackObserver<'cb, T, E> {
8 on_next: Box<dyn FnMut(T) + 'cb>,
9 on_termination: Box<dyn FnOnce(Termination<E>) + 'cb>,
10 }
11 } else {
12 pub struct CallbackObserver<'cb, T, E> {
14 on_next: Box<dyn FnMut(T) + Send + 'cb>,
15 on_termination: Box<dyn FnOnce(Termination<E>) + Send + 'cb>,
16 }
17 }
18}
19
20impl<'cb, T, E> CallbackObserver<'cb, T, E> {
21 pub fn new<FN, FT>(on_next: FN, on_termination: FT) -> Self
22 where
23 FN: FnMut(T) + NecessarySend + 'cb,
24 FT: FnOnce(Termination<E>) + NecessarySend + 'cb,
25 {
26 Self {
27 on_next: Box::new(on_next),
28 on_termination: Box::new(on_termination),
29 }
30 }
31}
32
33impl<T, E> Observer<T, E> for CallbackObserver<'_, T, E> {
34 fn on_next(&mut self, value: T) {
35 (self.on_next)(value);
36 }
37
38 fn on_termination(self, termination: Termination<E>) {
39 (self.on_termination)(termination);
40 }
41}