rx_rust/operators/utility/materialize.rs
1use crate::utils::types::NecessarySend;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Event, Observer, Termination},
6};
7use educe::Educe;
8use std::convert::Infallible;
9
10/// Converts an Observable into an Observable that emits `Event` objects, each of which wraps a notification from the source Observable.
11/// See <https://reactivex.io/documentation/operators/materialize-dematerialize.html>
12///
13/// # Examples
14/// ```rust
15/// use rx_rust::{
16/// observable::observable_ext::ObservableExt,
17/// observer::{Event, Termination},
18/// operators::{
19/// creating::from_iter::FromIter,
20/// utility::materialize::Materialize,
21/// },
22/// };
23///
24/// let mut events = Vec::new();
25/// let mut terminations = Vec::new();
26///
27/// Materialize::new(FromIter::new(vec![1, 2])).subscribe_with_callback(
28/// |event| events.push(event),
29/// |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(
33/// events,
34/// vec![
35/// Event::Next(1),
36/// Event::Next(2),
37/// Event::Termination(Termination::Completed)
38/// ]
39/// );
40/// assert_eq!(terminations, vec![Termination::Completed]);
41/// ```
42#[derive(Educe)]
43#[educe(Debug, Clone)]
44pub struct Materialize<OE>(OE);
45
46impl<OE> Materialize<OE> {
47 pub fn new(source: OE) -> Self {
48 Self(source)
49 }
50}
51
52impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, Event<T, E>, Infallible> for Materialize<OE>
53where
54 OE: Observable<'or, 'sub, T, E>,
55{
56 fn subscribe(
57 self,
58 observer: impl Observer<Event<T, E>, Infallible> + NecessarySend + 'or,
59 ) -> Subscription<'sub> {
60 self.0.subscribe(MaterializeObserver(observer))
61 }
62}
63
64struct MaterializeObserver<OR>(OR);
65
66impl<T, E, OR> Observer<T, E> for MaterializeObserver<OR>
67where
68 OR: Observer<Event<T, E>, Infallible>,
69{
70 fn on_next(&mut self, value: T) {
71 self.0.on_next(Event::Next(value));
72 }
73
74 fn on_termination(mut self, termination: Termination<E>) {
75 self.0.on_next(Event::Termination(termination));
76 self.0.on_termination(Termination::Completed);
77 }
78}