rx_rust/operators/utility/
dematerialize.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Event, Observer, Termination},
6    utils::unsub_after_termination::subscribe_unsub_after_termination,
7};
8use educe::Educe;
9use std::convert::Infallible;
10
11/// Converts an Observable that emits `Event` objects into a "live" Observable that emits the items and notifications embedded in those `Event` objects.
12/// See <https://reactivex.io/documentation/operators/materialize-dematerialize.html>
13///
14/// # Examples
15/// ```rust
16/// use rx_rust::{
17///     observable::observable_ext::ObservableExt,
18///     observer::Termination,
19///     operators::{
20///         creating::from_iter::FromIter,
21///         utility::{
22///             dematerialize::Dematerialize,
23///             materialize::Materialize,
24///         },
25///     },
26/// };
27///
28/// let mut values = Vec::new();
29/// let mut terminations = Vec::new();
30///
31/// Dematerialize::new(Materialize::new(FromIter::new(vec![1, 2])))
32///     .subscribe_with_callback(
33///         |value| values.push(value),
34///         |termination| terminations.push(termination),
35///     );
36///
37/// assert_eq!(values, vec![1, 2]);
38/// assert_eq!(terminations, vec![Termination::Completed]);
39/// ```
40#[derive(Educe)]
41#[educe(Debug, Clone)]
42pub struct Dematerialize<OE>(OE);
43
44impl<OE> Dematerialize<OE> {
45    pub fn new(source: OE) -> Self {
46        Self(source)
47    }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Dematerialize<OE>
51where
52    OE: Observable<'or, 'sub, Event<T, E>, Infallible>,
53    'sub: 'or,
54{
55    fn subscribe(
56        self,
57        observer: impl Observer<T, E> + NecessarySendSync + 'or,
58    ) -> Subscription<'sub> {
59        subscribe_unsub_after_termination(observer, |observer| {
60            self.0.subscribe(DematerializeObserver(Some(observer)))
61        })
62    }
63}
64
65struct DematerializeObserver<OR>(Option<OR>);
66
67impl<T, E, OR> Observer<Event<T, E>, Infallible> for DematerializeObserver<OR>
68where
69    OR: Observer<T, E>,
70{
71    fn on_next(&mut self, value: Event<T, E>) {
72        match value {
73            Event::Next(value) => {
74                if let Some(observer) = self.0.as_mut() {
75                    observer.on_next(value);
76                }
77            }
78            Event::Termination(termination) => {
79                if let Some(observer) = self.0.take() {
80                    observer.on_termination(termination);
81                }
82            }
83        }
84    }
85
86    fn on_termination(mut self, termination: Termination<Infallible>) {
87        match termination {
88            Termination::Completed => {
89                if let Some(observer) = self.0.take() {
90                    observer.on_termination(Termination::Completed);
91                }
92            }
93            Termination::Error(_) => unreachable!(),
94        }
95    }
96}