rw_cell/mpsc.rs
1//!Implements the pattern "multiple producer single consumer"
2//!
3//! # Example
4//!
5//!```
6//!
7//! let (w0, mut r) = rw_cell::mpsc::new("Not good, but ok");
8//! let w1 = r.writer();
9//! let w2 = w0.clone();
10//!
11//! assert_eq!(r.read_with_is_new(), (&"Not good, but ok", false));
12//!
13//! w0.write("Not good");
14//! assert_eq!(r.read_with_is_new(), (&"Not good", true));
15//!
16//! w1.write("ok");
17//! assert_eq!(r.read(), &"ok");
18//!
19//! w2.write("But!!!");
20//! assert_eq!(r.read(), &"But!!!");
21//! ```
22
23use std::sync::Arc;
24use crate::option::OptionCell;
25
26/// Struct for write data in cell with non-copy and non-lock
27#[derive(Clone)]
28pub struct Writer<T> {
29 cell: Arc<OptionCell<T>>
30}
31
32impl<T> Writer<T> {
33 fn new(cell: Arc<OptionCell<T>>) -> Self {
34 Self {
35 cell,
36 }
37 }
38
39 /// Write value in cell
40 ///
41 /// # Examples
42 ///
43 /// ```
44 /// let (w, mut r) = rw_cell::mpsc::new("Not good");
45 /// w.write("Good");
46 ///
47 /// assert_eq!(r.read(), &"Good");
48 /// ```
49 pub fn write(&self, val: T) {
50 self.cell.replace(val);
51 }
52}
53
54/// Struct for read data from cell with non-copy and non-lock
55pub struct Reader<T> {
56 cell: Arc<OptionCell<T>>,
57 val: T
58}
59
60impl<T> Reader<T> {
61 fn new(cell: Arc<OptionCell<T>>) -> Self {
62
63 Self {
64 val: cell.take().unwrap(),
65 cell,
66 }
67 }
68
69 /// Return [`Writer`] for write data in cell
70 pub fn writer(&self) -> Writer<T> {
71 Writer::new(self.cell.clone())
72 }
73
74 /// Returns a tuple of value references and a boolean value, whether new or not
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// let (w, mut r) = rw_cell::mpsc::new("Not good");
80 /// assert_eq!(r.read_with_is_new(), (&"Not good", false));
81 ///
82 /// w.write("But ok");
83 /// assert_eq!(r.read_with_is_new(), (&"But ok", true));
84 /// ```
85 pub fn read_with_is_new(&mut self) -> (&T, bool) {
86 match self.cell.take() {
87 None => (&self.val, false),
88 Some(val) => {
89 self.val = val;
90 (&self.val, true)
91 }
92 }
93 }
94
95 /// Return a reference to the value from the cell
96 pub fn read(&mut self) -> &T {
97 match self.cell.take() {
98 None => &self.val,
99 Some(val) => {
100 self.val = val;
101 &self.val
102 }
103 }
104 }
105}
106
107/// Create new cell with [`Writer`] and [`Reader`]
108///
109/// # Examples
110///
111/// ```
112/// let (w, mut r) = rw_cell::mpsc::new("Not good");
113/// assert_eq!(r.read(), &"Not good");
114///
115/// w.write("But ok");
116/// assert_eq!(r.read_with_is_new(), (&"But ok", true));
117/// ```
118pub fn new<T>(val: T) -> (Writer<T>, Reader<T>) {
119 let cell = Arc::new(OptionCell::new(val));
120 (Writer::new(cell.clone()), Reader::new(cell))
121}
122
123pub fn default<T>() -> (Writer<T>, Reader<T>)
124 where
125 T: Default
126{
127 let cell = Arc::new(OptionCell::new(T::default()));
128 (Writer::new(cell.clone()), Reader::new(cell))
129}
130
131
132#[cfg(test)]
133mod test {
134 use crate::mpsc;
135
136 #[test]
137 fn test_read() {
138 let (w, mut r) = mpsc::new(vec!["fffff"; 1000]);
139 assert_eq!(r.read(), &vec!["fffff"; 1000]);
140 w.write(vec!["Not good, but ok"]);
141 assert_eq!(r.read(), &vec!["Not good, but ok"]);
142 }
143
144 #[test]
145 fn test_read_with_is_new() {
146 let (w, mut r) = mpsc::new(vec!["fffff"; 1000]);
147 assert_eq!(r.read_with_is_new(), (&vec!["fffff"; 1000], false));
148 w.write(vec!["Not good"]);
149 assert_eq!(r.read_with_is_new(), (&vec!["Not good"], true));
150 }
151
152 #[test]
153 fn is_work() {
154 let (w0, mut r) = mpsc::new(vec!["Ukraine"; 1000]);
155
156 let w1 = r.writer();
157 let w2 = r.writer();
158 let w3 = r.writer();
159 let w4 = r.writer();
160 let w5 = r.writer();
161
162 std::thread::spawn(move || loop {
163 w0.write(vec!["Slovakia"; 1001])
164 });
165
166 std::thread::spawn(move || loop {
167 w1.write(vec!["Estonia"; 1002])
168 });
169
170 std::thread::spawn(move || loop {
171 w2.write(vec!["Czechia"; 1003])
172 });
173
174 std::thread::spawn(move || loop {
175 w3.write(vec!["United Kingdom"; 1004])
176 });
177
178 std::thread::spawn(move || loop {
179 w4.write(vec!["Lithuania"; 1004])
180 });
181
182 std::thread::spawn(move || loop {
183 w5.write(vec!["Latvia"; 1004])
184 });
185
186 let mut slovakia = 0;
187 let mut estonia = 0;
188 let mut czechia = 0;
189 let mut united_kingdom = 0;
190 let mut lithuania = 0;
191 let mut latvia = 0;
192 let mut ukraine = 0;
193
194 for _ in 0..10000000usize {
195 match r.read() {
196 val if val.first() == Some(&"Slovakia") => slovakia += 1,
197 val if val.first() == Some(&"Estonia") => estonia += 1,
198 val if val.first() == Some(&"Czechia") => czechia += 1,
199 val if val.first() == Some(&"United Kingdom") => united_kingdom += 1,
200 val if val.first() == Some(&"Lithuania") => lithuania += 1,
201 val if val.first() == Some(&"Latvia") => latvia += 1,
202 val if val.first() == Some(&"Ukraine") => ukraine += 1,
203 val => println!("val: {:?}, Not good, but ok", val.first())
204 }
205 }
206
207 println!("count Slovakia: {}", slovakia);
208 println!("count Estonia: {}", estonia);
209 println!("count Czechia: {}", czechia);
210 println!("count United Kingdom: {}", united_kingdom);
211 println!("count Lithuania: {}", lithuania);
212 println!("count Latvia: {}", latvia);
213 println!("count Ukraine: {}", ukraine);
214 }
215
216 // #[test]
217 // fn test() {
218 // let read_count = 1_000_000usize;
219 //
220 // let rw_cell_avg_time = test_rw_cell(read_count);
221 // let rw_lock_avg_time = test_rwlock(read_count);
222 //
223 // println!("rw_cell_avg_time: {rw_cell_avg_time}");
224 // println!("rw_lock_avg_time: {rw_lock_avg_time}");
225 // }
226 //
227 // fn test_rwlock(read_count: usize) -> u128 {
228 // let lock_cell0 = Arc::new(RwLock::new(vec!["fffff"; 1000]));
229 //
230 // let lock_cell1 = lock_cell0.clone();
231 // let lock_cell2 = lock_cell0.clone();
232 // let lock_cell3 = lock_cell0.clone();
233 // let lock_cell4 = lock_cell0.clone();
234 // let lock_cell5 = lock_cell0.clone();
235 // let lock_cell6 = lock_cell0.clone();
236 //
237 // std::thread::spawn(move || loop {
238 // *lock_cell0.write().unwrap() = vec!["fffff"; 1001];
239 // });
240 //
241 // std::thread::spawn(move || loop {
242 // *lock_cell1.write().unwrap() = vec!["fffff"; 1002]
243 // });
244 //
245 // std::thread::spawn(move || loop {
246 // *lock_cell2.write().unwrap() = vec!["fffff"; 1003]
247 // });
248 //
249 // std::thread::spawn(move || loop {
250 // *lock_cell3.write().unwrap() = vec!["fffff"; 1004]
251 // });
252 //
253 // std::thread::spawn(move || loop {
254 // *lock_cell4.write().unwrap() = vec!["fffff"; 1005]
255 // });
256 //
257 // std::thread::spawn(move || loop {
258 // *lock_cell5.write().unwrap() = vec!["fffff"; 1006]
259 // });
260 //
261 // let mut times = vec![];
262 //
263 // for _ in 0..read_count {
264 // let st = Instant::now();
265 // lock_cell6.read().unwrap().len();
266 // times.push(st.elapsed().as_nanos());
267 // }
268 // times.into_iter().sum::<u128>() / read_count as u128
269 // }
270 //
271 // fn test_rw_cell(read_count: usize) -> u128 {
272 // let (w0, mut r) = mwsr::cell(vec!["fffff"; 1000]);
273 //
274 // let w1 = w0.clone();
275 // let w2 = w0.clone();
276 // let w3 = w0.clone();
277 // let w4 = w0.clone();
278 // let w5 = w0.clone();
279 //
280 // std::thread::spawn(move || loop {
281 // w0.write(vec!["fffff"; 1001])
282 // });
283 //
284 // std::thread::spawn(move || loop {
285 // w1.write(vec!["fffff"; 1002])
286 // });
287 //
288 // std::thread::spawn(move || loop {
289 // w2.write(vec!["fffff"; 1003])
290 // });
291 //
292 // std::thread::spawn(move || loop {
293 // w3.write(vec!["fffff"; 1004])
294 // });
295 //
296 // std::thread::spawn(move || loop {
297 // w4.write(vec!["fffff"; 1005])
298 // });
299 //
300 // std::thread::spawn(move || loop {
301 // w5.write(vec!["fffff"; 1006])
302 // });
303 //
304 // let mut times = vec![];
305 //
306 // for _ in 0..read_count {
307 // let st = Instant::now();
308 // r.read().len();
309 // times.push(st.elapsed().as_nanos());
310 // }
311 //
312 // times.into_iter().sum::<u128>() / read_count as u128
313 // }
314}