rxrust 0.7.2

A Rust implementation of Reactive Extensions.
Documentation
use crate::observer::{
  observer_complete_proxy_impl, observer_error_proxy_impl,
};
use crate::ops::SharedOp;
use crate::prelude::*;
/// Emits only the first `count` values emitted by the source Observable.
///
/// `take` returns an Observable that emits only the first `count` values
/// emitted by the source Observable. If the source emits fewer than `count`
/// values then all of its values are emitted. After that, it completes,
/// regardless if the source completes.
///
/// # Example
/// Take the first 5 seconds of an infinite 1-second interval Observable
///
/// ```
/// # use rxrust::{
///   ops::{Take}, prelude::*,
/// };
///
/// observable::from_iter(0..10).take(5).subscribe(|v| println!("{}", v));
///

/// // print logs:
/// // 0
/// // 1
/// // 2
/// // 3
/// // 4
/// ```
///
pub trait Take {
  fn take(self, count: u32) -> TakeOp<Self>
  where
    Self: Sized,
  {
    TakeOp {
      source: self,
      count,
    }
  }
}

impl<O> Take for O {}

pub struct TakeOp<S> {
  source: S,
  count: u32,
}

impl<S> IntoShared for TakeOp<S>
where
  S: IntoShared,
{
  type Shared = SharedOp<TakeOp<S::Shared>>;
  fn to_shared(self) -> Self::Shared {
    SharedOp(TakeOp {
      source: self.source.to_shared(),
      count: self.count,
    })
  }
}

impl<O, U, S> RawSubscribable<Subscriber<O, U>> for TakeOp<S>
where
  S: RawSubscribable<Subscriber<TakeObserver<O, U>, U>>,
  U: SubscriptionLike + Clone + 'static,
{
  type Unsub = S::Unsub;
  fn raw_subscribe(self, subscriber: Subscriber<O, U>) -> Self::Unsub {
    let subscriber = Subscriber {
      observer: TakeObserver {
        observer: subscriber.observer,
        subscription: subscriber.subscription.clone(),
        count: self.count,
        hits: 0,
      },
      subscription: subscriber.subscription,
    };
    self.source.raw_subscribe(subscriber)
  }
}

pub struct TakeObserver<O, S> {
  observer: O,
  subscription: S,
  count: u32,
  hits: u32,
}

impl<S, ST> IntoShared for TakeObserver<S, ST>
where
  S: IntoShared,
  ST: IntoShared,
{
  type Shared = TakeObserver<S::Shared, ST::Shared>;
  fn to_shared(self) -> Self::Shared {
    TakeObserver {
      observer: self.observer.to_shared(),
      subscription: self.subscription.to_shared(),
      count: self.count,
      hits: self.hits,
    }
  }
}

impl<Item, O, U> ObserverNext<Item> for TakeObserver<O, U>
where
  O: ObserverNext<Item> + ObserverComplete,
  U: SubscriptionLike,
{
  fn next(&mut self, value: Item) {
    if self.hits < self.count {
      self.hits += 1;
      self.observer.next(value);
      if self.hits == self.count {
        self.complete();
        self.subscription.unsubscribe();
      }
    }
  }
}

observer_error_proxy_impl!(TakeObserver<O, U>, O, observer, <O,U>);
observer_complete_proxy_impl!(TakeObserver<O, U>, O,  observer, <O,U>);

impl<S> Fork for TakeOp<S>
where
  S: Fork,
{
  type Output = TakeOp<S::Output>;
  fn fork(&self) -> Self::Output {
    TakeOp {
      source: self.source.fork(),
      count: self.count,
    }
  }
}

#[cfg(test)]
mod test {
  use super::Take;
  use crate::prelude::*;

  #[test]
  fn base_function() {
    let mut completed = false;
    let mut next_count = 0;

    observable::from_iter(0..100)
      .take(5)
      .subscribe_complete(|_| next_count += 1, || completed = true);

    assert_eq!(next_count, 5);
    assert_eq!(completed, true);
  }

  #[test]
  fn take_support_fork() {
    let mut nc1 = 0;
    let mut nc2 = 0;
    {
      let take5 = observable::from_iter(0..100).take(5);
      let f1 = take5.fork();
      let f2 = take5.fork();

      f1.take(5).fork().subscribe(|_| nc1 += 1);
      f2.take(5).fork().subscribe(|_| nc2 += 1);
    }
    assert_eq!(nc1, 5);
    assert_eq!(nc2, 5);
  }

  #[test]
  fn into_shared() {
    observable::from_iter(0..100)
      .take(5)
      .take(5)
      .to_shared()
      .subscribe(|_| {});
  }
}