pub mod cloneable;
use std::sync::Arc;
use crate::option::OptionCell;
#[derive(Default)]
pub struct Writer<T> {
cells: Vec<Arc<OptionCell<Arc<T>>>>,
val: Arc<T>,
}
impl<T> Writer<T> {
pub fn new(val: T) -> Writer<T> {
Self {
cells: vec![],
val: Arc::new(val),
}
}
pub fn share(&mut self, val: T) {
self.val = Arc::new(val);
self.cells.retain(|cell| {
let is_single_ref = Arc::strong_count(cell) != 1;
if is_single_ref {
cell.replace(self.val.clone());
}
is_single_ref
});
}
pub fn read(&self) -> &T {
&self.val
}
pub fn subscribe(&mut self) -> Reader<T> {
let cell = Arc::new(OptionCell::new(self.val.clone()));
self.cells.push(cell.clone());
Reader::new(cell)
}
}
pub struct Reader<T> {
cell: Arc<OptionCell<Arc<T>>>,
val: Arc<T>
}
impl<T> Reader<T> {
fn new(cell: Arc<OptionCell<Arc<T>>>) -> Reader<T> {
Self {
val: cell.take().unwrap(),
cell,
}
}
pub fn read_with_is_new(&mut self) -> (&T, bool) {
match self.cell.take() {
None => (&*self.val, false),
Some(val) => {
self.val = val;
(&*self.val, true)
}
}
}
pub fn read(&mut self) -> &T {
match self.cell.take() {
None => &self.val,
Some(val) => {
self.val = val;
&self.val
}
}
}
}
#[cfg(test)]
mod test {
use std::thread::JoinHandle;
use crate::broadcast::{Writer, Reader};
#[test]
fn test_remove_reader() {
let mut d = Writer::new("Good");
let mut _r1 = d.subscribe();
let mut _r2 = d.subscribe();
assert_eq!(d.cells.len(), 2);
drop(_r1);
d.share("Only one");
assert_eq!(d.cells.len(), 1)
}
#[test]
fn test_write_read() {
let mut tx = Writer::new("Not good, but ok");
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
assert_eq!(tx.read(), &"Not good, but ok");
assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));
tx.share("Not good");
assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
tx.share("ok");
assert_eq!(rx1.read(), &"ok");
assert_eq!(rx2.read(), &"ok");
}
#[test]
fn is_work() -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
let mut tx = Writer::new(vec!["Ukraine".to_string(); 1000]);
let rx0 = tx.subscribe();
let rx1 = tx.subscribe();
let rx2 = tx.subscribe();
let rx3 = tx.subscribe();
std::thread::spawn(move || loop {
for i in 0usize.. {
match i % 6 {
0 => tx.share(vec!["Slovakia".to_string(); 1001]),
1 => tx.share(vec!["Estonia".to_string(); 1001]),
2 => tx.share(vec!["Czechia".to_string(); 1001]),
3 => tx.share(vec!["United Kingdom".to_string(); 1001]),
4 => tx.share(vec!["Lithuania".to_string(); 1001]),
5 => tx.share(vec!["Latvia".to_string(); 1001]),
val => println!("val: {:?}, Not good, but ok", val)
}
}
});
let count_iter = 1_000_000;
let h0 = create_thread_read(rx0, count_iter);
let h1 = create_thread_read(rx1, count_iter);
let h2 = create_thread_read(rx2, count_iter);
let h3 = create_thread_read(rx3, count_iter);
let res0 = h0.join()?;
let res1 = h1.join()?;
let res2 = h2.join()?;
let res3 = h3.join()?;
println!("Slovakia: {}", res0.0 + res1.0 + res2.0 + res3.0);
println!("Estonia: {}", res0.1 + res1.1 + res2.1 + res3.1);
println!("Czechia: {}", res0.2 + res1.2 + res2.2 + res3.2);
println!("United Kingdom: {}", res0.3 + res1.3 + res2.3 + res3.3);
println!("Lithuania: {}", res0.4 + res1.4 + res2.4 + res3.4);
println!("Latvia: {}", res0.5 + res1.5 + res2.5 + res3.5);
println!("Ukraine: {}", res0.6 + res1.6 + res2.6 + res3.6);
Ok(())
}
fn create_thread_read(
mut rw: Reader<Vec<String>>,
count_iter: usize
) -> JoinHandle<(i32, i32, i32, i32, i32, i32, i32)> {
let mut slovakia = 0;
let mut estonia = 0;
let mut czechia = 0;
let mut united_kingdom = 0;
let mut lithuania = 0;
let mut latvia = 0;
let mut ukraine = 0;
std::thread::spawn(move || {
for _ in 0..count_iter {
match rw.read() {
val if val.first() == Some(&"Slovakia".to_string()) => slovakia += 1,
val if val.first() == Some(&"Estonia".to_string()) => estonia += 1,
val if val.first() == Some(&"Czechia".to_string()) => czechia += 1,
val if val.first() == Some(&"United Kingdom".to_string()) => united_kingdom += 1,
val if val.first() == Some(&"Lithuania".to_string()) => lithuania += 1,
val if val.first() == Some(&"Latvia".to_string()) => latvia += 1,
val if val.first() == Some(&"Ukraine".to_string()) => ukraine += 1,
val => println!("val: {:?}, Not good, but ok", val.first())
}
}
(slovakia, estonia, czechia, united_kingdom, lithuania, latvia, ukraine)
})
}
}