rx_rust/operators/utility/
materialize.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Event, Observer, Termination},
6};
7use educe::Educe;
8use std::convert::Infallible;
9
10/// Converts an Observable into an Observable that emits `Event` objects, each of which wraps a notification from the source Observable.
11/// See <https://reactivex.io/documentation/operators/materialize-dematerialize.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16///     observable::observable_ext::ObservableExt,
17///     observer::{Event, Termination},
18///     operators::{
19///         creating::from_iter::FromIter,
20///         utility::materialize::Materialize,
21///     },
22/// };
23///
24/// let mut events = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// Materialize::new(FromIter::new(vec![1, 2])).subscribe_with_callback(
28///     |event| events.push(event),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(
33///     events,
34///     vec![
35///         Event::Next(1),
36///         Event::Next(2),
37///         Event::Termination(Termination::Completed)
38///     ]
39/// );
40/// assert_eq!(terminations, vec![Termination::Completed]);
41/// ```
42#[derive(Educe)]
43#[educe(Debug, Clone)]
44pub struct Materialize<OE>(OE);
45
46impl<OE> Materialize<OE> {
47    pub fn new(source: OE) -> Self {
48        Self(source)
49    }
50}
51
52impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, Event<T, E>, Infallible> for Materialize<OE>
53where
54    OE: Observable<'or, 'sub, T, E>,
55{
56    fn subscribe(
57        self,
58        observer: impl Observer<Event<T, E>, Infallible> + NecessarySendSync + 'or,
59    ) -> Subscription<'sub> {
60        self.0.subscribe(MaterializeObserver(observer))
61    }
62}
63
64struct MaterializeObserver<OR>(OR);
65
66impl<T, E, OR> Observer<T, E> for MaterializeObserver<OR>
67where
68    OR: Observer<Event<T, E>, Infallible>,
69{
70    fn on_next(&mut self, value: T) {
71        self.0.on_next(Event::Next(value));
72    }
73
74    fn on_termination(mut self, termination: Termination<E>) {
75        self.0.on_next(Event::Termination(termination));
76        self.0.on_termination(Termination::Completed);
77    }
78}