another-rxrust 0.0.46

A different implementation than `rxRust` for easier use of `ReactiveX` in `Rust`.
Documentation
use crate::{internals::function_wrapper::FunctionWrapper, prelude::*};
use std::{
  collections::HashMap,
  sync::{Arc, RwLock},
};

#[derive(Clone)]
pub struct Subject<'a, Item>
where
  Item: Clone + Send + Sync,
{
  observers: Arc<RwLock<HashMap<i32, Observer<'a, Item>>>>,
  serial: Arc<RwLock<i32>>,
  on_subscribe: Arc<RwLock<Option<FunctionWrapper<'a, usize, ()>>>>,
  on_unsubscribe: Arc<RwLock<Option<FunctionWrapper<'a, usize, ()>>>>,
}

impl<'a, Item> Subject<'a, Item>
where
  Item: Clone + Send + Sync,
{
  pub fn new() -> Subject<'a, Item> {
    Subject {
      observers: Arc::new(RwLock::new(HashMap::new())),
      serial: Arc::new(RwLock::new(0)),
      on_subscribe: Arc::new(RwLock::new(None)),
      on_unsubscribe: Arc::new(RwLock::new(None)),
    }
  }

  fn fetch_observers(&self) -> Vec<Observer<'a, Item>> {
    let binding = self.observers.read().unwrap();
    let x = binding.iter().map(|x| x.1.clone());
    Vec::from_iter(x)
  }

  pub fn next(&self, item: Item) {
    self
      .fetch_observers()
      .into_iter()
      .for_each(move |x| x.next(item.clone()));
  }
  pub fn error(&self, err: RxError) {
    let obs = self.fetch_observers();
    self.observers.write().unwrap().clear();
    obs.into_iter().for_each(move |x| x.error(err.clone()));
  }
  pub fn complete(&self) {
    let obs = self.fetch_observers();
    self.observers.write().unwrap().clear();
    obs.into_iter().for_each(|x| x.complete());
  }

  pub fn observable(&self) -> Observable<'a, Item> {
    let observers = Arc::clone(&self.observers);
    let serial = Arc::clone(&self.serial);

    let on_subscribe = Arc::clone(&self.on_subscribe);
    let on_unsubscribe = Arc::clone(&self.on_unsubscribe);

    Observable::create(move |s| {
      let serial = {
        let mut serial = serial.write().unwrap();
        *serial += 1;
        *serial
      };
      {
        let observers = Arc::clone(&observers);
        let on_unsubscribe = Arc::clone(&on_unsubscribe);
        s.set_on_unsubscribe(move || {
          let len = {
            let mut observers = observers.write().unwrap();
            observers.remove(&serial);
            observers.len()
          };
          if let Some(on_unsubscribe) = &*on_unsubscribe.read().unwrap() {
            on_unsubscribe.call(len);
          }
        });
      }
      let len = {
        let mut observers = observers.write().unwrap();
        observers.insert(serial, s);
        observers.len()
      };
      if let Some(on_subscribe) = &*on_subscribe.read().unwrap() {
        on_subscribe.call(len);
      }
    })
  }

  pub(crate) fn set_on_subscribe<F>(&self, f: F)
  where
    F: Fn(usize) + Send + Sync + 'a,
  {
    *self.on_subscribe.write().unwrap() = Some(FunctionWrapper::new(f));
  }

  pub(crate) fn set_on_unsubscribe<F>(&self, f: F)
  where
    F: Fn(usize) + Send + Sync + 'a,
  {
    *self.on_unsubscribe.write().unwrap() = Some(FunctionWrapper::new(f));
  }
}

#[cfg(test)]
mod tset {
  use crate::prelude::*;
  use std::{thread, time};

  #[test]
  fn basic() {
    let sbj = subjects::Subject::new();

    sbj.observable().subscribe(
      |x| println!("#1 next {}", x),
      |e| println!("#1 error {:?}", e),
      || println!("#1 complete"),
    );

    sbj.next(1);
    sbj.next(2);
    sbj.next(3);
    sbj.complete();
  }

  #[test]
  fn double() {
    let sbj = subjects::Subject::new();

    sbj.set_on_subscribe(|x| println!("on_subscribe {}", x));
    sbj.set_on_unsubscribe(|x| println!("on_unsubscribe {}", x));

    let binding = sbj.observable();
    let sbsc1 = binding.subscribe(
      |x| println!("#1 next {}", x),
      |e| println!("#1 error {:?}", e),
      || println!("#1 complete"),
    );

    sbj.next(1);
    sbj.next(2);
    sbj.next(3);

    sbj.observable().subscribe(
      |x| println!("#2 next {}", x),
      |e| println!("#2 error {:?}", e),
      || println!("#2 complete"),
    );

    sbj.next(4);
    sbj.next(5);
    sbj.next(6);

    sbsc1.unsubscribe();

    sbj.next(7);
    sbj.next(8);
    sbj.next(9);

    sbj.complete();
  }

  #[test]
  fn thread() {
    let sbj = subjects::Subject::new();

    let sbj_thread = sbj.clone();
    let th = thread::spawn(move || {
      for n in 0..10 {
        thread::sleep(time::Duration::from_millis(100));
        sbj_thread.next(n);
      }
      sbj_thread.complete();
    });

    let binding = sbj.observable();
    let sbsc1 = binding.subscribe(
      |x| println!("#1 next {}", x),
      |e| println!("#1 error {:?}", e),
      || println!("#1 complete"),
    );

    thread::sleep(time::Duration::from_millis(300));

    sbj.observable().subscribe(
      |x| println!("#2 next {}", x),
      |e| println!("#2 error {:?}", e),
      || println!("#2 complete"),
    );

    thread::sleep(time::Duration::from_millis(300));
    sbsc1.unsubscribe();

    th.join().ok();
  }
}