rxrust 0.7.2

A Rust implementation of Reactive Extensions.
Documentation
use crate::prelude::*;
use crate::subject::{LocalSubject, SharedSubject};

pub struct ConnectableObservable<Source, Subject> {
  source: Source,
  subject: Subject,
}

pub trait Connect {
  type Unsub;
  fn connect(self) -> Self::Unsub;
}

impl<S, O, U, Sub> RawSubscribable<Sub>
  for ConnectableObservable<S, Subject<O, U>>
where
  S: RawSubscribable<Subscriber<O, U>>,
  Subject<O, U>: RawSubscribable<Sub>,
{
  type Unsub = <Subject<O, U> as RawSubscribable<Sub>>::Unsub;

  #[inline(always)]
  fn raw_subscribe(self, subscriber: Sub) -> Self::Unsub {
    self.subject.raw_subscribe(subscriber)
  }
}

impl<'a, Item, Err, S> ConnectableObservable<S, LocalSubject<'a, Item, Err>> {
  pub fn local(observable: S) -> Self {
    Self {
      source: observable,
      subject: Subject::local(),
    }
  }
}

impl<Source, Item, Err> ConnectableObservable<Source, SharedSubject<Item, Err>>
where
  Source: IntoShared,
{
  pub fn shared(
    observable: Source,
  ) -> ConnectableObservable<Source::Shared, SharedSubject<Item, Err>> {
    ConnectableObservable {
      source: observable.to_shared(),
      subject: Subject::shared(),
    }
  }
}

impl<Source, O, U> Connect for ConnectableObservable<Source, Subject<O, U>>
where
  Source: RawSubscribable<Subscriber<O, U>>,
{
  type Unsub = Source::Unsub;
  fn connect(self) -> Self::Unsub {
    self.source.raw_subscribe(Subscriber {
      observer: self.subject.observers,
      subscription: self.subject.subscription,
    })
  }
}

impl<Source, Subject> IntoShared for ConnectableObservable<Source, Subject>
where
  Source: IntoShared,
  Subject: IntoShared,
{
  type Shared = ConnectableObservable<Source::Shared, Subject::Shared>;
  fn to_shared(self) -> Self::Shared {
    ConnectableObservable {
      source: self.source.to_shared(),
      subject: self.subject.to_shared(),
    }
  }
}

impl<Source, Subject> Fork for ConnectableObservable<Source, Subject>
where
  Subject: Fork,
{
  type Output = Subject::Output;
  #[inline(always)]
  fn fork(&self) -> Self::Output { self.subject.fork() }
}

#[cfg(test)]
mod test {
  use super::*;

  #[test]
  fn smoke() {
    let o = observable::of(100);
    let connected = ConnectableObservable::local(o);
    let mut first = 0;
    let mut second = 0;
    connected.fork().subscribe(|v| first = v);
    connected.fork().subscribe(|v| second = v);

    connected.connect();
    assert_eq!(first, 100);
    assert_eq!(second, 100);
  }

  #[test]
  fn fork_and_shared() {
    let o = observable::of(100);
    let connected = ConnectableObservable::local(o).to_shared();
    connected.fork().subscribe(|_| {});
    connected.fork().subscribe(|_| {});

    connected.connect();
  }
}