another-rxrust 0.0.24

another implementation of `rxRust`.
Documentation

another-rxrust

Why not rxRust?

rxRust is a great Rust implementation of ReactiveX. However, when rxRust combines observable in a slightly complicated way, rust peculiar difficulties are exposed.

Therefore, I implemented ReactiveX in a different way than rxRust, and created another-rxrust that can be easily described even if the following observable is combined in a complicated manner.

This implementation is not a panacea. Rust sacrifices memory efficiency, speed, and much more. If you want performance, I think you should use rxRust.

use crate::prelude::*;
use anyhow::anyhow;
use std::{thread, time};

fn main() {
  fn ob() -> Observable<'static, i32> {
    Observable::create(|s| {
      s.next(100);
      s.next(200);
      s.complete();
    })
  }

  observables::from_iter(1..10)
    .observe_on(schedulers::new_thread_scheduler())
    .flat_map(|x| match x {
      1 => observables::empty(),
      2 => observables::just(x),
      3 => ob().map(move |y| (y + x)),
      4 => observables::error(RxError::new(anyhow!("err"))),
      _ => observables::never(),
    })
    .map(|x| format!("{}", x))
    .on_error_resume_next(|e| ob().map(move |x| format!("resume {:} {}", e.error, x)))
    .subscribe(
      |x| {
        println!("next {}", x);
      },
      |e| {
        println!("error {:}", e.error);
      },
      || {
        println!("complete");
      },
    );

  thread::sleep(time::Duration::from_millis(600));
}

// next 2
// next 103
// next 203
// next resume err 100
// next resume err 200
// complete

Implementation policy

Based on the problems of rxRust, another-rxrust has the following implementation policy.

  • It is assumed that the values and functions that can be emitted may be shared between threads.
  • Value to emit should be Clone + Send + Sync only.
  • Use move to emit values ​​as much as possible.
  • Functions should be Fn() + Send + Sync only.
  • Default errors use std::any. If features=["anyhow"] use anyhow::Error.
  • Prioritize flexibility over memory efficiency and execution speed.

Usage

default

[dependencies]
another-rxrust = {}

use anyhow::Error

[dependencies]
another-rxrust = {features=["anyhow"]}

Implementation status

Quoted from ReactiveX.

Creating Observables

Operators that originate new Observables.

  • Create — create an Observable from scratch by calling observer methods programmatically
  • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
  • Empty/Never/Throw — create Observables that have very precise and limited behavior
    • Throw - error
  • From — convert some other object or data structure into an Observable
    • from_iter
  • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
  • Just — convert an object or a set of objects into an Observable that emits that or those objects
  • Range — create an Observable that emits a range of sequential integers
  • Repeat — create an Observable that emits a particular item or sequence of items repeatedly
  • Start — create an Observable that emits the return value of a function
  • Timer — create an Observable that emits a single item after a given delay

Transforming Observables

Operators that transform items that are emitted by an Observable.

  • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
  • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
  • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
  • Map — transform the items emitted by an Observable by applying a function to each item
  • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

Filtering Observables

Operators that selectively emit items from a source Observable.

  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
  • Distinct — suppress duplicate items emitted by an Observable
    • distinct_until_changed
  • ElementAt — emit only item n emitted by an Observable
  • Filter — emit only those items from an Observable that pass a predicate test
  • First — emit only the first item, or the first item that meets a condition, from an Observable
  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
  • Last — emit only the last item emitted by an Observable
  • Sample — emit the most recent item emitted by an Observable within periodic time intervals
  • Skip — suppress the first n items emitted by an Observable
  • SkipLast — suppress the last n items emitted by an Observable
  • Take — emit only the first n items emitted by an Observable
  • TakeLast — emit only the last n items emitted by an Observable

Combining Observables

Operators that work with multiple source Observables to create a single Observable

  • And/Then/When — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries
  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
  • Merge — combine multiple Observables into one by merging their emissions
  • StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
  • Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
  • Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Error Handling Operators

Operators that help to recover from error notifications from an Observable

  • Catch — recover from an onError notification by continuing the sequence without error
    • on_error_resume_next
  • Retry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error
    • retry
    • retry_when

Observable Utility Operators

A toolbox of useful Operators for working with Observables

  • Delay — shift the emissions from an Observable forward in time by a particular amount
  • Do — register an action to take upon a variety of Observable lifecycle events
    • tap
  • Materialize/Dematerialize — represent both the items emitted and the notifications sent as emitted items, or reverse this process
  • ObserveOn — specify the scheduler on which an observer will observe this Observable
  • Serialize — force an Observable to make serialized calls and to be well-behaved
  • Subscribe — operate upon the emissions and notifications from an Observable
  • SubscribeOn — specify the scheduler an Observable should use when it is subscribed to
  • TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
  • Timeout — mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
  • Timestamp — attach a timestamp to each item emitted by an Observable
  • Using — create a disposable resource that has the same lifespan as the Observable

Conditional and Boolean Operators

Operators that evaluate one or more Observables or items emitted by Observables

  • All — determine whether all items emitted by an Observable meet some criteria
  • Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
  • Contains — determine whether an Observable emits a particular item or not
  • DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
  • SequenceEqual — determine whether two Observables emit the same sequence of items
  • SkipUntil — discard items emitted by an Observable until a second Observable emits an item
  • SkipWhile — discard items emitted by an Observable until a specified condition becomes false
  • TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
  • TakeWhile — discard items emitted by an Observable after a specified condition becomes false

Mathematical and Aggregate Operators

Operators that operate on the entire sequence of items emitted by an Observable

  • Average — calculates the average of numbers emitted by an Observable and emits this average
  • Concat — emit the emissions from two or more Observables without interleaving them
  • Count — count the number of items emitted by the source Observable and emit only this value
  • Max — determine, and emit, the maximum-valued item emitted by an Observable
  • Min — determine, and emit, the minimum-valued item emitted by an Observable
  • Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
  • Sum — calculate the sum of numbers emitted by an Observable and emit this sum

Connectable Observable Operators

Specialty Observables that have more precisely-controlled subscription dynamics

  • Connect — instruct a connectable Observable to begin emitting items to its subscribers
  • Publish — convert an ordinary Observable into a connectable Observable
  • RefCount — make a Connectable Observable behave like an ordinary Observable
  • Replay — ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items

Operators to Convert Observables

  • To — convert an Observable into another object or data structure

Subjects

Schedulers

  • Default - scheduler to run on the current thread.
  • NewThread - scheduler that creates a new thread and executes it there.

Others

  • pipe - for custom operators.

Utilities

  • subscribe macros
    • junk_next!()
    • junk_error!()
    • junk_complete!()
    • print_next!()
    • print_error!()
    • print_complete!()
    • print_next_fmt!()
    • panic_error!()