rx_rust/operators/connectable/
connectable_observable.rs

1use super::ref_count::RefCount;
2use crate::observable::Observable;
3use crate::safe_lock_option;
4use crate::utils::types::{Mutable, NecessarySendSync, Shared};
5use crate::{disposable::subscription::Subscription, observer::Observer};
6use educe::Educe;
7
8/// Represents an Observable that waits until its `connect()` method is called before it begins emitting items to its Observers.
9/// See <https://reactivex.io/documentation/operators/connect.html>
10///
11/// # Examples
12/// ```rust
13/// use rx_rust::{
14///     observable::observable_ext::ObservableExt,
15///     observer::Termination,
16///     operators::{
17///         connectable::connectable_observable::ConnectableObservable,
18///         creating::from_iter::FromIter,
19///     },
20///     subject::publish_subject::PublishSubject,
21/// };
22///
23/// use std::{convert::Infallible, sync::{Arc, Mutex}};
24///
25/// let values_1 = Arc::new(Mutex::new(Vec::new()));
26/// let values_2 = Arc::new(Mutex::new(Vec::new()));
27/// let terminations = Arc::new(Mutex::new(Vec::new()));
28///
29/// let subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
30/// let connectable =
31///     ConnectableObservable::new(FromIter::new(vec![1, 2]), subject.clone());
32/// let values_1_observer = Arc::clone(&values_1);
33/// let values_2_observer = Arc::clone(&values_2);
34/// let terminations_observer = Arc::clone(&terminations);
35///
36/// let subscription_1 = connectable.clone().subscribe_with_callback(
37///     move |value| values_1_observer.lock().unwrap().push(value),
38///     |_| {},
39/// );
40/// let subscription_2 = connectable.clone().subscribe_with_callback(
41///     move |value| values_2_observer.lock().unwrap().push(value),
42///     move |termination| terminations_observer
43///         .lock()
44///         .unwrap()
45///         .push(termination),
46/// );
47///
48/// let connection = connectable.connect();
49/// drop(connection);
50/// drop(subscription_1);
51/// drop(subscription_2);
52///
53/// assert_eq!(&*values_1.lock().unwrap(), &[1, 2]);
54/// assert_eq!(&*values_2.lock().unwrap(), &[1, 2]);
55/// assert_eq!(
56///     &*terminations.lock().unwrap(),
57///     &[Termination::Completed]
58/// );
59/// ```
60#[derive(Educe)]
61#[educe(Debug, Clone)]
62pub struct ConnectableObservable<OE, S> {
63    source: Shared<Mutable<Option<OE>>>,
64    subject: S,
65}
66
67impl<OE, S> ConnectableObservable<OE, S> {
68    pub fn new(source: OE, subject: S) -> Self {
69        Self {
70            source: Shared::new(Mutable::new(Some(source))),
71            subject,
72        }
73    }
74
75    pub fn connect<'or, 'sub, T, E>(self) -> Subscription<'sub>
76    where
77        OE: Observable<'or, 'sub, T, E>,
78        S: Observer<T, E> + NecessarySendSync + 'or,
79    {
80        safe_lock_option!(take: self.source)
81            .expect("Already connected")
82            .subscribe(self.subject)
83    }
84
85    pub fn ref_count<'sub>(self) -> RefCount<'sub, OE, S> {
86        RefCount::new(self)
87    }
88}
89
90impl<'or, 'sub, T, E, OE, S> Observable<'or, 'sub, T, E> for ConnectableObservable<OE, S>
91where
92    S: Observable<'or, 'sub, T, E>,
93{
94    fn subscribe(
95        self,
96        observer: impl Observer<T, E> + NecessarySendSync + 'or,
97    ) -> Subscription<'sub> {
98        self.subject.subscribe(observer)
99    }
100}