rw-cell 2.0.3

Write data to cell from anything place your application without copy, lock and reading in one place
Documentation
//! Implements the pattern "single producer multiple consumer"
//!
//! # Example
//!
//!```
//! let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
//!
//! assert_eq!(tx.read(), &"Not good, but ok");
//!
//! let mut rx1 = tx.subscribe();
//! let mut rx2 = tx.subscribe();
//!
//! assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
//! assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));
//!
//! tx.share("Not good");
//! assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
//! assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
//!
//! tx.share("ok");
//! assert_eq!(rx1.read(), &"ok");
//! assert_eq!(rx2.read(), &"ok");
//! ```

pub mod cloneable;

use std::sync::Arc;
use crate::option::OptionCell;

/// Distribute value between all readers
#[derive(Default)]
pub struct Writer<T> {
    cells: Vec<Arc<OptionCell<Arc<T>>>>,
    val: Arc<T>,
}

impl<T> Writer<T> {
    /// Create new Writer
    pub fn new(val: T) -> Writer<T> {
        Self {
            cells: vec![],
            val: Arc::new(val),
        }
    }

    /// Write val and distribute it between all readers
    ///
    /// # Example
    ///
    /// ```
    /// let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
    /// let mut rx1 = tx.subscribe();
    /// let mut rx2 = tx.subscribe();
    ///
    /// tx.share("Not good");
    ///
    /// assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
    /// assert_eq!(rx2.read_with_is_new(), (&"Not good", true));
    /// ```
    pub fn share(&mut self, val: T) {
        self.val = Arc::new(val);

        self.cells.retain(|cell| {
            let is_single_ref = Arc::strong_count(cell) != 1;
            if is_single_ref {
                cell.replace(self.val.clone());
            }

            is_single_ref
        });
    }

    /// Return a reference to the value from the cell
    ///
    /// # Example
    ///
    /// ```
    /// let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
    ///
    /// assert_eq!(tx.read(), &"Not good, but ok");
    /// ```
    pub fn read(&self) -> &T {
        &self.val
    }

    /// Return new [`Reader`] for read data from cell
    pub fn subscribe(&mut self) -> Reader<T> {
        let cell = Arc::new(OptionCell::new(self.val.clone()));
        self.cells.push(cell.clone());
        Reader::new(cell)
    }
}

/// Struct for read data from cell with non-copy and non-lock
pub struct Reader<T> {
    cell: Arc<OptionCell<Arc<T>>>,
    val: Arc<T>
}

impl<T> Reader<T> {
    fn new(cell: Arc<OptionCell<Arc<T>>>) -> Reader<T> {
        Self {
            val: cell.take().unwrap(),
            cell,
        }
    }

    /// Returns a tuple of value references and a boolean value, whether new or not
    ///
    /// # Examples
    ///
    /// ```
    /// let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
    /// let mut rx = tx.subscribe();
    ///
    /// assert_eq!(rx.read_with_is_new(), (&"Not good, but ok", false));
    ///
    /// tx.share("Not good");
    ///
    /// assert_eq!(rx.read_with_is_new(), (&"Not good", true));
    pub fn read_with_is_new(&mut self) -> (&T, bool) {
        match self.cell.take() {
            None => (&*self.val, false),
            Some(val) => {
                self.val = val;
                (&*self.val, true)
            }
        }
    }

    /// Return a reference to the value from the cell
    ///
    /// # Examples
    ///
    /// ```
    /// let mut tx = rw_cell::broadcast::Writer::new("Not good, but ok");
    /// let mut rx = tx.subscribe();
    ///
    /// assert_eq!(rx.read(), &"Not good, but ok");
    ///
    /// tx.share("ok");
    ///
    /// assert_eq!(rx.read(), &"ok");
    pub fn read(&mut self) -> &T {
        match self.cell.take() {
            None => &self.val,
            Some(val) => {
                self.val = val;
                &self.val
            }
        }
    }
}


#[cfg(test)]
mod test {
    use std::thread::JoinHandle;
    use crate::broadcast::{Writer, Reader};

    #[test]
    fn test_remove_reader() {
        let mut d = Writer::new("Good");
        let mut _r1 = d.subscribe();
        let mut _r2 = d.subscribe();

        assert_eq!(d.cells.len(), 2);
        drop(_r1);

        d.share("Only one");

        assert_eq!(d.cells.len(), 1)
    }

    #[test]
    fn test_write_read() {
        let mut tx = Writer::new("Not good, but ok");
        let mut rx1 = tx.subscribe();
        let mut rx2 = tx.subscribe();

        assert_eq!(tx.read(), &"Not good, but ok");

        assert_eq!(rx1.read_with_is_new(), (&"Not good, but ok", false));
        assert_eq!(rx2.read_with_is_new(), (&"Not good, but ok", false));

        tx.share("Not good");
        assert_eq!(rx1.read_with_is_new(), (&"Not good", true));
        assert_eq!(rx2.read_with_is_new(), (&"Not good", true));

        tx.share("ok");
        assert_eq!(rx1.read(), &"ok");
        assert_eq!(rx2.read(), &"ok");
    }

    #[test]
    fn is_work() -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
        let mut tx = Writer::new(vec!["Ukraine".to_string(); 1000]);

        let rx0 = tx.subscribe();
        let rx1 = tx.subscribe();
        let rx2 = tx.subscribe();
        let rx3 = tx.subscribe();

        std::thread::spawn(move || loop {
            for i in 0usize.. {
                match i % 6 {
                    0 => tx.share(vec!["Slovakia".to_string(); 1001]),
                    1 => tx.share(vec!["Estonia".to_string(); 1001]),
                    2 => tx.share(vec!["Czechia".to_string(); 1001]),
                    3 => tx.share(vec!["United Kingdom".to_string(); 1001]),
                    4 => tx.share(vec!["Lithuania".to_string(); 1001]),
                    5 => tx.share(vec!["Latvia".to_string(); 1001]),
                    val => println!("val: {:?}, Not good, but ok", val)
                }
            }
        });
        let count_iter = 1_000_000;
        let h0 = create_thread_read(rx0, count_iter);
        let h1 = create_thread_read(rx1, count_iter);
        let h2 = create_thread_read(rx2, count_iter);
        let h3 = create_thread_read(rx3, count_iter);

        let res0 = h0.join()?;
        let res1 = h1.join()?;
        let res2 = h2.join()?;
        let res3 = h3.join()?;

        println!("Slovakia:         {}", res0.0 + res1.0 + res2.0 + res3.0);
        println!("Estonia:          {}", res0.1 + res1.1 + res2.1 + res3.1);
        println!("Czechia:          {}", res0.2 + res1.2 + res2.2 + res3.2);
        println!("United Kingdom:   {}", res0.3 + res1.3 + res2.3 + res3.3);
        println!("Lithuania:        {}", res0.4 + res1.4 + res2.4 + res3.4);
        println!("Latvia:           {}", res0.5 + res1.5 + res2.5 + res3.5);
        println!("Ukraine:          {}", res0.6 + res1.6 + res2.6 + res3.6);

        Ok(())
    }

    fn create_thread_read(
        mut rw: Reader<Vec<String>>,
        count_iter: usize
    ) -> JoinHandle<(i32, i32, i32, i32, i32, i32, i32)> {
        let mut slovakia = 0;
        let mut estonia = 0;
        let mut czechia = 0;
        let mut united_kingdom = 0;
        let mut lithuania = 0;
        let mut latvia = 0;
        let mut ukraine = 0;

        std::thread::spawn(move || {
            for _ in 0..count_iter {
                match rw.read() {
                    val if val.first() == Some(&"Slovakia".to_string())         => slovakia += 1,
                    val if val.first() == Some(&"Estonia".to_string())          => estonia += 1,
                    val if val.first() == Some(&"Czechia".to_string())          => czechia += 1,
                    val if val.first() == Some(&"United Kingdom".to_string())   => united_kingdom += 1,
                    val if val.first() == Some(&"Lithuania".to_string())        => lithuania += 1,
                    val if val.first() == Some(&"Latvia".to_string())           => latvia += 1,
                    val if val.first() == Some(&"Ukraine".to_string())          => ukraine += 1,
                    val => println!("val: {:?}, Not good, but ok", val.first())
                }
            }
            (slovakia, estonia, czechia, united_kingdom, lithuania, latvia, ukraine)
        })
    }
}