rxrust 1.0.0-alpha.5

A Rust implementation of Reactive Extensions.
Documentation
use super::box_it::LocalBoxOp;
#[cfg(not(all(target_arch = "wasm32")))]
use super::box_it::SharedBoxOp;
use crate::prelude::*;
#[cfg(not(all(target_arch = "wasm32")))]
use std::sync::{Arc, Mutex};
use std::{cell::RefCell, collections::VecDeque, rc::Rc};

pub struct MergeAllOp<S> {
  pub concurrent: usize,
  pub source: S,
}

impl<S> Observable for MergeAllOp<S>
where
  S: Observable,
  S::Item: Observable,
{
  type Item = <S::Item as Observable>::Item;
  type Err = S::Err;
}

impl<'a, S, Item> LocalObservable<'a> for MergeAllOp<S>
where
  S: LocalObservable<'a, Item = Item>,
  Item: LocalObservable<'a, Err = S::Err> + 'a,
  Item::Unsub: 'static,
  S::Unsub: 'static,
{
  type Unsub = LocalSubscription;
  fn actual_subscribe<O>(self, observer: O) -> Self::Unsub
  where
    O: Observer<Item = Self::Item, Err = Self::Err> + 'a,
  {
    let subscription = LocalSubscription::default();
    let c_subscription = subscription.clone();
    let s =
      self
        .source
        .map(|v| v.box_it())
        .actual_subscribe(Rc::new(RefCell::new(LocalMergeAllObserver {
          observer,
          subscribed: 0,
          concurrent: self.concurrent,
          subscription,
          buffer: <_>::default(),
          completed: false,
        })));
    c_subscription.add(s);
    c_subscription
  }
}

pub struct LocalMergeAllObserver<'a, O: Observer> {
  observer: O,
  subscribed: usize,
  concurrent: usize,
  subscription: LocalSubscription,
  completed: bool,
  buffer: VecDeque<LocalBoxOp<'a, O::Item, O::Err>>,
}

impl<'a, O> Observer for Rc<RefCell<LocalMergeAllObserver<'a, O>>>
where
  O: Observer + 'a,
{
  type Item = LocalBoxOp<'a, O::Item, O::Err>;
  type Err = O::Err;

  fn next(&mut self, value: Self::Item) {
    let mut inner = self.borrow_mut();
    if inner.subscribed < inner.concurrent {
      inner
        .subscription
        .add(value.actual_subscribe(LocalInnerObserver(self.clone())));
      inner.subscribed += 1;
    } else {
      inner.buffer.push_back(value);
    }
  }

  fn error(&mut self, err: Self::Err) {
    let mut inner = self.borrow_mut();
    inner.completed = true;
    inner.observer.error(err);
    inner.subscription.unsubscribe();
  }

  fn complete(&mut self) {
    let mut inner = self.borrow_mut();
    inner.completed = true;
    if inner.subscribed == 0 && inner.buffer.is_empty() {
      inner.observer.complete()
    }
  }
}

struct LocalInnerObserver<'a, O: Observer>(
  Rc<RefCell<LocalMergeAllObserver<'a, O>>>,
);

impl<'a, O> Observer for LocalInnerObserver<'a, O>
where
  O: Observer + 'a,
{
  type Item = O::Item;
  type Err = O::Err;
  #[inline]
  fn next(&mut self, value: Self::Item) {
    self.0.borrow_mut().observer.next(value);
  }

  fn error(&mut self, err: Self::Err) {
    let mut inner = self.0.borrow_mut();
    inner.subscribed -= 1;
    inner.observer.error(err);
    inner.subscription.unsubscribe();
  }

  fn complete(&mut self) {
    let mut inner = self.0.borrow_mut();

    if let Some(o) = inner.buffer.pop_front() {
      inner
        .subscription
        .add(o.actual_subscribe(LocalInnerObserver(self.0.clone())));
    } else {
      inner.subscribed -= 1;
      if inner.completed && inner.subscribed == 0 {
        inner.observer.complete();
        inner.subscription.unsubscribe();
      }
    }
  }
}

#[cfg(not(all(target_arch = "wasm32")))]
impl<S> SharedObservable for MergeAllOp<S>
where
  S: SharedObservable,
  S::Err: Send + Sync + 'static,
  S::Item: SharedObservable<Err = S::Err> + Send + Sync + 'static,
  <S::Item as SharedObservable>::Unsub: Send + Sync + 'static,
  Self::Item: Send + Sync + 'static,
  S::Unsub: 'static,
{
  type Unsub = SharedSubscription;

  fn actual_subscribe<O>(self, observer: O) -> Self::Unsub
  where
    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
  {
    let subscription = SharedSubscription::default();
    let c_subscription = subscription.clone();
    let s =
      self
        .source
        .map(|v| v.box_it())
        .actual_subscribe(Arc::new(Mutex::new(SharedMergeAllObserver {
          observer,
          subscribed: 0,
          concurrent: self.concurrent,
          subscription,
          buffer: <_>::default(),
          completed: false,
        })));
    c_subscription.add(s);
    c_subscription
  }
}

#[cfg(not(all(target_arch = "wasm32")))]
pub struct SharedMergeAllObserver<O: Observer> {
  observer: O,
  subscribed: usize,
  concurrent: usize,
  subscription: SharedSubscription,
  completed: bool,
  buffer: VecDeque<SharedBoxOp<O::Item, O::Err>>,
}

#[cfg(not(all(target_arch = "wasm32")))]
impl<O> Observer for Arc<Mutex<SharedMergeAllObserver<O>>>
where
  O: Observer + Send + Sync + 'static,
{
  type Item = SharedBoxOp<O::Item, O::Err>;
  type Err = O::Err;

  #[allow(clippy::mut_mutex_lock)]
  fn next(&mut self, value: Self::Item) {
    let mut inner = self.lock().unwrap();
    if inner.subscribed < inner.concurrent {
      inner
        .subscription
        .add(value.actual_subscribe(SharedInnerObserver(self.clone())));
      inner.subscribed += 1;
    } else {
      inner.buffer.push_back(value);
    }
  }

  #[allow(clippy::mut_mutex_lock)]
  fn error(&mut self, err: Self::Err) {
    let mut inner = self.lock().unwrap();
    inner.completed = true;
    inner.observer.error(err);
    inner.subscription.unsubscribe();
  }

  #[allow(clippy::mut_mutex_lock)]
  fn complete(&mut self) {
    let mut inner = self.lock().unwrap();
    inner.completed = true;
    if inner.subscribed == 0 && inner.buffer.is_empty() {
      inner.observer.complete()
    }
  }
}

#[cfg(not(all(target_arch = "wasm32")))]
struct SharedInnerObserver<O: Observer>(Arc<Mutex<SharedMergeAllObserver<O>>>);

#[cfg(not(all(target_arch = "wasm32")))]
impl<O> Observer for SharedInnerObserver<O>
where
  O: Observer + Send + Sync + 'static,
{
  type Item = O::Item;
  type Err = O::Err;
  #[inline]
  fn next(&mut self, value: Self::Item) {
    self.0.lock().unwrap().observer.next(value);
  }

  fn error(&mut self, err: Self::Err) {
    let mut inner = self.0.lock().unwrap();
    inner.subscribed -= 1;
    inner.observer.error(err);
    inner.subscription.unsubscribe();
  }

  fn complete(&mut self) {
    let mut inner = self.0.lock().unwrap();

    if let Some(o) = inner.buffer.pop_front() {
      inner
        .subscription
        .add(o.actual_subscribe(SharedInnerObserver(self.0.clone())));
    } else {
      inner.subscribed -= 1;
      if inner.completed && inner.subscribed == 0 {
        inner.observer.complete();
        inner.subscription.unsubscribe();
      }
    }
  }
}

#[cfg(test)]
mod test {
  use super::*;
  #[cfg(not(target_arch = "wasm32"))]
  use crate::observable::SubscribeBlocking;
  use futures::executor::LocalPool;
  #[cfg(not(target_arch = "wasm32"))]
  use futures::executor::ThreadPool;
  use std::time::Duration;

  #[test]
  fn local() {
    let values = Rc::new(RefCell::new(vec![]));
    let c_values = values.clone();

    let mut local = LocalPool::new();
    observable::from_iter(
      (0..3)
        .map(|_| interval(Duration::from_millis(1), local.spawner()).take(5)),
    )
    .merge_all(2)
    .subscribe(move |i| values.borrow_mut().push(i));
    local.run();

    assert_eq!(
      &*c_values.borrow(),
      &[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 0, 1, 2, 3, 4]
    );
  }

  #[test]
  fn fix_inner_unsubscribe() {
    let values = Rc::new(RefCell::new(vec![]));
    let c_values = values.clone();
    let mut subject = LocalSubject::default();

    let mut subscription = observable::of(subject.clone())
      .merge_all(1)
      .subscribe(move |v| values.borrow_mut().push(v));
    subscription.unsubscribe();

    subject.next(1);

    assert_eq!(&*c_values.borrow(), &[]);
  }

  #[cfg(not(all(target_arch = "wasm32")))]
  #[test]
  fn shared() {
    let values = Arc::new(Mutex::new(vec![]));
    let c_values = values.clone();

    let pool = ThreadPool::new().unwrap();
    observable::from_iter(
      (0..3).map(|_| interval(Duration::from_millis(1), pool.clone()).take(5)),
    )
    .merge_all(2)
    .into_shared()
    .subscribe_blocking(move |i| values.lock().unwrap().push(i));

    assert_eq!(
      &*c_values.lock().unwrap(),
      &[0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 0, 1, 2, 3, 4]
    );
  }
}