Struct rxr::observable::Observable
source · pub struct Observable<T> { /* private fields */ }
Expand description
The Observable
struct represents a source of values that can be observed
and transformed.
This struct serves as the foundation for creating, transforming, and working with observables. It provides methods for applying operators, subscribing to emitted values, and creating new observables.
§Example: basic synchronous Observable
This simple Observable
emits values and completes. It returns an empty Subscription
,
making it unable to be unsubscribed from. Some operators like take
, switch_map
,
merge_map
, concat_map
, and exhaust_map
require unsubscribe functionality to
work correctly.
Additionally, this is a synchronous Observable
, so it blocks the current thread
until it completes emission.
use rxr::subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic};
use rxr::{Observable, Observer, Subscribeable};
// Create a custom observable that emits values from 1 to 10.
let mut emit_10_observable = Observable::new(|mut subscriber| {
let mut i = 1;
while i <= 10 {
// Emit the value to the subscriber.
subscriber.next(i);
i += 1;
}
// Signal completion to the subscriber.
subscriber.complete();
// Return the empty subscription.
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable blocks until completion since it doesn't use async or
// threads. If you comment out the line below, no emissions will occur
// because observables are cold.
emit_10_observable.subscribe(observer);
println!("Custom Observable finished emmiting")
§Example: basic asynchronous Observable
Emits values and completes, returning an empty Subscription
, making it unable
to be unsubscribed from. Some operators like take
, switch_map
, merge_map
,
concat_map
, and exhaust_map
require unsubscribe functionality to work correctly.
Utilizes an OS thread for asynchronous processing, preventing it from blocking the current thread.
use std::time::Duration;
use rxr::{
subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
Observable, ObservableExt, Observer, Subscribeable,
};
// Create a custom observable that emits values from 1 to 10 in separate thread.
let observable = Observable::new(|mut o| {
// Launch a new thread for the Observable's processing and store its handle.
let join_handle = std::thread::spawn(move || {
for i in 0..=15 {
// Emit the value to the subscriber.
o.next(i);
// Important. Put an await point after each emit or after some emits.
// This allows the `take()` operator to function properly.
// Not required in this example.
std::thread::sleep(Duration::from_millis(1));
}
// Signal completion to the subscriber.
o.complete();
});
// Return the subscription.
Subscription::new(
// In this example, we omit the unsubscribe functionality. Without it, we
// can't unsubscribe, which prevents the `take()` operator, as well as
// higher-order operators like `switch_map`, `merge_map`, `concat_map`,
// and `exhaust_map`, from functioning as expected.
UnsubscribeLogic::Nil,
// Store the `JoinHandle` to enable waiting functionality using the
// `Subscription` for this Observable thread to complete.
SubscriptionHandle::JoinThread(join_handle),
)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
.filter(|&v| v <= 10)
.map(|v| format!("Mapped {}", v))
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Because the subscription creates a new thread, we can utilize the `Subscription`
// to wait for its completion. This ensures that the main thread won't terminate
// prematurely and stop all child threads.
if subscription.join().is_err() {
// Handle error
}
println!("Custom Observable finished emmiting")
§Example: asynchronous Observable
with unsubscribe
Emits values and completes, returning a Subscription
that can be unsubscribed
from, enabling all operators to function correctly. Utilizes an OS thread for
asynchronous processing, preventing it from blocking the current thread.
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use rxr::{
subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic, Unsubscribeable},
Observable, ObservableExt, Observer, Subscribeable,
};
const UNSUBSCRIBE_SIGNAL: bool = true;
// Create a custom observable that emits values in a separate thread.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
// Spawn a new thread to await a signal sent from the unsubscribe logic.
std::thread::spawn(move || {
// Attempt to receive a signal sent from the unsubscribe logic.
if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
// Update the `done_c` mutex with the received signal.
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
// Launch a new thread for the Observable's processing and store its handle.
let join_handle = std::thread::spawn(move || {
for i in 0..=10000 {
// If an unsubscribe signal is received, exit the loop and stop emissions.
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
// Emit the value to the subscriber.
o.next(i);
// Important. Put an await point after each emit or after some emits.
// This allows the `take()` operator to function properly.
std::thread::sleep(Duration::from_millis(1));
}
// Signal completion to the subscriber.
o.complete();
});
// Return a new `Subscription` with custom unsubscribe logic.
Subscription::new(
// The provided closure defines the behavior of the subscription when it
// is unsubscribed. In this case, it sends a signal to an asynchronous
// observable to stop emitting values.
UnsubscribeLogic::Logic(Box::new(move || {
if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
println!("Receiver dropped.");
}
})),
// Store the `JoinHandle` for awaiting completion using the `Subscription`.
SubscriptionHandle::JoinThread(join_handle),
)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
// `take` utilizes our unsubscribe function to stop background emissions
// after a specified item count.
.take(500)
.map(|v| format!("Mapped {}", v))
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Unsubscribe from the observable to stop emissions.
subscription.unsubscribe();
// Allow some time for the main thread to confirm that the observable indeed
// isn't emitting.
std::thread::sleep(Duration::from_millis(2000));
println!("`main` function done")
§Example: asynchronous Observable
with Tokio
Emits values and completes, returning a Subscription
that can be unsubscribed
from, enabling all operators to function correctly. Utilizes Tokio
tasks for
asynchronous processing, preventing it from blocking the current thread.
use std::sync::{Arc, Mutex};
use rxr::{
subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
Observable, ObservableExt, Observer, Subscribeable,
};
use tokio::{task, time, sync::mpsc::channel};
const UNSUBSCRIBE_SIGNAL: bool = true;
#[tokio::main()]
async fn main() {
// Create a custom observable that emits values in a separate task.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
// Spawn a new Tokio task to await a signal sent from the unsubscribe logic.
task::spawn(async move {
// Attempt to receive a signal sent from the unsubscribe logic.
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
// Update the `done_c` mutex with the received signal.
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
// Launch a new Tokio task for the Observable's processing and store its handle.
let join_handle = task::spawn(async move {
for i in 0..=10000 {
// If an unsubscribe signal is received, exit the loop and stop emissions.
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
// Emit the value to the subscriber.
o.next(i);
// Important. Put an await point after each emit or after some emits.
// This allows the `take()` operator to function properly.
time::sleep(time::Duration::from_millis(1)).await;
}
// Signal completion to the subscriber.
o.complete();
});
// Return a new `Subscription` with custom unsubscribe logic.
Subscription::new(
// The provided closure defines the behavior of the subscription when it
// is unsubscribed. In this case, it sends a signal to an asynchronous
// observable to stop emitting values. If your closure requires Tokio
// tasks or channels to send unsubscribe signals, use `UnsubscribeLogic::Future`.
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
// Store the `JoinHandle` for awaiting completion using the `Subscription`.
SubscriptionHandle::JoinTask(join_handle),
)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable uses Tokio tasks so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
// `take` utilizes our unsubscribe function to stop background emissions
// after a specified item count.
.take(15)
.map(|v| format!("Mapped {}", v))
.delay(1000)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for the subscription to either complete as a Tokio task or join an OS thread.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
§Example: Observable
with error handling
Waits for user input and emits both a value and a completion signal upon success.
In case of any errors, it signals them to the attached Observer
.
Ensure errors are wrapped in an Arc
before passing them to the Observer’s
error
function.
use std::{error::Error, fmt::Display, io, sync::Arc};
use rxr::{subscribe::*, Observable, Observer, Subscribeable};
#[derive(Debug)]
struct MyErr(i32);
impl Display for MyErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "number should be less than 100, you entered {}", self.0)
}
}
impl Error for MyErr {}
// Creates an `Observable<i32>` that processes user input and emits or signals errors.
pub fn get_less_than_100() -> Observable<i32> {
Observable::new(|mut observer| {
let mut input = String::new();
println!("Please enter an integer (less than 100):");
if let Err(e) = io::stdin().read_line(&mut input) {
// Send input error to the observer.
observer.error(Arc::new(e));
return Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil);
}
match input.trim().parse::<i32>() {
Err(e) => {
// Send parsing error to the observer.
observer.error(Arc::new(e));
}
Ok(num) if num > 100 => {
// Send custom error to the observer.
observer.error(Arc::new(MyErr(num)))
}
Ok(num) => {
// Emit the parsed value to the observer.
observer.next(num);
}
}
// Signal completion if there are no errors.
// Note: `complete` does not affect the outcome if `error` was called before it.
observer.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
})
}
let observer = Subscriber::new(
|input| println!("You entered: {}", input),
|e| eprintln!("{}", e),
|| println!("User input handled"),
);
let mut observable = get_less_than_100();
observable.subscribe(observer);
Implementations§
source§impl<T> Observable<T>
impl<T> Observable<T>
sourcepub fn new(
sf: impl FnMut(Subscriber<T>) -> Subscription + Send + Sync + 'static
) -> Self
pub fn new( sf: impl FnMut(Subscriber<T>) -> Subscription + Send + Sync + 'static ) -> Self
Creates a new Observable
with the provided subscribe function.
This method allows you to define custom behavior for the Observable
by
providing a subscribe function (sf
), a closure that defines the behavior of
the Observable
when subscribed. When the Observable
is subscribed to, the
sf
function is invoked to manage the delivery of values to the Subscriber
.
It should also return a Subscription
that enables unsubscribing and can be
used for awaiting Tokio
tasks or joining OS threads when the Observable
is asynchronous.
Examples found in repository?
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
fn get_response_observable() -> Observable<&'static str> {
Observable::new(|mut o| {
task::spawn(async move {
o.next("response");
time::sleep(time::Duration::from_millis(1)).await;
o.complete();
});
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
})
}
#[tokio::main]
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
observer.on_complete(|| println!("Completed"));
// Accumulate response strings into a single string.
// The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
// In this example, `total` is of type `String`, and `n` is of type `&str`.
let subscription = observable
.take(6)
.delay(100)
.merge_map(|_| get_response_observable())
.scan(|total, n| format!("{} {}", total, n), None)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
More examples
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
fn main() {
// Create a custom observable that emits values from 1 to 10.
let mut emit_10_observable = Observable::new(|mut subscriber| {
let mut i = 1;
while i <= 10 {
// Emit the value to the subscriber.
subscriber.next(i);
i += 1;
}
// Signal completion to the subscriber.
subscriber.complete();
// Return the empty subscription.
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable does not use async or threads so it will block until it is done.
// Observables are cold so if you comment out the line bellow nothing will be emitted.
emit_10_observable.subscribe(observer);
println!("Custom Observable finished emmiting")
}
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
pub fn get_less_than_100() -> Observable<i32> {
Observable::new(|mut observer| {
let mut input = String::new();
println!("Please enter an integer (less than 100):");
if let Err(e) = io::stdin().read_line(&mut input) {
// Send input error to the observer.
observer.error(Arc::new(e));
return Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil);
}
match input.trim().parse::<i32>() {
Err(e) => {
// Send parsing error to the observer.
observer.error(Arc::new(e));
}
Ok(num) if num > 100 => {
// Send custom error to the observer.
observer.error(Arc::new(MyErr(num)))
}
Ok(num) => {
// Emit the parsed value to the observer.
observer.next(num);
}
}
// Signal completion if there are no errors.
// Note: `complete` does not affect the outcome if `error` was called before it.
observer.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
})
}
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
pub fn main() {
// Make an Observable.
let mut observable = Observable::new(move |mut o: Subscriber<_>| {
for i in 0..10 + 1 {
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Initialize a `Subject` and obtain its emitter and receiver.
let (emitter, mut receiver) = Subject::emitter_receiver();
// Register `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
// Register `Subscriber` 2.
receiver
// We're cloning the receiver so we can use it again.
// Shallow clone: clones only the pointer to the `Subject`.
.clone()
.take(7) // For performance, prioritize placing `take()` as the first operator.
.delay(1000)
.map(|v| format!("mapped {}", v))
.subscribe(create_subscriber(2));
// Register `Subscriber` 3.
receiver
.filter(|v| v % 2 == 0)
.map(|v| format!("filtered {}", v))
.subscribe(create_subscriber(3));
// Convert the emitter into an observer and subscribe it to the observable.
observable.subscribe(emitter.into());
}
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Capture and process only the last 8 values.
let subscription = observable.take_last(8).subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Emit values only when they are less than or equal to 40.
let subscription = observable.take_while(|v| v <= &40).subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcepub fn empty() -> Self
pub fn empty() -> Self
Creates an empty observable.
The resulting observable does not emit any values and immediately completes upon subscription. It serves as a placeholder or a base case for some observable operations.
sourcepub fn fuse(self) -> Self
pub fn fuse(self) -> Self
Fuse the observable, allowing it to complete at most once.
If complete()
is called on a fused observable, any subsequent emissions
will have no effect. This ensures that the observable is closed after the
first completion call.
By default, observables are not fused, allowing them to emit values even
after calling complete()
and permitting multiple calls to complete()
.
When an observable emits an error, it is considered closed and will no longer
emit any further values, regardless of being fused or not.
§Notes
fuse()
does not unsubscribe ongoing emissions from the observable;
it simply ignores them after the first complete()
call, ensuring that no
more values are emitted.
sourcepub fn defuse(self) -> Self
pub fn defuse(self) -> Self
Defuse the observable, allowing it to complete and emit values after calling
complete()
.
Observables are defused by default, enabling them to emit values even after
completion and allowing multiple calls to complete()
. Calling defuse()
is
not necessary unless the observable has been previously fused using
fuse()
. Once an observable is defused, it can emit values
and call complete()
multiple times on its observers.
§Notes
Defusing an observable does not allow it to emit an error after the first error emission. Once an error is emitted, the observable is considered closed and will not emit any further values, regardless of being defused or not.
Trait Implementations§
source§impl<T: Clone> Clone for Observable<T>
impl<T: Clone> Clone for Observable<T>
source§fn clone(&self) -> Observable<T>
fn clone(&self) -> Observable<T>
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl<T: Clone + Send + Sync + 'static> From<AsyncSubjectReceiver<T>> for Observable<T>
impl<T: Clone + Send + Sync + 'static> From<AsyncSubjectReceiver<T>> for Observable<T>
source§fn from(value: AsyncSubjectReceiver<T>) -> Self
fn from(value: AsyncSubjectReceiver<T>) -> Self
source§impl<T: Clone + Send + Sync + 'static> From<BehaviorSubjectReceiver<T>> for Observable<T>
impl<T: Clone + Send + Sync + 'static> From<BehaviorSubjectReceiver<T>> for Observable<T>
source§fn from(value: BehaviorSubjectReceiver<T>) -> Self
fn from(value: BehaviorSubjectReceiver<T>) -> Self
source§impl<T: Clone + Send + Sync + 'static> From<ReplaySubjectReceiver<T>> for Observable<T>
impl<T: Clone + Send + Sync + 'static> From<ReplaySubjectReceiver<T>> for Observable<T>
source§fn from(value: ReplaySubjectReceiver<T>) -> Self
fn from(value: ReplaySubjectReceiver<T>) -> Self
source§impl<T: Clone + Send + Sync + 'static> From<SubjectReceiver<T>> for Observable<T>
impl<T: Clone + Send + Sync + 'static> From<SubjectReceiver<T>> for Observable<T>
source§fn from(value: SubjectReceiver<T>) -> Self
fn from(value: SubjectReceiver<T>) -> Self
source§impl<T: 'static> Subscribeable for Observable<T>
impl<T: 'static> Subscribeable for Observable<T>
source§fn subscribe(&mut self, v: Subscriber<Self::ObsType>) -> Subscription
fn subscribe(&mut self, v: Subscriber<Self::ObsType>) -> Subscription
source§fn is_subject(&self) -> bool
fn is_subject(&self) -> bool
Subject
. Read moreAuto Trait Implementations§
impl<T> Freeze for Observable<T>
impl<T> RefUnwindSafe for Observable<T>
impl<T> Send for Observable<T>
impl<T> Sync for Observable<T>
impl<T> Unpin for Observable<T>
impl<T> UnwindSafe for Observable<T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<O, T> ObservableExt<T> for Owhere
T: 'static,
O: Subscribeable<ObsType = T>,
impl<O, T> ObservableExt<T> for Owhere
T: 'static,
O: Subscribeable<ObsType = T>,
source§fn map<U, F>(self, f: F) -> Observable<U>
fn map<U, F>(self, f: F) -> Observable<U>
source§fn filter<P>(self, predicate: P) -> Observable<T>
fn filter<P>(self, predicate: P) -> Observable<T>
source§fn skip(self, n: usize) -> Observable<T>
fn skip(self, n: usize) -> Observable<T>
n
items emitted by the observable and then emits the rest. Read moresource§fn delay(self, num_of_ms: u64) -> Observable<T>
fn delay(self, num_of_ms: u64) -> Observable<T>
source§fn scan<U, F>(self, acc: F, seed: Option<U>) -> Observable<U>
fn scan<U, F>(self, acc: F, seed: Option<U>) -> Observable<U>
source§fn connectable(self) -> Connectable<T>
fn connectable(self) -> Connectable<T>
source§fn first<F>(self, predicate: F, default_value: Option<T>) -> Observable<T>
fn first<F>(self, predicate: F, default_value: Option<T>) -> Observable<T>
predicate
, optionally applying a default value if no items match
the predicate
. Read moresource§fn zip(self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>>
fn zip(self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>>
source§fn take(self, n: usize) -> Observable<T>
fn take(self, n: usize) -> Observable<T>
n
items emitted by the observable, then
unsubscribes. Read moresource§fn take_until<U: 'static>(
self,
notifier: Observable<U>,
unsubscribe_notifier: bool
) -> Observable<T>
fn take_until<U: 'static>( self, notifier: Observable<U>, unsubscribe_notifier: bool ) -> Observable<T>
notifier
observable. Read moresource§fn take_while<P>(self, predicate: P) -> Observable<T>
fn take_while<P>(self, predicate: P) -> Observable<T>
predicate
criteria. The operation concludes immediately
upon encountering the first value that doesn’t satisfy the predicate
. Read moresource§fn take_last(self, count: usize) -> Observable<T>
fn take_last(self, count: usize) -> Observable<T>
count
values
emitted by the source observable. Read moresource§fn tap(self, observer: Subscriber<T>) -> Observable<T>
fn tap(self, observer: Subscriber<T>) -> Observable<T>
tap
operator allows you to intercept the items emitted by an observable
and perform side effects on those items without modifying the emitted data or
the stream itself. Read moresource§fn merge(self, sources: Vec<Observable<T>>) -> Observable<T>
fn merge(self, sources: Vec<Observable<T>>) -> Observable<T>
source§fn merge_one(self, source: Observable<T>) -> Observable<T>
fn merge_one(self, source: Observable<T>) -> Observable<T>
source§fn switch_map<R: 'static, F>(self, project: F) -> Observable<R>
fn switch_map<R: 'static, F>(self, project: F) -> Observable<R>
map
, but switches to a new inner
observable whenever a new item is emitted. Read more