rx_rust/utils/
unsub_after_termination.rs

1use crate::{
2    disposable::{Disposable, subscription::Subscription},
3    observer::{Observer, Termination},
4    safe_lock,
5    utils::types::{Mutable, MutableHelper, Shared},
6};
7use educe::Educe;
8
9enum SubState<'sub> {
10    Initialized,
11    Subscribed(Subscription<'sub>),
12    Unsubscribed,
13}
14
15impl Disposable for Shared<Mutable<SubState<'_>>> {
16    fn dispose(self) {
17        match safe_lock!(mem_replace: self, SubState::Unsubscribed) {
18            SubState::Initialized => {}
19            SubState::Subscribed(subscription) => subscription.dispose(),
20            SubState::Unsubscribed => {}
21        }
22    }
23}
24
25/// Wraps subscription creation so that termination from the observer automatically disposes the inner subscription.
26pub fn subscribe_unsub_after_termination<'sub, OR, F>(
27    observer: OR,
28    builder: F,
29) -> Subscription<'sub>
30where
31    F: FnOnce(UnsubAfterTerminationObserver<'sub, OR>) -> Subscription<'sub>,
32{
33    let sub_state = Shared::new(Mutable::new(SubState::Initialized));
34    let observer = UnsubAfterTerminationObserver {
35        observer,
36        sub_state: sub_state.clone(),
37    };
38    let sub = builder(observer);
39
40    sub_state.lock_mut(|mut lock| match &*lock {
41        SubState::Initialized => *lock = SubState::Subscribed(sub),
42        SubState::Subscribed(_) => unreachable!(),
43        SubState::Unsubscribed => sub.dispose(),
44    });
45
46    Subscription::new_with_disposal(sub_state)
47}
48
49#[derive(Educe)]
50#[educe(Debug)]
51pub struct UnsubAfterTerminationObserver<'sub, OR> {
52    observer: OR,
53    sub_state: Shared<Mutable<SubState<'sub>>>,
54}
55
56impl<T, E, OR> Observer<T, E> for UnsubAfterTerminationObserver<'_, OR>
57where
58    OR: Observer<T, E>,
59{
60    fn on_next(&mut self, value: T) {
61        self.observer.on_next(value);
62    }
63
64    fn on_termination(self, termination: Termination<E>) {
65        self.observer.on_termination(termination);
66        self.sub_state.dispose();
67    }
68}