another_rxrust/operators/
ref_count.rs1use crate::prelude::*;
2use std::sync::{Arc, RwLock};
3use subject::Subject;
4
5#[derive(Clone)]
6pub struct RefCount<'a, Item>
7where
8 Item: Clone + Send + Sync,
9{
10 subject: subjects::Subject<'a, Item>,
11 source: Observable<'a, Item>,
12 subscription: Arc<RwLock<Option<Subscription<'a>>>>,
13}
14
15impl<'a, Item> RefCount<'a, Item>
16where
17 Item: Clone + Send + Sync,
18{
19 pub fn new(source: Observable<'a, Item>) -> RefCount<'a, Item> {
20 let _self = RefCount {
21 subject: Subject::<Item>::new(),
22 source,
23 subscription: Arc::new(RwLock::new(None)),
24 };
25 _self.set_ref_count();
26 _self
27 }
28
29 pub fn observable(&self) -> Observable<'a, Item> {
30 self.subject.observable()
31 }
32
33 fn set_ref_count(&self) {
34 {
35 let subscription = Arc::clone(&self.subscription);
36 self.subject.set_on_unsubscribe(move |count| {
37 if count == 0 {
38 if let Some(sbsc) = &*subscription.read().unwrap() {
39 sbsc.unsubscribe();
40 }
41 }
42 });
43 }
44
45 let source = self.source.clone();
46 let subject = self.subject.clone();
47 let subscription = Arc::clone(&self.subscription);
48
49 self.subject.set_on_subscribe(move |count| {
50 if count == 1 {
51 let sbj_next = subject.clone();
53 let sbj_error = subject.clone();
54 let sbj_complete = subject.clone();
55
56 let mut subscription = subscription.write().unwrap();
57 if subscription.is_some() {
58 return;
59 }
60
61 *subscription = Some(source.subscribe(
62 move |x| {
63 sbj_next.next(x);
64 },
65 move |e| {
66 sbj_error.error(e);
67 },
68 move || {
69 sbj_complete.complete();
70 },
71 ));
72 }
73 });
74 }
75}
76
77impl<'a, Item> Observable<'a, Item>
78where
79 Item: Clone + Send + Sync,
80{
81 pub fn ref_count(&self) -> RefCount<'a, Item> {
82 RefCount::new(self.clone())
83 }
84}
85
86#[cfg(all(test, not(feature = "web")))]
87mod test {
88 use crate::prelude::*;
89 use crate::{print_complete, print_error, print_next_fmt};
90 use schedulers::new_thread_scheduler;
91 use std::{thread, time};
92
93 #[test]
94 fn basic() {
95 let o = observables::from_iter(0..10)
96 .tap(
97 print_next_fmt!("tap {}"),
98 print_error!(),
99 print_complete!(),
100 )
101 .ref_count();
102 let obs = o.observable();
103
104 println!("start #1");
105 let sbsc1 = obs.subscribe(
106 print_next_fmt!("#1 {}"),
107 print_error!(),
108 print_complete!(),
109 );
110
111 println!("start #2");
112 let sbsc2 = obs.subscribe(
113 print_next_fmt!("#2 {}"),
114 print_error!(),
115 print_complete!(),
116 );
117
118 println!("end #1");
119 sbsc1.unsubscribe();
120
121 println!("end #2");
122 sbsc2.unsubscribe();
123 }
124
125 #[test]
126 fn thread() {
127 let o = observables::interval(
128 time::Duration::from_millis(100),
129 new_thread_scheduler(),
130 )
131 .tap(
132 print_next_fmt!("tap {}"),
133 print_error!(),
134 print_complete!(),
135 )
136 .ref_count();
137 let obs = o.observable();
138
139 println!("start #1");
140 let sbsc1 = obs.subscribe(
141 print_next_fmt!("#1 {}"),
142 print_error!(),
143 print_complete!(),
144 );
145
146 thread::sleep(time::Duration::from_millis(500));
147
148 println!("start #2");
149 let sbsc2 = obs.subscribe(
150 print_next_fmt!("#2 {}"),
151 print_error!(),
152 print_complete!(),
153 );
154
155 thread::sleep(time::Duration::from_millis(500));
156
157 println!("end #1");
158 sbsc1.unsubscribe();
159
160 thread::sleep(time::Duration::from_millis(500));
161
162 println!("end #2");
163 sbsc2.unsubscribe();
164
165 println!("final wait start");
166 thread::sleep(time::Duration::from_millis(1000));
167 println!("final wait end");
168 }
169}