another_rxrust/operators/
replay.rs

1use crate::prelude::*;
2use std::sync::{Arc, RwLock};
3
4#[derive(Clone)]
5pub struct Replay<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  subject: subjects::ReplaySubject<'a, Item>,
10  source: Observable<'a, Item>,
11  subscription: Arc<RwLock<Option<Subscription<'a>>>>,
12}
13
14impl<'a, Item> Replay<'a, Item>
15where
16  Item: Clone + Send + Sync,
17{
18  pub fn new(source: Observable<'a, Item>) -> Replay<'a, Item> {
19    let _self = Replay {
20      subject: subjects::ReplaySubject::new(),
21      source,
22      subscription: Arc::new(RwLock::new(None)),
23    };
24    _self.set_ref_count();
25    _self
26  }
27
28  pub fn observable(&self) -> Observable<'a, Item> {
29    self.subject.observable()
30  }
31
32  fn set_ref_count(&self) {
33    {
34      let subscription = Arc::clone(&self.subscription);
35      self.subject.set_on_unsubscribe(move |count| {
36        if count == 0 {
37          if let Some(sbsc) = &*subscription.read().unwrap() {
38            sbsc.unsubscribe();
39          }
40        }
41      });
42    }
43
44    let source = self.source.clone();
45    let subject = self.subject.clone();
46    let subscription = Arc::clone(&self.subscription);
47
48    self.subject.set_on_subscribe(move |count| {
49      if count == 1 {
50        // connect
51        let sbj_next = subject.clone();
52        let sbj_error = subject.clone();
53        let sbj_complete = subject.clone();
54
55        let mut subscription = subscription.write().unwrap();
56        if subscription.is_some() {
57          return;
58        }
59
60        *subscription = Some(source.subscribe(
61          move |x| {
62            sbj_next.next(x);
63          },
64          move |e| {
65            sbj_error.error(e);
66          },
67          move || {
68            sbj_complete.complete();
69          },
70        ));
71      }
72    });
73  }
74}
75
76impl<'a, Item> Observable<'a, Item>
77where
78  Item: Clone + Send + Sync,
79{
80  pub fn replay(&self) -> Replay<'a, Item> {
81    Replay::new(self.clone())
82  }
83}
84
85#[cfg(all(test, not(feature = "web")))]
86mod test {
87  use crate::prelude::*;
88  use crate::{print_complete, print_error, print_next_fmt};
89  use schedulers::new_thread_scheduler;
90  use std::{thread, time};
91
92  #[test]
93  fn basic() {
94    let o = observables::from_iter(0..10)
95      .tap(
96        print_next_fmt!("tap {}"),
97        print_error!(),
98        print_complete!(),
99      )
100      .replay();
101    let obs = o.observable();
102
103    println!("start #1");
104    let sbsc1 = obs.subscribe(
105      print_next_fmt!("#1 {}"),
106      print_error!(),
107      print_complete!(),
108    );
109
110    println!("start #2");
111    let sbsc2 = obs.subscribe(
112      print_next_fmt!("#2 {}"),
113      print_error!(),
114      print_complete!(),
115    );
116
117    println!("end #1");
118    sbsc1.unsubscribe();
119
120    println!("end #2");
121    sbsc2.unsubscribe();
122  }
123
124  #[test]
125  fn thread() {
126    let o = observables::interval(
127      time::Duration::from_millis(100),
128      new_thread_scheduler(),
129    )
130    .tap(
131      print_next_fmt!("tap {}"),
132      print_error!(),
133      print_complete!(),
134    )
135    .replay();
136    let obs = o.observable();
137
138    println!("start #1");
139    let sbsc1 = obs.subscribe(
140      print_next_fmt!("#1 {}"),
141      print_error!(),
142      print_complete!(),
143    );
144
145    thread::sleep(time::Duration::from_millis(500));
146
147    println!("start #2");
148    let sbsc2 = obs.subscribe(
149      print_next_fmt!("#2 {}"),
150      print_error!(),
151      print_complete!(),
152    );
153
154    thread::sleep(time::Duration::from_millis(500));
155
156    println!("end #1");
157    sbsc1.unsubscribe();
158
159    thread::sleep(time::Duration::from_millis(500));
160
161    println!("end #2");
162    sbsc2.unsubscribe();
163
164    println!("final wait start");
165    thread::sleep(time::Duration::from_millis(500));
166    println!("final wait end");
167  }
168}