another-rxrust 0.0.46

A different implementation than `rxRust` for easier use of `ReactiveX` in `Rust`.
Documentation
use crate::prelude::*;
use subject::Subject;

#[derive(Clone)]
pub struct Publish<'a, Item>
where
  Item: Clone + Send + Sync,
{
  sbj: subjects::Subject<'a, Item>,
  source: Observable<'a, Item>,
}

impl<'a, Item> Publish<'a, Item>
where
  Item: Clone + Send + Sync,
{
  pub fn new(source: Observable<'a, Item>) -> Publish<'a, Item> {
    Publish { sbj: Subject::<Item>::new(), source }
  }

  pub fn observable(&self) -> Observable<'a, Item> {
    self.sbj.observable()
  }

  pub fn connect(&self) -> Subscription<'a> {
    let sbj_next = self.sbj.clone();
    let sbj_error = self.sbj.clone();
    let sbj_complete = self.sbj.clone();

    self.source.subscribe(
      move |x| {
        sbj_next.next(x);
      },
      move |e| {
        sbj_error.error(e);
      },
      move || {
        sbj_complete.complete();
      },
    )
  }
}

impl<'a, Item> Observable<'a, Item>
where
  Item: Clone + Send + Sync,
{
  pub fn publish(&self) -> publish::Publish<'a, Item> {
    Publish::new(self.clone())
  }
}

#[cfg(all(test, not(feature = "web")))]
mod test {
  use crate::prelude::*;
  use crate::{print_complete, print_error, print_next_fmt};
  use schedulers::new_thread_scheduler;
  use std::{thread, time};

  #[test]
  fn basic() {
    let o = observables::interval(
      time::Duration::from_millis(100),
      new_thread_scheduler(),
    )
    .tap(
      print_next_fmt!("tap {}"),
      print_error!(),
      print_complete!(),
    )
    .publish();
    let obs = o.observable();

    println!("start #1");
    let sbsc1 = obs.subscribe(
      print_next_fmt!("#1 {}"),
      print_error!(),
      print_complete!(),
    );

    println!("connect");
    let breaker = o.connect();
    thread::sleep(time::Duration::from_millis(500));

    println!("start #1");
    let sbsc2 = obs.subscribe(
      print_next_fmt!("#2 {}"),
      print_error!(),
      print_complete!(),
    );

    thread::sleep(time::Duration::from_millis(500));

    println!("end #1");
    sbsc1.unsubscribe();
    thread::sleep(time::Duration::from_millis(500));

    println!("end #2");
    sbsc2.unsubscribe();
    thread::sleep(time::Duration::from_millis(500));

    println!("braker");
    breaker.unsubscribe();
    thread::sleep(time::Duration::from_millis(500));
  }

  #[test]
  fn first_connect() {
    let o = observables::interval(
      time::Duration::from_millis(100),
      new_thread_scheduler(),
    )
    .tap(
      print_next_fmt!("tap {}"),
      print_error!(),
      print_complete!(),
    )
    .publish();
    let obs = o.observable();

    println!("connect");
    let breaker = o.connect();
    thread::sleep(time::Duration::from_millis(500));

    println!("start #1");
    let sbsc1 = obs.subscribe(
      print_next_fmt!("#1 {}"),
      print_error!(),
      print_complete!(),
    );

    println!("start #1");
    let sbsc2 = obs.subscribe(
      print_next_fmt!("#2 {}"),
      print_error!(),
      print_complete!(),
    );

    thread::sleep(time::Duration::from_millis(500));

    println!("end #1");
    sbsc1.unsubscribe();
    thread::sleep(time::Duration::from_millis(500));

    println!("end #2");
    sbsc2.unsubscribe();
    thread::sleep(time::Duration::from_millis(500));

    println!("braker");
    breaker.unsubscribe();
    thread::sleep(time::Duration::from_millis(500));
  }
}