1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use RefCount;
use crateObservable;
use cratesafe_lock_option;
use crate;
use crate::;
use Educe;
/// Represents an Observable that waits until its `connect()` method is called before it begins emitting items to its Observers.
/// See <https://reactivex.io/documentation/operators/connect.html>
///
/// # Examples
/// ```rust
/// use rx_rust::{
/// observable::observable_ext::ObservableExt,
/// observer::Termination,
/// operators::{
/// connectable::connectable_observable::ConnectableObservable,
/// creating::from_iter::FromIter,
/// },
/// subject::publish_subject::PublishSubject,
/// };
///
/// use std::{convert::Infallible, sync::{Arc, Mutex}};
///
/// let values_1 = Arc::new(Mutex::new(Vec::new()));
/// let values_2 = Arc::new(Mutex::new(Vec::new()));
/// let terminations = Arc::new(Mutex::new(Vec::new()));
///
/// let subject: PublishSubject<'_, i32, Infallible> = PublishSubject::default();
/// let connectable =
/// ConnectableObservable::new(FromIter::new(vec![1, 2]), subject.clone());
/// let values_1_observer = Arc::clone(&values_1);
/// let values_2_observer = Arc::clone(&values_2);
/// let terminations_observer = Arc::clone(&terminations);
///
/// let subscription_1 = connectable.clone().subscribe_with_callback(
/// move |value| values_1_observer.lock().unwrap().push(value),
/// |_| {},
/// );
/// let subscription_2 = connectable.clone().subscribe_with_callback(
/// move |value| values_2_observer.lock().unwrap().push(value),
/// move |termination| terminations_observer
/// .lock()
/// .unwrap()
/// .push(termination),
/// );
///
/// let connection = connectable.connect();
/// drop(connection);
/// drop(subscription_1);
/// drop(subscription_2);
///
/// assert_eq!(&*values_1.lock().unwrap(), &[1, 2]);
/// assert_eq!(&*values_2.lock().unwrap(), &[1, 2]);
/// assert_eq!(
/// &*terminations.lock().unwrap(),
/// &[Termination::Completed]
/// );
/// ```