1use std::sync::{Arc, Mutex};
27use crate::option::OptionCell;
28
29type Cells<T> = Arc<Mutex<Vec<Cell<T>>>>;
30type Cell<T> = Arc<OptionCell<Arc<T>>>;
31
32pub struct Writer<T> {
34 cells: Cells<T>,
35 val: Arc<T>,
36}
37
38impl<T> Writer<T> {
39 fn new(val: T) -> Writer<T> {
41 Self {
42 cells: Arc::new(Mutex::new(vec![])),
43 val: Arc::new(val),
44 }
45 }
46
47 pub fn share(&mut self, val: T) {
62 let mut cells = self.cells
63 .lock()
64 .unwrap();
65
66 self.val = Arc::new(val);
67
68 cells.retain(|cell| {
69 let is_single_ref = Arc::strong_count(cell) != 1;
70 if is_single_ref {
71 cell.replace(self.val.clone());
72 }
73
74 is_single_ref
75 });
76 }
77
78 pub fn read(&self) -> &T {
88 &self.val
89 }
90
91 pub fn subscribe(&mut self) -> Reader<T> {
93 let cell = Arc::new(OptionCell::new(self.val.clone()));
94
95 self.cells
96 .lock()
97 .unwrap()
98 .push(cell.clone());
99 Reader::new(self.cells.clone(), cell)
100 }
101}
102
103pub struct Reader<T> {
105 cells: Cells<T>,
106 cell: Cell<T>,
107 val: Arc<T>
108}
109
110impl<T> Clone for Reader<T> {
111 fn clone(&self) -> Self {
112 let mut cells = self.cells
113 .lock()
114 .unwrap();
115
116 let self_cell_ptr = &raw const *self.cell;
117
118 let self_cell_idx = cells
119 .iter()
120 .enumerate()
121 .find_map(|(idx, cell)| (self_cell_ptr == &raw const **cell).then_some(idx))
122 .expect("Undefined self cell in cells");
123
124 let clone_cell = if let Some(val) = cells[self_cell_idx].take() {
125 cells[self_cell_idx].replace(val.clone());
126 Arc::new(OptionCell::new(val))
127 } else {
128 Arc::new(OptionCell::new(self.val.clone()))
129 };
130 cells.push(clone_cell.clone());
131
132 Reader::new(self.cells.clone(), clone_cell)
133 }
134}
135
136impl<T> Reader<T> {
137 fn new(cells: Cells<T>, cell: Cell<T>) -> Reader<T> {
138 Self {
139 val: cell.take().unwrap(),
140 cells,
141 cell,
142 }
143 }
144
145 pub fn read_with_is_new(&mut self) -> (&T, bool) {
158 match self.cell.take() {
159 None => (&*self.val, false),
160 Some(val) => {
161 self.val = val;
162 (&*self.val, true)
163 }
164 }
165 }
166
167 pub fn read(&mut self) -> &T {
180 match self.cell.take() {
181 None => &self.val,
182 Some(val) => {
183 self.val = val;
184 &self.val
185 }
186 }
187 }
188}
189
190pub fn new<T>(val: T) -> (Writer<T>, Reader<T>) {
202 let mut tx = Writer::new(val);
203 let rx = tx.subscribe();
204 (tx, rx)
205}
206
207pub fn default<T>() -> (Writer<T>, Reader<T>)
208where
209 T: Default
210{
211 let mut tx = Writer::new(T::default());
212 let rx = tx.subscribe();
213 (tx, rx)
214}
215
216
217#[cfg(test)]
218mod test {
219 use std::thread::JoinHandle;
220 use crate::broadcast;
221 use crate::broadcast::cloneable::{Writer, Reader};
222
223 #[test]
224 fn test_remove_reader() {
225 let (mut tx, rx) = broadcast::cloneable::new("Good");
226
227 let _rx1 = tx.subscribe();
228 let _rx2 = rx.clone();
229
230 assert_eq!(tx.cells.lock().unwrap().len(), 3);
231 drop(_rx1);
232
233 tx.share("Only one");
234
235 assert_eq!(tx.cells.lock().unwrap().len(), 2);
236 }
237
238 #[test]
239 fn test_clone() {
240 let (mut tx, mut rx0) = broadcast::cloneable::new("Good");
241 let mut rx1 = rx0.clone();
242
243 assert_eq!(rx1.read(), &"Good");
244
245 let mut rx2 = rx0.clone();
246 let mut rx3 = rx1.clone();
247
248 tx.share("Ok");
249 let mut rx4 = rx3.clone();
250 assert_eq!(rx0.read(), &"Ok");
251 assert_eq!(rx1.read(), &"Ok");
252 assert_eq!(rx2.read(), &"Ok");
253 assert_eq!(rx3.read(), &"Ok");
254 assert_eq!(rx4.read(), &"Ok");
255 }
256
257 #[test]
258 fn test_write_read() {
259 let mut tx = Writer::new("Not good, but ok");
260 let mut rx1 = tx.subscribe();
261 let mut rx2 = tx.subscribe();
262
263 assert_eq!(tx.read(), &"Not good, but ok");
264
265 assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
266 assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));
267
268 tx.share("Not good");
269 assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
270 assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
271
272 tx.share("ok");
273 let mut rx3 = rx2.clone();
274 assert_eq!(rx1.read(), &"ok");
275 assert_eq!(rx2.read(), &"ok");
276 assert_eq!(rx3.read(), &"ok");
277 }
278
279 #[test]
280 fn is_work() -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
281 let (mut tx, rx0) = broadcast::cloneable::new(vec!["Ukraine".to_string(); 1000]);
282
283 let rx1 = rx0.clone();
284 let rx2 = tx.subscribe();
285 let rx3 = tx.subscribe();
286
287 std::thread::spawn(move || loop {
288 for i in 0usize.. {
289 match i % 6 {
290 0 => tx.share(vec!["Slovakia".to_string(); 1001]),
291 1 => tx.share(vec!["Estonia".to_string(); 1001]),
292 2 => tx.share(vec!["Czechia".to_string(); 1001]),
293 3 => tx.share(vec!["United Kingdom".to_string(); 1001]),
294 4 => tx.share(vec!["Lithuania".to_string(); 1001]),
295 5 => tx.share(vec!["Latvia".to_string(); 1001]),
296 val => println!("val: {:?}, Not good, but ok", val)
297 }
298 }
299 });
300 let count_iter = 1_000_000;
301 let h0 = create_thread_read(rx0, count_iter);
302 let h1 = create_thread_read(rx1, count_iter);
303 let h2 = create_thread_read(rx2, count_iter);
304 let h3 = create_thread_read(rx3, count_iter);
305
306 let res0 = h0.join()?;
307 let res1 = h1.join()?;
308 let res2 = h2.join()?;
309 let res3 = h3.join()?;
310
311 println!("Slovakia: {}", res0.0 + res1.0 + res2.0 + res3.0);
312 println!("Estonia: {}", res0.1 + res1.1 + res2.1 + res3.1);
313 println!("Czechia: {}", res0.2 + res1.2 + res2.2 + res3.2);
314 println!("United Kingdom: {}", res0.3 + res1.3 + res2.3 + res3.3);
315 println!("Lithuania: {}", res0.4 + res1.4 + res2.4 + res3.4);
316 println!("Latvia: {}", res0.5 + res1.5 + res2.5 + res3.5);
317 println!("Ukraine: {}", res0.6 + res1.6 + res2.6 + res3.6);
318
319 Ok(())
320 }
321
322 fn create_thread_read(
323 mut rw: Reader<Vec<String>>,
324 count_iter: usize
325 ) -> JoinHandle<(i32, i32, i32, i32, i32, i32, i32)> {
326 let mut slovakia = 0;
327 let mut estonia = 0;
328 let mut czechia = 0;
329 let mut united_kingdom = 0;
330 let mut lithuania = 0;
331 let mut latvia = 0;
332 let mut ukraine = 0;
333
334 std::thread::spawn(move || {
335 for _ in 0..count_iter {
336 match rw.read() {
337 val if val.first() == Some(&"Slovakia".to_string()) => slovakia += 1,
338 val if val.first() == Some(&"Estonia".to_string()) => estonia += 1,
339 val if val.first() == Some(&"Czechia".to_string()) => czechia += 1,
340 val if val.first() == Some(&"United Kingdom".to_string()) => united_kingdom += 1,
341 val if val.first() == Some(&"Lithuania".to_string()) => lithuania += 1,
342 val if val.first() == Some(&"Latvia".to_string()) => latvia += 1,
343 val if val.first() == Some(&"Ukraine".to_string()) => ukraine += 1,
344 val => println!("val: {:?}, Not good, but ok", val.first())
345 }
346 }
347 (slovakia, estonia, czechia, united_kingdom, lithuania, latvia, ukraine)
348 })
349 }
350}