rxrust 0.15.0

A Rust implementation of Reactive Extensions.
Documentation
use crate::prelude::*;
use std::cell::RefCell;
use std::rc::Rc;

type RcPublishers<P> = Rc<RefCell<Vec<Box<P>>>>;
type _LocalSubject<P> = Subject<RcPublishers<P>, LocalSubscription>;
type _LocalBehaviorSubject<P, Item> =
  BehaviorSubject<RcPublishers<P>, LocalSubscription, Item>;

pub type LocalSubject<'a, Item, Err> =
  _LocalSubject<dyn Publisher<Item = Item, Err = Err> + 'a>;

pub type LocalSubjectRef<'a, Item, Err> =
  _LocalSubject<dyn Publisher<Item = &'a Item, Err = Err> + 'a>;

pub type LocalSubjectErrRef<'a, Item, Err> =
  _LocalSubject<dyn Publisher<Item = Item, Err = &'a Err> + 'a>;

pub type LocalSubjectRefAll<'a, Item, Err> =
  _LocalSubject<dyn Publisher<Item = &'a Item, Err = &'a Err> + 'a>;

pub type LocalBehaviorSubject<'a, Item, Err> =
  _LocalBehaviorSubject<dyn Publisher<Item = Item, Err = Err> + 'a, Item>;

pub type LocalBehaviorSubjectRef<'a, Item, Err> =
  _LocalBehaviorSubject<dyn Publisher<Item = &'a Item, Err = Err> + 'a, Item>;

pub type LocalBehaviorSubjectErrRef<'a, Item, Err> =
  _LocalBehaviorSubject<dyn Publisher<Item = Item, Err = &'a Err> + 'a, Item>;

pub type LocalBehaviorSubjectRefAll<'a, Item, Err> = _LocalBehaviorSubject<
  dyn Publisher<Item = &'a Item, Err = &'a Err> + 'a,
  Item,
>;

impl<'a, Item, Err> LocalSubject<'a, Item, Err> {
  #[inline]
  pub fn new() -> Self
  where
    Self: Default,
  {
    Self::default()
  }
  #[inline]
  pub fn subscribed_size(&self) -> usize {
    self.observers.observers.borrow().len()
  }
}

impl<'a, Item, Err> Observable for LocalSubject<'a, Item, Err> {
  type Item = Item;
  type Err = Err;
}

impl<'a, Item, Err> LocalObservable<'a> for LocalSubject<'a, Item, Err> {
  type Unsub = LocalSubscription;
  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
    self,
    subscriber: Subscriber<O, LocalSubscription>,
  ) -> LocalSubscription {
    let subscription = subscriber.subscription.clone();
    self.subscription.add(subscription.clone());
    self
      .observers
      .observers
      .borrow_mut()
      .push(Box::new(subscriber));
    subscription
  }
}

impl<'a, Item, Err> LocalBehaviorSubject<'a, Item, Err> {
  #[inline]
  pub fn new(value: Item) -> Self
  where
    Self: Default,
  {
    LocalBehaviorSubject {
      observers: Default::default(),
      subscription: Default::default(),
      value,
    }
  }
  #[inline]
  pub fn subscribed_size(&self) -> usize {
    self.observers.observers.borrow().len()
  }
}

impl<'a, Item, Err> Observable for LocalBehaviorSubject<'a, Item, Err> {
  type Item = Item;
  type Err = Err;
}

impl<'a, Item, Err> LocalObservable<'a>
  for LocalBehaviorSubject<'a, Item, Err>
{
  type Unsub = LocalSubscription;
  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
    self,
    subscriber: Subscriber<O, LocalSubscription>,
  ) -> LocalSubscription {
    let subscription = subscriber.subscription.clone();
    self.subscription.add(subscription.clone());

    self
      .observers
      .observers
      .borrow_mut()
      .push(Box::new(subscriber));

    if !subscription.is_closed() {
      self
        .observers
        .observers
        .borrow_mut()
        .last_mut()
        .unwrap()
        .next(self.value);
    }

    subscription
  }
}

#[cfg(test)]
mod test {
  use crate::prelude::*;

  #[test]
  fn smoke() {
    let mut test_code = 1;
    {
      let mut subject = LocalSubject::new();
      subject.clone().subscribe(|v| {
        test_code = v;
      });
      subject.next(2);

      assert_eq!(subject.subscribed_size(), 1);
    }
    assert_eq!(test_code, 2);
  }

  #[test]
  fn emit_ref() {
    let mut check = 0;

    {
      let mut subject = LocalSubjectRef::new();
      subject.clone().subscribe(|v| {
        check = *v;
      });
      subject.next(&1);
    }
    assert_eq!(check, 1);

    {
      let mut subject = LocalSubjectErrRef::new();
      subject
        .clone()
        .subscribe_err(|_: ()| {}, |err| check = *err);
      subject.error(&2);
    }
    assert_eq!(check, 2);

    {
      let mut subject = LocalSubjectRefAll::new();
      subject.clone().subscribe_err(|v| check = *v, |_: &()| {});
      subject.next(&1);
    }
    assert_eq!(check, 1);

    {
      let mut subject = LocalSubjectRefAll::new();
      subject
        .clone()
        .subscribe_err(|_: &()| {}, |err| check = *err);
      subject.error(&2);
    }
    assert_eq!(check, 2);
  }
}