another_rxrust/operators/
replay.rs1use crate::prelude::*;
2use std::sync::{Arc, RwLock};
3
4#[derive(Clone)]
5pub struct Replay<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 subject: subjects::ReplaySubject<'a, Item>,
10 source: Observable<'a, Item>,
11 subscription: Arc<RwLock<Option<Subscription<'a>>>>,
12}
13
14impl<'a, Item> Replay<'a, Item>
15where
16 Item: Clone + Send + Sync,
17{
18 pub fn new(source: Observable<'a, Item>) -> Replay<'a, Item> {
19 let _self = Replay {
20 subject: subjects::ReplaySubject::new(),
21 source,
22 subscription: Arc::new(RwLock::new(None)),
23 };
24 _self.set_ref_count();
25 _self
26 }
27
28 pub fn observable(&self) -> Observable<'a, Item> {
29 self.subject.observable()
30 }
31
32 fn set_ref_count(&self) {
33 {
34 let subscription = Arc::clone(&self.subscription);
35 self.subject.set_on_unsubscribe(move |count| {
36 if count == 0 {
37 if let Some(sbsc) = &*subscription.read().unwrap() {
38 sbsc.unsubscribe();
39 }
40 }
41 });
42 }
43
44 let source = self.source.clone();
45 let subject = self.subject.clone();
46 let subscription = Arc::clone(&self.subscription);
47
48 self.subject.set_on_subscribe(move |count| {
49 if count == 1 {
50 let sbj_next = subject.clone();
52 let sbj_error = subject.clone();
53 let sbj_complete = subject.clone();
54
55 let mut subscription = subscription.write().unwrap();
56 if subscription.is_some() {
57 return;
58 }
59
60 *subscription = Some(source.subscribe(
61 move |x| {
62 sbj_next.next(x);
63 },
64 move |e| {
65 sbj_error.error(e);
66 },
67 move || {
68 sbj_complete.complete();
69 },
70 ));
71 }
72 });
73 }
74}
75
76impl<'a, Item> Observable<'a, Item>
77where
78 Item: Clone + Send + Sync,
79{
80 pub fn replay(&self) -> Replay<'a, Item> {
81 Replay::new(self.clone())
82 }
83}
84
85#[cfg(all(test, not(feature = "web")))]
86mod test {
87 use crate::prelude::*;
88 use crate::{print_complete, print_error, print_next_fmt};
89 use schedulers::new_thread_scheduler;
90 use std::{thread, time};
91
92 #[test]
93 fn basic() {
94 let o = observables::from_iter(0..10)
95 .tap(
96 print_next_fmt!("tap {}"),
97 print_error!(),
98 print_complete!(),
99 )
100 .replay();
101 let obs = o.observable();
102
103 println!("start #1");
104 let sbsc1 = obs.subscribe(
105 print_next_fmt!("#1 {}"),
106 print_error!(),
107 print_complete!(),
108 );
109
110 println!("start #2");
111 let sbsc2 = obs.subscribe(
112 print_next_fmt!("#2 {}"),
113 print_error!(),
114 print_complete!(),
115 );
116
117 println!("end #1");
118 sbsc1.unsubscribe();
119
120 println!("end #2");
121 sbsc2.unsubscribe();
122 }
123
124 #[test]
125 fn thread() {
126 let o = observables::interval(
127 time::Duration::from_millis(100),
128 new_thread_scheduler(),
129 )
130 .tap(
131 print_next_fmt!("tap {}"),
132 print_error!(),
133 print_complete!(),
134 )
135 .replay();
136 let obs = o.observable();
137
138 println!("start #1");
139 let sbsc1 = obs.subscribe(
140 print_next_fmt!("#1 {}"),
141 print_error!(),
142 print_complete!(),
143 );
144
145 thread::sleep(time::Duration::from_millis(500));
146
147 println!("start #2");
148 let sbsc2 = obs.subscribe(
149 print_next_fmt!("#2 {}"),
150 print_error!(),
151 print_complete!(),
152 );
153
154 thread::sleep(time::Duration::from_millis(500));
155
156 println!("end #1");
157 sbsc1.unsubscribe();
158
159 thread::sleep(time::Duration::from_millis(500));
160
161 println!("end #2");
162 sbsc2.unsubscribe();
163
164 println!("final wait start");
165 thread::sleep(time::Duration::from_millis(500));
166 println!("final wait end");
167 }
168}