1pub mod cloneable;
26
27use std::sync::Arc;
28use crate::option::OptionCell;
29
30#[derive(Default)]
32pub struct Writer<T> {
33 cells: Vec<Arc<OptionCell<Arc<T>>>>,
34 val: Arc<T>,
35}
36
37impl<T> Writer<T> {
38 pub fn new(val: T) -> Writer<T> {
40 Self {
41 cells: vec![],
42 val: Arc::new(val),
43 }
44 }
45
46 pub fn share(&mut self, val: T) {
61 self.val = Arc::new(val);
62
63 self.cells.retain(|cell| {
64 let is_single_ref = Arc::strong_count(cell) != 1;
65 if is_single_ref {
66 cell.replace(self.val.clone());
67 }
68
69 is_single_ref
70 });
71 }
72
73 pub fn read(&self) -> &T {
83 &self.val
84 }
85
86 pub fn subscribe(&mut self) -> Reader<T> {
88 let cell = Arc::new(OptionCell::new(self.val.clone()));
89 self.cells.push(cell.clone());
90 Reader::new(cell)
91 }
92}
93
94pub struct Reader<T> {
96 cell: Arc<OptionCell<Arc<T>>>,
97 val: Arc<T>
98}
99
100impl<T> Reader<T> {
101 fn new(cell: Arc<OptionCell<Arc<T>>>) -> Reader<T> {
102 Self {
103 val: cell.take().unwrap(),
104 cell,
105 }
106 }
107
108 pub fn read_with_is_new(&mut self) -> (&T, bool) {
122 match self.cell.take() {
123 None => (&*self.val, false),
124 Some(val) => {
125 self.val = val;
126 (&*self.val, true)
127 }
128 }
129 }
130
131 pub fn read(&mut self) -> &T {
145 match self.cell.take() {
146 None => &self.val,
147 Some(val) => {
148 self.val = val;
149 &self.val
150 }
151 }
152 }
153}
154
155
156#[cfg(test)]
157mod test {
158 use std::thread::JoinHandle;
159 use crate::broadcast::{Writer, Reader};
160
161 #[test]
162 fn test_remove_reader() {
163 let mut d = Writer::new("Good");
164 let mut _r1 = d.subscribe();
165 let mut _r2 = d.subscribe();
166
167 assert_eq!(d.cells.len(), 2);
168 drop(_r1);
169
170 d.share("Only one");
171
172 assert_eq!(d.cells.len(), 1)
173 }
174
175 #[test]
176 fn test_write_read() {
177 let mut tx = Writer::new("Not good, but ok");
178 let mut rx1 = tx.subscribe();
179 let mut rx2 = tx.subscribe();
180
181 assert_eq!(tx.read(), &"Not good, but ok");
182
183 assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
184 assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));
185
186 tx.share("Not good");
187 assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
188 assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
189
190 tx.share("ok");
191 assert_eq!(rx1.read(), &"ok");
192 assert_eq!(rx2.read(), &"ok");
193 }
194
195 #[test]
196 fn is_work() -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
197 let mut tx = Writer::new(vec!["Ukraine".to_string(); 1000]);
198
199 let rx0 = tx.subscribe();
200 let rx1 = tx.subscribe();
201 let rx2 = tx.subscribe();
202 let rx3 = tx.subscribe();
203
204 std::thread::spawn(move || loop {
205 for i in 0usize.. {
206 match i % 6 {
207 0 => tx.share(vec!["Slovakia".to_string(); 1001]),
208 1 => tx.share(vec!["Estonia".to_string(); 1001]),
209 2 => tx.share(vec!["Czechia".to_string(); 1001]),
210 3 => tx.share(vec!["United Kingdom".to_string(); 1001]),
211 4 => tx.share(vec!["Lithuania".to_string(); 1001]),
212 5 => tx.share(vec!["Latvia".to_string(); 1001]),
213 val => println!("val: {:?}, Not good, but ok", val)
214 }
215 }
216 });
217 let count_iter = 1_000_000;
218 let h0 = create_thread_read(rx0, count_iter);
219 let h1 = create_thread_read(rx1, count_iter);
220 let h2 = create_thread_read(rx2, count_iter);
221 let h3 = create_thread_read(rx3, count_iter);
222
223 let res0 = h0.join()?;
224 let res1 = h1.join()?;
225 let res2 = h2.join()?;
226 let res3 = h3.join()?;
227
228 println!("Slovakia: {}", res0.0 + res1.0 + res2.0 + res3.0);
229 println!("Estonia: {}", res0.1 + res1.1 + res2.1 + res3.1);
230 println!("Czechia: {}", res0.2 + res1.2 + res2.2 + res3.2);
231 println!("United Kingdom: {}", res0.3 + res1.3 + res2.3 + res3.3);
232 println!("Lithuania: {}", res0.4 + res1.4 + res2.4 + res3.4);
233 println!("Latvia: {}", res0.5 + res1.5 + res2.5 + res3.5);
234 println!("Ukraine: {}", res0.6 + res1.6 + res2.6 + res3.6);
235
236 Ok(())
237 }
238
239 fn create_thread_read(
240 mut rw: Reader<Vec<String>>,
241 count_iter: usize
242 ) -> JoinHandle<(i32, i32, i32, i32, i32, i32, i32)> {
243 let mut slovakia = 0;
244 let mut estonia = 0;
245 let mut czechia = 0;
246 let mut united_kingdom = 0;
247 let mut lithuania = 0;
248 let mut latvia = 0;
249 let mut ukraine = 0;
250
251 std::thread::spawn(move || {
252 for _ in 0..count_iter {
253 match rw.read() {
254 val if val.first() == Some(&"Slovakia".to_string()) => slovakia += 1,
255 val if val.first() == Some(&"Estonia".to_string()) => estonia += 1,
256 val if val.first() == Some(&"Czechia".to_string()) => czechia += 1,
257 val if val.first() == Some(&"United Kingdom".to_string()) => united_kingdom += 1,
258 val if val.first() == Some(&"Lithuania".to_string()) => lithuania += 1,
259 val if val.first() == Some(&"Latvia".to_string()) => latvia += 1,
260 val if val.first() == Some(&"Ukraine".to_string()) => ukraine += 1,
261 val => println!("val: {:?}, Not good, but ok", val.first())
262 }
263 }
264 (slovakia, estonia, czechia, united_kingdom, lithuania, latvia, ukraine)
265 })
266 }
267}