another-rxrust 0.0.46

A different implementation than `rxRust` for easier use of `ReactiveX` in `Rust`.
Documentation
use crate::prelude::*;
use std::sync::{Arc, RwLock};

#[derive(Clone)]
pub struct BehaviorSubject<'a, Item>
where
  Item: Clone + Send + Sync,
{
  subject: Arc<subject::Subject<'a, Item>>,
  last_item: Arc<RwLock<Option<Item>>>,
  last_error: Arc<RwLock<Option<RxError>>>,
}

impl<'a, Item> BehaviorSubject<'a, Item>
where
  Item: Clone + Send + Sync,
{
  pub fn new(initial: Item) -> BehaviorSubject<'a, Item> {
    BehaviorSubject {
      subject: Arc::new(subjects::Subject::new()),
      last_item: Arc::new(RwLock::new(Some(initial))),
      last_error: Arc::new(RwLock::new(None)),
    }
  }

  pub fn next(&self, item: Item) {
    *self.last_item.write().unwrap() = Some(item.clone());
    self.subject.next(item);
  }
  pub fn error(&self, err: RxError) {
    *self.last_error.write().unwrap() = Some(err.clone());
    self.subject.error(err);
  }
  pub fn complete(&self) {
    *self.last_item.write().unwrap() = None;
    self.subject.complete();
  }
  pub fn observable(&self) -> Observable<'a, Item> {
    let last_item = Arc::clone(&self.last_item);
    let last_error = Arc::clone(&self.last_error);
    let subject = Arc::clone(&self.subject);

    Observable::create(move |s| {
      {
        let last_item = &*last_item.read().unwrap();
        let last_error = &*last_error.read().unwrap();

        if let Some(err) = last_error {
          s.error(err.clone());
          return;
        }
        if let Some(item) = last_item {
          s.next(item.clone());
        } else {
          s.complete();
          return;
        }
      }

      let sbsc = Arc::new(RwLock::new(None::<Subscription>));
      {
        let sbsc = Arc::clone(&sbsc);
        s.set_on_unsubscribe(move || {
          if let Some(sbsc) = &*sbsc.read().unwrap() {
            sbsc.unsubscribe();
          }
        });
      }

      let s_next = s.clone();
      let s_error = s.clone();
      let s_complete = s.clone();
      *sbsc.write().unwrap() = Some(subject.observable().subscribe(
        move |x| s_next.next(x),
        move |e| s_error.error(e),
        move || {
          s_complete.complete();
        },
      ));
    })
  }
}

#[cfg(test)]
mod tset {
  use crate::prelude::*;
  use std::{thread, time};

  #[test]
  fn basic() {
    let sbj = subjects::BehaviorSubject::<i32>::new(100);

    println!("start #1");
    sbj.observable().subscribe(
      |x| println!("#1 next {}", x),
      |e| println!("#1 error {:?}", e),
      || println!("#1 complete"),
    );

    sbj.next(1);
    sbj.next(2);

    println!("start #2");
    sbj.observable().subscribe(
      |x| println!("#2 next {}", x),
      |e| println!("#2 error {:?}", e),
      || println!("#2 complete"),
    );

    sbj.next(3);
    sbj.complete();

    println!("start #3");
    sbj.observable().subscribe(
      |x| println!("#3 next {}", x),
      |e| println!("#3 error {:?}", e),
      || println!("#3 complete"),
    );
  }

  #[test]
  fn double() {
    let sbj = subjects::BehaviorSubject::<i32>::new(100);

    println!("start #1");
    let sbsc1 = sbj.observable().subscribe(
      |x| println!("#1 next {}", x),
      |e| {
        println!(
          "#1 error {:?}",
          e.downcast_ref::<&str>()
        )
      },
      || println!("#1 complete"),
    );

    sbj.next(1);
    sbj.next(2);
    sbj.next(3);

    println!("start #2");
    sbj.observable().subscribe(
      |x| println!("#2 next {}", x),
      |e| {
        println!(
          "#2 error {:?}",
          e.downcast_ref::<&str>()
        )
      },
      || println!("#2 complete"),
    );

    sbj.next(4);
    sbj.next(5);
    sbj.next(6);

    sbsc1.unsubscribe();

    sbj.next(7);
    sbj.next(8);
    sbj.next(9);

    sbj.error(RxError::from_error("ERR!"));

    println!("start #3");
    sbj.observable().subscribe(
      |x| println!("#3 next {}", x),
      |e| {
        println!(
          "#3 error {:?}",
          e.downcast_ref::<&str>()
        )
      },
      || println!("#3 complete"),
    );
  }

  #[test]
  fn thread() {
    let sbj = subjects::BehaviorSubject::<i32>::new(100);

    let sbj_thread = sbj.clone();
    let th = thread::spawn(move || {
      for n in 0..10 {
        thread::sleep(time::Duration::from_millis(100));
        sbj_thread.next(n);
      }
      sbj_thread.complete();
    });

    println!("start #1");
    let sbsc1 = sbj.observable().subscribe(
      |x| println!("#1 next {}", x),
      |e| println!("#1 error {:?}", e),
      || println!("#1 complete"),
    );

    thread::sleep(time::Duration::from_millis(300));

    println!("start #2");
    sbj.observable().subscribe(
      |x| println!("#2 next {}", x),
      |e| println!("#2 error {:?}", e),
      || println!("#2 complete"),
    );

    thread::sleep(time::Duration::from_millis(300));

    println!("unsbscribe #1");
    sbsc1.unsubscribe();

    th.join().ok();
  }
}