rx_rust/operators/connectable/
ref_count.rs

1use super::connectable_observable::ConnectableObservable;
2use crate::disposable::Disposable;
3use crate::disposable::subscription::Subscription;
4use crate::observable::Observable;
5use crate::observer::Observer;
6use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
7use educe::Educe;
8
9enum State<'sub> {
10    Initialized,
11    Subscribed(usize, Subscription<'sub>),
12    Unsubscribed,
13}
14
15/// Makes a `ConnectableObservable` behave like an ordinary `Observable` that automatically connects and disconnects.
16/// See <https://reactivex.io/documentation/operators/refcount.html>
17///
18/// # Examples
19/// ```rust
20/// use rx_rust::{
21///     observable::observable_ext::ObservableExt,
22///     observer::Termination,
23///     operators::{
24///         connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
25///         creating::from_iter::FromIter,
26///     },
27///     subject::publish_subject::PublishSubject,
28/// };
29/// use std::{convert::Infallible, sync::{Arc, Mutex}};
30///
31/// let values = Arc::new(Mutex::new(Vec::new()));
32/// let terminations = Arc::new(Mutex::new(Vec::new()));
33///
34/// let subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
35/// let connectable =
36///     ConnectableObservable::new(FromIter::new(vec![1, 2]), subject.clone());
37/// let observable: RefCount<'_, _, _> = connectable.ref_count();
38/// let values_observer = Arc::clone(&values);
39/// let terminations_observer = Arc::clone(&terminations);
40///
41/// let subscription = observable.clone().subscribe_with_callback(
42///     move |value| values_observer.lock().unwrap().push(value),
43///     move |termination| terminations_observer
44///         .lock()
45///         .unwrap()
46///         .push(termination),
47/// );
48///
49/// drop(subscription);
50///
51/// assert_eq!(&*values.lock().unwrap(), &[1, 2]);
52/// assert_eq!(
53///     &*terminations.lock().unwrap(),
54///     &[Termination::Completed]
55/// );
56/// ```
57#[derive(Educe)]
58#[educe(Debug, Clone)]
59pub struct RefCount<'sub, OE, S> {
60    source: ConnectableObservable<OE, S>,
61    state: Shared<Mutable<State<'sub>>>,
62}
63
64impl<OE, S> RefCount<'_, OE, S> {
65    pub fn new(source: ConnectableObservable<OE, S>) -> Self {
66        Self {
67            source,
68            state: Shared::new(Mutable::new(State::Initialized)),
69        }
70    }
71}
72
73impl<'or, 'sub, T, E, OE, S> Observable<'or, 'sub, T, E> for RefCount<'sub, OE, S>
74where
75    OE: Observable<'or, 'sub, T, E>,
76    S: Observable<'or, 'sub, T, E> + Observer<T, E> + Clone + NecessarySendSync + 'or,
77{
78    fn subscribe(
79        self,
80        observer: impl Observer<T, E> + NecessarySendSync + 'or,
81    ) -> Subscription<'sub> {
82        let sub = self.source.clone().subscribe(observer);
83        self.state.lock_mut(|mut lock| {
84            match &mut *lock {
85                State::Initialized => {
86                    *lock = State::Subscribed(1, self.source.connect());
87                }
88                State::Subscribed(count, _) => *count += 1,
89                State::Unsubscribed => panic!("Already Unsubscribed"),
90            };
91        });
92        sub + self.state
93    }
94}
95
96impl Disposable for Shared<Mutable<State<'_>>> {
97    fn dispose(self) {
98        self.lock_mut(|mut lock| match &mut *lock {
99            State::Initialized => unreachable!(),
100            State::Subscribed(count, _) => {
101                *count -= 1;
102                if *count == 0 {
103                    let state = std::mem::replace(&mut *lock, State::Unsubscribed);
104                    drop(lock);
105                    match state {
106                        State::Initialized => unreachable!(),
107                        State::Subscribed(_, subscription) => {
108                            subscription.dispose();
109                        }
110                        State::Unsubscribed => unreachable!(),
111                    }
112                }
113            }
114            State::Unsubscribed => unreachable!(),
115        });
116    }
117}