rx_rust/operators/utility/
materialize.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Event, Observer, Termination},
6};
7use educe::Educe;
8use std::convert::Infallible;
9
10#[derive(Educe)]
43#[educe(Debug, Clone)]
44pub struct Materialize<OE>(OE);
45
46impl<OE> Materialize<OE> {
47 pub fn new(source: OE) -> Self {
48 Self(source)
49 }
50}
51
52impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, Event<T, E>, Infallible> for Materialize<OE>
53where
54 OE: Observable<'or, 'sub, T, E>,
55{
56 fn subscribe(
57 self,
58 observer: impl Observer<Event<T, E>, Infallible> + NecessarySendSync + 'or,
59 ) -> Subscription<'sub> {
60 self.0.subscribe(MaterializeObserver(observer))
61 }
62}
63
64struct MaterializeObserver<OR>(OR);
65
66impl<T, E, OR> Observer<T, E> for MaterializeObserver<OR>
67where
68 OR: Observer<Event<T, E>, Infallible>,
69{
70 fn on_next(&mut self, value: T) {
71 self.0.on_next(Event::Next(value));
72 }
73
74 fn on_termination(mut self, termination: Termination<E>) {
75 self.0.on_next(Event::Termination(termination));
76 self.0.on_termination(Termination::Completed);
77 }
78}