rx_rust/operators/utility/
dematerialize.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Event, Observer, Termination},
6    utils::unsub_after_termination::subscribe_unsub_after_termination,
7};
8use educe::Educe;
9use std::convert::Infallible;
10
11/// Converts an Observable that emits `Event` objects into a "live" Observable that emits the items and notifications embedded in those `Event` objects.
12/// See <https://reactivex.io/documentation/operators/materialize-dematerialize.html>
13///
14/// # Examples
15/// ```rust
16/// use rx_rust::{
17///     observable::observable_ext::ObservableExt,
18///     observer::Termination,
19///     operators::{
20///         creating::from_iter::FromIter,
21///         utility::{
22///             dematerialize::Dematerialize,
23///             materialize::Materialize,
24///         },
25///     },
26/// };
27///
28/// let mut values = Vec::new();
29/// let mut terminations = Vec::new();
30///
31/// Dematerialize::new(Materialize::new(FromIter::new(vec![1, 2])))
32///     .subscribe_with_callback(
33///         |value| values.push(value),
34///         |termination| terminations.push(termination),
35///     );
36///
37/// assert_eq!(values, vec![1, 2]);
38/// assert_eq!(terminations, vec![Termination::Completed]);
39/// ```
40#[derive(Educe)]
41#[educe(Debug, Clone)]
42pub struct Dematerialize<OE>(OE);
43
44impl<OE> Dematerialize<OE> {
45    pub fn new(source: OE) -> Self {
46        Self(source)
47    }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Dematerialize<OE>
51where
52    OE: Observable<'or, 'sub, Event<T, E>, Infallible>,
53    'sub: 'or,
54{
55    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
56        subscribe_unsub_after_termination(observer, |observer| {
57            self.0.subscribe(DematerializeObserver(Some(observer)))
58        })
59    }
60}
61
62struct DematerializeObserver<OR>(Option<OR>);
63
64impl<T, E, OR> Observer<Event<T, E>, Infallible> for DematerializeObserver<OR>
65where
66    OR: Observer<T, E>,
67{
68    fn on_next(&mut self, value: Event<T, E>) {
69        match value {
70            Event::Next(value) => {
71                if let Some(observer) = self.0.as_mut() {
72                    observer.on_next(value);
73                }
74            }
75            Event::Termination(termination) => {
76                if let Some(observer) = self.0.take() {
77                    observer.on_termination(termination);
78                }
79            }
80        }
81    }
82
83    fn on_termination(mut self, termination: Termination<Infallible>) {
84        match termination {
85            Termination::Completed => {
86                if let Some(observer) = self.0.take() {
87                    observer.on_termination(Termination::Completed);
88                }
89            }
90            Termination::Error(_) => unreachable!(),
91        }
92    }
93}