rx_rust/operators/utility/
dematerialize.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Event, Observer, Termination},
6 utils::unsub_after_termination::subscribe_unsub_after_termination,
7};
8use educe::Educe;
9use std::convert::Infallible;
10
11#[derive(Educe)]
41#[educe(Debug, Clone)]
42pub struct Dematerialize<OE>(OE);
43
44impl<OE> Dematerialize<OE> {
45 pub fn new(source: OE) -> Self {
46 Self(source)
47 }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Dematerialize<OE>
51where
52 OE: Observable<'or, 'sub, Event<T, E>, Infallible>,
53 'sub: 'or,
54{
55 fn subscribe(
56 self,
57 observer: impl Observer<T, E> + NecessarySendSync + 'or,
58 ) -> Subscription<'sub> {
59 subscribe_unsub_after_termination(observer, |observer| {
60 self.0.subscribe(DematerializeObserver(Some(observer)))
61 })
62 }
63}
64
65struct DematerializeObserver<OR>(Option<OR>);
66
67impl<T, E, OR> Observer<Event<T, E>, Infallible> for DematerializeObserver<OR>
68where
69 OR: Observer<T, E>,
70{
71 fn on_next(&mut self, value: Event<T, E>) {
72 match value {
73 Event::Next(value) => {
74 if let Some(observer) = self.0.as_mut() {
75 observer.on_next(value);
76 }
77 }
78 Event::Termination(termination) => {
79 if let Some(observer) = self.0.take() {
80 observer.on_termination(termination);
81 }
82 }
83 }
84 }
85
86 fn on_termination(mut self, termination: Termination<Infallible>) {
87 match termination {
88 Termination::Completed => {
89 if let Some(observer) = self.0.take() {
90 observer.on_termination(Termination::Completed);
91 }
92 }
93 Termination::Error(_) => unreachable!(),
94 }
95 }
96}