rx_rust/operators/connectable/ref_count.rs
1use super::connectable_observable::ConnectableObservable;
2use crate::disposable::Disposable;
3use crate::disposable::subscription::Subscription;
4use crate::observable::Observable;
5use crate::observer::Observer;
6use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
7use educe::Educe;
8
9enum State<'sub> {
10 Initialized,
11 Subscribed(usize, Subscription<'sub>),
12 Unsubscribed,
13}
14
15/// Makes a `ConnectableObservable` behave like an ordinary `Observable` that automatically connects and disconnects.
16/// See <https://reactivex.io/documentation/operators/refcount.html>
17///
18/// # Examples
19/// ```rust
20/// use rx_rust::{
21/// observable::observable_ext::ObservableExt,
22/// observer::Termination,
23/// operators::{
24/// connectable::{connectable_observable::ConnectableObservable, ref_count::RefCount},
25/// creating::from_iter::FromIter,
26/// },
27/// subject::publish_subject::PublishSubject,
28/// };
29/// use std::{convert::Infallible, sync::{Arc, Mutex}};
30///
31/// let values = Arc::new(Mutex::new(Vec::new()));
32/// let terminations = Arc::new(Mutex::new(Vec::new()));
33///
34/// let subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
35/// let connectable =
36/// ConnectableObservable::new(FromIter::new(vec![1, 2]), subject.clone());
37/// let observable: RefCount<'_, _, _> = connectable.ref_count();
38/// let values_observer = Arc::clone(&values);
39/// let terminations_observer = Arc::clone(&terminations);
40///
41/// let subscription = observable.clone().subscribe_with_callback(
42/// move |value| values_observer.lock().unwrap().push(value),
43/// move |termination| terminations_observer
44/// .lock()
45/// .unwrap()
46/// .push(termination),
47/// );
48///
49/// drop(subscription);
50///
51/// assert_eq!(&*values.lock().unwrap(), &[1, 2]);
52/// assert_eq!(
53/// &*terminations.lock().unwrap(),
54/// &[Termination::Completed]
55/// );
56/// ```
57#[derive(Educe)]
58#[educe(Debug, Clone)]
59pub struct RefCount<'sub, OE, S> {
60 source: ConnectableObservable<OE, S>,
61 state: Shared<Mutable<State<'sub>>>,
62}
63
64impl<OE, S> RefCount<'_, OE, S> {
65 pub fn new(source: ConnectableObservable<OE, S>) -> Self {
66 Self {
67 source,
68 state: Shared::new(Mutable::new(State::Initialized)),
69 }
70 }
71}
72
73impl<'or, 'sub, T, E, OE, S> Observable<'or, 'sub, T, E> for RefCount<'sub, OE, S>
74where
75 OE: Observable<'or, 'sub, T, E>,
76 S: Observable<'or, 'sub, T, E> + Observer<T, E> + Clone + NecessarySendSync + 'or,
77{
78 fn subscribe(
79 self,
80 observer: impl Observer<T, E> + NecessarySendSync + 'or,
81 ) -> Subscription<'sub> {
82 let sub = self.source.clone().subscribe(observer);
83 self.state.lock_mut(|mut lock| {
84 match &mut *lock {
85 State::Initialized => {
86 *lock = State::Subscribed(1, self.source.connect());
87 }
88 State::Subscribed(count, _) => *count += 1,
89 State::Unsubscribed => panic!("Already Unsubscribed"),
90 };
91 });
92 sub + self.state
93 }
94}
95
96impl Disposable for Shared<Mutable<State<'_>>> {
97 fn dispose(self) {
98 self.lock_mut(|mut lock| match &mut *lock {
99 State::Initialized => unreachable!(),
100 State::Subscribed(count, _) => {
101 *count -= 1;
102 if *count == 0 {
103 let state = std::mem::replace(&mut *lock, State::Unsubscribed);
104 drop(lock);
105 match state {
106 State::Initialized => unreachable!(),
107 State::Subscribed(_, subscription) => {
108 subscription.dispose();
109 }
110 State::Unsubscribed => unreachable!(),
111 }
112 }
113 }
114 State::Unsubscribed => unreachable!(),
115 });
116 }
117}