rw_cell/broadcast/
mod.rs

1//! Implements the pattern "single producer multiple consumer"
2//!
3//! # Example
4//!
5//!```
6//! let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
7//!
8//! assert_eq!(tx.read(), &"Not good, but ok");
9//!
10//! let mut rx1 = tx.subscribe();
11//! let mut rx2 = tx.subscribe();
12//!
13//! assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
14//! assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));
15//!
16//! tx.share("Not good");
17//! assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
18//! assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
19//!
20//! tx.share("ok");
21//! assert_eq!(rx1.read(), &"ok");
22//! assert_eq!(rx2.read(), &"ok");
23//! ```
24
25pub mod cloneable;
26
27use std::sync::Arc;
28use crate::option::OptionCell;
29
30/// Distribute value between all readers
31#[derive(Default)]
32pub struct Writer<T> {
33    cells: Vec<Arc<OptionCell<Arc<T>>>>,
34    val: Arc<T>,
35}
36
37impl<T> Writer<T> {
38    /// Create new Writer
39    pub fn new(val: T) -> Writer<T> {
40        Self {
41            cells: vec![],
42            val: Arc::new(val),
43        }
44    }
45
46    /// Write val and distribute it between all readers
47    ///
48    /// # Example
49    ///
50    /// ```
51    /// let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
52    /// let mut rx1 = tx.subscribe();
53    /// let mut rx2 = tx.subscribe();
54    ///
55    /// tx.share("Not good");
56    ///
57    /// assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
58    /// assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
59    /// ```
60    pub fn share(&mut self, val: T) {
61        self.val = Arc::new(val);
62
63        self.cells.retain(|cell| {
64            let is_single_ref = Arc::strong_count(cell) != 1;
65            if is_single_ref {
66                cell.replace(self.val.clone());
67            }
68
69            is_single_ref
70        });
71    }
72
73    /// Return a reference to the value from the cell
74    ///
75    /// # Example
76    ///
77    /// ```
78    /// let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
79    ///
80    /// assert_eq!(tx.read(), &"Not good, but ok");
81    /// ```
82    pub fn read(&self) -> &T {
83        &self.val
84    }
85
86    /// Return new [`Reader`] for read data from cell
87    pub fn subscribe(&mut self) -> Reader<T> {
88        let cell = Arc::new(OptionCell::new(self.val.clone()));
89        self.cells.push(cell.clone());
90        Reader::new(cell)
91    }
92}
93
94/// Struct for read data from cell with non-copy and non-lock
95pub struct Reader<T> {
96    cell: Arc<OptionCell<Arc<T>>>,
97    val: Arc<T>
98}
99
100impl<T> Reader<T> {
101    fn new(cell: Arc<OptionCell<Arc<T>>>) -> Reader<T> {
102        Self {
103            val: cell.take().unwrap(),
104            cell,
105        }
106    }
107
108    /// Returns a tuple of value references and a boolean value, whether new or not
109    ///
110    /// # Examples
111    ///
112    /// ```
113    /// let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
114    /// let mut rx = tx.subscribe();
115    ///
116    /// assert_eq!(rx.read_with_is_new(), (&"Not good, but ok", false));
117    ///
118    /// tx.share("Not good");
119    ///
120    /// assert_eq!(rx.read_with_is_new(), (&"Not good", true));
121    pub fn read_with_is_new(&mut self) -> (&T, bool) {
122        match self.cell.take() {
123            None => (&*self.val, false),
124            Some(val) => {
125                self.val = val;
126                (&*self.val, true)
127            }
128        }
129    }
130
131    /// Return a reference to the value from the cell
132    ///
133    /// # Examples
134    ///
135    /// ```
136    /// let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
137    /// let mut rx = tx.subscribe();
138    ///
139    /// assert_eq!(rx.read(), &"Not good, but ok");
140    ///
141    /// tx.share("ok");
142    ///
143    /// assert_eq!(rx.read(), &"ok");
144    pub fn read(&mut self) -> &T {
145        match self.cell.take() {
146            None => &self.val,
147            Some(val) => {
148                self.val = val;
149                &self.val
150            }
151        }
152    }
153}
154
155
156#[cfg(test)]
157mod test {
158    use std::thread::JoinHandle;
159    use crate::broadcast::{Writer, Reader};
160
161    #[test]
162    fn test_remove_reader() {
163        let mut d = Writer::new("Good");
164        let mut _r1 = d.subscribe();
165        let mut _r2 = d.subscribe();
166
167        assert_eq!(d.cells.len(), 2);
168        drop(_r1);
169
170        d.share("Only one");
171
172        assert_eq!(d.cells.len(), 1)
173    }
174
175    #[test]
176    fn test_write_read() {
177        let mut tx = Writer::new("Not good, but ok");
178        let mut rx1 = tx.subscribe();
179        let mut rx2 = tx.subscribe();
180
181        assert_eq!(tx.read(), &"Not good, but ok");
182
183        assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
184        assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));
185
186        tx.share("Not good");
187        assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
188        assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
189
190        tx.share("ok");
191        assert_eq!(rx1.read(), &"ok");
192        assert_eq!(rx2.read(), &"ok");
193    }
194
195    #[test]
196    fn is_work() -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
197        let mut tx = Writer::new(vec!["Ukraine".to_string(); 1000]);
198
199        let rx0 = tx.subscribe();
200        let rx1 = tx.subscribe();
201        let rx2 = tx.subscribe();
202        let rx3 = tx.subscribe();
203
204        std::thread::spawn(move || loop {
205            for i in 0usize.. {
206                match i % 6 {
207                    0 => tx.share(vec!["Slovakia".to_string(); 1001]),
208                    1 => tx.share(vec!["Estonia".to_string(); 1001]),
209                    2 => tx.share(vec!["Czechia".to_string(); 1001]),
210                    3 => tx.share(vec!["United Kingdom".to_string(); 1001]),
211                    4 => tx.share(vec!["Lithuania".to_string(); 1001]),
212                    5 => tx.share(vec!["Latvia".to_string(); 1001]),
213                    val => println!("val: {:?}, Not good, but ok", val)
214                }
215            }
216        });
217        let count_iter = 1_000_000;
218        let h0 = create_thread_read(rx0, count_iter);
219        let h1 = create_thread_read(rx1, count_iter);
220        let h2 = create_thread_read(rx2, count_iter);
221        let h3 = create_thread_read(rx3, count_iter);
222
223        let res0 = h0.join()?;
224        let res1 = h1.join()?;
225        let res2 = h2.join()?;
226        let res3 = h3.join()?;
227
228        println!("Slovakia:         {}", res0.0 + res1.0 + res2.0 + res3.0);
229        println!("Estonia:          {}", res0.1 + res1.1 + res2.1 + res3.1);
230        println!("Czechia:          {}", res0.2 + res1.2 + res2.2 + res3.2);
231        println!("United Kingdom:   {}", res0.3 + res1.3 + res2.3 + res3.3);
232        println!("Lithuania:        {}", res0.4 + res1.4 + res2.4 + res3.4);
233        println!("Latvia:           {}", res0.5 + res1.5 + res2.5 + res3.5);
234        println!("Ukraine:          {}", res0.6 + res1.6 + res2.6 + res3.6);
235
236        Ok(())
237    }
238
239    fn create_thread_read(
240        mut rw: Reader<Vec<String>>,
241        count_iter: usize
242    ) -> JoinHandle<(i32, i32, i32, i32, i32, i32, i32)> {
243        let mut slovakia = 0;
244        let mut estonia = 0;
245        let mut czechia = 0;
246        let mut united_kingdom = 0;
247        let mut lithuania = 0;
248        let mut latvia = 0;
249        let mut ukraine = 0;
250
251        std::thread::spawn(move || {
252            for _ in 0..count_iter {
253                match rw.read() {
254                    val if val.first() == Some(&"Slovakia".to_string())         => slovakia += 1,
255                    val if val.first() == Some(&"Estonia".to_string())          => estonia += 1,
256                    val if val.first() == Some(&"Czechia".to_string())          => czechia += 1,
257                    val if val.first() == Some(&"United Kingdom".to_string())   => united_kingdom += 1,
258                    val if val.first() == Some(&"Lithuania".to_string())        => lithuania += 1,
259                    val if val.first() == Some(&"Latvia".to_string())           => latvia += 1,
260                    val if val.first() == Some(&"Ukraine".to_string())          => ukraine += 1,
261                    val => println!("val: {:?}, Not good, but ok", val.first())
262                }
263            }
264            (slovakia, estonia, czechia, united_kingdom, lithuania, latvia, ukraine)
265        })
266    }
267}