rx_rust/utils/
unsub_after_termination.rs1use crate::{
2 disposable::{Disposable, subscription::Subscription},
3 observer::{Observer, Termination},
4 safe_lock,
5 utils::types::{Mutable, MutableHelper, Shared},
6};
7use educe::Educe;
8
9enum SubState<'sub> {
10 Initialized,
11 Subscribed(Subscription<'sub>),
12 Unsubscribed,
13}
14
15impl Disposable for Shared<Mutable<SubState<'_>>> {
16 fn dispose(self) {
17 match safe_lock!(mem_replace: self, SubState::Unsubscribed) {
18 SubState::Initialized => {}
19 SubState::Subscribed(subscription) => subscription.dispose(),
20 SubState::Unsubscribed => {}
21 }
22 }
23}
24
25pub fn subscribe_unsub_after_termination<'sub, OR, F>(
27 observer: OR,
28 builder: F,
29) -> Subscription<'sub>
30where
31 F: FnOnce(UnsubAfterTerminationObserver<'sub, OR>) -> Subscription<'sub>,
32{
33 let sub_state = Shared::new(Mutable::new(SubState::Initialized));
34 let observer = UnsubAfterTerminationObserver {
35 observer,
36 sub_state: sub_state.clone(),
37 };
38 let sub = builder(observer);
39
40 sub_state.lock_mut(|mut lock| match &*lock {
41 SubState::Initialized => *lock = SubState::Subscribed(sub),
42 SubState::Subscribed(_) => unreachable!(),
43 SubState::Unsubscribed => sub.dispose(),
44 });
45
46 Subscription::new_with_disposal(sub_state)
47}
48
49#[derive(Educe)]
50#[educe(Debug)]
51pub struct UnsubAfterTerminationObserver<'sub, OR> {
52 observer: OR,
53 sub_state: Shared<Mutable<SubState<'sub>>>,
54}
55
56impl<T, E, OR> Observer<T, E> for UnsubAfterTerminationObserver<'_, OR>
57where
58 OR: Observer<T, E>,
59{
60 fn on_next(&mut self, value: T) {
61 self.observer.on_next(value);
62 }
63
64 fn on_termination(self, termination: Termination<E>) {
65 self.observer.on_termination(termination);
66 self.sub_state.dispose();
67 }
68}