rx_rust/utils/
unsub_after_termination.rs1use crate::{
2 disposable::{Disposable, subscription::Subscription},
3 observer::{Observer, Termination},
4 safe_lock,
5 utils::types::{Mutable, MutableHelper, Shared},
6};
7
8enum SubState<'sub> {
9 Initialized,
10 Subscribed(Subscription<'sub>),
11 Unsubscribed,
12}
13
14impl Disposable for Shared<Mutable<SubState<'_>>> {
15 fn dispose(self) {
16 match safe_lock!(mem_replace: self, SubState::Unsubscribed) {
17 SubState::Initialized => {}
18 SubState::Subscribed(subscription) => subscription.dispose(),
19 SubState::Unsubscribed => {}
20 }
21 }
22}
23
24pub fn subscribe_unsub_after_termination<'sub, OR, F>(
26 observer: OR,
27 builder: F,
28) -> Subscription<'sub>
29where
30 F: FnOnce(UnsubAfterTerminationObserver<'sub, OR>) -> Subscription<'sub>,
31{
32 let sub_state = Shared::new(Mutable::new(SubState::Initialized));
33 let observer = UnsubAfterTerminationObserver {
34 observer,
35 sub_state: sub_state.clone(),
36 };
37 let sub = builder(observer);
38
39 sub_state.lock_mut(|mut lock| match &*lock {
40 SubState::Initialized => *lock = SubState::Subscribed(sub),
41 SubState::Subscribed(_) => unreachable!(),
42 SubState::Unsubscribed => sub.dispose(),
43 });
44
45 Subscription::new_with_disposal(sub_state)
46}
47
48pub struct UnsubAfterTerminationObserver<'sub, OR> {
49 observer: OR,
50 sub_state: Shared<Mutable<SubState<'sub>>>,
51}
52
53impl<T, E, OR> Observer<T, E> for UnsubAfterTerminationObserver<'_, OR>
54where
55 OR: Observer<T, E>,
56{
57 fn on_next(&mut self, value: T) {
58 self.observer.on_next(value);
59 }
60
61 fn on_termination(self, termination: Termination<E>) {
62 self.observer.on_termination(termination);
63 self.sub_state.dispose();
64 }
65}