rxrust 1.0.0-alpha.5

A Rust implementation of Reactive Extensions.
Documentation
use crate::{impl_local_shared_both, prelude::*};

#[derive(Clone)]
pub struct StartWithOp<S, B> {
  pub(crate) source: S,
  pub(crate) values: Vec<B>,
}

impl<S, B> Observable for StartWithOp<S, B>
where
  S: Observable,
{
  type Item = S::Item;
  type Err = S::Err;
}

impl_local_shared_both! {
  impl<S, B> StartWithOp<S, B>;
  type Unsub = S::Unsub;
  macro method($self: ident, $observer: ident, $ctx: ident) {
    for val in $self.values {
      $observer.next(val);
    }

    $self.source.actual_subscribe($observer)
  }
  where
    S: @ctx::Observable<Item=B>,
    @ctx::shared_only(
      B: Clone + Into<S::Item> + Send + Sync + 'static,
      S::Item: 'static,
    )
    @ctx::local_only(
      B: Clone + Into<S::Item> + 'o,
      S::Item: Clone + 'o,
    )
}

#[cfg(test)]
mod test {
  use crate::of_sequence;
  use crate::prelude::*;
  use crate::test_scheduler::ManualScheduler;
  use std::cell::RefCell;
  use std::rc::Rc;
  use std::time::Duration;

  #[test]
  fn simple_integer() {
    let mut ret = String::new();

    {
      let s = of_sequence!(1, 2, 3);

      s.start_with(vec![-1, 0]).subscribe(|value| {
        ret.push_str(&value.to_string());
      });
    }

    assert_eq!(ret, "-10123");
  }

  #[test]
  fn simple_string() {
    let mut ret = String::new();

    {
      let s = of_sequence!(" World!", " Goodbye", " World!");

      s.start_with(vec!["Hello"]).subscribe(|value| {
        ret.push_str(value);
      });
    }

    assert_eq!(ret, "Hello World! Goodbye World!");
  }

  #[test]
  fn should_start_on_subscription() {
    let scheduler = ManualScheduler::now();
    let values = Rc::new(RefCell::new(vec![]));
    let interval =
      observable::interval(Duration::from_millis(100), scheduler.clone());

    {
      let values = values.clone();
      interval
        .start_with(vec![0])
        .subscribe(move |v| values.borrow_mut().push(v));
    }

    scheduler.advance_and_run(Duration::from_millis(10), 1);
    assert_eq!(values.borrow().len(), 1);
    assert_eq!(values.borrow().as_ref(), vec![0]);
  }

  #[test]
  fn bench() {
    do_bench();
  }

  benchmark_group!(do_bench, bench_start_with);

  fn bench_start_with(b: &mut bencher::Bencher) {
    b.iter(simple_integer);
  }
}