rw_cell/broadcast/
cloneable.rs

1//! This approach uses [`Mutex::lock`] when calling the [`Writer::share`],
2//! [`Writer::subscribe`] and [`Reader::clone`] methods
3//!
4//! # Example
5//!
6//!```
7//! let (mut tx, rx) = rw_cell::broadcast::cloneable::new("Not good, but ok");
8//!
9//! assert_eq!(tx.read(), &"Not good, but ok");
10//!
11//! let mut rx1 = tx.subscribe();
12//! let mut rx2 = rx.clone();
13//!
14//! assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
15//! assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));
16//!
17//! tx.share("Not good");
18//! assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
19//! assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
20//!
21//! tx.share("ok");
22//! assert_eq!(rx1.read(), &"ok");
23//! assert_eq!(rx2.read(), &"ok");
24//! ```
25
26use std::sync::{Arc, Mutex};
27use crate::option::OptionCell;
28
29type Cells<T> = Arc<Mutex<Vec<Cell<T>>>>;
30type Cell<T> = Arc<OptionCell<Arc<T>>>;
31
32/// Distribute value between all readers
33pub struct Writer<T> {
34    cells: Cells<T>,
35    val: Arc<T>,
36}
37
38impl<T> Writer<T> {
39    /// Create new distributor
40    fn new(val: T) -> Writer<T> {
41        Self {
42            cells: Arc::new(Mutex::new(vec![])),
43            val: Arc::new(val),
44        }
45    }
46
47    /// Write val and distribute it between all readers
48    ///
49    /// # Example
50    ///
51    /// ```
52    /// let (mut tx, rx) = rw_cell::broadcast::cloneable::new("Not good, but ok");
53    /// let mut rx1 = tx.subscribe();
54    /// let mut rx2 = rx.clone();
55    ///
56    /// tx.share("Not good");
57    ///
58    /// assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
59    /// assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
60    /// ```
61    pub fn share(&mut self, val: T) {
62        let mut cells = self.cells
63            .lock()
64            .unwrap();
65
66        self.val = Arc::new(val);
67
68        cells.retain(|cell| {
69            let is_single_ref = Arc::strong_count(cell) != 1;
70            if is_single_ref {
71                cell.replace(self.val.clone());
72            }
73
74            is_single_ref
75        });
76    }
77
78    /// Return a reference to the value from the cell
79    ///
80    /// # Example
81    ///
82    /// ```
83    /// let (mut tx, rx) = rw_cell::broadcast::cloneable::new("Not good, but ok");
84    ///
85    /// assert_eq!(tx.read(), &"Not good, but ok");
86    /// ```
87    pub fn read(&self) -> &T {
88        &self.val
89    }
90
91    /// Return new [`Reader`] for read data from cell
92    pub fn subscribe(&mut self) -> Reader<T>  {
93        let cell = Arc::new(OptionCell::new(self.val.clone()));
94
95        self.cells
96            .lock()
97            .unwrap()
98            .push(cell.clone());
99        Reader::new(self.cells.clone(), cell)
100    }
101}
102
103/// Struct for read data from cell with non-copy and non-lock
104pub struct Reader<T> {
105    cells: Cells<T>,
106    cell: Cell<T>,
107    val: Arc<T>
108}
109
110impl<T> Clone for Reader<T> {
111    fn clone(&self) -> Self {
112        let mut cells = self.cells
113            .lock()
114            .unwrap();
115
116        let self_cell_ptr = &raw const *self.cell;
117
118        let self_cell_idx = cells
119            .iter()
120            .enumerate()
121            .find_map(|(idx, cell)| (self_cell_ptr == &raw const **cell).then_some(idx))
122            .expect("Undefined self cell in cells");
123
124        let clone_cell = if let Some(val) = cells[self_cell_idx].take() {
125            cells[self_cell_idx].replace(val.clone());
126            Arc::new(OptionCell::new(val))
127        } else {
128            Arc::new(OptionCell::new(self.val.clone()))
129        };
130        cells.push(clone_cell.clone());
131
132        Reader::new(self.cells.clone(), clone_cell)
133    }
134}
135
136impl<T> Reader<T> {
137    fn new(cells: Cells<T>, cell: Cell<T>) -> Reader<T> {
138        Self {
139            val: cell.take().unwrap(),
140            cells,
141            cell,
142        }
143    }
144
145    /// Returns a tuple of value references and a boolean value, whether new or not
146    ///
147    /// # Examples
148    ///
149    /// ```
150    /// let (mut tx, mut rx) = rw_cell::broadcast::cloneable::new("Not good, but ok");
151    ///
152    /// assert_eq!(rx.read_with_is_new(), (&"Not good, but ok", false));
153    ///
154    /// tx.share("Not good");
155    ///
156    /// assert_eq!(rx.read_with_is_new(), (&"Not good", true));
157    pub fn read_with_is_new(&mut self) -> (&T, bool) {
158        match self.cell.take() {
159            None => (&*self.val, false),
160            Some(val) => {
161                self.val = val;
162                (&*self.val, true)
163            }
164        }
165    }
166
167    /// Return a reference to the value from the cell
168    ///
169    /// # Examples
170    ///
171    /// ```
172    /// let (mut tx, mut rx) = rw_cell::broadcast::cloneable::new("Not good, but ok");
173    ///
174    /// assert_eq!(rx.read(), &"Not good, but ok");
175    ///
176    /// tx.share("ok");
177    ///
178    /// assert_eq!(rx.read(), &"ok");
179    pub fn read(&mut self) -> &T {
180        match self.cell.take() {
181            None => &self.val,
182            Some(val) => {
183                self.val = val;
184                &self.val
185            }
186        }
187    }
188}
189
190/// Create new cell with [`Writer`] and [`Reader`]
191///
192/// # Examples
193///
194/// ```
195/// let (mut tx, mut rx) = rw_cell::broadcast::cloneable::new("Not good");
196/// assert_eq!(rx.read(), &"Not good");
197///
198/// tx.share("But ok");
199/// assert_eq!(rx.read_with_is_new(), (&"But ok", true));
200/// ```
201pub fn new<T>(val: T) -> (Writer<T>, Reader<T>) {
202    let mut tx = Writer::new(val);
203    let rx = tx.subscribe();
204    (tx, rx)
205}
206
207pub fn default<T>() -> (Writer<T>, Reader<T>)
208where
209    T: Default
210{
211    let mut tx = Writer::new(T::default());
212    let rx = tx.subscribe();
213    (tx, rx)
214}
215
216
217#[cfg(test)]
218mod test {
219    use std::thread::JoinHandle;
220    use crate::broadcast;
221    use crate::broadcast::cloneable::{Writer, Reader};
222
223    #[test]
224    fn test_remove_reader() {
225        let (mut tx, rx) = broadcast::cloneable::new("Good");
226
227        let _rx1 = tx.subscribe();
228        let _rx2 = rx.clone();
229
230        assert_eq!(tx.cells.lock().unwrap().len(), 3);
231        drop(_rx1);
232
233        tx.share("Only one");
234
235        assert_eq!(tx.cells.lock().unwrap().len(), 2);
236    }
237
238    #[test]
239    fn test_clone() {
240        let (mut tx, mut rx0) = broadcast::cloneable::new("Good");
241        let mut rx1 = rx0.clone();
242
243        assert_eq!(rx1.read(), &"Good");
244
245        let mut rx2 = rx0.clone();
246        let mut rx3 = rx1.clone();
247
248        tx.share("Ok");
249        let mut rx4 = rx3.clone();
250        assert_eq!(rx0.read(), &"Ok");
251        assert_eq!(rx1.read(), &"Ok");
252        assert_eq!(rx2.read(), &"Ok");
253        assert_eq!(rx3.read(), &"Ok");
254        assert_eq!(rx4.read(), &"Ok");
255    }
256
257    #[test]
258    fn test_write_read() {
259        let mut tx = Writer::new("Not good, but ok");
260        let mut rx1 = tx.subscribe();
261        let mut rx2 = tx.subscribe();
262
263        assert_eq!(tx.read(), &"Not good, but ok");
264
265        assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
266        assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));
267
268        tx.share("Not good");
269        assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
270        assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
271
272        tx.share("ok");
273        let mut rx3 = rx2.clone();
274        assert_eq!(rx1.read(), &"ok");
275        assert_eq!(rx2.read(), &"ok");
276        assert_eq!(rx3.read(), &"ok");
277    }
278
279    #[test]
280    fn is_work() -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
281        let (mut tx, rx0) = broadcast::cloneable::new(vec!["Ukraine".to_string(); 1000]);
282
283        let rx1 = rx0.clone();
284        let rx2 = tx.subscribe();
285        let rx3 = tx.subscribe();
286
287        std::thread::spawn(move || loop {
288            for i in 0usize.. {
289                match i % 6 {
290                    0 => tx.share(vec!["Slovakia".to_string(); 1001]),
291                    1 => tx.share(vec!["Estonia".to_string(); 1001]),
292                    2 => tx.share(vec!["Czechia".to_string(); 1001]),
293                    3 => tx.share(vec!["United Kingdom".to_string(); 1001]),
294                    4 => tx.share(vec!["Lithuania".to_string(); 1001]),
295                    5 => tx.share(vec!["Latvia".to_string(); 1001]),
296                    val => println!("val: {:?}, Not good, but ok", val)
297                }
298            }
299        });
300        let count_iter = 1_000_000;
301        let h0 = create_thread_read(rx0, count_iter);
302        let h1 = create_thread_read(rx1, count_iter);
303        let h2 = create_thread_read(rx2, count_iter);
304        let h3 = create_thread_read(rx3, count_iter);
305
306        let res0 = h0.join()?;
307        let res1 = h1.join()?;
308        let res2 = h2.join()?;
309        let res3 = h3.join()?;
310
311        println!("Slovakia:         {}", res0.0 + res1.0 + res2.0 + res3.0);
312        println!("Estonia:          {}", res0.1 + res1.1 + res2.1 + res3.1);
313        println!("Czechia:          {}", res0.2 + res1.2 + res2.2 + res3.2);
314        println!("United Kingdom:   {}", res0.3 + res1.3 + res2.3 + res3.3);
315        println!("Lithuania:        {}", res0.4 + res1.4 + res2.4 + res3.4);
316        println!("Latvia:           {}", res0.5 + res1.5 + res2.5 + res3.5);
317        println!("Ukraine:          {}", res0.6 + res1.6 + res2.6 + res3.6);
318
319        Ok(())
320    }
321
322    fn create_thread_read(
323        mut rw: Reader<Vec<String>>,
324        count_iter: usize
325    ) -> JoinHandle<(i32, i32, i32, i32, i32, i32, i32)> {
326        let mut slovakia = 0;
327        let mut estonia = 0;
328        let mut czechia = 0;
329        let mut united_kingdom = 0;
330        let mut lithuania = 0;
331        let mut latvia = 0;
332        let mut ukraine = 0;
333
334        std::thread::spawn(move || {
335            for _ in 0..count_iter {
336                match rw.read() {
337                    val if val.first() == Some(&"Slovakia".to_string())         => slovakia += 1,
338                    val if val.first() == Some(&"Estonia".to_string())          => estonia += 1,
339                    val if val.first() == Some(&"Czechia".to_string())          => czechia += 1,
340                    val if val.first() == Some(&"United Kingdom".to_string())   => united_kingdom += 1,
341                    val if val.first() == Some(&"Lithuania".to_string())        => lithuania += 1,
342                    val if val.first() == Some(&"Latvia".to_string())           => latvia += 1,
343                    val if val.first() == Some(&"Ukraine".to_string())          => ukraine += 1,
344                    val => println!("val: {:?}, Not good, but ok", val.first())
345                }
346            }
347            (slovakia, estonia, czechia, united_kingdom, lithuania, latvia, ukraine)
348        })
349    }
350}