rxrust 1.0.0-alpha.5

A Rust implementation of Reactive Extensions.
use crate::prelude::*;

pub trait BoxObservable<'a> {
  type Item;
  type Err;
  fn box_subscribe(
    self: Box<Self>,
    observer: Box<dyn Observer<Item = Self::Item, Err = Self::Err> + 'a>,
  ) -> Box<dyn SubscriptionLike>;

pub trait SharedBoxObservable {
  type Item;
  type Err;
  fn box_subscribe(
    self: Box<Self>,
    observer: Box<
      dyn Observer<Item = Self::Item, Err = Self::Err> + Send + Sync,
  ) -> Box<dyn SubscriptionLike + Send + Sync>;

macro_rules! box_observable_impl {
    ($subscription:ty, $source:ident, $($marker:ident +)* $lf: lifetime) => {
  type Item = $source::Item;
  type Err = $source::Err;
  fn box_subscribe(
    self: Box<Self>,
    observer: Box<
                dyn Observer<Item=Self::Item,Err= Self::Err>
                + $($marker +)* $lf
  ) -> Box<dyn SubscriptionLike + $($marker +)*>  {

impl<'a, T> BoxObservable<'a> for T
  T: LocalObservable<'a> + 'a,
  T::Unsub: 'static,
  box_observable_impl!(LocalSubscription, T, 'a);

impl<T> SharedBoxObservable for T
  T: SharedObservable,
  T::Unsub: Send + Sync + 'static,
  T::Item: Send + Sync + 'static,
  T::Err: Send + Sync + 'static,
  box_observable_impl!(SharedSubscription, T, Send + Sync + 'static);

pub struct BoxOp<T>(T);

impl<T: Clone> Clone for BoxOp<T> {
  fn clone(&self) -> Self {

pub type LocalBoxOp<'a, Item, Err> =
  BoxOp<Box<dyn BoxObservable<'a, Item = Item, Err = Err> + 'a>>;
pub type LocalCloneBoxOp<'a, Item, Err> =
  BoxOp<Box<dyn BoxClone<'a, Item = Item, Err = Err> + 'a>>;
pub type SharedBoxOp<Item, Err> =
  BoxOp<Box<dyn SharedBoxObservable<Item = Item, Err = Err> + Send + Sync>>;
pub type SharedCloneBoxOp<Item, Err> =
  BoxOp<Box<dyn SharedBoxClone<Item = Item, Err = Err>>>;

macro_rules! observable_impl {
    ($subscription:ty, $($marker:ident +)* $lf: lifetime) => {
  fn actual_subscribe<O>(
    observer: O
  ) -> Self::Unsub
  where O: Observer<Item=Self::Item,Err= Self::Err> + $($marker +)* $lf {

impl<'a, Item, Err> Observable for LocalBoxOp<'a, Item, Err> {
  type Item = Item;
  type Err = Err;
impl<'a, Item, Err> LocalObservable<'a> for LocalBoxOp<'a, Item, Err> {
  type Unsub = Box<dyn SubscriptionLike>;
  observable_impl!(LocalSubscription, 'a);

impl<Item, Err> Observable for SharedBoxOp<Item, Err> {
  type Item = Item;
  type Err = Err;

impl<Item, Err> SharedObservable for SharedBoxOp<Item, Err> {
  type Unsub = Box<dyn SubscriptionLike + Send + Sync>;
  observable_impl!(SharedSubscription, Send + Sync + 'static);

impl<'a, Item, Err> Observable for LocalCloneBoxOp<'a, Item, Err> {
  type Item = Item;
  type Err = Err;
impl<'a, Item, Err> LocalObservable<'a> for LocalCloneBoxOp<'a, Item, Err> {
  type Unsub = Box<dyn SubscriptionLike + 'a>;
  observable_impl!(LocalSubscription, 'a);

impl<Item, Err> Observable for SharedCloneBoxOp<Item, Err> {
  type Item = Item;
  type Err = Err;
impl<Item, Err> SharedObservable for SharedCloneBoxOp<Item, Err> {
  type Unsub = Box<dyn SubscriptionLike + Send + Sync>;
  observable_impl!(SharedSubscription, Send + Sync + 'static);

/// FIXME: IntoBox should use associated type instead of generic after rust
/// generic specialization supported and work for associated type. So we have
/// different specialized version for same type, and type infer will work fine.
pub trait IntoBox<T> {
  fn box_it(origin: T) -> BoxOp<Self>
    Self: Sized;

impl<'a, T> IntoBox<T>
  for Box<dyn BoxObservable<'a, Item = T::Item, Err = T::Err> + 'a>
  T: LocalObservable<'a> + 'a,
  T::Unsub: 'static,
  fn box_it(origin: T) -> BoxOp<Self> {

impl<T> IntoBox<T>
  for Box<dyn SharedBoxObservable<Item = T::Item, Err = T::Err> + Send + Sync>
  T: SharedObservable + Send + Sync + 'static,
  T::Item: Send + Sync + 'static,
  T::Err: Send + Sync + 'static,
  T::Unsub: Send + Sync,
  fn box_it(origin: T) -> BoxOp<Self> {

// support box observable clone
pub trait BoxClone<'a>: BoxObservable<'a> {
  fn box_clone(
  ) -> Box<dyn BoxClone<'a, Item = Self::Item, Err = Self::Err> + 'a>;

impl<'a, T> BoxClone<'a> for T
  T: BoxObservable<'a> + Clone + 'a,
  fn box_clone(
  ) -> Box<dyn BoxClone<'a, Item = Self::Item, Err = Self::Err> + 'a> {

impl<'a, Item, Err> Clone
  for Box<dyn BoxClone<'a, Item = Item, Err = Err> + 'a>
  fn clone(&self) -> Self {

impl<'a, T> IntoBox<T>
  for Box<dyn BoxClone<'a, Item = T::Item, Err = T::Err> + 'a>
  T: LocalObservable<'a> + Clone + 'a,
  T::Unsub: 'static,
  fn box_it(origin: T) -> BoxOp<Self> {

pub trait SharedBoxClone: SharedBoxObservable {
  fn box_clone(
  ) -> Box<dyn SharedBoxClone<Item = Self::Item, Err = Self::Err>>;

impl<T> SharedBoxClone for T
  T: SharedBoxObservable + Clone + 'static,
  fn box_clone(
  ) -> Box<dyn SharedBoxClone<Item = Self::Item, Err = Self::Err>> {

impl<Item, Err> Clone for Box<dyn SharedBoxClone<Item = Item, Err = Err>> {
  fn clone(&self) -> Self {

impl<T> IntoBox<T> for Box<dyn SharedBoxClone<Item = T::Item, Err = T::Err>>
  T: SharedBoxObservable + Clone + 'static,
  fn box_it(origin: T) -> BoxOp<Self> {

#[cfg(not(target_arch = "wasm32"))]
mod test {
  use crate::prelude::*;
  use bencher::Bencher;
  use ops::box_it::{BoxClone, SharedBoxClone};
  use ops::box_it::{LocalBoxOp, SharedBoxOp};

  fn box_observable() {
    let mut test = 0;
    let mut boxed: LocalBoxOp<'_, i32, ()> = observable::of(100).box_it();
    boxed.subscribe(|v| test = v);

    boxed = observable::empty().box_it();
    boxed.subscribe(|_| unreachable!());
    assert_eq!(test, 100);

  fn shared_box_observable() {
    let mut boxed: SharedBoxOp<i32, ()> = observable::of(100).box_it();
    boxed.into_shared().subscribe(|_| {});

    boxed = observable::empty().box_it();
    boxed.into_shared().subscribe(|_| unreachable!());

  fn box_clone() {
      .box_it::<Box<dyn BoxClone<Item = _, Err = _>>>()
      .subscribe(|_| {});

  fn shared_box_clone() {
      .box_it::<Box<dyn SharedBoxClone<Item = _, Err = _>>>()
      .subscribe(|_| {});

  fn bench() {

  benchmark_group!(do_bench, bench_box_clone);

  fn bench_box_clone(b: &mut Bencher) {