rx_rust/operators/connectable/
ref_count.rs

1use super::connectable_observable::ConnectableObservable;
2use crate::disposable::Disposable;
3use crate::disposable::subscription::Subscription;
4use crate::observable::Observable;
5use crate::observer::Observer;
6use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
7use educe::Educe;
8
9enum State<'sub> {
10    Initialized,
11    Subscribed(usize, Subscription<'sub>),
12    Unsubscribed,
13}
14
15/// Makes a `ConnectableObservable` behave like an ordinary `Observable` that automatically connects and disconnects.
16/// See <https://reactivex.io/documentation/operators/refcount.html>
17///
18/// # Examples
19/// ```rust
20/// use rx_rust::{
21///     observable::observable_ext::ObservableExt,
22///     observer::Termination,
23///     operators::{
24///         connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
25///         creating::from_iter::FromIter,
26///     },
27///     subject::publish_subject::PublishSubject,
28/// };
29/// use std::{convert::Infallible, sync::{Arc, Mutex}};
30///
31/// let values = Arc::new(Mutex::new(Vec::new()));
32/// let terminations = Arc::new(Mutex::new(Vec::new()));
33///
34/// let subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
35/// let connectable =
36///     ConnectableObservable::new(FromIter::new(vec![1, 2]), subject.clone());
37/// let observable: RefCount<'_, _, _> = connectable.ref_count();
38/// let values_observer = Arc::clone(&values);
39/// let terminations_observer = Arc::clone(&terminations);
40///
41/// let subscription = observable.clone().subscribe_with_callback(
42///     move |value| values_observer.lock().unwrap().push(value),
43///     move |termination| terminations_observer
44///         .lock()
45///         .unwrap()
46///         .push(termination),
47/// );
48///
49/// drop(subscription);
50///
51/// assert_eq!(&*values.lock().unwrap(), &[1, 2]);
52/// assert_eq!(
53///     &*terminations.lock().unwrap(),
54///     &[Termination::Completed]
55/// );
56/// ```
57#[derive(Educe)]
58#[educe(Debug, Clone)]
59pub struct RefCount<'sub, OE, S> {
60    source: ConnectableObservable<OE, S>,
61    state: Shared<Mutable<State<'sub>>>,
62}
63
64impl<OE, S> RefCount<'_, OE, S> {
65    pub fn new(source: ConnectableObservable<OE, S>) -> Self {
66        Self {
67            source,
68            state: Shared::new(Mutable::new(State::Initialized)),
69        }
70    }
71}
72
73impl<'or, 'sub, T, E, OE, S> Observable<'or, 'sub, T, E> for RefCount<'sub, OE, S>
74where
75    OE: Observable<'or, 'sub, T, E>,
76    S: Observable<'or, 'sub, T, E> + Observer<T, E> + Clone + NecessarySendSync + 'or,
77{
78    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
79        let sub = self.source.clone().subscribe(observer);
80        self.state.lock_mut(|mut lock| {
81            match &mut *lock {
82                State::Initialized => {
83                    *lock = State::Subscribed(1, self.source.connect());
84                }
85                State::Subscribed(count, _) => *count += 1,
86                State::Unsubscribed => panic!("Already Unsubscribed"),
87            };
88        });
89        sub + self.state
90    }
91}
92
93impl Disposable for Shared<Mutable<State<'_>>> {
94    fn dispose(self) {
95        self.lock_mut(|mut lock| match &mut *lock {
96            State::Initialized => unreachable!(),
97            State::Subscribed(count, _) => {
98                *count -= 1;
99                if *count == 0 {
100                    let state = std::mem::replace(&mut *lock, State::Unsubscribed);
101                    drop(lock);
102                    match state {
103                        State::Initialized => unreachable!(),
104                        State::Subscribed(_, subscription) => {
105                            subscription.dispose();
106                        }
107                        State::Unsubscribed => unreachable!(),
108                    }
109                }
110            }
111            State::Unsubscribed => unreachable!(),
112        });
113    }
114}