rxrust 0.15.0

A Rust implementation of Reactive Extensions.
Documentation
use crate::prelude::*;
use std::{
  cell::RefCell,
  rc::Rc,
  sync::{Arc, Mutex},
  time::{Duration, Instant},
};

#[derive(Clone)]
pub struct DebounceOp<S, SD> {
  pub(crate) source: S,
  pub(crate) scheduler: SD,
  pub(crate) duration: Duration,
}

observable_proxy_impl!(DebounceOp, S, SD);

impl<Item, Err, S, SD, Unsub> LocalObservable<'static> for DebounceOp<S, SD>
where
  S: LocalObservable<'static, Item = Item, Err = Err, Unsub = Unsub>,
  Unsub: SubscriptionLike + 'static,
  Item: Clone + 'static,
  SD: LocalScheduler + 'static,
{
  type Unsub = Unsub;

  fn actual_subscribe<
    O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
  >(
    self,
    subscriber: Subscriber<O, LocalSubscription>,
  ) -> Self::Unsub {
    let Self {
      source,
      scheduler,
      duration,
    } = self;

    source.actual_subscribe(Subscriber {
      observer: LocalDebounceObserver(Rc::new(RefCell::new(
        DebounceObserver {
          observer: subscriber.observer,
          delay: duration,
          scheduler,
          trailing_value: None,
          last_updated: None,
        },
      ))),
      subscription: subscriber.subscription,
    })
  }
}
impl<S, SD> SharedObservable for DebounceOp<S, SD>
where
  S: SharedObservable,
  S::Item: Clone + Send + 'static,
  SD: SharedScheduler + Send + 'static,
{
  type Unsub = S::Unsub;
  fn actual_subscribe<
    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
  >(
    self,
    subscriber: Subscriber<O, SharedSubscription>,
  ) -> S::Unsub {
    let Self {
      source,
      duration,
      scheduler,
    } = self;
    let Subscriber {
      observer,
      subscription,
    } = subscriber;
    source.actual_subscribe(Subscriber {
      observer: SharedDebounceObserver(Arc::new(Mutex::new(
        DebounceObserver {
          observer,
          scheduler,
          trailing_value: None,
          delay: duration,
          last_updated: None,
        },
      ))),
      subscription,
    })
  }
}

struct DebounceObserver<O, S, Item> {
  observer: O,
  scheduler: S,
  delay: Duration,
  trailing_value: Option<Item>,
  last_updated: Option<Instant>,
}

struct SharedDebounceObserver<O, S, Item>(
  Arc<Mutex<DebounceObserver<O, S, Item>>>,
);

struct LocalDebounceObserver<O, S, Item>(
  Rc<RefCell<DebounceObserver<O, S, Item>>>,
);

impl<O, S> Observer for SharedDebounceObserver<O, S, O::Item>
where
  O: Observer + Send + 'static,
  S: SharedScheduler + Send + 'static,
  O::Item: Clone + Send + 'static,
{
  type Item = O::Item;
  type Err = O::Err;
  fn next(&mut self, value: Self::Item) {
    let c_inner = self.0.clone();
    let mut inner = self.0.lock().unwrap();
    let updated = Some(Instant::now());
    inner.last_updated = updated;
    inner.trailing_value = Some(value);
    let delay = inner.delay;
    inner.scheduler.schedule(
      move |last| {
        let mut inner = c_inner.lock().unwrap();
        if let Some(value) = inner.trailing_value.clone() {
          if inner.last_updated == last {
            inner.observer.next(value);
            inner.trailing_value = None;
          }
        }
      },
      Some(delay),
      inner.last_updated,
    );
  }
  fn error(&mut self, err: Self::Err) {
    let mut inner = self.0.lock().unwrap();
    inner.observer.error(err)
  }
  fn complete(&mut self) {
    let mut inner = self.0.lock().unwrap();
    if let Some(value) = inner.trailing_value.take() {
      inner.observer.next(value);
    }
    inner.observer.complete();
  }
  fn is_stopped(&self) -> bool {
    let inner = self.0.lock().unwrap();
    inner.observer.is_stopped()
  }
}

impl<O, S> Observer for LocalDebounceObserver<O, S, O::Item>
where
  O: Observer + 'static,
  S: LocalScheduler + 'static,
  O::Item: Clone + 'static,
{
  type Item = O::Item;
  type Err = O::Err;
  fn next(&mut self, value: Self::Item) {
    let c_inner = self.0.clone();
    let mut inner = self.0.borrow_mut();
    let updated = Some(Instant::now());
    inner.last_updated = updated;
    inner.trailing_value = Some(value);
    let delay = inner.delay;
    inner.scheduler.schedule(
      move |last| {
        let mut inner = c_inner.borrow_mut();
        if let Some(value) = inner.trailing_value.clone() {
          if inner.last_updated == last {
            inner.observer.next(value);
            inner.trailing_value = None;
          }
        }
      },
      Some(delay),
      inner.last_updated,
    );
  }
  fn error(&mut self, err: Self::Err) {
    let mut inner = self.0.borrow_mut();
    inner.observer.error(err)
  }
  fn complete(&mut self) {
    let mut inner = self.0.borrow_mut();
    if let Some(value) = inner.trailing_value.take() {
      inner.observer.next(value);
    }
    inner.observer.complete();
  }
  fn is_stopped(&self) -> bool {
    let inner = self.0.borrow();
    inner.observer.is_stopped()
  }
}

#[cfg(test)]
mod tests {
  use super::*;
  use futures::executor::LocalPool;
  #[test]
  fn smoke_last() {
    let x = Rc::new(RefCell::new(vec![]));
    let x_c = x.clone();
    let mut pool = LocalPool::new();
    let interval =
      observable::interval(Duration::from_millis(2), pool.spawner());
    let spawner = pool.spawner();
    let debounce_subscribe = || {
      let x = x.clone();
      interval
        .clone()
        .take(10)
        .debounce(Duration::from_millis(3), spawner.clone())
        .subscribe(move |v| x.borrow_mut().push(v))
    };
    let mut sub = debounce_subscribe();
    pool.run();
    sub.unsubscribe();
    assert_eq!(&*x_c.borrow(), &[9]);
  }

  #[test]
  fn smoke_every() {
    let x = Rc::new(RefCell::new(vec![]));
    let x_c = x.clone();
    let mut pool = LocalPool::new();
    let interval =
      observable::interval(Duration::from_millis(3), pool.spawner());
    let spawner = pool.spawner();
    let debounce_subscribe = || {
      let x = x.clone();
      interval
        .clone()
        .take(10)
        .debounce(Duration::from_millis(2), spawner.clone())
        .subscribe(move |v| x.borrow_mut().push(v))
    };
    let mut sub = debounce_subscribe();
    pool.run();
    sub.unsubscribe();
    assert_eq!(&*x_c.borrow(), &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
  }

  #[test]
  fn fork_and_shared() {
    use futures::executor::ThreadPool;
    let scheduler = ThreadPool::new().unwrap();
    observable::from_iter(0..10)
      .debounce(Duration::from_nanos(1), scheduler)
      .into_shared()
      .into_shared()
      .subscribe(|_| {});
  }
}