rx_rust/operators/connectable/connectable_observable.rs
1use super::ref_count::RefCount;
2use crate::observable::Observable;
3use crate::safe_lock_option;
4use crate::utils::types::{Mutable, NecessarySendSync, Shared};
5use crate::{disposable::subscription::Subscription, observer::Observer};
6use educe::Educe;
7
8/// Represents an Observable that waits until its `connect()` method is called before it begins emitting items to its Observers.
9/// See <https://reactivex.io/documentation/operators/connect.html>
10///
11/// # Examples
12/// ```rust
13/// use rx_rust::{
14/// observable::observable_ext::ObservableExt,
15/// observer::Termination,
16/// operators::{
17/// connectable::connectable_observable::ConnectableObservable,
18/// creating::from_iter::FromIter,
19/// },
20/// subject::publish_subject::PublishSubject,
21/// };
22///
23/// use std::{convert::Infallible, sync::{Arc, Mutex}};
24///
25/// let values_1 = Arc::new(Mutex::new(Vec::new()));
26/// let values_2 = Arc::new(Mutex::new(Vec::new()));
27/// let terminations = Arc::new(Mutex::new(Vec::new()));
28///
29/// let subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
30/// let connectable =
31/// ConnectableObservable::new(FromIter::new(vec![1, 2]), subject.clone());
32/// let values_1_observer = Arc::clone(&values_1);
33/// let values_2_observer = Arc::clone(&values_2);
34/// let terminations_observer = Arc::clone(&terminations);
35///
36/// let subscription_1 = connectable.clone().subscribe_with_callback(
37/// move |value| values_1_observer.lock().unwrap().push(value),
38/// |_| {},
39/// );
40/// let subscription_2 = connectable.clone().subscribe_with_callback(
41/// move |value| values_2_observer.lock().unwrap().push(value),
42/// move |termination| terminations_observer
43/// .lock()
44/// .unwrap()
45/// .push(termination),
46/// );
47///
48/// let connection = connectable.connect();
49/// drop(connection);
50/// drop(subscription_1);
51/// drop(subscription_2);
52///
53/// assert_eq!(&*values_1.lock().unwrap(), &[1, 2]);
54/// assert_eq!(&*values_2.lock().unwrap(), &[1, 2]);
55/// assert_eq!(
56/// &*terminations.lock().unwrap(),
57/// &[Termination::Completed]
58/// );
59/// ```
60#[derive(Educe)]
61#[educe(Debug, Clone)]
62pub struct ConnectableObservable<OE, S> {
63 source: Shared<Mutable<Option<OE>>>,
64 subject: S,
65}
66
67impl<OE, S> ConnectableObservable<OE, S> {
68 pub fn new(source: OE, subject: S) -> Self {
69 Self {
70 source: Shared::new(Mutable::new(Some(source))),
71 subject,
72 }
73 }
74
75 pub fn connect<'or, 'sub, T, E>(self) -> Subscription<'sub>
76 where
77 OE: Observable<'or, 'sub, T, E>,
78 S: Observer<T, E> + NecessarySendSync + 'or,
79 {
80 safe_lock_option!(take: self.source)
81 .expect("Already connected")
82 .subscribe(self.subject)
83 }
84
85 pub fn ref_count<'sub>(self) -> RefCount<'sub, OE, S> {
86 RefCount::new(self)
87 }
88}
89
90impl<'or, 'sub, T, E, OE, S> Observable<'or, 'sub, T, E> for ConnectableObservable<OE, S>
91where
92 S: Observable<'or, 'sub, T, E>,
93{
94 fn subscribe(
95 self,
96 observer: impl Observer<T, E> + NecessarySendSync + 'or,
97 ) -> Subscription<'sub> {
98 self.subject.subscribe(observer)
99 }
100}