another_rxrust/operators/
ref_count.rs

1use crate::prelude::*;
2use std::sync::{Arc, RwLock};
3use subject::Subject;
4
5#[derive(Clone)]
6pub struct RefCount<'a, Item>
7where
8  Item: Clone + Send + Sync,
9{
10  subject: subjects::Subject<'a, Item>,
11  source: Observable<'a, Item>,
12  subscription: Arc<RwLock<Option<Subscription<'a>>>>,
13}
14
15impl<'a, Item> RefCount<'a, Item>
16where
17  Item: Clone + Send + Sync,
18{
19  pub fn new(source: Observable<'a, Item>) -> RefCount<'a, Item> {
20    let _self = RefCount {
21      subject: Subject::<Item>::new(),
22      source,
23      subscription: Arc::new(RwLock::new(None)),
24    };
25    _self.set_ref_count();
26    _self
27  }
28
29  pub fn observable(&self) -> Observable<'a, Item> {
30    self.subject.observable()
31  }
32
33  fn set_ref_count(&self) {
34    {
35      let subscription = Arc::clone(&self.subscription);
36      self.subject.set_on_unsubscribe(move |count| {
37        if count == 0 {
38          if let Some(sbsc) = &*subscription.read().unwrap() {
39            sbsc.unsubscribe();
40          }
41        }
42      });
43    }
44
45    let source = self.source.clone();
46    let subject = self.subject.clone();
47    let subscription = Arc::clone(&self.subscription);
48
49    self.subject.set_on_subscribe(move |count| {
50      if count == 1 {
51        // connect
52        let sbj_next = subject.clone();
53        let sbj_error = subject.clone();
54        let sbj_complete = subject.clone();
55
56        let mut subscription = subscription.write().unwrap();
57        if subscription.is_some() {
58          return;
59        }
60
61        *subscription = Some(source.subscribe(
62          move |x| {
63            sbj_next.next(x);
64          },
65          move |e| {
66            sbj_error.error(e);
67          },
68          move || {
69            sbj_complete.complete();
70          },
71        ));
72      }
73    });
74  }
75}
76
77impl<'a, Item> Observable<'a, Item>
78where
79  Item: Clone + Send + Sync,
80{
81  pub fn ref_count(&self) -> RefCount<'a, Item> {
82    RefCount::new(self.clone())
83  }
84}
85
86#[cfg(all(test, not(feature = "web")))]
87mod test {
88  use crate::prelude::*;
89  use crate::{print_complete, print_error, print_next_fmt};
90  use schedulers::new_thread_scheduler;
91  use std::{thread, time};
92
93  #[test]
94  fn basic() {
95    let o = observables::from_iter(0..10)
96      .tap(
97        print_next_fmt!("tap {}"),
98        print_error!(),
99        print_complete!(),
100      )
101      .ref_count();
102    let obs = o.observable();
103
104    println!("start #1");
105    let sbsc1 = obs.subscribe(
106      print_next_fmt!("#1 {}"),
107      print_error!(),
108      print_complete!(),
109    );
110
111    println!("start #2");
112    let sbsc2 = obs.subscribe(
113      print_next_fmt!("#2 {}"),
114      print_error!(),
115      print_complete!(),
116    );
117
118    println!("end #1");
119    sbsc1.unsubscribe();
120
121    println!("end #2");
122    sbsc2.unsubscribe();
123  }
124
125  #[test]
126  fn thread() {
127    let o = observables::interval(
128      time::Duration::from_millis(100),
129      new_thread_scheduler(),
130    )
131    .tap(
132      print_next_fmt!("tap {}"),
133      print_error!(),
134      print_complete!(),
135    )
136    .ref_count();
137    let obs = o.observable();
138
139    println!("start #1");
140    let sbsc1 = obs.subscribe(
141      print_next_fmt!("#1 {}"),
142      print_error!(),
143      print_complete!(),
144    );
145
146    thread::sleep(time::Duration::from_millis(500));
147
148    println!("start #2");
149    let sbsc2 = obs.subscribe(
150      print_next_fmt!("#2 {}"),
151      print_error!(),
152      print_complete!(),
153    );
154
155    thread::sleep(time::Duration::from_millis(500));
156
157    println!("end #1");
158    sbsc1.unsubscribe();
159
160    thread::sleep(time::Duration::from_millis(500));
161
162    println!("end #2");
163    sbsc2.unsubscribe();
164
165    println!("final wait start");
166    thread::sleep(time::Duration::from_millis(1000));
167    println!("final wait end");
168  }
169}