[][src]Trait rxrust::observable::Observable

pub trait Observable {
    type Item;
    type Err;
    fn first(self) -> TakeOp<Self>
    where
        Self: Sized
, { ... }
fn first_or(
        self,
        default: Self::Item
    ) -> FirstOrOp<TakeOp<Self>, Self::Item>
    where
        Self: Sized
, { ... }
fn last_or(self, default: Self::Item) -> LastOrOp<Self, Self::Item>
    where
        Self: Sized
, { ... }
fn last(self) -> LastOrOp<Self, Self::Item>
    where
        Self: Sized
, { ... }
fn map<B, Item, F>(self, f: F) -> MapOp<Self, F>
    where
        Self: Sized,
        F: Fn(B) -> Item
, { ... }
fn merge<S>(self, o: S) -> MergeOp<Self, S>
    where
        Self: Sized
, { ... }
fn filter<F>(self, filter: F) -> FilterOp<Self, F>
    where
        Self: Sized,
        F: Fn(&Self::Item) -> bool
, { ... }
fn box_it<O: IntoBox<Self>>(self) -> BoxOp<O>
    where
        Self: Sized
, { ... }
fn skip(self, count: u32) -> SkipOp<Self>
    where
        Self: Sized
, { ... }
fn skip_last(self, count: usize) -> SkipLastOp<Self>
    where
        Self: Sized
, { ... }
fn take(self, count: u32) -> TakeOp<Self>
    where
        Self: Sized
, { ... }
fn take_last(self, count: usize) -> TakeLastOp<Self>
    where
        Self: Sized
, { ... }
fn scan_initial<OutputItem, BinaryOp>(
        self,
        initial_value: OutputItem,
        binary_op: BinaryOp
    ) -> ScanOp<Self, BinaryOp, OutputItem>
    where
        Self: Sized,
        BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem
, { ... }
fn scan<OutputItem, BinaryOp>(
        self,
        binary_op: BinaryOp
    ) -> ScanOp<Self, BinaryOp, OutputItem>
    where
        Self: Sized,
        BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
        OutputItem: Default
, { ... }
fn reduce_initial<OutputItem, BinaryOp>(
        self,
        initial: OutputItem,
        binary_op: BinaryOp
    ) -> ReduceOp<Self, BinaryOp, OutputItem>
    where
        Self: Sized,
        BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
        OutputItem: Clone
, { ... }
fn reduce<OutputItem, BinaryOp>(
        self,
        binary_op: BinaryOp
    ) -> LastOrOp<ScanOp<Self, BinaryOp, OutputItem>, OutputItem>
    where
        Self: Sized,
        BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
        OutputItem: Default + Clone
, { ... }
fn max(self) -> MinMaxOp<Self, Self::Item>
    where
        Self: Sized,
        Self::Item: Copy + Send + PartialOrd<Self::Item>
, { ... }
fn min(self) -> MinMaxOp<Self, Self::Item>
    where
        Self: Sized,
        Self::Item: Copy + Send + PartialOrd<Self::Item>
, { ... }
fn sum(self) -> SumOp<Self, Self::Item>
    where
        Self: Sized,
        Self::Item: Copy + Default + Add<Self::Item, Output = Self::Item>
, { ... }
fn count(self) -> CountOp<Self, Self::Item>
    where
        Self: Sized
, { ... }
fn average(self) -> AverageOp<Self, Self::Item>
    where
        Self: Sized,
        Self::Item: Copy + Send + Default + Add<Self::Item, Output = Self::Item> + Mul<f64, Output = Self::Item>
, { ... }
fn publish<Subject: Default>(self) -> ConnectableObservable<Self, Subject>
    where
        Self: Sized
, { ... }
fn delay(self, dur: Duration) -> DelayOp<Self>
    where
        Self: Sized
, { ... }
fn delay_at(self, at: Instant) -> DelayOp<Self>
    where
        Self: Sized
, { ... }
fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD>
    where
        Self: Sized
, { ... }
fn observe_on<'a, SD>(self, scheduler: SD) -> ObserveOnOp<'a, Self, SD>
    where
        Self: Sized
, { ... }
fn throttle_time(
        self,
        duration: Duration,
        edge: ThrottleEdge
    ) -> ThrottleTimeOp<Self>
    where
        Self: Sized
, { ... } }

Associated Types

type Item

type Err

Loading content...

Provided methods

fn first(self) -> TakeOp<Self> where
    Self: Sized

emit only the first item emitted by an Observable

fn first_or(self, default: Self::Item) -> FirstOrOp<TakeOp<Self>, Self::Item> where
    Self: Sized

emit only the first item emitted by an Observable

fn last_or(self, default: Self::Item) -> LastOrOp<Self, Self::Item> where
    Self: Sized

Emit only the last final item emitted by a source observable or a default item given.

Completes right after emitting the single item. Emits error when source observable emits it.

Examples

use rxrust::prelude::*;

observable::empty()
  .last_or(1234)
  .subscribe(|v| println!("{}", v));

// print log:
// 1234

fn last(self) -> LastOrOp<Self, Self::Item> where
    Self: Sized

Emits only last final item emitted by a source observable.

Completes right after emitting the single last item, or when source observable completed, being an empty one. Emits error when source observable emits it.

Examples

use rxrust::prelude::*;

observable::from_iter(0..100)
  .last()
  .subscribe(|v| println!("{}", v));

// print log:
// 99

fn map<B, Item, F>(self, f: F) -> MapOp<Self, F> where
    Self: Sized,
    F: Fn(B) -> Item, 

Creates a new stream which calls a closure on each element and uses its return as the value.

fn merge<S>(self, o: S) -> MergeOp<Self, S> where
    Self: Sized

combine two Observables into one by merging their emissions

Example

let numbers = Subject::new();
// crate a even stream by filter
let even = numbers.clone().filter(|v| *v % 2 == 0);
// crate an odd stream by filter
let odd = numbers.clone().filter(|v| *v % 2 != 0);

// merge odd and even stream again
let merged = even.merge(odd);

// attach observers
merged.subscribe(|v: &i32| println!("{} ", v));

fn filter<F>(self, filter: F) -> FilterOp<Self, F> where
    Self: Sized,
    F: Fn(&Self::Item) -> bool

Emit only those items from an Observable that pass a predicate test

Example

use rxrust:: prelude::*;

let mut coll = vec![];
let coll_clone = coll.clone();

observable::from_iter(0..10)
  .filter(|v| *v % 2 == 0)
  .subscribe(|v| { coll.push(v); });

// only even numbers received.
assert_eq!(coll, vec![0, 2, 4, 6, 8]);

fn box_it<O: IntoBox<Self>>(self) -> BoxOp<O> where
    Self: Sized

box an observable to a safety object and convert it to a simple type BoxOp, which only care Item and Err Observable emitted.

Example

use rxrust::prelude::*;
use ops::box_it::LocalBoxOp;

let mut boxed: LocalBoxOp<'_, i32, ()> = observable::of(1)
  .map(|v| v).box_it();

// BoxOp can box any observable type
boxed = observable::empty().box_it();

boxed.subscribe(|_| {});

fn skip(self, count: u32) -> SkipOp<Self> where
    Self: Sized

Ignore the first count values emitted by the source Observable.

skip returns an Observable that ignore the first count values emitted by the source Observable. If the source emits fewer than count values then 0 of its values are emitted. After that, it completes, regardless if the source completes.

Example

Ignore the first 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10).skip(5).subscribe(|v| println!("{}", v));
// print logs:
// 6
// 7
// 8
// 9
// 10

fn skip_last(self, count: usize) -> SkipLastOp<Self> where
    Self: Sized

Ignore the last count values emitted by the source Observable.

skip_last returns an Observable that ignore the last count values emitted by the source Observable. If the source emits fewer than count values then 0 of its values are emitted. It will not emit values until source Observable complete.

Example

Skip the last 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10)
  .skip_last(5)
  .subscribe(|v| println!("{}", v));

// print logs:
// 0
// 1
// 2
// 3
// 4

fn take(self, count: u32) -> TakeOp<Self> where
    Self: Sized

Emits only the first count values emitted by the source Observable.

take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.

Example

Take the first 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10).take(5).subscribe(|v| println!("{}", v));
// print logs:
// 0
// 1
// 2
// 3
// 4

fn take_last(self, count: usize) -> TakeLastOp<Self> where
    Self: Sized

Emits only the last count values emitted by the source Observable.

take_last returns an Observable that emits only the last count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. It will not emit values until source Observable complete.

Example

Take the last 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10)
  .take_last(5)
.subscribe(|v| println!("{}", v));
// print logs:
// 5
// 6
// 7
// 8
// 9

fn scan_initial<OutputItem, BinaryOp>(
    self,
    initial_value: OutputItem,
    binary_op: BinaryOp
) -> ScanOp<Self, BinaryOp, OutputItem> where
    Self: Sized,
    BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem, 

The Scan operator applies a function to the first item emitted by the source observable and then emits the result of that function as its own first emission. It also feeds the result of the function back into the function along with the second item emitted by the source observable in order to generate its second emission. It continues to feed back its own subsequent emissions along with the subsequent emissions from the source Observable in order to create the rest of its sequence.

Applies a binary operator closure to each item emitted from source observable and emits successive values.

Completes when source observable completes. Emits error when source observable emits it.

This version starts with an user-specified initial value for when the binary operator is called with the first item processed.

Arguments

  • initial_value - An initial value to start the successive accumulations from.
  • binary_op - A closure or function acting as a binary operator.

Examples

use rxrust::prelude::*;

observable::from_iter(vec![1, 1, 1, 1, 1])
  .scan_initial(100, |acc, v| acc + v)
  .subscribe(|v| println!("{}", v));

// print log:
// 101
// 102
// 103
// 104
// 105

fn scan<OutputItem, BinaryOp>(
    self,
    binary_op: BinaryOp
) -> ScanOp<Self, BinaryOp, OutputItem> where
    Self: Sized,
    BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
    OutputItem: Default

Works like scan_initial but starts with a value defined by a Default trait for the first argument binary_op operator operates on.

Arguments

  • binary_op - A closure or function acting as a binary operator.

fn reduce_initial<OutputItem, BinaryOp>(
    self,
    initial: OutputItem,
    binary_op: BinaryOp
) -> ReduceOp<Self, BinaryOp, OutputItem> where
    Self: Sized,
    BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
    OutputItem: Clone

Apply a function to each item emitted by an observable, sequentially, and emit the final value, after source observable completes.

Emits error when source observable emits it.

Arguments

  • initial - An initial value to start the successive reduction from.
  • binary_op - A closure acting as a binary (folding) operator.

Examples

use rxrust::prelude::*;

observable::from_iter(vec![1, 1, 1, 1, 1])
  .reduce_initial(100, |acc, v| acc + v)
  .subscribe(|v| println!("{}", v));

// print log:
// 105

fn reduce<OutputItem, BinaryOp>(
    self,
    binary_op: BinaryOp
) -> LastOrOp<ScanOp<Self, BinaryOp, OutputItem>, OutputItem> where
    Self: Sized,
    BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
    OutputItem: Default + Clone

Works like reduce_initial but starts with a value defined by a Default trait for the first argument f operator operates on.

Arguments

  • binary_op - A closure acting as a binary operator.

fn max(self) -> MinMaxOp<Self, Self::Item> where
    Self: Sized,
    Self::Item: Copy + Send + PartialOrd<Self::Item>, 

Emits the item from the source observable that had the maximum value.

Emits error when source observable emits it.

Examples

use rxrust::prelude::*;

observable::from_iter(vec![3., 4., 7., 5., 6.])
  .max()
  .subscribe(|v| println!("{}", v));

// print log:
// 7

fn min(self) -> MinMaxOp<Self, Self::Item> where
    Self: Sized,
    Self::Item: Copy + Send + PartialOrd<Self::Item>, 

Emits the item from the source observable that had the minimum value.

Emits error when source observable emits it.

Examples

use rxrust::prelude::*;

observable::from_iter(vec![3., 4., 7., 5., 6.])
  .min()
  .subscribe(|v| println!("{}", v));

// print log:
// 3

fn sum(self) -> SumOp<Self, Self::Item> where
    Self: Sized,
    Self::Item: Copy + Default + Add<Self::Item, Output = Self::Item>, 

Calculates the sum of numbers emitted by an source observable and emits this sum when source completes.

Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.

Examples

use rxrust::prelude::*;

observable::from_iter(vec![1, 1, 1, 1, 1])
  .sum()
  .subscribe(|v| println!("{}", v));

// p rint log:
// 5

fn count(self) -> CountOp<Self, Self::Item> where
    Self: Sized

Emits the number of items emitted by a source observable when this source completes.

The output type of this operator is fixed to usize.

Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.

Examples

use rxrust::prelude::*;

observable::from_iter(vec!['1', '7', '3', '0', '4'])
  .count()
  .subscribe(|v| println!("{}", v));

// print log:
// 5

fn average(self) -> AverageOp<Self, Self::Item> where
    Self: Sized,
    Self::Item: Copy + Send + Default + Add<Self::Item, Output = Self::Item> + Mul<f64, Output = Self::Item>, 

Calculates the sum of numbers emitted by an source observable and emits this sum when source completes.

Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.

Examples

use rxrust::prelude::*;

observable::from_iter(vec![3., 4., 5., 6., 7.])
  .average()
  .subscribe(|v| println!("{}", v));

// print log:
// 5

fn publish<Subject: Default>(self) -> ConnectableObservable<Self, Subject> where
    Self: Sized

Returns a ConnectableObservable. A ConnectableObservable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items.

fn delay(self, dur: Duration) -> DelayOp<Self> where
    Self: Sized

Delays the emission of items from the source Observable by a given timeout or until a given Instant.

fn delay_at(self, at: Instant) -> DelayOp<Self> where
    Self: Sized

fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD> where
    Self: Sized

Specify the Scheduler on which an Observable will operate

With SubscribeON you can decide what type of scheduler a specific Observable will be using when it is subscribed to.

Schedulers control the speed and order of emissions to observers from an Observable stream.

Example

Given the following code:

use rxrust::prelude::*;

let a = observable::from_iter(1..5);
let b = observable::from_iter(5..10);
a.merge(b).subscribe(|v| print!("{} ", v));

Both Observable a and b will emit their values directly and synchronously once they are subscribed to. This will result in the output of 1 2 3 4 5 6 7 8 9.

But if we instead use the subscribe_on operator declaring that we want to use the new thread scheduler for values emitted by Observable a:

use rxrust::prelude::*;
use rxrust::scheduler::Schedulers;
use std::thread;

let a = observable::from_iter(1..5).subscribe_on(Schedulers::NewThread);
let b = observable::from_iter(5..10);
a.merge(b).to_shared().subscribe(|v|{
  let handle = thread::current();
  print!("{}({:?}) ", v, handle.id())
});

The output will instead by `1(thread 1) 2(thread 1) 3(thread 1) 4(thread

  1. 5(thread 2) 6(thread 2) 7(thread 2) 8(thread 2) 9(thread id2). The reason for this is that Observable bemits its values directly like before, but the emissions fromaare scheduled on a new thread because we are now using theNewThread` Scheduler for that specific Observable.

fn observe_on<'a, SD>(self, scheduler: SD) -> ObserveOnOp<'a, Self, SD> where
    Self: Sized

Re-emits all notifications from source Observable with specified scheduler.

ObserveOn is an operator that accepts a scheduler as the parameter, which will be used to reschedule notifications emitted by the source Observable.

fn throttle_time(
    self,
    duration: Duration,
    edge: ThrottleEdge
) -> ThrottleTimeOp<Self> where
    Self: Sized

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

#Example

use rxrust::{ prelude::*, ops::throttle_time::ThrottleEdge };
use std::time::Duration;

observable::interval(Duration::from_millis(1))
  .to_shared()
  .throttle_time(Duration::from_millis(9), ThrottleEdge::Leading)
  .to_shared()
  .subscribe(move |v| println!("{}", v));
Loading content...

Implementors

impl<'a, Item, Err> Observable for LocalSubject<'a, Item, Err>[src]

type Item = Item

type Err = Err

impl<'a, Item, Err, S> Observable for RefCount<InnerLocalRefCount<'a, S, Item, Err>, LocalConnectableObservable<'a, S, Item, Err>> where
    S: LocalObservable<'a, Item = Item, Err = Err>, 
[src]

type Item = Item

type Err = Err

impl<'a, Item, S, F> Observable for FilterMapOp<S, F> where
    S: Observable,
    F: FnMut(S::Item) -> Option<Item>, 
[src]

type Item = Item

type Err = S::Err

impl<'a, Item: 'a, Err: 'a> Observable for LocalBoxOp<'a, Item, Err>[src]

type Item = Item

type Err = Err

impl<'a, MI, ME, Item, Err, MapItem, MapErr> Observable for MutRefSubject<'a, Item, Err, MI, ME> where
    MI: Fn(Item) -> MapItem + 'a,
    ME: Fn(Err) -> MapErr + 'a, 
[src]

type Item = MapItem

type Err = MapErr

impl<'a, S, SD> Observable for ObserveOnOp<'a, S, SD> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<Emit> Observable for ObservableBase<Emit> where
    Emit: Emitter
[src]

type Item = Emit::Item

type Err = Emit::Err

impl<Item, Err> Observable for SharedBoxOp<Item, Err>[src]

type Item = Item

type Err = Err

impl<Item, Err> Observable for SharedSubject<Item, Err>[src]

type Item = Item

type Err = Err

impl<Item, Err, S> Observable for RefCount<InnerSharedRefCount<S, Item, Err>, SharedConnectableObservable<S, Item, Err>> where
    S: SharedObservable<Item = Item, Err = Err>, 
[src]

type Item = Item

type Err = Err

impl<Item, S> Observable for LastOrOp<S, Item> where
    S: Observable<Item = Item>, 
[src]

type Item = Item

type Err = S::Err

impl<Item, S, M> Observable for MapOp<S, M> where
    S: Observable,
    M: FnMut(S::Item) -> Item, 
[src]

type Item = Item

type Err = S::Err

impl<OutputItem, Source, BinaryOp> Observable for ScanOp<Source, BinaryOp, OutputItem> where
    Source: Observable,
    OutputItem: Clone,
    BinaryOp: FnMut(OutputItem, Source::Item) -> OutputItem, 
[src]

type Item = OutputItem

type Err = Source::Err

impl<S> Observable for DelayOp<S> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<S> Observable for SkipOp<S> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<S> Observable for SkipLastOp<S> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<S> Observable for TakeOp<S> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<S> Observable for TakeLastOp<S> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<S> Observable for ThrottleTimeOp<S> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<S> Observable for Shared<S> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<S, F> Observable for FilterOp<S, F> where
    S: Observable,
    F: FnMut(&S::Item) -> bool
[src]

type Item = S::Item

type Err = S::Err

impl<S, Item> Observable for FirstOrOp<S, Item> where
    S: Observable<Item = Item>, 
[src]

type Item = Item

type Err = S::Err

impl<S, SD> Observable for SubscribeOnOP<S, SD> where
    S: Observable
[src]

type Item = S::Item

type Err = S::Err

impl<S1, S2> Observable for MergeOp<S1, S2> where
    S1: Observable,
    S2: Observable<Item = S1::Item, Err = S1::Err>, 
[src]

type Item = S1::Item

type Err = S1::Err

impl<Source, Subject> Observable for ConnectableObservable<Source, Subject> where
    Source: Observable
[src]

type Item = Source::Item

type Err = Source::Err

Loading content...