rxrust 0.15.0

A Rust implementation of Reactive Extensions.
Documentation
use crate::prelude::*;
use crate::subject::{LocalSubject, SharedSubject};
use ops::ref_count::{RefCount, RefCountCreator};

#[derive(Clone, Default)]
pub struct ConnectableObservable<Source, Subject> {
  pub(crate) source: Source,
  pub(crate) subject: Subject,
}

impl<Source, Subject: Default> ConnectableObservable<Source, Subject> {
  pub fn new(source: Source) -> Self {
    ConnectableObservable {
      source,
      subject: Subject::default(),
    }
  }
}
crate::observable_proxy_impl!(ConnectableObservable, Source, Subject);

pub type LocalConnectableObservable<'a, S, Item, Err> =
  ConnectableObservable<S, LocalSubject<'a, Item, Err>>;

pub type SharedConnectableObservable<S, Item, Err> =
  ConnectableObservable<S, SharedSubject<Item, Err>>;

#[doc(hidden)]
macro_rules! observable_impl {
    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
  type Unsub = $subscription;
  #[inline(always)]
  fn actual_subscribe<O>(
    self,
    subscriber: Subscriber<O, $subscription>,
  ) -> Self::Unsub
  where O: Observer<Item=Self::Item, Err= Self::Err> + $($marker +)* $lf {
    self.subject.actual_subscribe(subscriber)
  }
}
}

impl<'a, S, Item, Err> LocalObservable<'a>
  for LocalConnectableObservable<'a, S, Item, Err>
where
  S: LocalObservable<'a, Item = Item, Err = Err>,
{
  observable_impl!(LocalSubscription, 'a);
}

impl<S, Item, Err> SharedObservable
  for SharedConnectableObservable<S, Item, Err>
where
  S: SharedObservable<Item = Item, Err = Err>,
  S: SharedObservable<Item = Item, Err = Err>,
{
  observable_impl!(SharedSubscription, Send + Sync + 'static);
}

impl<Source, Subject> ConnectableObservable<Source, Subject>
where
  Source: Clone,
{
  #[inline]
  pub fn ref_count<Inner: RefCountCreator<Connectable = Self>>(
    self,
  ) -> RefCount<Inner, Self> {
    Inner::new(self)
  }
}

impl<'a, S, Item, Err> LocalConnectableObservable<'a, S, Item, Err>
where
  S: LocalObservable<'a, Item = Item, Err = Err>,
  Item: Clone + 'a,
  Err: Clone + 'a,
{
  pub fn connect(self) -> S::Unsub {
    self.source.actual_subscribe(Subscriber {
      observer: self.subject.observers,
      subscription: self.subject.subscription,
    })
  }
}

impl<S, Item, Err> SharedConnectableObservable<S, Item, Err>
where
  S: SharedObservable<Item = Item, Err = Err>,
  Item: Clone + Send + Sync + 'static,
  Err: Clone + Send + Sync + 'static,
{
  pub fn connect(self) -> S::Unsub {
    self.source.actual_subscribe(Subscriber {
      observer: self.subject.observers,
      subscription: self.subject.subscription,
    })
  }
}

#[cfg(test)]
mod test {
  use super::*;

  #[test]
  fn smoke() {
    let o = observable::of(100);
    let connected = ConnectableObservable::new(o);
    let mut first = 0;
    let mut second = 0;
    connected.clone().subscribe(|v| first = v);
    connected.clone().subscribe(|v| second = v);

    connected.connect();
    assert_eq!(first, 100);
    assert_eq!(second, 100);
  }

  #[test]
  fn fork_and_shared() {
    let o = observable::of(100);
    let connected = ConnectableObservable::new(o);
    connected.clone().into_shared().subscribe(|_| {});
    connected.clone().into_shared().subscribe(|_| {});

    connected.connect();
  }
  #[test]
  fn publish_smoke() {
    let p = observable::of(100).publish();
    let mut first = 0;
    let mut second = 0;
    p.clone().subscribe(|v| first = v);
    p.clone().subscribe(|v| second = v);

    p.connect();
    assert_eq!(first, 100);
    assert_eq!(second, 100);
  }

  #[test]
  fn bench() { do_bench(); }

  benchmark_group!(do_bench, bench_connectable);

  fn bench_connectable(b: &mut bencher::Bencher) { b.iter(smoke); }
}