rx_rust/utils/
unsub_after_termination.rs

1use crate::{
2    disposable::{Disposable, subscription::Subscription},
3    observer::{Observer, Termination},
4    safe_lock,
5    utils::types::{Mutable, MutableHelper, Shared},
6};
7
8enum SubState<'sub> {
9    Initialized,
10    Subscribed(Subscription<'sub>),
11    Unsubscribed,
12}
13
14impl Disposable for Shared<Mutable<SubState<'_>>> {
15    fn dispose(self) {
16        match safe_lock!(mem_replace: self, SubState::Unsubscribed) {
17            SubState::Initialized => {}
18            SubState::Subscribed(subscription) => subscription.dispose(),
19            SubState::Unsubscribed => {}
20        }
21    }
22}
23
24/// Wraps subscription creation so that termination from the observer automatically disposes the inner subscription.
25pub fn subscribe_unsub_after_termination<'sub, OR, F>(
26    observer: OR,
27    builder: F,
28) -> Subscription<'sub>
29where
30    F: FnOnce(UnsubAfterTerminationObserver<'sub, OR>) -> Subscription<'sub>,
31{
32    let sub_state = Shared::new(Mutable::new(SubState::Initialized));
33    let observer = UnsubAfterTerminationObserver {
34        observer,
35        sub_state: sub_state.clone(),
36    };
37    let sub = builder(observer);
38
39    sub_state.lock_mut(|mut lock| match &*lock {
40        SubState::Initialized => *lock = SubState::Subscribed(sub),
41        SubState::Subscribed(_) => unreachable!(),
42        SubState::Unsubscribed => sub.dispose(),
43    });
44
45    Subscription::new_with_disposal(sub_state)
46}
47
48pub struct UnsubAfterTerminationObserver<'sub, OR> {
49    observer: OR,
50    sub_state: Shared<Mutable<SubState<'sub>>>,
51}
52
53impl<T, E, OR> Observer<T, E> for UnsubAfterTerminationObserver<'_, OR>
54where
55    OR: Observer<T, E>,
56{
57    fn on_next(&mut self, value: T) {
58        self.observer.on_next(value);
59    }
60
61    fn on_termination(self, termination: Termination<E>) {
62        self.observer.on_termination(termination);
63        self.sub_state.dispose();
64    }
65}