rxrust/observable/
connectable_observable.rs

1use crate::prelude::*;
2use crate::subject::{LocalSubject, SharedSubject};
3use ops::ref_count::{RefCount, RefCountCreator};
4
5#[derive(Clone, Default)]
6pub struct ConnectableObservable<Source, Subject> {
7  pub(crate) source: Source,
8  pub(crate) subject: Subject,
9}
10
11impl<Source, Subject: Default> ConnectableObservable<Source, Subject> {
12  pub fn new(source: Source) -> Self {
13    ConnectableObservable {
14      source,
15      subject: Subject::default(),
16    }
17  }
18}
19crate::observable_proxy_impl!(ConnectableObservable, Source, Subject);
20
21pub type LocalConnectableObservable<'a, S, Item, Err> =
22  ConnectableObservable<S, LocalSubject<'a, Item, Err>>;
23
24pub type SharedConnectableObservable<S, Item, Err> =
25  ConnectableObservable<S, SharedSubject<Item, Err>>;
26
27#[doc(hidden)]
28macro_rules! observable_impl {
29    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
30  type Unsub = $subscription;
31  #[inline(always)]
32  fn actual_subscribe<O>(
33    self,
34    subscriber: Subscriber<O, $subscription>,
35  ) -> Self::Unsub
36  where O: Observer<Item=Self::Item, Err= Self::Err> + $($marker +)* $lf {
37    self.subject.actual_subscribe(subscriber)
38  }
39}
40}
41
42impl<'a, S, Item, Err> LocalObservable<'a>
43  for LocalConnectableObservable<'a, S, Item, Err>
44where
45  S: LocalObservable<'a, Item = Item, Err = Err>,
46{
47  observable_impl!(LocalSubscription, 'a);
48}
49
50impl<S, Item, Err> SharedObservable
51  for SharedConnectableObservable<S, Item, Err>
52where
53  S: SharedObservable<Item = Item, Err = Err>,
54  S: SharedObservable<Item = Item, Err = Err>,
55{
56  observable_impl!(SharedSubscription, Send + Sync + 'static);
57}
58
59impl<Source, Subject> ConnectableObservable<Source, Subject>
60where
61  Source: Clone,
62{
63  #[inline]
64  pub fn ref_count<Inner: RefCountCreator<Connectable = Self>>(
65    self,
66  ) -> RefCount<Inner, Self> {
67    Inner::new(self)
68  }
69}
70
71impl<'a, S, Item, Err> LocalConnectableObservable<'a, S, Item, Err>
72where
73  S: LocalObservable<'a, Item = Item, Err = Err>,
74  Item: Clone + 'a,
75  Err: Clone + 'a,
76{
77  pub fn connect(self) -> S::Unsub {
78    self.source.actual_subscribe(Subscriber {
79      observer: self.subject.observers,
80      subscription: self.subject.subscription,
81    })
82  }
83}
84
85impl<S, Item, Err> SharedConnectableObservable<S, Item, Err>
86where
87  S: SharedObservable<Item = Item, Err = Err>,
88  Item: Clone + Send + Sync + 'static,
89  Err: Clone + Send + Sync + 'static,
90{
91  pub fn connect(self) -> S::Unsub {
92    self.source.actual_subscribe(Subscriber {
93      observer: self.subject.observers,
94      subscription: self.subject.subscription,
95    })
96  }
97}
98
99#[cfg(test)]
100mod test {
101  use super::*;
102
103  #[test]
104  fn smoke() {
105    let o = observable::of(100);
106    let connected = ConnectableObservable::new(o);
107    let mut first = 0;
108    let mut second = 0;
109    connected.clone().subscribe(|v| first = v);
110    connected.clone().subscribe(|v| second = v);
111
112    connected.connect();
113    assert_eq!(first, 100);
114    assert_eq!(second, 100);
115  }
116
117  #[test]
118  fn fork_and_shared() {
119    let o = observable::of(100);
120    let connected = ConnectableObservable::new(o);
121    connected.clone().into_shared().subscribe(|_| {});
122    connected.clone().into_shared().subscribe(|_| {});
123
124    connected.connect();
125  }
126  #[test]
127  fn publish_smoke() {
128    let p = observable::of(100).publish();
129    let mut first = 0;
130    let mut second = 0;
131    p.clone().subscribe(|v| first = v);
132    p.clone().subscribe(|v| second = v);
133
134    p.connect();
135    assert_eq!(first, 100);
136    assert_eq!(second, 100);
137  }
138
139  #[test]
140  fn bench() { do_bench(); }
141
142  benchmark_group!(do_bench, bench_connectable);
143
144  fn bench_connectable(b: &mut bencher::Bencher) { b.iter(smoke); }
145}