rw_cell/
mpsc.rs

1//!Implements the pattern "multiple producer single consumer"
2//!
3//! # Example
4//!
5//!```
6//!
7//! let (w0, mut r) = rw_cell::mpsc::new("Not good, but ok");
8//! let w1 = r.writer();
9//! let w2 = w0.clone();
10//!
11//! assert_eq!(r.read_with_is_new(), (&"Not good, but ok", false));
12//!
13//! w0.write("Not good");
14//! assert_eq!(r.read_with_is_new(), (&"Not good", true));
15//!
16//! w1.write("ok");
17//! assert_eq!(r.read(), &"ok");
18//!
19//! w2.write("But!!!");
20//! assert_eq!(r.read(), &"But!!!");
21//! ```
22
23use std::sync::Arc;
24use crate::option::OptionCell;
25
26/// Struct for write data in cell with non-copy and non-lock
27#[derive(Clone)]
28pub struct Writer<T> {
29    cell: Arc<OptionCell<T>>
30}
31
32impl<T> Writer<T> {
33    fn new(cell: Arc<OptionCell<T>>) -> Self {
34        Self {
35            cell,
36        }
37    }
38
39    /// Write value in cell
40    ///
41    /// # Examples
42    ///
43    /// ```
44    /// let (w, mut r) = rw_cell::mpsc::new("Not good");
45    /// w.write("Good");
46    ///
47    /// assert_eq!(r.read(), &"Good");
48    /// ```
49    pub fn write(&self, val: T) {
50        self.cell.replace(val);
51    }
52}
53
54/// Struct for read data from cell with non-copy and non-lock
55pub struct Reader<T> {
56    cell: Arc<OptionCell<T>>,
57    val: T
58}
59
60impl<T> Reader<T> {
61    fn new(cell: Arc<OptionCell<T>>) -> Self {
62
63        Self {
64            val: cell.take().unwrap(),
65            cell,
66        }
67    }
68
69    /// Return [`Writer`] for write data in cell
70    pub fn writer(&self) -> Writer<T> {
71        Writer::new(self.cell.clone())
72    }
73
74    /// Returns a tuple of value references and a boolean value, whether new or not
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// let (w, mut r) = rw_cell::mpsc::new("Not good");
80    /// assert_eq!(r.read_with_is_new(), (&"Not good", false));
81    ///
82    /// w.write("But ok");
83    /// assert_eq!(r.read_with_is_new(), (&"But ok", true));
84    /// ```
85    pub fn read_with_is_new(&mut self) -> (&T, bool) {
86        match self.cell.take() {
87            None => (&self.val, false),
88            Some(val) => {
89                self.val = val;
90                (&self.val, true)
91            }
92        }
93    }
94
95    /// Return a reference to the value from the cell
96    pub fn read(&mut self) -> &T {
97        match self.cell.take() {
98            None => &self.val,
99            Some(val) => {
100                self.val = val;
101                &self.val
102            }
103        }
104    }
105}
106
107/// Create new cell with [`Writer`] and [`Reader`]
108///
109/// # Examples
110///
111/// ```
112/// let (w, mut r) = rw_cell::mpsc::new("Not good");
113/// assert_eq!(r.read(), &"Not good");
114///
115/// w.write("But ok");
116/// assert_eq!(r.read_with_is_new(), (&"But ok", true));
117/// ```
118pub fn new<T>(val: T) -> (Writer<T>, Reader<T>) {
119    let cell = Arc::new(OptionCell::new(val));
120    (Writer::new(cell.clone()), Reader::new(cell))
121}
122
123pub fn default<T>() -> (Writer<T>, Reader<T>)
124    where
125        T: Default
126{
127    let cell = Arc::new(OptionCell::new(T::default()));
128    (Writer::new(cell.clone()), Reader::new(cell))
129}
130
131
132#[cfg(test)]
133mod test {
134    use crate::mpsc;
135
136    #[test]
137    fn test_read() {
138        let (w, mut r) = mpsc::new(vec!["fffff"; 1000]);
139        assert_eq!(r.read(), &vec!["fffff"; 1000]);
140        w.write(vec!["Not good, but ok"]);
141        assert_eq!(r.read(), &vec!["Not good, but ok"]);
142    }
143
144    #[test]
145    fn test_read_with_is_new() {
146        let (w, mut r) = mpsc::new(vec!["fffff"; 1000]);
147        assert_eq!(r.read_with_is_new(), (&vec!["fffff"; 1000], false));
148        w.write(vec!["Not good"]);
149        assert_eq!(r.read_with_is_new(), (&vec!["Not good"], true));
150    }
151
152    #[test]
153    fn is_work() {
154        let (w0, mut r) = mpsc::new(vec!["Ukraine"; 1000]);
155
156        let w1 = r.writer();
157        let w2 = r.writer();
158        let w3 = r.writer();
159        let w4 = r.writer();
160        let w5 = r.writer();
161
162        std::thread::spawn(move || loop {
163            w0.write(vec!["Slovakia"; 1001])
164        });
165
166        std::thread::spawn(move || loop {
167            w1.write(vec!["Estonia"; 1002])
168        });
169
170        std::thread::spawn(move || loop {
171            w2.write(vec!["Czechia"; 1003])
172        });
173
174        std::thread::spawn(move || loop {
175            w3.write(vec!["United Kingdom"; 1004])
176        });
177
178        std::thread::spawn(move || loop {
179            w4.write(vec!["Lithuania"; 1004])
180        });
181
182        std::thread::spawn(move || loop {
183            w5.write(vec!["Latvia"; 1004])
184        });
185
186        let mut slovakia        = 0;
187        let mut estonia         = 0;
188        let mut czechia         = 0;
189        let mut united_kingdom  = 0;
190        let mut lithuania       = 0;
191        let mut latvia          = 0;
192        let mut ukraine         = 0;
193
194        for _ in 0..10000000usize {
195            match r.read() {
196                val if val.first() == Some(&"Slovakia")         => slovakia += 1,
197                val if val.first() == Some(&"Estonia")          => estonia += 1,
198                val if val.first() == Some(&"Czechia")          => czechia += 1,
199                val if val.first() == Some(&"United Kingdom")   => united_kingdom += 1,
200                val if val.first() == Some(&"Lithuania")        => lithuania += 1,
201                val if val.first() == Some(&"Latvia")           => latvia += 1,
202                val if val.first() == Some(&"Ukraine")          => ukraine += 1,
203                val => println!("val: {:?}, Not good, but ok", val.first())
204            }
205        }
206
207        println!("count Slovakia:       {}", slovakia);
208        println!("count Estonia:        {}", estonia);
209        println!("count Czechia:        {}", czechia);
210        println!("count United Kingdom: {}", united_kingdom);
211        println!("count Lithuania:      {}", lithuania);
212        println!("count Latvia:         {}", latvia);
213        println!("count Ukraine:        {}", ukraine);
214    }
215
216    // #[test]
217    // fn test() {
218    //     let read_count = 1_000_000usize;
219    //
220    //     let rw_cell_avg_time = test_rw_cell(read_count);
221    //     let rw_lock_avg_time = test_rwlock(read_count);
222    //
223    //     println!("rw_cell_avg_time: {rw_cell_avg_time}");
224    //     println!("rw_lock_avg_time: {rw_lock_avg_time}");
225    // }
226    //
227    // fn test_rwlock(read_count: usize) -> u128 {
228    //     let lock_cell0 = Arc::new(RwLock::new(vec!["fffff"; 1000]));
229    //
230    //     let lock_cell1 = lock_cell0.clone();
231    //     let lock_cell2 = lock_cell0.clone();
232    //     let lock_cell3 = lock_cell0.clone();
233    //     let lock_cell4 = lock_cell0.clone();
234    //     let lock_cell5 = lock_cell0.clone();
235    //     let lock_cell6 = lock_cell0.clone();
236    //
237    //     std::thread::spawn(move || loop {
238    //         *lock_cell0.write().unwrap() = vec!["fffff"; 1001];
239    //     });
240    //
241    //     std::thread::spawn(move || loop {
242    //         *lock_cell1.write().unwrap() = vec!["fffff"; 1002]
243    //     });
244    //
245    //     std::thread::spawn(move || loop {
246    //         *lock_cell2.write().unwrap() = vec!["fffff"; 1003]
247    //     });
248    //
249    //     std::thread::spawn(move || loop {
250    //         *lock_cell3.write().unwrap() = vec!["fffff"; 1004]
251    //     });
252    //
253    //     std::thread::spawn(move || loop {
254    //         *lock_cell4.write().unwrap() = vec!["fffff"; 1005]
255    //     });
256    //
257    //     std::thread::spawn(move || loop {
258    //         *lock_cell5.write().unwrap() = vec!["fffff"; 1006]
259    //     });
260    //
261    //     let mut times = vec![];
262    //
263    //     for _ in 0..read_count {
264    //         let st = Instant::now();
265    //         lock_cell6.read().unwrap().len();
266    //         times.push(st.elapsed().as_nanos());
267    //     }
268    //     times.into_iter().sum::<u128>() / read_count as u128
269    // }
270    //
271    // fn test_rw_cell(read_count: usize) -> u128 {
272    //     let (w0, mut r) = mwsr::cell(vec!["fffff"; 1000]);
273    //
274    //     let w1 = w0.clone();
275    //     let w2 = w0.clone();
276    //     let w3 = w0.clone();
277    //     let w4 = w0.clone();
278    //     let w5 = w0.clone();
279    //
280    //     std::thread::spawn(move || loop {
281    //         w0.write(vec!["fffff"; 1001])
282    //     });
283    //
284    //     std::thread::spawn(move || loop {
285    //         w1.write(vec!["fffff"; 1002])
286    //     });
287    //
288    //     std::thread::spawn(move || loop {
289    //         w2.write(vec!["fffff"; 1003])
290    //     });
291    //
292    //     std::thread::spawn(move || loop {
293    //         w3.write(vec!["fffff"; 1004])
294    //     });
295    //
296    //     std::thread::spawn(move || loop {
297    //         w4.write(vec!["fffff"; 1005])
298    //     });
299    //
300    //     std::thread::spawn(move || loop {
301    //         w5.write(vec!["fffff"; 1006])
302    //     });
303    //
304    //     let mut times = vec![];
305    //
306    //     for _ in 0..read_count {
307    //         let st = Instant::now();
308    //         r.read().len();
309    //         times.push(st.elapsed().as_nanos());
310    //     }
311    //
312    //     times.into_iter().sum::<u128>() / read_count as u128
313    // }
314}