rx_rust/observable/
boxed_observable.rs

1use super::{Observable, Observer};
2use crate::{
3    disposable::subscription::Subscription, observer::boxed_observer::BoxedObserver,
4    utils::types::NecessarySend,
5};
6
7cfg_if::cfg_if! {
8    if #[cfg(feature = "single-threaded")] {
9        /// Type-erased observable for single-threaded builds to handle this problem <https://stackoverflow.com/q/46620790/9315497>
10        pub struct BoxedObservable<'or, 'sub, 'oe, T, E>(
11            Box<dyn FnOnce(BoxedObserver<'or, T, E>) -> Subscription<'sub> + 'oe>,
12        );
13    } else {
14        /// Type-erased observable for multi-threaded builds to handle this problem <https://stackoverflow.com/q/46620790/9315497>
15        pub struct BoxedObservable<'or, 'sub, 'oe, T, E>(
16            Box<dyn FnOnce(BoxedObserver<'or, T, E>) -> Subscription<'sub> + Send + 'oe>,
17        );
18    }
19}
20
21impl<'or, 'sub, 'oe, T, E> BoxedObservable<'or, 'sub, 'oe, T, E> {
22    pub fn new(observable: impl Observable<'or, 'sub, T, E> + NecessarySend + 'oe) -> Self
23    where
24        T: 'or,
25        E: 'or,
26    {
27        Self(Box::new(|observer| observable.subscribe(observer)))
28    }
29}
30
31impl<'or, 'sub, T, E> Observable<'or, 'sub, T, E> for BoxedObservable<'or, 'sub, '_, T, E> {
32    fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
33        self.0(BoxedObserver::new(observer))
34    }
35}