Trait rxrust::observable::Observable [−][src]
pub trait Observable: Sized { type Item; type Err;}Show methods
fn first(self) -> TakeOp<Self> { ... } fn first_or(self, default: Self::Item) -> DefaultIfEmptyOp<TakeOp<Self>> { ... } fn last_or(
self,
default: Self::Item
) -> DefaultIfEmptyOp<LastOp<Self, Self::Item>> { ... } fn element_at(self, nth: u32) -> TakeOp<SkipOp<Self>> { ... } fn ignore_elements(self) -> FilterOp<Self, fn(_: &Self::Item) -> bool> { ... } fn all<F>(
self,
pred: F
) -> DefaultIfEmptyOp<TakeOp<FilterOp<MapOp<Self, F>, fn(_: &bool) -> bool>>>
where
F: Fn(Self::Item) -> bool, { ... } fn contains(self, target: Self::Item) -> ContainsOp<Self, Self::Item> { ... } fn last(self) -> LastOp<Self, Self::Item> { ... } fn finalize<F>(self, f: F) -> FinalizeOp<Self, F>
where
F: FnMut(), { ... } fn flatten<Inner, A>(self) -> FlattenOp<Self, Inner>
where
Inner: Observable<Item = A, Err = Self::Err>, { ... } fn map<B, F>(self, f: F) -> MapOp<Self, F>
where
F: Fn(Self::Item) -> B, { ... } fn map_to<B>(self, value: B) -> MapToOp<Self, B> { ... } fn merge<S>(self, o: S) -> MergeOp<Self, S>
where
S: Observable<Item = Self::Item, Err = Self::Err>, { ... } fn filter<F>(self, filter: F) -> FilterOp<Self, F>
where
F: Fn(&Self::Item) -> bool, { ... } fn filter_map<F, SourceItem, Item>(self, f: F) -> FilterMapOp<Self, F>
where
F: FnMut(SourceItem) -> Option<Item>, { ... } fn box_it<O: IntoBox<Self>>(self) -> BoxOp<O>
where
BoxOp<O>: Observable<Item = Self::Item, Err = Self::Err>, { ... } fn skip(self, count: u32) -> SkipOp<Self> { ... } fn skip_while<F>(self, callback: F) -> SkipWhileOp<Self, F>
where
F: FnMut(&Self::Item) -> bool, { ... } fn skip_last(self, count: usize) -> SkipLastOp<Self> { ... } fn take(self, count: u32) -> TakeOp<Self> { ... } fn take_until<T>(self, notifier: T) -> TakeUntilOp<Self, T> { ... } fn take_while<F>(self, callback: F) -> TakeWhileOp<Self, F>
where
F: FnMut(&Self::Item) -> bool, { ... } fn take_last(self, count: usize) -> TakeLastOp<Self> { ... } fn sample<O>(self, sampling: O) -> SampleOp<Self, O>
where
O: Observable, { ... } fn scan_initial<OutputItem, BinaryOp>(
self,
initial_value: OutputItem,
binary_op: BinaryOp
) -> ScanOp<Self, BinaryOp, OutputItem>
where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Clone, { ... } fn scan<OutputItem, BinaryOp>(
self,
binary_op: BinaryOp
) -> ScanOp<Self, BinaryOp, OutputItem>
where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Default + Clone, { ... } fn reduce_initial<OutputItem, BinaryOp>(
self,
initial: OutputItem,
binary_op: BinaryOp
) -> ReduceOp<Self, BinaryOp, OutputItem>
where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Clone, { ... } fn reduce<OutputItem, BinaryOp>(
self,
binary_op: BinaryOp
) -> DefaultIfEmptyOp<LastOp<ScanOp<Self, BinaryOp, OutputItem>, OutputItem>>
where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Default + Clone, { ... } fn max(self) -> MinMaxOp<Self, Self::Item>
where
Self::Item: Clone + Send + PartialOrd<Self::Item>, { ... } fn min(self) -> MinMaxOp<Self, Self::Item>
where
Self::Item: Clone + Send + PartialOrd<Self::Item>, { ... } fn sum(self) -> SumOp<Self, Self::Item>
where
Self::Item: Clone + Default + Add<Self::Item, Output = Self::Item>, { ... } fn count(self) -> CountOp<Self, Self::Item> { ... } fn average(self) -> AverageOp<Self, Self::Item>
where
Self::Item: Clone + Send + Default + Add<Self::Item, Output = Self::Item> + Mul<f64, Output = Self::Item>, { ... } fn publish<Subject: Default>(self) -> ConnectableObservable<Self, Subject> { ... } fn share<Subject, Inner>(
self
) -> RefCount<Inner, ConnectableObservable<Self, Subject>>
where
Inner: RefCountCreator<Connectable = ConnectableObservable<Self, Subject>>,
Subject: Default,
Self: Clone, { ... } fn delay<SD>(self, dur: Duration, scheduler: SD) -> DelayOp<Self, SD> { ... } fn delay_at<SD>(self, at: Instant, scheduler: SD) -> DelayOp<Self, SD> { ... } fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD> { ... } fn observe_on<SD>(self, scheduler: SD) -> ObserveOnOp<Self, SD> { ... } fn debounce<SD>(
self,
duration: Duration,
scheduler: SD
) -> DebounceOp<Self, SD> { ... } fn throttle_time<SD>(
self,
duration: Duration,
edge: ThrottleEdge,
scheduler: SD
) -> ThrottleTimeOp<Self, SD> { ... } fn distinct(self) -> DistinctOp<Self> { ... } fn zip<U>(self, other: U) -> ZipOp<Self, U>
where
U: Observable, { ... } fn default_if_empty(
self,
default_value: Self::Item
) -> DefaultIfEmptyOp<Self> { ... }
Associated Types
Loading content...Provided methods
fn first(self) -> TakeOp<Self>
[src]
emit only the first item emitted by an Observable
fn first_or(self, default: Self::Item) -> DefaultIfEmptyOp<TakeOp<Self>>
[src]
emit only the first item emitted by an Observable
fn last_or(
self,
default: Self::Item
) -> DefaultIfEmptyOp<LastOp<Self, Self::Item>>
[src]
self,
default: Self::Item
) -> DefaultIfEmptyOp<LastOp<Self, Self::Item>>
Emit only the last final item emitted by a source observable or a default item given.
Completes right after emitting the single item. Emits error when source observable emits it.
Examples
use rxrust::prelude::*; observable::empty() .last_or(1234) .subscribe(|v| println!("{}", v)); // print log: // 1234
fn element_at(self, nth: u32) -> TakeOp<SkipOp<Self>>
[src]
Emit only item n (0-indexed) emitted by an Observable
fn ignore_elements(self) -> FilterOp<Self, fn(_: &Self::Item) -> bool>
[src]
Do not emit any items from an Observable but mirror its termination notification
fn all<F>(
self,
pred: F
) -> DefaultIfEmptyOp<TakeOp<FilterOp<MapOp<Self, F>, fn(_: &bool) -> bool>>> where
F: Fn(Self::Item) -> bool,
[src]
self,
pred: F
) -> DefaultIfEmptyOp<TakeOp<FilterOp<MapOp<Self, F>, fn(_: &bool) -> bool>>> where
F: Fn(Self::Item) -> bool,
Determine whether all items emitted by an Observable meet some criteria
fn contains(self, target: Self::Item) -> ContainsOp<Self, Self::Item>
[src]
Determine whether an Observable emits a particular item or not
fn last(self) -> LastOp<Self, Self::Item>
[src]
Emits only last final item emitted by a source observable.
Completes right after emitting the single last item, or when source observable completed, being an empty one. Emits error when source observable emits it.
Examples
use rxrust::prelude::*; observable::from_iter(0..100) .last() .subscribe(|v| println!("{}", v)); // print log: // 99
fn finalize<F>(self, f: F) -> FinalizeOp<Self, F> where
F: FnMut(),
[src]
F: FnMut(),
Call a function when observable completes, errors or is unsubscribed from.
fn flatten<Inner, A>(self) -> FlattenOp<Self, Inner> where
Inner: Observable<Item = A, Err = Self::Err>,
[src]
Inner: Observable<Item = A, Err = Self::Err>,
Creates an Observable that combines all the emissions from Observables that get emitted from an Observable.
Example
let mut source = Subject::new(); let numbers = Subject::new(); // create a even stream by filter let even = numbers.clone().filter((|v| *v % 2 == 0) as fn(&i32) -> bool); // create an odd stream by filter let odd = numbers.clone().filter((|v| *v % 2 != 0) as fn(&i32) -> bool); // merge odd and even stream again let out = source.clone().flatten(); source.next(even); source.next(odd); // attach observers out.subscribe(|v: i32| println!("{} ", v));
fn map<B, F>(self, f: F) -> MapOp<Self, F> where
F: Fn(Self::Item) -> B,
[src]
F: Fn(Self::Item) -> B,
Creates a new stream which calls a closure on each element and uses its return as the value.
fn map_to<B>(self, value: B) -> MapToOp<Self, B>
[src]
Maps emissions to a constant value.
fn merge<S>(self, o: S) -> MergeOp<Self, S> where
S: Observable<Item = Self::Item, Err = Self::Err>,
[src]
S: Observable<Item = Self::Item, Err = Self::Err>,
combine two Observables into one by merging their emissions
Example
let numbers = Subject::new(); // create a even stream by filter let even = numbers.clone().filter(|v| *v % 2 == 0); // create an odd stream by filter let odd = numbers.clone().filter(|v| *v % 2 != 0); // merge odd and even stream again let merged = even.merge(odd); // attach observers merged.subscribe(|v: &i32| println!("{} ", v));
fn filter<F>(self, filter: F) -> FilterOp<Self, F> where
F: Fn(&Self::Item) -> bool,
[src]
F: Fn(&Self::Item) -> bool,
Emit only those items from an Observable that pass a predicate test
Example
use rxrust:: prelude::*; let mut coll = vec![]; let coll_clone = coll.clone(); observable::from_iter(0..10) .filter(|v| *v % 2 == 0) .subscribe(|v| { coll.push(v); }); // only even numbers received. assert_eq!(coll, vec![0, 2, 4, 6, 8]);
fn filter_map<F, SourceItem, Item>(self, f: F) -> FilterMapOp<Self, F> where
F: FnMut(SourceItem) -> Option<Item>,
[src]
F: FnMut(SourceItem) -> Option<Item>,
The closure must return an Option
Why filter_map and not just filter and map? The key is in this part:
If the closure returns Some(element), then that element is returned.
In other words, it removes the Option
Examples
let mut res: Vec<i32> = vec![]; observable::from_iter(["1", "lol", "3", "NaN", "5"].iter()) .filter_map(|s: &&str| s.parse().ok()) .subscribe(|v| res.push(v)); assert_eq!(res, [1, 3, 5]);
fn box_it<O: IntoBox<Self>>(self) -> BoxOp<O> where
BoxOp<O>: Observable<Item = Self::Item, Err = Self::Err>,
[src]
BoxOp<O>: Observable<Item = Self::Item, Err = Self::Err>,
box an observable to a safety object and convert it to a simple type
BoxOp
, which only care Item
and Err
Observable emitted.
Example
use rxrust::prelude::*; use ops::box_it::LocalBoxOp; let mut boxed: LocalBoxOp<'_, i32, ()> = observable::of(1) .map(|v| v).box_it(); // BoxOp can box any observable type boxed = observable::empty().box_it(); boxed.subscribe(|_| {});
fn skip(self, count: u32) -> SkipOp<Self>
[src]
Ignore the first count
values emitted by the source Observable.
skip
returns an Observable that ignore the first count
values
emitted by the source Observable. If the source emits fewer than count
values then 0 of its values are emitted. After that, it completes,
regardless if the source completes.
Example
Ignore the first 5 seconds of an infinite 1-second interval Observable
observable::from_iter(0..10).skip(5).subscribe(|v| println!("{}", v)); // print logs: // 6 // 7 // 8 // 9 // 10
fn skip_while<F>(self, callback: F) -> SkipWhileOp<Self, F> where
F: FnMut(&Self::Item) -> bool,
[src]
F: FnMut(&Self::Item) -> bool,
Ignore values while result of a callback is true.
skip_while
returns an Observable that ignores values while result of an
callback is true emitted by the source Observable.
Example
Suppress the first 5 items of an infinite 1-second interval Observable
observable::from_iter(0..10) .skip_while(|v| v < &5) .subscribe(|v| println!("{}", v)); // print logs: // 5 // 6 // 7 // 8 // 9
fn skip_last(self, count: usize) -> SkipLastOp<Self>
[src]
Ignore the last count
values emitted by the source Observable.
skip_last
returns an Observable that ignore the last count
values
emitted by the source Observable. If the source emits fewer than count
values then 0 of its values are emitted.
It will not emit values until source Observable complete.
Example
Skip the last 5 seconds of an infinite 1-second interval Observable
observable::from_iter(0..10) .skip_last(5) .subscribe(|v| println!("{}", v)); // print logs: // 0 // 1 // 2 // 3 // 4
fn take(self, count: u32) -> TakeOp<Self>
[src]
Emits only the first count
values emitted by the source Observable.
take
returns an Observable that emits only the first count
values
emitted by the source Observable. If the source emits fewer than count
values then all of its values are emitted. After that, it completes,
regardless if the source completes.
Example
Take the first 5 seconds of an infinite 1-second interval Observable
observable::from_iter(0..10).take(5).subscribe(|v| println!("{}", v)); // print logs: // 0 // 1 // 2 // 3 // 4
fn take_until<T>(self, notifier: T) -> TakeUntilOp<Self, T>
[src]
Emits the values emitted by the source Observable until a notifier
Observable emits a value.
take_until
subscribes and begins mirroring the source Observable. It
also monitors a second Observable, notifier
that you provide. If the
notifier
emits a value, the output Observable stops mirroring the source
Observable and completes. If the notifier
doesn’t emit any value and
completes then take_until
will pass all values.
fn take_while<F>(self, callback: F) -> TakeWhileOp<Self, F> where
F: FnMut(&Self::Item) -> bool,
[src]
F: FnMut(&Self::Item) -> bool,
Emits values while result of an callback is true.
take_while
returns an Observable that emits values while result of an
callback is true emitted by the source Observable.
It will not emit values until source Observable complete.
Example
Take the first 5 seconds of an infinite 1-second interval Observable
observable::from_iter(0..10) .take_while(|v| v < &5) .subscribe(|v| println!("{}", v)); // print logs: // 0 // 1 // 2 // 3 // 4
fn take_last(self, count: usize) -> TakeLastOp<Self>
[src]
Emits only the last count
values emitted by the source Observable.
take_last
returns an Observable that emits only the last count
values
emitted by the source Observable. If the source emits fewer than count
values then all of its values are emitted.
It will not emit values until source Observable complete.
Example
Take the last 5 seconds of an infinite 1-second interval Observable
observable::from_iter(0..10) .take_last(5) .subscribe(|v| println!("{}", v)); // print logs: // 5 // 6 // 7 // 8 // 9
fn sample<O>(self, sampling: O) -> SampleOp<Self, O> where
O: Observable,
[src]
O: Observable,
Emits item it has most recently emitted since the previous sampling
It will emit values when sampling observable complete.
#Example Sampling every 5ms of an infinite 1ms interval Observable
use rxrust::prelude::*; use std::time::Duration; use futures::executor::LocalPool; let mut local_scheduler = LocalPool::new(); let spawner = local_scheduler.spawner(); observable::interval(Duration::from_millis(2), spawner.clone()) .sample(observable::interval(Duration::from_millis(5), spawner)) .take(5) .subscribe(move |v| println!("{}", v)); local_scheduler.run(); // print logs: // 1 // 4 // 6 // 9 // ...
fn scan_initial<OutputItem, BinaryOp>(
self,
initial_value: OutputItem,
binary_op: BinaryOp
) -> ScanOp<Self, BinaryOp, OutputItem> where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Clone,
[src]
self,
initial_value: OutputItem,
binary_op: BinaryOp
) -> ScanOp<Self, BinaryOp, OutputItem> where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Clone,
The Scan operator applies a function to the first item emitted by the source observable and then emits the result of that function as its own first emission. It also feeds the result of the function back into the function along with the second item emitted by the source observable in order to generate its second emission. It continues to feed back its own subsequent emissions along with the subsequent emissions from the source Observable in order to create the rest of its sequence.
Applies a binary operator closure to each item emitted from source observable and emits successive values.
Completes when source observable completes. Emits error when source observable emits it.
This version starts with an user-specified initial value for when the binary operator is called with the first item processed.
Arguments
initial_value
- An initial value to start the successive accumulations from.binary_op
- A closure or function acting as a binary operator.
Examples
use rxrust::prelude::*; observable::from_iter(vec![1, 1, 1, 1, 1]) .scan_initial(100, |acc, v| acc + v) .subscribe(|v| println!("{}", v)); // print log: // 101 // 102 // 103 // 104 // 105
fn scan<OutputItem, BinaryOp>(
self,
binary_op: BinaryOp
) -> ScanOp<Self, BinaryOp, OutputItem> where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Default + Clone,
[src]
self,
binary_op: BinaryOp
) -> ScanOp<Self, BinaryOp, OutputItem> where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Default + Clone,
Works like scan_initial
but starts with a
value defined by a Default
trait for the first argument binary_op
operator operates on.
Arguments
binary_op
- A closure or function acting as a binary operator.
fn reduce_initial<OutputItem, BinaryOp>(
self,
initial: OutputItem,
binary_op: BinaryOp
) -> ReduceOp<Self, BinaryOp, OutputItem> where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Clone,
[src]
self,
initial: OutputItem,
binary_op: BinaryOp
) -> ReduceOp<Self, BinaryOp, OutputItem> where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Clone,
Apply a function to each item emitted by an observable, sequentially, and emit the final value, after source observable completes.
Emits error when source observable emits it.
Arguments
initial
- An initial value to start the successive reduction from.binary_op
- A closure acting as a binary (folding) operator.
Examples
use rxrust::prelude::*; observable::from_iter(vec![1, 1, 1, 1, 1]) .reduce_initial(100, |acc, v| acc + v) .subscribe(|v| println!("{}", v)); // print log: // 105
fn reduce<OutputItem, BinaryOp>(
self,
binary_op: BinaryOp
) -> DefaultIfEmptyOp<LastOp<ScanOp<Self, BinaryOp, OutputItem>, OutputItem>> where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Default + Clone,
[src]
self,
binary_op: BinaryOp
) -> DefaultIfEmptyOp<LastOp<ScanOp<Self, BinaryOp, OutputItem>, OutputItem>> where
BinaryOp: Fn(OutputItem, Self::Item) -> OutputItem,
OutputItem: Default + Clone,
Works like reduce_initial
but starts with
a value defined by a Default
trait for the first argument f
operator operates on.
Arguments
binary_op
- A closure acting as a binary operator.
fn max(self) -> MinMaxOp<Self, Self::Item> where
Self::Item: Clone + Send + PartialOrd<Self::Item>,
[src]
Self::Item: Clone + Send + PartialOrd<Self::Item>,
Emits the item from the source observable that had the maximum value.
Emits error when source observable emits it.
Examples
use rxrust::prelude::*; observable::from_iter(vec![3., 4., 7., 5., 6.]) .max() .subscribe(|v| println!("{}", v)); // print log: // 7
fn min(self) -> MinMaxOp<Self, Self::Item> where
Self::Item: Clone + Send + PartialOrd<Self::Item>,
[src]
Self::Item: Clone + Send + PartialOrd<Self::Item>,
Emits the item from the source observable that had the minimum value.
Emits error when source observable emits it.
Examples
use rxrust::prelude::*; observable::from_iter(vec![3., 4., 7., 5., 6.]) .min() .subscribe(|v| println!("{}", v)); // print log: // 3
fn sum(self) -> SumOp<Self, Self::Item> where
Self::Item: Clone + Default + Add<Self::Item, Output = Self::Item>,
[src]
Self::Item: Clone + Default + Add<Self::Item, Output = Self::Item>,
Calculates the sum of numbers emitted by an source observable and emits this sum when source completes.
Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.
Examples
use rxrust::prelude::*; observable::from_iter(vec![1, 1, 1, 1, 1]) .sum() .subscribe(|v| println!("{}", v)); // p rint log: // 5
fn count(self) -> CountOp<Self, Self::Item>
[src]
Emits the number of items emitted by a source observable when this source completes.
The output type of this operator is fixed to usize
.
Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.
Examples
use rxrust::prelude::*; observable::from_iter(vec!['1', '7', '3', '0', '4']) .count() .subscribe(|v| println!("{}", v)); // print log: // 5
fn average(self) -> AverageOp<Self, Self::Item> where
Self::Item: Clone + Send + Default + Add<Self::Item, Output = Self::Item> + Mul<f64, Output = Self::Item>,
[src]
Self::Item: Clone + Send + Default + Add<Self::Item, Output = Self::Item> + Mul<f64, Output = Self::Item>,
Calculates the sum of numbers emitted by an source observable and emits this sum when source completes.
Emits zero when source completed as an and empty sequence. Emits error when source observable emits it.
Examples
use rxrust::prelude::*; observable::from_iter(vec![3., 4., 5., 6., 7.]) .average() .subscribe(|v| println!("{}", v)); // print log: // 5
fn publish<Subject: Default>(self) -> ConnectableObservable<Self, Subject>
[src]
Returns a ConnectableObservable. A ConnectableObservable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items.
fn share<Subject, Inner>(
self
) -> RefCount<Inner, ConnectableObservable<Self, Subject>> where
Inner: RefCountCreator<Connectable = ConnectableObservable<Self, Subject>>,
Subject: Default,
Self: Clone,
[src]
self
) -> RefCount<Inner, ConnectableObservable<Self, Subject>> where
Inner: RefCountCreator<Connectable = ConnectableObservable<Self, Subject>>,
Subject: Default,
Self: Clone,
Returns a new Observable that multicast (shares) the original
Observable. As long as there is at least one Subscriber this
Observable will be subscribed and emitting data. When all subscribers
have unsubscribed it will unsubscribe from the source Observable.
Because the Observable is multicasting it makes the stream hot
.
This is an alias for publish().ref_count()
fn delay<SD>(self, dur: Duration, scheduler: SD) -> DelayOp<Self, SD>
[src]
Delays the emission of items from the source Observable by a given timeout
or until a given Instant
.
fn delay_at<SD>(self, at: Instant, scheduler: SD) -> DelayOp<Self, SD>
[src]
fn subscribe_on<SD>(self, scheduler: SD) -> SubscribeOnOP<Self, SD>
[src]
Specify the Scheduler on which an Observable will operate
With SubscribeON
you can decide what type of scheduler a specific
Observable will be using when it is subscribed to.
Schedulers control the speed and order of emissions to observers from an Observable stream.
Example
Given the following code:
use rxrust::prelude::*; let a = observable::from_iter(1..5); let b = observable::from_iter(5..10); a.merge(b).subscribe(|v| print!("{} ", v));
Both Observable a
and b
will emit their values directly and
synchronously once they are subscribed to.
This will result in the output of 1 2 3 4 5 6 7 8 9
.
But if we instead use the subscribe_on
operator declaring that we want
to use the new thread scheduler for values emitted by Observable a
:
use rxrust::prelude::*; use std::thread; use futures::executor::ThreadPool; let pool = ThreadPool::new().unwrap(); let a = observable::from_iter(1..5).subscribe_on(pool); let b = observable::from_iter(5..10); a.merge(b).into_shared().subscribe(|v|{ let handle = thread::current(); print!("{}({:?}) ", v, handle.id()) });
The output will instead by `1(thread 1) 2(thread 1) 3(thread 1) 4(thread
- 5(thread 2) 6(thread 2) 7(thread 2) 8(thread 2) 9(thread id2)
. The reason for this is that Observable
bemits its values directly like before, but the emissions from
aare scheduled on a new thread because we are now using the
NewThread` Scheduler for that specific Observable.
fn observe_on<SD>(self, scheduler: SD) -> ObserveOnOp<Self, SD>
[src]
Re-emits all notifications from source Observable with specified scheduler.
ObserveOn
is an operator that accepts a scheduler as the parameter,
which will be used to reschedule notifications emitted by the source
Observable.
fn debounce<SD>(self, duration: Duration, scheduler: SD) -> DebounceOp<Self, SD>
[src]
Emits a value from the source Observable only after a particular time span has passed without another source emission.
fn throttle_time<SD>(
self,
duration: Duration,
edge: ThrottleEdge,
scheduler: SD
) -> ThrottleTimeOp<Self, SD>
[src]
self,
duration: Duration,
edge: ThrottleEdge,
scheduler: SD
) -> ThrottleTimeOp<Self, SD>
Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.
#Example
use rxrust::{ prelude::*, ops::throttle_time::ThrottleEdge }; use std::time::Duration; use futures::executor::LocalPool; let mut local_scheduler = LocalPool::new(); let spawner = local_scheduler.spawner(); observable::interval(Duration::from_millis(1), spawner.clone()) .throttle_time(Duration::from_millis(9), ThrottleEdge::Leading, spawner) .take(5) .subscribe(move |v| println!("{}", v)); local_scheduler.run();
fn distinct(self) -> DistinctOp<Self>
[src]
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
fn zip<U>(self, other: U) -> ZipOp<Self, U> where
U: Observable,
[src]
U: Observable,
‘Zips up’ two observable into a single observable of pairs.
zip() returns a new observable that will emit over two other observables, returning a tuple where the first element comes from the first observable, and the second element comes from the second observable.
In other words, it zips two observables together, into a single one.
fn default_if_empty(self, default_value: Self::Item) -> DefaultIfEmptyOp<Self>
[src]
Emits default value if Observable completed with empty result
#Example
use rxrust::prelude::*; observable::empty() .default_if_empty(5) .subscribe(|v| println!("{}", v)); // Prints: // 5
Implementors
impl<'a, Item, Err> Observable for LocalBoxOp<'a, Item, Err>
[src]
impl<'a, Item, Err> Observable for LocalBoxOp<'a, Item, Err>
[src]impl<'a, Item, Err> Observable for LocalCloneBoxOp<'a, Item, Err>
[src]
impl<'a, Item, Err> Observable for LocalCloneBoxOp<'a, Item, Err>
[src]impl<'a, Item, Err> Observable for LocalSubject<'a, Item, Err>
[src]
impl<'a, Item, Err> Observable for LocalSubject<'a, Item, Err>
[src]impl<'a, Item, Err, S> Observable for LocalRefCount<'a, S, Item, Err> where
S: LocalObservable<'a, Item = Item, Err = Err>,
[src]
impl<'a, Item, Err, S> Observable for LocalRefCount<'a, S, Item, Err> where
S: LocalObservable<'a, Item = Item, Err = Err>,
[src]impl<'a, Item, S, F> Observable for FilterMapOp<S, F> where
S: Observable,
F: FnMut(S::Item) -> Option<Item>,
[src]
impl<'a, Item, S, F> Observable for FilterMapOp<S, F> where
S: Observable,
F: FnMut(S::Item) -> Option<Item>,
[src]impl<A, B> Observable for ZipOp<A, B> where
A: Observable,
B: Observable<Err = A::Err>,
[src]
impl<A, B> Observable for ZipOp<A, B> where
A: Observable,
B: Observable<Err = A::Err>,
[src]impl<Emit> Observable for ObservableBase<Emit> where
Emit: Emitter,
[src]
impl<Emit> Observable for ObservableBase<Emit> where
Emit: Emitter,
[src]impl<Item, Err> Observable for SharedBoxOp<Item, Err>
[src]
impl<Item, Err> Observable for SharedBoxOp<Item, Err>
[src]impl<Item, Err> Observable for SharedCloneBoxOp<Item, Err>
[src]
impl<Item, Err> Observable for SharedCloneBoxOp<Item, Err>
[src]impl<Item, Err> Observable for SharedSubject<Item, Err>
[src]
impl<Item, Err> Observable for SharedSubject<Item, Err>
[src]impl<Item, Err, S> Observable for SharedRefCount<S, Item, Err> where
S: SharedObservable<Item = Item, Err = Err>,
[src]
impl<Item, Err, S> Observable for SharedRefCount<S, Item, Err> where
S: SharedObservable<Item = Item, Err = Err>,
[src]impl<Item, S> Observable for LastOp<S, Item> where
S: Observable<Item = Item>,
[src]
impl<Item, S> Observable for LastOp<S, Item> where
S: Observable<Item = Item>,
[src]impl<Item, S, M> Observable for MapOp<S, M> where
S: Observable,
M: FnMut(S::Item) -> Item,
[src]
impl<Item, S, M> Observable for MapOp<S, M> where
S: Observable,
M: FnMut(S::Item) -> Item,
[src]impl<Outer, Inner, Item, Err> Observable for FlattenOp<Outer, Inner> where
Outer: Observable<Item = Inner, Err = Err>,
Inner: Observable<Item = Item, Err = Err>,
[src]
impl<Outer, Inner, Item, Err> Observable for FlattenOp<Outer, Inner> where
Outer: Observable<Item = Inner, Err = Err>,
Inner: Observable<Item = Item, Err = Err>,
[src]impl<OutputItem, Source, BinaryOp> Observable for ScanOp<Source, BinaryOp, OutputItem> where
Source: Observable,
OutputItem: Clone,
BinaryOp: FnMut(OutputItem, Source::Item) -> OutputItem,
[src]
impl<OutputItem, Source, BinaryOp> Observable for ScanOp<Source, BinaryOp, OutputItem> where
Source: Observable,
OutputItem: Clone,
BinaryOp: FnMut(OutputItem, Source::Item) -> OutputItem,
[src]impl<S1, S2> Observable for MergeOp<S1, S2> where
S1: Observable,
S2: Observable<Item = S1::Item, Err = S1::Err>,
[src]
impl<S1, S2> Observable for MergeOp<S1, S2> where
S1: Observable,
S2: Observable<Item = S1::Item, Err = S1::Err>,
[src]impl<S> Observable for DefaultIfEmptyOp<S> where
S: Observable,
[src]
impl<S> Observable for DefaultIfEmptyOp<S> where
S: Observable,
[src]impl<S> Observable for DistinctOp<S> where
S: Observable,
[src]
impl<S> Observable for DistinctOp<S> where
S: Observable,
[src]impl<S> Observable for SkipOp<S> where
S: Observable,
[src]
impl<S> Observable for SkipOp<S> where
S: Observable,
[src]impl<S> Observable for SkipLastOp<S> where
S: Observable,
[src]
impl<S> Observable for SkipLastOp<S> where
S: Observable,
[src]impl<S> Observable for TakeOp<S> where
S: Observable,
[src]
impl<S> Observable for TakeOp<S> where
S: Observable,
[src]impl<S> Observable for TakeLastOp<S> where
S: Observable,
[src]
impl<S> Observable for TakeLastOp<S> where
S: Observable,
[src]impl<S> Observable for Shared<S> where
S: Observable,
[src]
impl<S> Observable for Shared<S> where
S: Observable,
[src]impl<S, B> Observable for MapToOp<S, B> where
S: Observable,
[src]
impl<S, B> Observable for MapToOp<S, B> where
S: Observable,
[src]impl<S, F> Observable for FilterOp<S, F> where
S: Observable,
F: FnMut(&S::Item) -> bool,
[src]
impl<S, F> Observable for FilterOp<S, F> where
S: Observable,
F: FnMut(&S::Item) -> bool,
[src]impl<S, F> Observable for FinalizeOp<S, F> where
S: Observable,
F: FnMut(),
[src]
impl<S, F> Observable for FinalizeOp<S, F> where
S: Observable,
F: FnMut(),
[src]impl<S, F> Observable for SkipWhileOp<S, F> where
S: Observable,
F: FnMut(&S::Item) -> bool,
[src]
impl<S, F> Observable for SkipWhileOp<S, F> where
S: Observable,
F: FnMut(&S::Item) -> bool,
[src]impl<S, F> Observable for TakeWhileOp<S, F> where
S: Observable,
F: FnMut(&S::Item) -> bool,
[src]
impl<S, F> Observable for TakeWhileOp<S, F> where
S: Observable,
F: FnMut(&S::Item) -> bool,
[src]impl<S, Item> Observable for ContainsOp<S, Item> where
S: Observable<Item = Item>,
[src]
impl<S, Item> Observable for ContainsOp<S, Item> where
S: Observable<Item = Item>,
[src]impl<S, N> Observable for TakeUntilOp<S, N> where
S: Observable,
[src]
impl<S, N> Observable for TakeUntilOp<S, N> where
S: Observable,
[src]impl<S, SD> Observable for DebounceOp<S, SD> where
S: Observable,
[src]
impl<S, SD> Observable for DebounceOp<S, SD> where
S: Observable,
[src]impl<S, SD> Observable for DelayOp<S, SD> where
S: Observable,
[src]
impl<S, SD> Observable for DelayOp<S, SD> where
S: Observable,
[src]impl<S, SD> Observable for ObserveOnOp<S, SD> where
S: Observable,
[src]
impl<S, SD> Observable for ObserveOnOp<S, SD> where
S: Observable,
[src]impl<S, SD> Observable for SubscribeOnOP<S, SD> where
S: Observable,
[src]
impl<S, SD> Observable for SubscribeOnOP<S, SD> where
S: Observable,
[src]impl<S, SD> Observable for ThrottleTimeOp<S, SD> where
S: Observable,
[src]
impl<S, SD> Observable for ThrottleTimeOp<S, SD> where
S: Observable,
[src]impl<Source, Sampling> Observable for SampleOp<Source, Sampling> where
Source: Observable,
Sampling: Observable,
[src]
impl<Source, Sampling> Observable for SampleOp<Source, Sampling> where
Source: Observable,
Sampling: Observable,
[src]impl<Source, Subject> Observable for ConnectableObservable<Source, Subject> where
Source: Observable,
[src]
impl<Source, Subject> Observable for ConnectableObservable<Source, Subject> where
Source: Observable,
[src]