rx_rust/operators/utility/
dematerialize.rs1use crate::utils::types::NecessarySendSync;
2use crate::{
3 disposable::subscription::Subscription,
4 observable::Observable,
5 observer::{Event, Observer, Termination},
6 utils::unsub_after_termination::subscribe_unsub_after_termination,
7};
8use educe::Educe;
9use std::convert::Infallible;
10
11#[derive(Educe)]
41#[educe(Debug, Clone)]
42pub struct Dematerialize<OE>(OE);
43
44impl<OE> Dematerialize<OE> {
45 pub fn new(source: OE) -> Self {
46 Self(source)
47 }
48}
49
50impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, T, E> for Dematerialize<OE>
51where
52 OE: Observable<'or, 'sub, Event<T, E>, Infallible>,
53 'sub: 'or,
54{
55 fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
56 subscribe_unsub_after_termination(observer, |observer| {
57 self.0.subscribe(DematerializeObserver(Some(observer)))
58 })
59 }
60}
61
62struct DematerializeObserver<OR>(Option<OR>);
63
64impl<T, E, OR> Observer<Event<T, E>, Infallible> for DematerializeObserver<OR>
65where
66 OR: Observer<T, E>,
67{
68 fn on_next(&mut self, value: Event<T, E>) {
69 match value {
70 Event::Next(value) => {
71 if let Some(observer) = self.0.as_mut() {
72 observer.on_next(value);
73 }
74 }
75 Event::Termination(termination) => {
76 if let Some(observer) = self.0.take() {
77 observer.on_termination(termination);
78 }
79 }
80 }
81 }
82
83 fn on_termination(mut self, termination: Termination<Infallible>) {
84 match termination {
85 Termination::Completed => {
86 if let Some(observer) = self.0.take() {
87 observer.on_termination(Termination::Completed);
88 }
89 }
90 Termination::Error(_) => unreachable!(),
91 }
92 }
93}