rxrust 1.0.0-alpha.5

A Rust implementation of Reactive Extensions.
Documentation
use crate::{impl_helper::*, impl_local_shared_both, prelude::*};
use std::collections::VecDeque;
/// An Observable that combines from two other two Observables.
///
/// This struct is created by the zip method on [Observable](Observable::zip).
/// See its documentation for more.
#[derive(Clone)]
pub struct ZipOp<A, B> {
  pub(crate) a: A,
  pub(crate) b: B,
}

impl<A, B> Observable for ZipOp<A, B>
where
  A: Observable,
  B: Observable<Err = A::Err>,
{
  type Item = (A::Item, B::Item);
  type Err = A::Err;
}

impl_local_shared_both! {
  impl<A, B> ZipOp<A, B>;
  type Unsub = @ctx::RcMultiSubscription;
  macro method($self:ident, $observer: ident, $ctx: ident) {
    let sub = $ctx::RcMultiSubscription::default();
    let o_zip = ZipObserver::new($observer, sub.clone());
    let o_zip = $ctx::Rc::own(o_zip);
    sub.add(
      $self
        .a
        .actual_subscribe(AObserver(o_zip.clone(), TypeHint::new())),
    );

    sub.add($self.b.actual_subscribe(BObserver(o_zip, TypeHint::new())));
    sub
  }
  where
    A: @ctx::Observable,
    B: @ctx::Observable<Err=A::Err>,
    @ctx::shared_only(
      A::Item: Send + Sync + 'static,
      B::Item: Send + Sync + 'static,
    )
    @ctx::local_only(
      A::Item: 'o,
      B::Item: 'o,
    )
    A::Unsub: 'static @ctx::shared_only(+Send + Sync),
    B::Unsub:'static @ctx::shared_only(+Send + Sync)
}
enum ZipItem<A, B> {
  ItemA(A),
  ItemB(B),
}

struct ZipObserver<O, U, A, B> {
  observer: O,
  subscription: U,
  a: VecDeque<A>,
  b: VecDeque<B>,
  completed_one: bool,
}

impl<O, U, A, B> ZipObserver<O, U, A, B> {
  fn new(o: O, u: U) -> Self {
    ZipObserver {
      observer: o,
      subscription: u,
      a: VecDeque::default(),
      b: VecDeque::default(),
      completed_one: false,
    }
  }
}

impl<O, U, A, B, Err> Observer for ZipObserver<O, U, A, B>
where
  O: Observer<Item = (A, B), Err = Err>,
  U: SubscriptionLike,
{
  type Item = ZipItem<A, B>;
  type Err = Err;
  fn next(&mut self, value: ZipItem<A, B>) {
    match value {
      ZipItem::ItemA(v) => {
        if !self.b.is_empty() {
          self.observer.next((v, self.b.pop_front().unwrap()))
        } else {
          self.a.push_back(v);
        }
      }
      ZipItem::ItemB(v) => {
        if !self.a.is_empty() {
          self.observer.next((self.a.pop_front().unwrap(), v))
        } else {
          self.b.push_back(v)
        }
      }
    }
  }

  fn error(&mut self, err: Err) {
    self.observer.error(err);
    self.subscription.unsubscribe();
  }

  fn complete(&mut self) {
    if self.completed_one {
      self.subscription.unsubscribe();
      self.observer.complete();
    } else {
      self.completed_one = true;
    }
  }
}

struct AObserver<O, B>(O, TypeHint<B>);

impl<O, A, B, Err> Observer for AObserver<O, B>
where
  O: Observer<Item = ZipItem<A, B>, Err = Err>,
{
  type Item = A;
  type Err = Err;
  fn next(&mut self, value: A) {
    self.0.next(ZipItem::ItemA(value));
  }

  fn error(&mut self, err: Self::Err) {
    self.0.error(err)
  }

  fn complete(&mut self) {
    self.0.complete()
  }
}

struct BObserver<O, A>(O, TypeHint<A>);

impl<O, A, B, Err> Observer for BObserver<O, A>
where
  O: Observer<Item = ZipItem<A, B>, Err = Err>,
{
  type Item = B;
  type Err = Err;
  fn next(&mut self, value: B) {
    self.0.next(ZipItem::ItemB(value));
  }

  fn error(&mut self, err: Self::Err) {
    self.0.error(err)
  }

  fn complete(&mut self) {
    self.0.complete()
  }
}

#[cfg(test)]
mod test {
  use crate::prelude::*;
  use std::sync::atomic::{AtomicUsize, Ordering};
  use std::sync::Arc;

  #[test]
  fn smoke() {
    let zip = observable::from_iter(0..10).zip(observable::from_iter(0..10));
    let zipped_count = Arc::new(AtomicUsize::new(0));
    let zcc = zipped_count.clone();
    zip
      .clone()
      .count()
      .subscribe(|v| zipped_count.store(v, Ordering::Relaxed));
    let mut zipped_sum = 0;
    assert_eq!(zcc.load(Ordering::Relaxed), 10);
    zip.map(|(a, b)| a + b).sum().subscribe(|v| zipped_sum = v);
    assert_eq!(zipped_sum, 90);
  }

  #[test]
  fn complete() {
    let mut complete = false;
    {
      let mut s1 = LocalSubject::new();
      s1.clone()
        .zip(LocalSubject::new())
        .subscribe_complete(|((), ())| {}, || complete = true);

      s1.complete();
    }
    assert!(!complete);

    {
      let mut s1 = LocalSubject::new();
      let mut s2 = LocalSubject::new();
      s1.clone()
        .zip(s2.clone())
        .subscribe_complete(|((), ())| {}, || complete = true);

      s1.complete();
      s2.complete();
    }
    assert!(complete);
  }

  #[test]
  fn bench() {
    do_bench();
  }

  benchmark_group!(do_bench, bench_zip);

  fn bench_zip(b: &mut bencher::Bencher) {
    b.iter(smoke);
  }
}