rxrust/observable/
connectable_observable.rs1use crate::prelude::*;
2use crate::subject::{LocalSubject, SharedSubject};
3use ops::ref_count::{RefCount, RefCountCreator};
4
5#[derive(Clone, Default)]
6pub struct ConnectableObservable<Source, Subject> {
7 pub(crate) source: Source,
8 pub(crate) subject: Subject,
9}
10
11impl<Source, Subject: Default> ConnectableObservable<Source, Subject> {
12 pub fn new(source: Source) -> Self {
13 ConnectableObservable {
14 source,
15 subject: Subject::default(),
16 }
17 }
18}
19crate::observable_proxy_impl!(ConnectableObservable, Source, Subject);
20
21pub type LocalConnectableObservable<'a, S, Item, Err> =
22 ConnectableObservable<S, LocalSubject<'a, Item, Err>>;
23
24pub type SharedConnectableObservable<S, Item, Err> =
25 ConnectableObservable<S, SharedSubject<Item, Err>>;
26
27#[doc(hidden)]
28macro_rules! observable_impl {
29 ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
30 type Unsub = $subscription;
31 #[inline(always)]
32 fn actual_subscribe<O>(
33 self,
34 subscriber: Subscriber<O, $subscription>,
35 ) -> Self::Unsub
36 where O: Observer<Item=Self::Item, Err= Self::Err> + $($marker +)* $lf {
37 self.subject.actual_subscribe(subscriber)
38 }
39}
40}
41
42impl<'a, S, Item, Err> LocalObservable<'a>
43 for LocalConnectableObservable<'a, S, Item, Err>
44where
45 S: LocalObservable<'a, Item = Item, Err = Err>,
46{
47 observable_impl!(LocalSubscription, 'a);
48}
49
50impl<S, Item, Err> SharedObservable
51 for SharedConnectableObservable<S, Item, Err>
52where
53 S: SharedObservable<Item = Item, Err = Err>,
54 S: SharedObservable<Item = Item, Err = Err>,
55{
56 observable_impl!(SharedSubscription, Send + Sync + 'static);
57}
58
59impl<Source, Subject> ConnectableObservable<Source, Subject>
60where
61 Source: Clone,
62{
63 #[inline]
64 pub fn ref_count<Inner: RefCountCreator<Connectable = Self>>(
65 self,
66 ) -> RefCount<Inner, Self> {
67 Inner::new(self)
68 }
69}
70
71impl<'a, S, Item, Err> LocalConnectableObservable<'a, S, Item, Err>
72where
73 S: LocalObservable<'a, Item = Item, Err = Err>,
74 Item: Clone + 'a,
75 Err: Clone + 'a,
76{
77 pub fn connect(self) -> S::Unsub {
78 self.source.actual_subscribe(Subscriber {
79 observer: self.subject.observers,
80 subscription: self.subject.subscription,
81 })
82 }
83}
84
85impl<S, Item, Err> SharedConnectableObservable<S, Item, Err>
86where
87 S: SharedObservable<Item = Item, Err = Err>,
88 Item: Clone + Send + Sync + 'static,
89 Err: Clone + Send + Sync + 'static,
90{
91 pub fn connect(self) -> S::Unsub {
92 self.source.actual_subscribe(Subscriber {
93 observer: self.subject.observers,
94 subscription: self.subject.subscription,
95 })
96 }
97}
98
99#[cfg(test)]
100mod test {
101 use super::*;
102
103 #[test]
104 fn smoke() {
105 let o = observable::of(100);
106 let connected = ConnectableObservable::new(o);
107 let mut first = 0;
108 let mut second = 0;
109 connected.clone().subscribe(|v| first = v);
110 connected.clone().subscribe(|v| second = v);
111
112 connected.connect();
113 assert_eq!(first, 100);
114 assert_eq!(second, 100);
115 }
116
117 #[test]
118 fn fork_and_shared() {
119 let o = observable::of(100);
120 let connected = ConnectableObservable::new(o);
121 connected.clone().into_shared().subscribe(|_| {});
122 connected.clone().into_shared().subscribe(|_| {});
123
124 connected.connect();
125 }
126 #[test]
127 fn publish_smoke() {
128 let p = observable::of(100).publish();
129 let mut first = 0;
130 let mut second = 0;
131 p.clone().subscribe(|v| first = v);
132 p.clone().subscribe(|v| second = v);
133
134 p.connect();
135 assert_eq!(first, 100);
136 assert_eq!(second, 100);
137 }
138
139 #[test]
140 fn bench() { do_bench(); }
141
142 benchmark_group!(do_bench, bench_connectable);
143
144 fn bench_connectable(b: &mut bencher::Bencher) { b.iter(smoke); }
145}