rxrust 1.0.0-alpha.5

A Rust implementation of Reactive Extensions.
Documentation
use crate::prelude::*;

#[derive(Clone)]
pub struct ContainsOp<S, Item> {
  pub(crate) source: S,
  pub(crate) target: Item,
}

impl<S, Item> Observable for ContainsOp<S, Item>
where
  S: Observable<Item = Item>,
{
  type Item = bool;
  type Err = S::Err;
}

#[doc(hidden)]
macro_rules! observable_impl {
    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
  fn actual_subscribe<O>(
    self,
    observer: O,
  ) -> Self::Unsub
  where O: Observer<Item=bool,Err= Self::Err> + $($marker +)* $lf {
    self.source.actual_subscribe(ContainsObserver{
      observer,
      target: self.target,
      done:false,
    })
  }
}
}

impl<'a, Item, S> LocalObservable<'a> for ContainsOp<S, Item>
where
  S: LocalObservable<'a, Item = Item>,
  Item: 'a + Clone + Eq,
{
  type Unsub = S::Unsub;
  observable_impl!(LocalSubscription,'a);
}

impl<Item, S> SharedObservable for ContainsOp<S, Item>
where
  S: SharedObservable<Item = Item>,
  Item: Send + Sync + 'static + Clone + Eq,
{
  type Unsub = S::Unsub;
  observable_impl!(SharedSubscription, Send + Sync + 'static);
}

pub struct ContainsObserver<S, T> {
  observer: S,
  target: T,
  done: bool,
}

impl<O, Item, Err> Observer for ContainsObserver<O, Item>
where
  O: Observer<Item = bool, Err = Err>,
  Item: Clone + Eq,
{
  type Item = Item;
  type Err = Err;
  fn next(&mut self, value: Item) {
    if !self.done && self.target == value {
      self.observer.next(true);
      self.done = true;
      self.observer.complete();
    }
  }

  fn error(&mut self, err: Self::Err) {
    self.observer.error(err)
  }

  fn complete(&mut self) {
    if !self.done {
      self.observer.next(false);
      self.observer.complete();
    }
  }
}

#[cfg(test)]
mod test {
  use crate::prelude::*;
  #[test]
  fn contains_smoke() {
    observable::from_iter(0..10)
      .contains(4)
      .subscribe(|b| assert!(b));
    observable::from_iter(0..10)
      .contains(99)
      .subscribe(|b| assert!(!b));
    observable::empty().contains(1).subscribe(|b| assert!(!b));
  }

  #[cfg(not(target_arch = "wasm32"))]
  #[test]
  fn contains_shared() {
    observable::from_iter(0..10)
      .contains(4)
      .into_shared()
      .subscribe(|b| assert!(b));
  }

  #[test]
  fn bench() {
    do_bench();
  }

  benchmark_group!(do_bench, bench_contains);

  fn bench_contains(b: &mut bencher::Bencher) {
    b.iter(contains_smoke);
  }
}