pub trait ObservableExt<Item, Err>: Sized {
Show 85 methods // Provided methods fn first(self) -> TakeOp<Self> { ... } fn first_or(self, default: Item) -> DefaultIfEmptyOp<TakeOp<Self>, Item> { ... } fn last_or(self, default: Item) -> DefaultIfEmptyOp<LastOp<Self, Item>, Item> { ... } fn element_at(self, nth: usize) -> TakeOp<SkipOp<Self>> { ... } fn ignore_elements(self) -> FilterOp<Self, fn(_: &Item) -> bool> { ... } fn all<F>( self, pred: F ) -> DefaultIfEmptyOp<TakeOp<FilterOp<MapOp<Self, F, Item>, fn(_: &bool) -> bool>>, bool> where F: Fn(Item) -> bool { ... } fn contains(self, target: Item) -> ContainsOp<Self, Item> { ... } fn last(self) -> LastOp<Self, Item> { ... } fn finalize<F>(self, f: F) -> FinalizeOp<Self, F> where F: FnMut() { ... } fn finalize_threads<F>(self, f: F) -> FinalizeOpThreads<Self, F> where F: FnMut() { ... } fn flatten<'a, Item2, Err2>(self) -> MergeAllOp<'a, Self, Item> where Item: ObservableExt<Item2, Err2> { ... } fn flatten_threads<Item2, Err2>(self) -> MergeAllOpThreads<Self, Item> where Item: ObservableExt<Item2, Err2> { ... } fn flat_map<'a, V, Item2, F>(self, f: F) -> FlatMapOp<'a, Self, V, F, Item> where F: Fn(Item) -> V, MapOp<Self, F, Item>: ObservableExt<V, Err>, V: ObservableExt<Item2, Err> { ... } fn flat_map_threads<V, Item2, F>( self, f: F ) -> FlatMapOpThreads<Self, V, F, Item> where F: Fn(Item) -> V, MapOp<Self, F, Item>: ObservableExt<V, Err>, V: ObservableExt<Item2, Err> { ... } fn group_by<D, Key, Subject>(self, discr: D) -> GroupByOp<Self, D, Subject> where D: FnMut(&Item) -> Key, Key: Hash + Eq + Clone, Subject: Clone + Default + Observer<Item, Err> { ... } fn map<B, F>(self, f: F) -> MapOp<Self, F, Item> where F: FnMut(Item) -> B { ... } fn on_error_map<B, F>(self, f: F) -> OnErrorMapOp<Self, F, Err> where F: FnMut(Err) -> B { ... } fn map_to<B>(self, value: B) -> MapToOp<Self, B, Item> { ... } fn merge<S>(self, other: S) -> MergeOp<Self, S> where S: ObservableExt<Item, Err> { ... } fn merge_threads<S>(self, other: S) -> MergeOpThreads<Self, S> where S: ObservableExt<Item, Err> { ... } fn merge_all<'a, Item2>( self, concurrent: usize ) -> MergeAllOp<'a, Self, Item> where Item: ObservableExt<Item2, Err> { ... } fn merge_all_threads<Item2>( self, concurrent: usize ) -> MergeAllOpThreads<Self, Item> where Item: ObservableExt<Item2, Err> { ... } fn filter<F>(self, filter: F) -> FilterOp<Self, F> where F: Fn(&Item) -> bool { ... } fn filter_map<F, OutputItem>(self, f: F) -> FilterMapOp<Self, F, Item> where F: FnMut(Item) -> Option<OutputItem> { ... } fn skip(self, count: usize) -> SkipOp<Self> { ... } fn skip_until<NotifyItem, NotifyErr, Other>( self, notifier: Other ) -> SkipUntilOp<Self, Other, NotifyItem, NotifyErr> where Other: ObservableExt<NotifyItem, NotifyErr> { ... } fn skip_until_threads<NotifyItem, NotifyErr, Other>( self, notifier: Other ) -> SkipUntilOpThreads<Self, Other, NotifyItem, NotifyErr> where Other: ObservableExt<NotifyItem, NotifyErr> { ... } fn skip_while<F>(self, predicate: F) -> SkipWhileOp<Self, F> where F: FnMut(&Item) -> bool { ... } fn skip_last(self, count: usize) -> SkipLastOp<Self> { ... } fn take(self, count: usize) -> TakeOp<Self> { ... } fn take_until<Notify, NotifyItem, NotifyErr>( self, notifier: Notify ) -> TakeUntilOp<Self, Notify, NotifyItem, NotifyErr> { ... } fn take_until_threads<Notify, NotifyItem, NotifyErr>( self, notifier: Notify ) -> TakeUntilOpThreads<Self, Notify, NotifyItem, NotifyErr> { ... } fn take_while<F>(self, callback: F) -> TakeWhileOp<Self, F> where F: FnMut(&Item) -> bool { ... } fn take_while_inclusive<F>(self, callback: F) -> TakeWhileOp<Self, F> where F: FnMut(&Item) -> bool { ... } fn take_last(self, count: usize) -> TakeLastOp<Self> { ... } fn sample<Sample, SampleItem, SampleErr>( self, sampling: Sample ) -> SampleOp<Self, Sample, SampleItem> where Sample: ObservableExt<SampleItem, SampleErr> { ... } fn sample_threads<Sample, SampleItem, SampleErr>( self, sampling: Sample ) -> SampleOpThreads<Self, Sample, SampleItem> where Sample: ObservableExt<SampleItem, SampleErr> { ... } fn scan_initial<OutputItem, BinaryOp>( self, initial_value: OutputItem, binary_op: BinaryOp ) -> ScanOp<Self, BinaryOp, OutputItem, Item> where BinaryOp: Fn(OutputItem, Item) -> OutputItem, OutputItem: Clone { ... } fn scan<OutputItem, BinaryOp>( self, binary_op: BinaryOp ) -> ScanOp<Self, BinaryOp, OutputItem, Item> where BinaryOp: Fn(OutputItem, Item) -> OutputItem, OutputItem: Default + Clone { ... } fn reduce_initial<OutputItem, BinaryOp>( self, initial: OutputItem, binary_op: BinaryOp ) -> ReduceOp<Self, BinaryOp, OutputItem, Item> where BinaryOp: Fn(OutputItem, Item) -> OutputItem, OutputItem: Clone { ... } fn reduce<OutputItem, BinaryOp>( self, binary_op: BinaryOp ) -> ReduceOp<Self, BinaryOp, OutputItem, Item> where BinaryOp: Fn(OutputItem, Item) -> OutputItem, OutputItem: Default + Clone { ... } fn max(self) -> MinMaxOp<Self, Item> where Item: PartialOrd<Item> + Clone { ... } fn min(self) -> MinMaxOp<Self, Item> where Item: Clone + PartialOrd<Item> { ... } fn sum(self) -> SumOp<Self, Item> where Item: Clone + Default + Add<Item, Output = Item> { ... } fn count(self) -> CountOp<Self, Item> { ... } fn average(self) -> AverageOp<Self, Item> where Item: Clone + Default + Add<Item, Output = Item> + Mul<f64, Output = Item>, ScanOp<Self, fn(_: Accum<Item>, _: Item) -> Accum<Item>, Accum<Item>, Item>: ObservableExt<Accum<Item>, Err>, LastOp<ScanOp<Self, fn(_: Accum<Item>, _: Item) -> Accum<Item>, Accum<Item>, Item>, Accum<Item>>: ObservableExt<Accum<Item>, Err> { ... } fn publish<Subject: Default>(self) -> ConnectableObservable<Self, Subject> { ... } fn share<'a>(self) -> ShareOp<'a, Item, Err, Self> { ... } fn share_threads(self) -> ShareOpThreads<Item, Err, Self> { ... } fn delay<SD>(self, dur: Duration, scheduler: SD) -> DelayOp<Self, SD> { ... } fn delay_threads<SD>( self, dur: Duration, scheduler: SD ) -> DelayOpThreads<Self, SD> { ... } fn delay_at<SD>(self, at: Instant, scheduler: SD) -> DelayOp<Self, SD> { ... } fn delay_at_threads<SD>( self, at: Instant, scheduler: SD ) -> DelayOpThreads<Self, SD> { ... } fn delay_subscription<SD>( self, dur: Duration, scheduler: SD ) -> DelaySubscriptionOp<Self, SD> { ... } fn delay_subscription_at<SD>( self, at: Instant, scheduler: SD ) -> DelaySubscriptionOp<Self, SD> { ... } fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD> { ... } fn observe_on<SD>(self, scheduler: SD) -> ObserveOnOp<Self, SD> { ... } fn observe_on_threads<SD>( self, scheduler: SD ) -> ObserveOnOpThreads<Self, SD> { ... } fn debounce<SD>( self, duration: Duration, scheduler: SD ) -> DebounceOp<Self, SD> { ... } fn throttle<SD, F>( self, duration_selector: F, edge: ThrottleEdge, scheduler: SD ) -> ThrottleOp<Self, SD, F> where F: Fn(&Item) -> Duration { ... } fn throttle_time<SD>( self, duration: Duration, edge: ThrottleEdge, scheduler: SD ) -> ThrottleOp<Self, SD, Box<dyn Fn(&Item) -> Duration + Send + Sync>> where Item: 'static { ... } fn distinct(self) -> DistinctOp<Self> { ... } fn distinct_key<F>(self, key: F) -> DistinctKeyOp<Self, F> { ... } fn distinct_until_changed(self) -> DistinctUntilChangedOp<Self> { ... } fn distinct_until_key_changed<F>( self, key: F ) -> DistinctUntilKeyChangedOp<Self, F> { ... } fn zip<Other, Item2>(self, other: Other) -> ZipOp<Self, Other> where Other: ObservableExt<Item2, Err> { ... } fn zip_threads<Other, Item2>(self, other: Other) -> ZipOpThreads<Self, Other> where Other: ObservableExt<Item2, Err> { ... } fn with_latest_from<From, OtherItem>( self, from: From ) -> WithLatestFromOp<Self, From> where From: ObservableExt<OtherItem, Err>, OtherItem: Clone { ... } fn with_latest_from_threads<From, OtherItem>( self, from: From ) -> WithLatestFromOpThreads<Self, From> where From: ObservableExt<OtherItem, Err>, OtherItem: Clone { ... } fn default_if_empty( self, default_value: Item ) -> DefaultIfEmptyOp<Self, Item> { ... } fn buffer_with_count(self, count: usize) -> BufferWithCountOp<Self> { ... } fn buffer_with_time<S>( self, time: Duration, scheduler: S ) -> BufferWithTimeOp<Self, S> { ... } fn buffer_with_count_and_time<S>( self, count: usize, time: Duration, scheduler: S ) -> BufferWithCountOrTimerOp<Self, S> { ... } fn combine_latest<Other, OtherItem, BinaryOp, OutputItem>( self, other: Other, binary_op: BinaryOp ) -> CombineLatestOp<Self, Other, BinaryOp> where Other: ObservableExt<OtherItem, Err>, BinaryOp: FnMut(Item, OtherItem) -> OutputItem { ... } fn combine_latest_threads<Other, OtherItem, BinaryOp, OutputItem>( self, other: Other, binary_op: BinaryOp ) -> CombineLatestOpThread<Self, Other, BinaryOp> where Other: ObservableExt<OtherItem, Err>, BinaryOp: FnMut(Item, OtherItem) -> OutputItem { ... } fn start_with<B>(self, values: Vec<B>) -> StartWithOp<Self, B> { ... } fn pairwise(self) -> PairwiseOp<Self> { ... } fn tap<F>(self, f: F) -> TapOp<Self, F> where F: FnMut(&Item) { ... } fn on_error<F>(self, f: F) -> OnErrorOp<Self, F, Err> where F: FnOnce(Err) { ... } fn on_complete<F>(self, f: F) -> OnCompleteOp<Self, F> where F: FnOnce() { ... } fn complete_status(self) -> (StatusOp<Self>, Arc<CompleteStatus>) { ... } fn collect<C>(self) -> CollectOp<Self, C> where C: IntoIterator + Extend<C::Item> + Default { ... } fn collect_into<C>(self, collection: C) -> CollectOp<Self, C> where C: IntoIterator + Extend<C::Item> { ... } fn to_future(self) -> ObservableFuture<Item, Err> where Self: Observable<Item, Err, ObservableFutureObserver<Item, Err>> { ... } fn to_stream(self) -> ObservableStream<Item, Err> where Self: Observable<Item, Err, ObservableStreamObserver<Item, Err>> { ... }
}

Provided Methods§

source

fn first(self) -> TakeOp<Self>

emit only the first item emitted by an Observable

source

fn first_or(self, default: Item) -> DefaultIfEmptyOp<TakeOp<Self>, Item>

emit only the first item emitted by an Observable

source

fn last_or(self, default: Item) -> DefaultIfEmptyOp<LastOp<Self, Item>, Item>

Emit only the last final item emitted by a source observable or a default item given.

Completes right after emitting the single item. Emits error when source observable emits it.

Examples
use rxrust::prelude::*;

observable::empty()
  .last_or(1234)
  .subscribe(|v| println!("{}", v));

// print log:
// 1234
source

fn element_at(self, nth: usize) -> TakeOp<SkipOp<Self>>

Emit only item n (0-indexed) emitted by an Observable

source

fn ignore_elements(self) -> FilterOp<Self, fn(_: &Item) -> bool>

Do not emit any items from an Observable but mirror its termination notification

source

fn all<F>( self, pred: F ) -> DefaultIfEmptyOp<TakeOp<FilterOp<MapOp<Self, F, Item>, fn(_: &bool) -> bool>>, bool>where F: Fn(Item) -> bool,

Determine whether all items emitted by an Observable meet some criteria

source

fn contains(self, target: Item) -> ContainsOp<Self, Item>

Determine whether an Observable emits a particular item or not

source

fn last(self) -> LastOp<Self, Item>

Emits only last final item emitted by a source observable.

Completes right after emitting the single last item, or when source observable completed, being an empty one. Emits error when source observable emits it.

Examples
use rxrust::prelude::*;

observable::from_iter(0..100)
  .last()
  .subscribe(|v| println!("{}", v));

// print log:
// 99
source

fn finalize<F>(self, f: F) -> FinalizeOp<Self, F>where F: FnMut(),

Call a function when observable completes, errors or is unsubscribed from.

source

fn finalize_threads<F>(self, f: F) -> FinalizeOpThreads<Self, F>where F: FnMut(),

A threads safe version of finalize

source

fn flatten<'a, Item2, Err2>(self) -> MergeAllOp<'a, Self, Item>where Item: ObservableExt<Item2, Err2>,

Creates an Observable that combines all the emissions from Observables that get emitted from an Observable.

Example
let mut source = Subject::default();
let numbers = Subject::default();
// create a even stream by filter
let even = numbers.clone().filter((|v| *v % 2 == 0) as fn(&i32) -> bool);
// create an odd stream by filter
let odd = numbers.clone().filter((|v| *v % 2 != 0) as fn(&i32) -> bool);

// merge odd and even stream again
let out = source.clone().flatten();

source.next(even);
source.next(odd);

// attach observers
out.subscribe(|v: i32| println!("{} ", v));
source

fn flatten_threads<Item2, Err2>(self) -> MergeAllOpThreads<Self, Item>where Item: ObservableExt<Item2, Err2>,

A threads safe version of flatten

source

fn flat_map<'a, V, Item2, F>(self, f: F) -> FlatMapOp<'a, Self, V, F, Item>where F: Fn(Item) -> V, MapOp<Self, F, Item>: ObservableExt<V, Err>, V: ObservableExt<Item2, Err>,

Applies given function to each item emitted by this Observable, where that function returns an Observable that itself emits items. It then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

source

fn flat_map_threads<V, Item2, F>( self, f: F ) -> FlatMapOpThreads<Self, V, F, Item>where F: Fn(Item) -> V, MapOp<Self, F, Item>: ObservableExt<V, Err>, V: ObservableExt<Item2, Err>,

source

fn group_by<D, Key, Subject>(self, discr: D) -> GroupByOp<Self, D, Subject>where D: FnMut(&Item) -> Key, Key: Hash + Eq + Clone, Subject: Clone + Default + Observer<Item, Err>,

Groups items emitted by the source Observable into Observables. Each emitted Observable emits items matching the key returned by the discriminator function.

Example
use rxrust::prelude::*;

#[derive(Clone)]
struct Person {
  name: String,
  age: u32,
}

observable::from_iter([
  Person{ name: String::from("John"), age: 26 },
  Person{ name: String::from("Anne"), age: 28 },
  Person{ name: String::from("Gregory"), age: 24 },
  Person{ name: String::from("Alice"), age: 28 },
])
.group_by::<_,_,Subject<_,_>>(|person: &Person| person.age)
.flat_map(|group| group.reduce(|acc, person: Person| format!("{} {}", acc, person.name)))
.subscribe(|result| println!("{}", result));

// Prints:
//  John
//  Anne Alice
//  Gregory
source

fn map<B, F>(self, f: F) -> MapOp<Self, F, Item>where F: FnMut(Item) -> B,

Creates a new stream which calls a closure on each element and uses its return as the value.

source

fn on_error_map<B, F>(self, f: F) -> OnErrorMapOp<Self, F, Err>where F: FnMut(Err) -> B,

Creates a new stream which calls a closure on each error and uses its return as emitted error.

source

fn map_to<B>(self, value: B) -> MapToOp<Self, B, Item>

Maps emissions to a constant value.

source

fn merge<S>(self, other: S) -> MergeOp<Self, S>where S: ObservableExt<Item, Err>,

combine two Observables into one by merging their emissions

Example
let numbers = Subject::default();
// create a even stream by filter
let even = numbers.clone().filter(|v| *v % 2 == 0);
// create an odd stream by filter
let odd = numbers.clone().filter(|v| *v % 2 != 0);

// merge odd and even stream again
let merged = even.merge(odd);

// attach observers
merged.subscribe(|v: &i32| println!("{} ", v));
source

fn merge_threads<S>(self, other: S) -> MergeOpThreads<Self, S>where S: ObservableExt<Item, Err>,

A threads safe version of merge

source

fn merge_all<'a, Item2>(self, concurrent: usize) -> MergeAllOp<'a, Self, Item>where Item: ObservableExt<Item2, Err>,

Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.

Example
let mut local = LocalPool::new();
observable::from_iter(
  (0..3)
    .map(|_| interval(Duration::from_millis(1), local.spawner()).take(5)),
)
.merge_all(2)
.subscribe(move |i| println!("{}", i));
local.run();
source

fn merge_all_threads<Item2>( self, concurrent: usize ) -> MergeAllOpThreads<Self, Item>where Item: ObservableExt<Item2, Err>,

A threads safe version of merge_all

source

fn filter<F>(self, filter: F) -> FilterOp<Self, F>where F: Fn(&Item) -> bool,

Emit only those items from an Observable that pass a predicate test

Example
use rxrust:: prelude::*;

let mut coll = vec![];
let coll_clone = coll.clone();

observable::from_iter(0..10)
  .filter(|v| *v % 2 == 0)
  .subscribe(|v| { coll.push(v); });

// only even numbers received.
assert_eq!(coll, vec![0, 2, 4, 6, 8]);
source

fn filter_map<F, OutputItem>(self, f: F) -> FilterMapOp<Self, F, Item>where F: FnMut(Item) -> Option<OutputItem>,

The closure must return an Option. filter_map creates an iterator which calls this closure on each element. If the closure returns Some(element), then that element is returned. If the closure returns None, it will try again, and call the closure on the next element, seeing if it will return Some.

Why filter_map and not just filter and map? The key is in this part:

If the closure returns Some(element), then that element is returned.

In other words, it removes the Option layer automatically. If your mapping is already returning an Option and you want to skip over Nones, then filter_map is much, much nicer to use.

Examples
 let mut res: Vec<i32> = vec![];
  observable::from_iter(["1", "lol", "3", "NaN", "5"].iter())
  .filter_map(|s: &&str| s.parse().ok())
  .subscribe(|v| res.push(v));

assert_eq!(res, [1, 3, 5]);
source

fn skip(self, count: usize) -> SkipOp<Self>

Ignore the first count values emitted by the source Observable.

skip returns an Observable that ignore the first count values emitted by the source Observable. If the source emits fewer than count values then 0 of its values are emitted. After that, it completes, regardless if the source completes.

Example

Ignore the first 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10).skip(5).subscribe(|v| println!("{}", v));
// print logs:
// 6
// 7
// 8
// 9
// 10
source

fn skip_until<NotifyItem, NotifyErr, Other>( self, notifier: Other ) -> SkipUntilOp<Self, Other, NotifyItem, NotifyErr>where Other: ObservableExt<NotifyItem, NotifyErr>,

Discard items emitted by an Observable until a second Observable emits an item

Example

Ignore the numbers in the 0-10 range until the Observer emits 5 and trigger the notify observable.


let mut items = vec![];
let notifier = Subject::<(), ()>::default();
let mut c_notifier = notifier.clone();
observable::from_iter(0..10)
  .tap(move |v| {
    if v == &5 {
      c_notifier.next(());
    }
  })
  .skip_until(notifier)
  .subscribe(|v| items.push(v));

assert_eq!((5..10).collect::<Vec<i32>>(), items);
source

fn skip_until_threads<NotifyItem, NotifyErr, Other>( self, notifier: Other ) -> SkipUntilOpThreads<Self, Other, NotifyItem, NotifyErr>where Other: ObservableExt<NotifyItem, NotifyErr>,

A threads safe version of skip_until_threads

source

fn skip_while<F>(self, predicate: F) -> SkipWhileOp<Self, F>where F: FnMut(&Item) -> bool,

Discard items emitted by an Observable until a specified condition becomes false

Example

Suppress the first 5 items of an infinite 1-second interval Observable


observable::from_iter(0..10)
  .skip_while(|v| v < &5)
  .subscribe(|v| println!("{}", v));

// print logs:
// 5
// 6
// 7
// 8
// 9
source

fn skip_last(self, count: usize) -> SkipLastOp<Self>

Ignore the last count values emitted by the source Observable.

skip_last returns an Observable that ignore the last count values emitted by the source Observable. If the source emits fewer than count values then 0 of its values are emitted. It will not emit values until source Observable complete.

Example

Skip the last 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10)
  .skip_last(5)
  .subscribe(|v| println!("{}", v));

// print logs:
// 0
// 1
// 2
// 3
// 4
source

fn take(self, count: usize) -> TakeOp<Self>

Emits only the first count values emitted by the source Observable.

take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.

Example

Take the first 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10).take(5).subscribe(|v| println!("{}", v));
// print logs:
// 0
// 1
// 2
// 3
// 4
source

fn take_until<Notify, NotifyItem, NotifyErr>( self, notifier: Notify ) -> TakeUntilOp<Self, Notify, NotifyItem, NotifyErr>

Emits the values emitted by the source Observable until a notifier Observable emits a value.

take_until subscribes and begins mirroring the source Observable. It also monitors a second Observable, notifier that you provide. If the notifier emits a value, the output Observable stops mirroring the source Observable and completes. If the notifier doesn’t emit any value and completes then take_until will pass all values.

source

fn take_until_threads<Notify, NotifyItem, NotifyErr>( self, notifier: Notify ) -> TakeUntilOpThreads<Self, Notify, NotifyItem, NotifyErr>

source

fn take_while<F>(self, callback: F) -> TakeWhileOp<Self, F>where F: FnMut(&Item) -> bool,

Emits values while result of an callback is true.

take_while returns an Observable that emits values while result of an callback is true emitted by the source Observable. It will not emit values until source Observable complete.

Example

Take the first 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10)
  .take_while(|v| v < &5)
.subscribe(|v| println!("{}", v));
// print logs:
// 0
// 1
// 2
// 3
// 4
source

fn take_while_inclusive<F>(self, callback: F) -> TakeWhileOp<Self, F>where F: FnMut(&Item) -> bool,

Emits values while result of an callback is true and the last one that causes the callback to return false.

Example

Take the first 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10)
  .take_while_inclusive(|v| v < &4)
.subscribe(|v| println!("{}", v));
// print logs:
// 0
// 1
// 2
// 3
// 4
source

fn take_last(self, count: usize) -> TakeLastOp<Self>

Emits only the last count values emitted by the source Observable.

take_last returns an Observable that emits only the last count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. It will not emit values until source Observable complete.

Example

Take the last 5 seconds of an infinite 1-second interval Observable


observable::from_iter(0..10)
  .take_last(5)
.subscribe(|v| println!("{}", v));
// print logs:
// 5
// 6
// 7
// 8
// 9
source

fn sample<Sample, SampleItem, SampleErr>( self, sampling: Sample ) -> SampleOp<Self, Sample, SampleItem>where Sample: ObservableExt<SampleItem, SampleErr>,

Emits item it has most recently emitted since the previous sampling

It will emit values when sampling observable complete.

#Example Sampling every 5ms of an infinite 1ms interval Observable

use rxrust::prelude::*;
use std::time::Duration;
use futures::executor::LocalPool;

let mut local_scheduler = LocalPool::new();
let spawner = local_scheduler.spawner();
observable::interval(Duration::from_millis(2), spawner.clone())
  .sample(observable::interval(Duration::from_millis(5), spawner))
  .take(5)
  .subscribe(move |v| println!("{}", v));

local_scheduler.run();
// print logs:
// 1
// 4
// 6
// 9
// ...
source

fn sample_threads<Sample, SampleItem, SampleErr>( self, sampling: Sample ) -> SampleOpThreads<Self, Sample, SampleItem>where Sample: ObservableExt<SampleItem, SampleErr>,

A threads safe version of sample

source

fn scan_initial<OutputItem, BinaryOp>( self, initial_value: OutputItem, binary_op: BinaryOp ) -> ScanOp<Self, BinaryOp, OutputItem, Item>where BinaryOp: Fn(OutputItem, Item) -> OutputItem, OutputItem: Clone,

The Scan operator applies a function to the first item emitted by the source observable and then emits the result of that function as its own first emission. It also feeds the result of the function back into the function along with the second item emitted by the source observable in order to generate its second emission. It continues to feed back its own subsequent emissions along with the subsequent emissions from the source Observable in order to create the rest of its sequence.

Applies a binary operator closure to each item emitted from source observable and emits successive values.

Completes when source observable completes. Emits error when source observable emits it.

This version starts with an user-specified initial value for when the binary operator is called with the first item processed.

Arguments
  • initial_value - An initial value to start the successive accumulations from.
  • binary_op - A closure or function acting as a binary operator.
Examples
use rxrust::prelude::*;

observable::from_iter(vec![1, 1, 1, 1, 1])
  .scan_initial(100, |acc, v| acc + v)
  .subscribe(|v| println!("{}", v));

// print log:
// 101
// 102
// 103
// 104
// 105
source

fn scan<OutputItem, BinaryOp>( self, binary_op: BinaryOp ) -> ScanOp<Self, BinaryOp, OutputItem, Item>where BinaryOp: Fn(OutputItem, Item) -> OutputItem, OutputItem: Default + Clone,

Works like scan_initial but starts with a value defined by a Default trait for the first argument binary_op operator operates on.

Arguments
  • binary_op - A closure or function acting as a binary operator.
source

fn reduce_initial<OutputItem, BinaryOp>( self, initial: OutputItem, binary_op: BinaryOp ) -> ReduceOp<Self, BinaryOp, OutputItem, Item>where BinaryOp: Fn(OutputItem, Item) -> OutputItem, OutputItem: Clone,

Apply a function to each item emitted by an observable, sequentially, and emit the final value, after source observable completes.

Emits error when source observable emits it.

Arguments
  • initial - An initial value to start the successive reduction from.
  • binary_op - A closure acting as a binary (folding) operator.
Examples
use rxrust::prelude::*;

observable::from_iter(vec![1, 1, 1, 1, 1])
  .reduce_initial(100, |acc, v| acc + v)
  .subscribe(|v| println!("{}", v));

// print log:
// 105
source

fn reduce<OutputItem, BinaryOp>( self, binary_op: BinaryOp ) -> ReduceOp<Self, BinaryOp, OutputItem, Item>where BinaryOp: Fn(OutputItem, Item) -> OutputItem, OutputItem: Default + Clone,

Works like reduce_initial but starts with a value defined by a Default trait for the first argument f operator operates on.

Arguments
  • binary_op - A closure acting as a binary operator.
source

fn max(self) -> MinMaxOp<Self, Item>where Item: PartialOrd<Item> + Clone,

Emits the item from the source observable that had the maximum value.

Emits error when source observable emits it.

Examples
use rxrust::prelude::*;

observable::from_iter(vec![3., 4., 7., 5., 6.])
  .max()
  .subscribe(|v| println!("{}", v));

// print log:
// 7
source

fn min(self) -> MinMaxOp<Self, Item>where Item: Clone + PartialOrd<Item>,

Emits the item from the source observable that had the minimum value.

Emits error when source observable emits it.

Examples
use rxrust::prelude::*;

observable::from_iter(vec![3., 4., 7., 5., 6.])
  .min()
  .subscribe(|v| println!("{}", v));

// print log:
// 3
source

fn sum(self) -> SumOp<Self, Item>where Item: Clone + Default + Add<Item, Output = Item>,

Calculates the sum of numbers emitted by an source observable and emits this sum when source completes.

Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.

Examples
use rxrust::prelude::*;

observable::from_iter(vec![1, 1, 1, 1, 1])
  .sum()
  .subscribe(|v| println!("{}", v));

// p rint log:
// 5
source

fn count(self) -> CountOp<Self, Item>

Emits the number of items emitted by a source observable when this source completes.

The output type of this operator is fixed to usize.

Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.

Examples
use rxrust::prelude::*;

observable::from_iter(vec!['1', '7', '3', '0', '4'])
  .count()
  .subscribe(|v| println!("{}", v));

// print log:
// 5
source

fn average(self) -> AverageOp<Self, Item>where Item: Clone + Default + Add<Item, Output = Item> + Mul<f64, Output = Item>, ScanOp<Self, fn(_: Accum<Item>, _: Item) -> Accum<Item>, Accum<Item>, Item>: ObservableExt<Accum<Item>, Err>, LastOp<ScanOp<Self, fn(_: Accum<Item>, _: Item) -> Accum<Item>, Accum<Item>, Item>, Accum<Item>>: ObservableExt<Accum<Item>, Err>,

Calculates the sum of numbers emitted by an source observable and emits this sum when source completes.

Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.

Examples
use rxrust::prelude::*;

observable::from_iter(vec![3., 4., 5., 6., 7.])
  .average()
  .subscribe(|v| println!("{}", v));

// print log:
// 5
source

fn publish<Subject: Default>(self) -> ConnectableObservable<Self, Subject>

Returns a ConnectableObservable. A ConnectableObservable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items.

source

fn share<'a>(self) -> ShareOp<'a, Item, Err, Self>

Returns a new Observable that multicast (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot. This is an alias for publish().ref_count()

source

fn share_threads(self) -> ShareOpThreads<Item, Err, Self>

source

fn delay<SD>(self, dur: Duration, scheduler: SD) -> DelayOp<Self, SD>

Delays the emission of items from the source Observable by a given timeout or until a given Instant.

source

fn delay_threads<SD>( self, dur: Duration, scheduler: SD ) -> DelayOpThreads<Self, SD>

A threads safe version of delay

source

fn delay_at<SD>(self, at: Instant, scheduler: SD) -> DelayOp<Self, SD>

source

fn delay_at_threads<SD>( self, at: Instant, scheduler: SD ) -> DelayOpThreads<Self, SD>

A threads safe version of delay_at

source

fn delay_subscription<SD>( self, dur: Duration, scheduler: SD ) -> DelaySubscriptionOp<Self, SD>

It’s similar to delay but rather than timeshifting the emissions from the source Observable, it timeshifts the moment of subscription to that Observable.

source

fn delay_subscription_at<SD>( self, at: Instant, scheduler: SD ) -> DelaySubscriptionOp<Self, SD>

source

fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD>

Specify the Scheduler on which an Observable will operate

With SubscribeON you can decide what type of scheduler a specific Observable will be using when it is subscribed to.

Schedulers control the speed and order of emissions to observers from an Observable stream.

Example

Given the following code:

use rxrust::prelude::*;

let a = observable::from_iter(1..5);
let b = observable::from_iter(5..10);
a.merge(b).subscribe(|v| print!("{} ", v));

Both Observable a and b will emit their values directly and synchronously once they are subscribed to. This will result in the output of 1 2 3 4 5 6 7 8 9.

But if we instead use the subscribe_on operator declaring that we want to use the new thread scheduler for values emitted by Observable a:

use rxrust::prelude::*;
use std::thread;

let pool = FuturesThreadPoolScheduler::new().unwrap();
let a = observable::from_iter(1..5).subscribe_on(pool);
let b = observable::from_iter(5..10);
a.merge_threads(b).subscribe(|v|{
  let handle = thread::current();
  print!("{}({:?}) ", v, handle.id())
});

The output will instead by `1(thread 1) 2(thread 1) 3(thread 1) 4(thread

  1. 5(thread 2) 6(thread 2) 7(thread 2) 8(thread 2) 9(thread id2). The reason for this is that Observable bemits its values directly like before, but the emissions fromaare scheduled on a new thread because we are now using theNewThread` Scheduler for that specific Observable.
source

fn observe_on<SD>(self, scheduler: SD) -> ObserveOnOp<Self, SD>

Re-emits all notifications from source Observable with specified scheduler.

ObserveOn is an operator that accepts a scheduler as the parameter, which will be used to reschedule notifications emitted by the source Observable.

source

fn observe_on_threads<SD>(self, scheduler: SD) -> ObserveOnOpThreads<Self, SD>

A thread safe version of observe_on

source

fn debounce<SD>(self, duration: Duration, scheduler: SD) -> DebounceOp<Self, SD>

Emits a value from the source Observable only after a particular time span has passed without another source emission.

source

fn throttle<SD, F>( self, duration_selector: F, edge: ThrottleEdge, scheduler: SD ) -> ThrottleOp<Self, SD, F>where F: Fn(&Item) -> Duration,

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

#Example

use rxrust::{ prelude::*, ops::throttle::ThrottleEdge };
use std::time::Duration;

let mut local_pool = FuturesLocalSchedulerPool::new();
let scheduler = local_pool.spawner();
observable::interval(Duration::from_millis(1), scheduler.clone())
  .throttle(
    |val| -> Duration {
      if val % 2 == 0 {
        Duration::from_millis(7)
      } else {
        Duration::from_millis(5)
      }
    },
    ThrottleEdge::leading(), scheduler)
  .take(5)
  .subscribe(move |v| println!("{}", v));

local_pool.run();
source

fn throttle_time<SD>( self, duration: Duration, edge: ThrottleEdge, scheduler: SD ) -> ThrottleOp<Self, SD, Box<dyn Fn(&Item) -> Duration + Send + Sync>>where Item: 'static,

Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

#Example

use rxrust::{ prelude::*, ops::throttle::ThrottleEdge };
use std::time::Duration;

let mut local_pool = FuturesLocalSchedulerPool::new();
let scheduler = local_pool.spawner();
observable::interval(Duration::from_millis(1), scheduler.clone())
  .throttle_time(
    Duration::from_millis(9), ThrottleEdge::leading(), scheduler)
  .take(5)
  .subscribe(move |v| println!("{}", v));

// wait task finish.
local_pool.run();
source

fn distinct(self) -> DistinctOp<Self>

Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.

source

fn distinct_key<F>(self, key: F) -> DistinctKeyOp<Self, F>

Variant of distinct that takes a key selector.

source

fn distinct_until_changed(self) -> DistinctUntilChangedOp<Self>

Only emit when the current value is different than the last

source

fn distinct_until_key_changed<F>( self, key: F ) -> DistinctUntilKeyChangedOp<Self, F>

Variant of distinct_until_changed that takes a key selector.

source

fn zip<Other, Item2>(self, other: Other) -> ZipOp<Self, Other>where Other: ObservableExt<Item2, Err>,

‘Zips up’ two observable into a single observable of pairs.

zip() returns a new observable that will emit over two other observables, returning a tuple where the first element comes from the first observable, and the second element comes from the second observable.

In other words, it zips two observables together, into a single one.

source

fn zip_threads<Other, Item2>(self, other: Other) -> ZipOpThreads<Self, Other>where Other: ObservableExt<Item2, Err>,

A threads safe version of zip

source

fn with_latest_from<From, OtherItem>( self, from: From ) -> WithLatestFromOp<Self, From>where From: ObservableExt<OtherItem, Err>, OtherItem: Clone,

Combines the source Observable with other Observables to create an Observable whose values are calculated from the latest values of each, only when the source emits.

Whenever the source Observable emits a value, it computes a formula using that value plus the latest values from other input Observables, then emits the output of that formula.

source

fn with_latest_from_threads<From, OtherItem>( self, from: From ) -> WithLatestFromOpThreads<Self, From>where From: ObservableExt<OtherItem, Err>, OtherItem: Clone,

source

fn default_if_empty(self, default_value: Item) -> DefaultIfEmptyOp<Self, Item>

Emits default value if Observable completed with empty result

#Example

use rxrust::prelude::*;

observable::empty()
  .default_if_empty(5)
  .subscribe(|v| println!("{}", v));

// Prints:
// 5
source

fn buffer_with_count(self, count: usize) -> BufferWithCountOp<Self>

Buffers emitted values of type T in a Vec and emits that Vec as soon as the buffer’s size equals the given count. On complete, if the buffer is not empty, it will be emitted. On error, the buffer will be discarded.

The operator never returns an empty buffer.

#Example

use rxrust::prelude::*;

observable::from_iter(0..6)
  .buffer_with_count(3)
  .subscribe(|vec| println!("{:?}", vec));

// Prints:
// [0, 1, 2]
// [3, 4, 5]
source

fn buffer_with_time<S>( self, time: Duration, scheduler: S ) -> BufferWithTimeOp<Self, S>

Buffers emitted values of type T in a Vec and emits that Vec periodically.

On complete, if the buffer is not empty, it will be emitted. On error, the buffer will be discarded.

The operator never returns an empty buffer.

#Example

use rxrust::prelude::*;
use std::time::Duration;

let pool = FuturesThreadPoolScheduler::new().unwrap();

observable::create(|mut subscriber: SubscriberThreads<_>| {
  subscriber.next(0);
  subscriber.next(1);
  std::thread::sleep(Duration::from_millis(100));
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
})
  .buffer_with_time(Duration::from_millis(50), pool)
  .subscribe(|vec| println!("{:?}", vec));

// Prints:
// [0, 1]
// [2, 3]
source

fn buffer_with_count_and_time<S>( self, count: usize, time: Duration, scheduler: S ) -> BufferWithCountOrTimerOp<Self, S>

Buffers emitted values of type T in a Vec and emits that Vec either if the buffer’s size equals count, or periodically. This operator combines the functionality of buffer_with_count and buffer_with_time.

#Example

use rxrust::prelude::*;
use std::time::Duration;

let pool = FuturesThreadPoolScheduler::new().unwrap();

observable::create(|mut subscriber: SubscriberThreads<_>| {
  subscriber.next(0);
  subscriber.next(1);
  subscriber.next(2);
  std::thread::sleep(Duration::from_millis(100));
  subscriber.next(3);
  subscriber.next(4);
  subscriber.complete();
})
  .buffer_with_count_and_time(2, Duration::from_millis(50), pool)
  .subscribe(|vec| println!("{:?}", vec));

// Prints:
// [0, 1]
// [2]
// [3, 4]
source

fn combine_latest<Other, OtherItem, BinaryOp, OutputItem>( self, other: Other, binary_op: BinaryOp ) -> CombineLatestOp<Self, Other, BinaryOp>where Other: ObservableExt<OtherItem, Err>, BinaryOp: FnMut(Item, OtherItem) -> OutputItem,

Emits item which is combining latest items from two observables.

combine_latest() merges two observables into one observable by applying a binary operator on the latest item of two observable whenever each of observables produces an element.

#Example

use rxrust::prelude::*;
use std::time::Duration;
use futures::executor::LocalPool;

let mut local_scheduler = LocalPool::new();
let spawner = local_scheduler.spawner();
observable::interval(Duration::from_millis(2), spawner.clone())
  .combine_latest(
    observable::interval(Duration::from_millis(3), spawner),
    |a, b| (a, b),
  )
  .take(5)
  .subscribe(move |v| println!("{}, {}", v.0, v.1));

local_scheduler.run();
// print logs:
// 0, 0
// 1, 0
// 2, 0
// 2, 1
// 3, 1
source

fn combine_latest_threads<Other, OtherItem, BinaryOp, OutputItem>( self, other: Other, binary_op: BinaryOp ) -> CombineLatestOpThread<Self, Other, BinaryOp>where Other: ObservableExt<OtherItem, Err>, BinaryOp: FnMut(Item, OtherItem) -> OutputItem,

source

fn start_with<B>(self, values: Vec<B>) -> StartWithOp<Self, B>

Returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.

source

fn pairwise(self) -> PairwiseOp<Self>

Groups pairs of consecutive emissions together and emits them as an pair of two values.

source

fn tap<F>(self, f: F) -> TapOp<Self, F>where F: FnMut(&Item),

Used to perform side-effects for notifications from the source observable

source

fn on_error<F>(self, f: F) -> OnErrorOp<Self, F, Err>where F: FnOnce(Err),

Process the error of the observable and the return observable can’t catch the error any more.

source

fn on_complete<F>(self, f: F) -> OnCompleteOp<Self, F>where F: FnOnce(),

source

fn complete_status(self) -> (StatusOp<Self>, Arc<CompleteStatus>)

Turn the observable to an new observable that will track its complete status. The second element of return tuple provide ability let you can query if it completed or error occur. You can also wait the observable finish.

source

fn collect<C>(self) -> CollectOp<Self, C>where C: IntoIterator + Extend<C::Item> + Default,

Collects all the items emitted by the observable into a collection.

Example
use rxrust::prelude::*;

let mut subject = Subject::default();
subject.clone()
  .collect::<Vec<_>>()
  .subscribe(|values| {
    println!("{values:?}");
});

subject.next(2);
subject.next(4);
subject.next(6);

// prints: [2,4,6]
source

fn collect_into<C>(self, collection: C) -> CollectOp<Self, C>where C: IntoIterator + Extend<C::Item>,

Collects all the items emitted by the observable into the given collection.

Example
use rxrust::prelude::*;

#[tokio::main]
async fn main() {
  let observable = observable::from_iter(['x', 'y', 'z']);
  let base = vec!['a', 'b', 'c'];
  let values = observable.collect_into::<Vec<_>>(base)
      .to_future()
      .await
      .unwrap()
      .ok();

  assert_eq!(values, Some(vec!['a', 'b', 'c', 'x', 'y', 'z']));
}
source

fn to_future(self) -> ObservableFuture<Item, Err> where Self: Observable<Item, Err, ObservableFutureObserver<Item, Err>>,

Converts this observable into a Future that resolves to Result<Result<Item, Err>, ObservableError>.

Error
  • ObservableError::Empty: If the observable emitted no values.
  • Observable::MultipleValues: If the observable emitted more than one value.
Example
use rxrust::prelude::*;

#[tokio::main]
async fn main() {
  let observable = observable::of(12);
  let value = observable.to_future().await.unwrap().ok();
  assert_eq!(value, Some(12));
}
source

fn to_stream(self) -> ObservableStream<Item, Err>where Self: Observable<Item, Err, ObservableStreamObserver<Item, Err>>,

Converts this observable into a stream that emits the values of the observable.

Example
use rxrust::prelude::*;
use futures::StreamExt;

#[tokio::main]
async fn main() {
  let observable = observable::from_iter([1,2,3]);
  let mut stream = observable.to_stream();
  let mut values = vec![];

  while let Some(Ok(x)) = stream.next().await {
    values.push(x);
  }

  assert_eq!(values, vec![1,2,3]);
}

Implementors§

source§

impl ObservableExt<(), Infallible> for NeverObservable

source§

impl<'a, Item, Err> ObservableExt<&mut Item, &mut Err> for MutRefItemErrSubject<'a, Item, Err>

source§

impl<'a, Item, Err> ObservableExt<&mut Item, Err> for MutRefItemSubject<'a, Item, Err>

source§

impl<'a, Item, Err> ObservableExt<Item, &mut Err> for MutRefErrSubject<'a, Item, Err>

source§

impl<'a, Item, Err> ObservableExt<Item, Err> for BoxOp<'a, Item, Err>

source§

impl<'a, Item, Err> ObservableExt<Item, Err> for CloneableBoxOp<'a, Item, Err>

source§

impl<'a, Item, Err> ObservableExt<Item, Err> for Subject<'a, Item, Err>

source§

impl<'a, ObservableItem, Item, Err, S> ObservableExt<Item, Err> for MergeAllOp<'a, S, ObservableItem>where S: ObservableExt<ObservableItem, Err>, ObservableItem: ObservableExt<Item, Err>,

source§

impl<'a, S, Item, Err> ObservableExt<Item, Err> for ShareOp<'a, Item, Err, S>where S: ObservableExt<Item, Err>,

source§

impl<'a, Source, Discr, Key, Item, Err> ObservableExt<KeyObservable<Key, Subject<'a, Item, Err>>, Err> for GroupByOp<Source, Discr, Subject<'a, Item, Err>>where Source: ObservableExt<Item, Err>,

source§

impl<A, B, ItemA, ItemB, Err> ObservableExt<(ItemA, ItemB), Err> for ZipOp<A, B>where A: ObservableExt<ItemA, Err>, B: ObservableExt<ItemB, Err>,

source§

impl<A, B, ItemA, ItemB, Err> ObservableExt<(ItemA, ItemB), Err> for ZipOpThreads<A, B>where A: ObservableExt<ItemA, Err>, B: ObservableExt<ItemB, Err>,

source§

impl<A, B, ItemA, ItemB, Err, BinaryOp> ObservableExt<(ItemA, ItemB), Err> for CombineLatestOp<A, B, BinaryOp>where A: ObservableExt<ItemA, Err>, B: ObservableExt<ItemB, Err>,

source§

impl<A, B, ItemA, ItemB, Err, BinaryOp> ObservableExt<(ItemA, ItemB), Err> for CombineLatestOpThread<A, B, BinaryOp>where A: ObservableExt<ItemA, Err>, B: ObservableExt<ItemB, Err>,

source§

impl<Err> ObservableExt<(), Err> for ThrowObservable<Err>

source§

impl<Err, S, C> ObservableExt<C, Err> for CollectOp<S, C>where C: IntoIterator + Extend<C::Item>, S: ObservableExt<C::Item, Err>,

source§

impl<F, Item, Err, O> ObservableExt<Item, Err> for ObservableFn<F, Subscriber<O>>where F: FnOnce(Subscriber<O>), O: Observer<Item, Err>,

source§

impl<F, Item, Err, O> ObservableExt<Item, Err> for ObservableFn<F, SubscriberThreads<O>>where F: FnOnce(SubscriberThreads<O>), O: Observer<Item, Err>,

source§

impl<InputItem, OutputItem, Err, S, BinaryOp> ObservableExt<OutputItem, Err> for ScanOp<S, BinaryOp, OutputItem, InputItem>where S: ObservableExt<InputItem, Err>,

source§

impl<Item1, Item2, Err, S, F> ObservableExt<Item1, Err> for MapOp<S, F, Item2>where S: ObservableExt<Item2, Err>, F: FnMut(Item2) -> Item1,

source§

impl<Item1, Item2, Err, Source, Sample> ObservableExt<Item1, Err> for SampleOp<Source, Sample, Item2>where Source: ObservableExt<Item1, Err>, Sample: ObservableExt<Item2, Err>,

source§

impl<Item1, Item2, Err, Source, Sample> ObservableExt<Item1, Err> for SampleOpThreads<Source, Sample, Item2>where Source: ObservableExt<Item1, Err>, Sample: ObservableExt<Item2, Err>,

source§

impl<Item> ObservableExt<Item, Infallible> for OfObservable<Item>

source§

impl<Item> ObservableExt<Item, Infallible> for OptionObservable<Item>

source§

impl<Item> ObservableExt<Item, Infallible> for EmptyObservable<Item>

source§

impl<Item, Err> ObservableExt<Item, Err> for BoxOpThreads<Item, Err>

source§

impl<Item, Err> ObservableExt<Item, Err> for CloneableBoxOpThreads<Item, Err>

source§

impl<Item, Err> ObservableExt<Item, Err> for SubjectThreads<Item, Err>

source§

impl<Item, Err> ObservableExt<Item, Err> for ResultObservable<Item, Err>

source§

impl<Item, Err, F, U> ObservableExt<Item, Err> for ObservableDeref<F>where F: FnOnce() -> U, U: ObservableExt<Item, Err>,

source§

impl<Item, Err, Key, Subject> ObservableExt<Item, Err> for KeyObservable<Key, Subject>where Subject: Observer<Item, Err>,

source§

impl<Item, Err, OutputErr, S, M> ObservableExt<Item, OutputErr> for OnErrorMapOp<S, M, Err>where S: ObservableExt<Item, Err>, M: FnMut(Err) -> OutputErr,

source§

impl<Item, Err, S> ObservableExt<bool, Err> for ContainsOp<S, Item>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<(Item, Item), Err> for PairwiseOp<S>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<Vec<Item, Global>, Err> for BufferWithCountOp<S>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<Item, Err> for DefaultIfEmptyOp<S, Item>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<Item, Err> for DistinctOp<S>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<Item, Err> for DistinctUntilChangedOp<S>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<Item, Err> for SkipLastOp<S>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<Item, Err> for StartWithOp<S, Item>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<Item, Err> for TakeOp<S>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S> ObservableExt<Item, Err> for TakeLastOp<S>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, B> ObservableExt<B, Err> for MapToOp<S, B, Item>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, F> ObservableExt<Item, Err> for DistinctKeyOp<S, F>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, F> ObservableExt<Item, Err> for DistinctUntilKeyChangedOp<S, F>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, F> ObservableExt<Item, Err> for FilterOp<S, F>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, F> ObservableExt<Item, Err> for FinalizeOp<S, F>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, F> ObservableExt<Item, Err> for FinalizeOpThreads<S, F>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, F> ObservableExt<Item, Err> for SkipWhileOp<S, F>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, M> ObservableExt<Item, Err> for TapOp<S, M>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD> ObservableExt<Item, Err> for BufferWithCountOrTimerOp<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD> ObservableExt<Item, Err> for BufferWithTimeOp<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD> ObservableExt<Item, Err> for DebounceOp<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD> ObservableExt<Item, Err> for DelayOp<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD> ObservableExt<Item, Err> for DelayOpThreads<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD> ObservableExt<Item, Err> for DelaySubscriptionOp<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD> ObservableExt<Item, Err> for ObserveOnOp<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD> ObservableExt<Item, Err> for ObserveOnOpThreads<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, S, SD, F> ObservableExt<Item, Err> for ThrottleOp<S, SD, F>where S: ObservableExt<Item, Err>,

source§

impl<Item, Err, Subject> ObservableExt<Item, Err> for BehaviorSubject<Item, Subject>where Subject: ObservableExt<Item, Err>,

source§

impl<Item, F> ObservableExt<Item, Infallible> for CallableObservable<F>where F: FnOnce() -> Item,

source§

impl<Item, S> ObservableExt<Item, Infallible> for TimerObservable<Item, S>

source§

impl<Item, S, Err> ObservableExt<Item, Err> for LastOp<S, Item>where S: ObservableExt<Item, Err>,

source§

impl<ObservableItem, Item, Err, S> ObservableExt<Item, Err> for MergeAllOpThreads<S, ObservableItem>where S: ObservableExt<ObservableItem, Err>, ObservableItem: ObservableExt<Item, Err>,

source§

impl<OutputItem, Item, Err, S, F> ObservableExt<OutputItem, Err> for FilterMapOp<S, F, Item>where S: ObservableExt<Item, Err>, F: FnMut(Item) -> Option<OutputItem>,

source§

impl<S1, S2, Item, Err> ObservableExt<Item, Err> for MergeOp<S1, S2>where S1: ObservableExt<Item, Err>, S2: ObservableExt<Item, Err>,

source§

impl<S1, S2, Item, Err> ObservableExt<Item, Err> for MergeOpThreads<S1, S2>where S1: ObservableExt<Item, Err>, S2: ObservableExt<Item, Err>,

source§

impl<S> ObservableExt<usize, Infallible> for IntervalObservable<S>

source§

impl<S, F, Item, Err> ObservableExt<Item, Infallible> for OnErrorOp<S, F, Err>where S: ObservableExt<Item, Err>,

source§

impl<S, F, Item, Err> ObservableExt<Item, Err> for OnCompleteOp<S, F>where S: ObservableExt<Item, Err>,

source§

impl<S, F, Item, Err> ObservableExt<Item, Err> for TakeWhileOp<S, F>where S: ObservableExt<Item, Err>,

source§

impl<S, Item, Err> ObservableExt<Item, Err> for StatusOp<S>where S: ObservableExt<Item, Err>,

source§

impl<S, Item, Err> ObservableExt<Item, Err> for ShareOpThreads<Item, Err, S>where S: ObservableExt<Item, Err>,

source§

impl<S, Item, Err> ObservableExt<Item, Err> for SkipOp<S>where S: ObservableExt<Item, Err>,

source§

impl<S, Item, Err, SD> ObservableExt<Item, Err> for SubscribeOnOP<S, SD>where S: ObservableExt<Item, Err>,

source§

impl<S, N, Item, Err, NotifyItem, NotifyErr> ObservableExt<Item, Err> for SkipUntilOp<S, N, NotifyItem, NotifyErr>where S: ObservableExt<Item, Err>, N: ObservableExt<NotifyItem, NotifyErr>,

source§

impl<S, N, Item, Err, NotifyItem, NotifyErr> ObservableExt<Item, Err> for SkipUntilOpThreads<S, N, NotifyItem, NotifyErr>where S: ObservableExt<Item, Err>, N: ObservableExt<NotifyItem, NotifyErr>,

source§

impl<S, N, Item, Err, NotifyItem, NotifyErr> ObservableExt<Item, Err> for TakeUntilOp<S, N, NotifyItem, NotifyErr>where S: ObservableExt<Item, Err>, N: ObservableExt<NotifyItem, NotifyErr>,

source§

impl<S, N, Item, Err, NotifyItem, NotifyErr> ObservableExt<Item, Err> for TakeUntilOpThreads<S, N, NotifyItem, NotifyErr>where S: ObservableExt<Item, Err>, N: ObservableExt<NotifyItem, NotifyErr>,

source§

impl<Source, Discr, Key, Item, Err> ObservableExt<KeyObservable<Key, SubjectThreads<Item, Err>>, Err> for GroupByOp<Source, Discr, SubjectThreads<Item, Err>>where Source: ObservableExt<Item, Err>,

source§

impl<Source, From, ItemA, ItemB, Err> ObservableExt<(ItemA, ItemB), Err> for WithLatestFromOp<Source, From>where Source: ObservableExt<ItemA, Err>, From: ObservableExt<ItemB, Err>,

source§

impl<Source, From, ItemA, ItemB, Err> ObservableExt<(ItemA, ItemB), Err> for WithLatestFromOpThreads<Source, From>where Source: ObservableExt<ItemA, Err>, From: ObservableExt<ItemB, Err>,