Trait rxr::observable::ObservableExt
source · pub trait ObservableExt<T: 'static>: Subscribeable<ObsType = T> {
Show 19 methods
// Provided methods
fn map<U, F>(self, f: F) -> Observable<U>
where Self: Sized + Send + Sync + 'static,
F: FnOnce(T) -> U + Copy + Sync + Send + 'static,
U: 'static { ... }
fn filter<P>(self, predicate: P) -> Observable<T>
where Self: Sized + Send + Sync + 'static,
P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static { ... }
fn skip(self, n: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static { ... }
fn delay(self, num_of_ms: u64) -> Observable<T>
where Self: Sized + Send + Sync + 'static { ... }
fn scan<U, F>(self, acc: F, seed: Option<U>) -> Observable<U>
where Self: Sized + Send + Sync + 'static,
F: FnOnce(U, T) -> U + Copy + Sync + Send + 'static,
U: From<T> + Clone + Send + Sync + 'static { ... }
fn connectable(self) -> Connectable<T>
where Self: Send + Sync + Sized + 'static,
T: Send + Sync + Clone { ... }
fn first<F>(self, predicate: F, default_value: Option<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static,
F: FnOnce(T, usize) -> bool + Copy + Send + Sync + 'static,
T: Clone + Send + Sync { ... }
fn zip(self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>>
where Self: Clone + Sized + Send + Sync + 'static,
T: Clone + Send { ... }
fn take(self, n: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static { ... }
fn take_until<U: 'static>(
self,
notifier: Observable<U>,
unsubscribe_notifier: bool
) -> Observable<T>
where Self: Sized + Send + Sync + 'static { ... }
fn take_while<P>(self, predicate: P) -> Observable<T>
where Self: Sized + Send + Sync + 'static,
P: FnOnce(&T) -> bool + Copy + Sync + Send + 'static { ... }
fn take_last(self, count: usize) -> Observable<T>
where Self: Sized + Send + Sync + 'static,
T: Send { ... }
fn tap(self, observer: Subscriber<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static,
T: Clone { ... }
fn merge(self, sources: Vec<Observable<T>>) -> Observable<T>
where Self: Sized + Send + Sync + 'static { ... }
fn merge_one(self, source: Observable<T>) -> Observable<T>
where Self: Sized + Send + Sync + 'static { ... }
fn switch_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static,
F: FnMut(T) -> Observable<R> + Sync + Send + 'static { ... }
fn merge_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static,
F: FnMut(T) -> Observable<R> + Sync + Send + 'static { ... }
fn concat_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static,
F: FnMut(T) -> Observable<R> + Sync + Send + 'static { ... }
fn exhaust_map<R: 'static, F>(self, project: F) -> Observable<R>
where Self: Sized + Send + Sync + 'static,
F: FnMut(T) -> Observable<R> + Sync + Send + 'static { ... }
}
Expand description
The ObservableExt
trait provides a set of extension methods that can be applied
to observables to transform and manipulate their behavior.
This trait enhances the capabilities of the Observable
struct by allowing users
to chain operators together, creating powerful reactive pipelines.
Provided Methods§
sourcefn map<U, F>(self, f: F) -> Observable<U>
fn map<U, F>(self, f: F) -> Observable<U>
Transforms the items emitted by the observable using a transformation function.
The transformation function f
is applied to each item emitted by the
observable, and the resulting value is emitted by the resulting observable.
Examples found in repository?
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
pub fn main() {
// Make an Observable.
let mut observable = Observable::new(move |mut o: Subscriber<_>| {
for i in 0..10 + 1 {
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Initialize a `Subject` and obtain its emitter and receiver.
let (emitter, mut receiver) = Subject::emitter_receiver();
// Register `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
// Register `Subscriber` 2.
receiver
// We're cloning the receiver so we can use it again.
// Shallow clone: clones only the pointer to the `Subject`.
.clone()
.take(7) // For performance, prioritize placing `take()` as the first operator.
.delay(1000)
.map(|v| format!("mapped {}", v))
.subscribe(create_subscriber(2));
// Register `Subscriber` 3.
receiver
.filter(|v| v % 2 == 0)
.map(|v| format!("filtered {}", v))
.subscribe(create_subscriber(3));
// Convert the emitter into an observer and subscribe it to the observable.
observable.subscribe(emitter.into());
}
More examples
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
pub fn main() {
// Initialize a `Subject` and obtain its emitter and receiver.
let (mut emitter, mut receiver) = Subject::emitter_receiver();
// Registers `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Emits 101 to registered `Subscriber` 1.
emitter.next(102); // Emits 102 to registered `Subscriber` 1.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2.
receiver
.clone() // Shallow clone: clones only the pointer to the `Subject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|_| eprintln!("Error"),
|| println!("Completed 2"),
));
// Registers `Subscriber` 3.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.
emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.
// Subscriber 4: post-completion subscribe, completes immediately.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
pub fn main() {
// Initialize a `AsyncSubject` and obtain its emitter and receiver.
let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
// Registers `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Stores 101 ast the latest value.
emitter.next(102); // Latest value is now 102.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2.
receiver
.clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|_| eprintln!("Error"),
|| println!("Completed 2"),
));
// Registers `Subscriber` 3.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Latest value is now 103.
// Emits latest value (103) to registered `Subscriber`'s 1, 2 and 3 and calls
// `complete` on each of them.
emitter.complete();
// Subscriber 4: post-completion subscribe, emits latest value (103) and completes.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
pub fn main() {
// Initialize a `AsyncSubject` and obtain its emitter and receiver.
let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
// Registers `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Stores 101 ast the latest value.
emitter.next(102); // Latest value is now 102.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2.
receiver
.clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|e| eprintln!("Error: {} 2", e),
|| println!("Completed"),
));
// Registers `Subscriber` 3.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Latest value is now 103.
// Calls `error` on registered `Subscriber`'s 1, 2 and 3.
emitter.error(Arc::new(AsyncSubjectError(
"AsyncSubject error".to_string(),
)));
// Subscriber 4: subscribed after subject's error call; emits error and
// does not emit further.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
pub fn main() {
// Initialize a `BehaviorSubject` with an initial value and obtain
// its emitter and receiver.
let (mut emitter, mut receiver) = BehaviorSubject::emitter_receiver(100);
// Registers `Subscriber` 1 and emits the default value 100 to it.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Emits 101 to registered `Subscriber` 1.
emitter.next(102); // Emits 102 to registered `Subscriber` 1.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2 and emits (now the default) value 102 to it.
receiver
.clone() // Shallow clone: clones only the pointer to the `BehaviorSubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|_| eprintln!("Error"),
|| println!("Completed 2"),
));
// Registers `Subscriber` 3 and emits (now the default) value 102 to it.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Emits 103 to registered `Subscriber`'s 1, 2 and 3.
emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.
// Subscriber 4: post-completion subscribe, completes immediately.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
pub fn main() {
// Initialize a `ReplaySubject` with an unbounded buffer size and obtain
// its emitter and receiver.
let (mut emitter, mut receiver) = ReplaySubject::emitter_receiver(BufSize::Unbounded);
// Registers `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
emitter.next(101); // Stores 101 and emits it to registered `Subscriber` 1.
emitter.next(102); // Stores 102 and emits it to registered `Subscriber` 1.
// All Observable operators can be applied to the receiver.
// Registers mapped `Subscriber` 2 and emits buffered values (101, 102) to it.
receiver
.clone() // Shallow clone: clones only the pointer to the `ReplaySubject` object.
.map(|v| format!("mapped {}", v))
.subscribe(Subscriber::new(
move |v| println!("Subscriber #2 emitted: {}", v),
|_| eprintln!("Error"),
|| println!("Completed 2"),
));
// Registers `Subscriber` 3 and emits buffered values (101, 102) to it.
receiver.subscribe(create_subscriber(3));
emitter.next(103); // Stores 103 and emits it to registered `Subscriber`'s 1, 2 and 3.
emitter.complete(); // Calls `complete` on registered `Subscriber`'s 1, 2 and 3.
// Subscriber 4: post-completion subscribe, emits buffered values (101, 102, 103)
// and completes.
receiver.subscribe(create_subscriber(4));
emitter.next(104); // Called post-completion, does not emit.
}
sourcefn filter<P>(self, predicate: P) -> Observable<T>
fn filter<P>(self, predicate: P) -> Observable<T>
Filters the items emitted by the observable based on a predicate function.
Only items for which the predicate function returns true
will be emitted
by the resulting observable.
Examples found in repository?
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
pub fn main() {
// Make an Observable.
let mut observable = Observable::new(move |mut o: Subscriber<_>| {
for i in 0..10 + 1 {
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Initialize a `Subject` and obtain its emitter and receiver.
let (emitter, mut receiver) = Subject::emitter_receiver();
// Register `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
// Register `Subscriber` 2.
receiver
// We're cloning the receiver so we can use it again.
// Shallow clone: clones only the pointer to the `Subject`.
.clone()
.take(7) // For performance, prioritize placing `take()` as the first operator.
.delay(1000)
.map(|v| format!("mapped {}", v))
.subscribe(create_subscriber(2));
// Register `Subscriber` 3.
receiver
.filter(|v| v % 2 == 0)
.map(|v| format!("filtered {}", v))
.subscribe(create_subscriber(3));
// Convert the emitter into an observer and subscribe it to the observable.
observable.subscribe(emitter.into());
}
More examples
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
fn main() {
// Create a custom observable that emits values in a separate thread.
let observable = Observable::new(|mut o| {
// Launch a new thread for the Observable's processing and store its handle.
let join_handle = std::thread::spawn(move || {
for i in 0..=15 {
// Emit the value to the subscriber.
o.next(i);
// Important. Put an await point after each emit or after some emits.
// This allows the `take()` operator to function properly.
// Not required in this example.
std::thread::sleep(Duration::from_millis(1));
}
// Signal completion to the subscriber.
o.complete();
});
// Return the subscription.
Subscription::new(
// In this example, we omit the unsubscribe functionality. Without it, we
// can't unsubscribe, which prevents the `take()` operator, as well as
// higher-order operators like `switch_map`, `merge_map`, `concat_map`,
// and `exhaust_map`, from functioning as expected.
UnsubscribeLogic::Nil,
// Store the `JoinHandle` to enable waiting functionality using the
// `Subscription` for this Observable thread to complete.
SubscriptionHandle::JoinThread(join_handle),
)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
.filter(|&v| v <= 10)
.map(|v| format!("Mapped {}", v))
.subscribe(observer);
// Do something else here.
println!("Print something while Observable is emitting.");
// Because the subscription creates a new thread, we can utilize the `Subscription`
// to wait for its completion. This ensures that the main thread won't terminate
// prematurely and stop all child threads.
if subscription.join().is_err() {
// Handle error
}
println!("Custom Observable finished emmiting")
}
sourcefn skip(self, n: usize) -> Observable<T>
fn skip(self, n: usize) -> Observable<T>
Skips the first n
items emitted by the observable and then emits the rest.
If n
is greater than or equal to the total number of items, it behaves as
if the observable is complete and emits no items.
sourcefn delay(self, num_of_ms: u64) -> Observable<T>
fn delay(self, num_of_ms: u64) -> Observable<T>
Delays the emissions from the observable by the specified number of milliseconds.
The delay
operator introduces a time delay for emissions from the
observable, determined by the specified duration.
Examples found in repository?
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
pub fn main() {
// Make an Observable.
let mut observable = Observable::new(move |mut o: Subscriber<_>| {
for i in 0..10 + 1 {
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Initialize a `Subject` and obtain its emitter and receiver.
let (emitter, mut receiver) = Subject::emitter_receiver();
// Register `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
// Register `Subscriber` 2.
receiver
// We're cloning the receiver so we can use it again.
// Shallow clone: clones only the pointer to the `Subject`.
.clone()
.take(7) // For performance, prioritize placing `take()` as the first operator.
.delay(1000)
.map(|v| format!("mapped {}", v))
.subscribe(create_subscriber(2));
// Register `Subscriber` 3.
receiver
.filter(|v| v % 2 == 0)
.map(|v| format!("filtered {}", v))
.subscribe(create_subscriber(3));
// Convert the emitter into an observer and subscribe it to the observable.
observable.subscribe(emitter.into());
}
More examples
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
observer.on_complete(|| println!("Completed"));
// Accumulate response strings into a single string.
// The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
// In this example, `total` is of type `String`, and `n` is of type `&str`.
let subscription = observable
.take(6)
.delay(100)
.merge_map(|_| get_response_observable())
.scan(|total, n| format!("{} {}", total, n), None)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = std::thread::spawn(move || {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinThread(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Create notifier, it can be observable or one of the Subject variants.
let (mut emitter, receiver) = Subject::emitter_receiver();
// We can chain the Subject, here we use `delay()` to slow down the notifier so
// source observable has time to emmit some values. Note when we chain the
// notifier with operators we don't have to use `into()`. To continue using the
// receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
let subscription = observable
.take_until(receiver.delay(20), false)
.subscribe(observer);
// Allowing some time for the `take_until` function to register the notifier
// before emitting a signal. This step is unnecessary if you're not immediately
// sending a signal.
std::thread::sleep(Duration::from_millis(1));
// Send signal to stop source observable emitting values.
emitter.next(());
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join().is_err() {
// Handle error
}
println!("`main` function done")
}
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
async fn main() {
// Make a source observable.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..10 + 1 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
observer1.on_complete(|| println!("Observer 1 completed"));
let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
observer2.on_complete(|| println!("Observer 2 completed"));
// You can use other operators before calling `connectable()` operator.
let observable = observable.tap(Subscriber::on_next(|v| println!("... emitting {v}")));
// Make a `Connectable` observable from the source observable.
let connectable = observable.connectable();
// If you want to use other operators after calling `connectable()` operator you
// can do it by cloning first.
let mut connectable_chained = connectable.clone().map(|v| v + 10).delay(1000);
// Subscribe observers to chained `Connectable`.
connectable_chained.subscribe(observer1);
connectable_chained.subscribe(observer2);
// Connect `Connectable` to start emitting to all `Subscriber`'s.
// No emissions happen if `connect()` is not called.
let connected = connectable.connect();
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for `Connectable` observable to finish before exiting the program.
// You can also use `connected.unsubscribe();` to stop all emissions.
if connected.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
async fn main() {
// Create a custom observable that emits values in a separate task.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
// Spawn a new Tokio task to await a signal sent from the unsubscribe logic.
task::spawn(async move {
// Attempt to receive a signal sent from the unsubscribe logic.
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
// Update the `done_c` mutex with the received signal.
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
// Launch a new Tokio task for the Observable's processing and store its handle.
let join_handle = task::spawn(async move {
for i in 0..=10000 {
// If an unsubscribe signal is received, exit the loop and stop emissions.
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
// Emit the value to the subscriber.
o.next(i);
// Important. Put an await point after each emit or after some emits.
// This allows the `take()` operator to function properly.
time::sleep(time::Duration::from_millis(1)).await;
}
// Signal completion to the subscriber.
o.complete();
});
// Return a new `Subscription` with custom unsubscribe logic.
Subscription::new(
// The provided closure defines the behavior of the subscription when it
// is unsubscribed. In this case, it sends a signal to an asynchronous
// observable to stop emitting values. If your closure requires Tokio
// tasks or channels to send unsubscribe signals, consider using
// `UnsubscribeLogic::Future`.
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
// Store the `JoinHandle` for awaiting completion using the `Subscription`.
SubscriptionHandle::JoinTask(join_handle),
)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
// take utilizes our unsubscribe function to stop background emissions after
// a specified item count.
.take(15)
.map(|v| format!("Mapped {}", v))
.delay(1000)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for the subscription to either complete as a Tokio task or join an OS thread.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn scan<U, F>(self, acc: F, seed: Option<U>) -> Observable<U>
fn scan<U, F>(self, acc: F, seed: Option<U>) -> Observable<U>
Accumulates values emitted by an observable over time, producing an accumulated result based on an accumulator function applied to each emitted value.
The scan
operator applies an accumulator function over the values emitted by
the source observable. It accumulates values into a single accumulated result,
and each new value emitted by the source observable contributes to this
accumulation. The accumulated result is emitted by the resulting observable.
seed
is optional. If omitted, the first emitted value is used as the seed
.
Examples found in repository?
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
observer.on_complete(|| println!("Completed"));
// Accumulate response strings into a single string.
// The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
// In this example, `total` is of type `String`, and `n` is of type `&str`.
let subscription = observable
.take(6)
.delay(100)
.merge_map(|_| get_response_observable())
.scan(|total, n| format!("{} {}", total, n), None)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn connectable(self) -> Connectable<T>
fn connectable(self) -> Connectable<T>
Creates a connectable observable from the source observable.
This operator converts the source observable into a connectable observable, allowing multiple subscribers to connect to the same source without causing multiple subscriptions to the underlying source.
The actual emission of values from the source observable will occur only
after calling the connect()
method on the resulting Connectable
instance.
Examples found in repository?
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
async fn main() {
// Make a source observable.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..10 + 1 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
observer1.on_complete(|| println!("Observer 1 completed"));
let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
observer2.on_complete(|| println!("Observer 2 completed"));
// You can use other operators before calling `connectable()` operator.
let observable = observable.tap(Subscriber::on_next(|v| println!("... emitting {v}")));
// Make a `Connectable` observable from the source observable.
let connectable = observable.connectable();
// If you want to use other operators after calling `connectable()` operator you
// can do it by cloning first.
let mut connectable_chained = connectable.clone().map(|v| v + 10).delay(1000);
// Subscribe observers to chained `Connectable`.
connectable_chained.subscribe(observer1);
connectable_chained.subscribe(observer2);
// Connect `Connectable` to start emitting to all `Subscriber`'s.
// No emissions happen if `connect()` is not called.
let connected = connectable.connect();
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for `Connectable` observable to finish before exiting the program.
// You can also use `connected.unsubscribe();` to stop all emissions.
if connected.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
More examples
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
async fn main() {
// Make a source observable.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..10 + 1 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
observer1.on_complete(|| println!("Observer 1 completed"));
let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
observer2.on_complete(|| println!("Observer 2 completed"));
let mut observer3 = Subscriber::on_next(|v| println!("Observer 3 emitted {}", v));
observer3.on_complete(|| println!("Observer 3 completed"));
// Make a `Connectable` observable from the source observable.
let mut connectable = observable.connectable();
// Subscribe observers to `Connectable`.
connectable.subscribe(observer1);
connectable.subscribe(observer2);
// You can also obtain a `Subscription` to unsubscribe individual observers
// before calling `connect()`. In this case, `observer3` will be removed and will
// not receive multicast emissions.
let subscription3 = connectable.subscribe(observer3);
subscription3.unsubscribe();
// Connect `Connectable` to start emitting to all `Subscriber`'s.
// No emissions happen if `connect()` is not called.
let connected = connectable.connect();
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for `Connectable` observable to finish before exiting the program.
// You can also use `connected.unsubscribe();` to stop all emissions.
if connected.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn first<F>(self, predicate: F, default_value: Option<T>) -> Observable<T>
fn first<F>(self, predicate: F, default_value: Option<T>) -> Observable<T>
Emits only the first item emitted by the source observable that satisfies the
provided predicate
, optionally applying a default value if no items match
the predicate
.
The predicate
function takes two arguments: the emitted item T
and the index
usize
of the emission. It should return true
if the item meets the criteria.
If a default_value
is provided and no item satisfies the predicate
, the
observable emits the default_value
instead. If no default value is provided
and no item satisfies the predicate
, the observable emits an EmptyError
.
The first
operator unsubscribes from the background emissions as soon as it
takes the first item that satisfies the predicate
.
sourcefn zip(self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>>
fn zip(self, observable_inputs: Vec<Observable<T>>) -> Observable<Vec<T>>
Zips the values emitted by multiple observables into a single observable.
This method combines the values emitted by multiple observables into a single observable, emitting a vector containing the latest value from each observable in order when all observables have emitted a new value. This method is non-blocking and combines the latest values emitted by observables without waiting for completion. It completes as soon as the first observable in the sequence completes and attempts to unsubscribe all zipped observables. If any observable within the sequence encounters an error, it stops emissions, emits that error, and tries to unsubscribe all observables in the sequence.
sourcefn take(self, n: usize) -> Observable<T>
fn take(self, n: usize) -> Observable<T>
Emits at most the first n
items emitted by the observable, then
unsubscribes.
If the observable emits more than n
items, this operator will only allow
the first n
items to be emitted. After emitting n
items, it will
unsubscribe from the observable.
§Notes
For Subject
variants, using take(n)
as the initial operator
(e.g., subject.take(n).delay(n)
) will not call unsubscribe and remove
registered subscribers for performance reasons.
However, when used as a non-initial operator (e.g., subject.delay(n).take(n)
),
it will call unsubscribe and remove registered subscribers.
Examples found in repository?
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
pub fn main() {
// Make an Observable.
let mut observable = Observable::new(move |mut o: Subscriber<_>| {
for i in 0..10 + 1 {
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
// Initialize a `Subject` and obtain its emitter and receiver.
let (emitter, mut receiver) = Subject::emitter_receiver();
// Register `Subscriber` 1.
receiver.subscribe(create_subscriber(1));
// Register `Subscriber` 2.
receiver
// We're cloning the receiver so we can use it again.
// Shallow clone: clones only the pointer to the `Subject`.
.clone()
.take(7) // For performance, prioritize placing `take()` as the first operator.
.delay(1000)
.map(|v| format!("mapped {}", v))
.subscribe(create_subscriber(2));
// Register `Subscriber` 3.
receiver
.filter(|v| v % 2 == 0)
.map(|v| format!("filtered {}", v))
.subscribe(create_subscriber(3));
// Convert the emitter into an observer and subscribe it to the observable.
observable.subscribe(emitter.into());
}
More examples
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
observer.on_complete(|| println!("Completed"));
// Accumulate response strings into a single string.
// The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
// In this example, `total` is of type `String`, and `n` is of type `&str`.
let subscription = observable
.take(6)
.delay(100)
.merge_map(|_| get_response_observable())
.scan(|total, n| format!("{} {}", total, n), None)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
fn main() {
// Create a custom observable that emits values in a separate thread.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
// Spawn a new thread to await a signal sent from the unsubscribe logic.
std::thread::spawn(move || {
// Attempt to receive a signal sent from the unsubscribe logic.
if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
// Update the `done_c` mutex with the received signal.
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
// Launch a new thread for the Observable's processing and store its handle.
let join_handle = std::thread::spawn(move || {
for i in 0..=10000 {
// If an unsubscribe signal is received, exit the loop and stop emissions.
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
// Emit the value to the subscriber.
o.next(i);
// Important. Put an await point after each emit or after some emits.
// This allows the `take()` operator to function properly.
std::thread::sleep(Duration::from_millis(1));
}
// Signal completion to the subscriber.
o.complete();
});
// Return a new `Subscription` with custom unsubscribe logic.
Subscription::new(
// The provided closure defines the behavior of the subscription when it
// is unsubscribed. In this case, it sends a signal to an asynchronous
// observable to stop emitting values.
UnsubscribeLogic::Logic(Box::new(move || {
if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
println!("Receiver dropped.");
}
})),
// Store the `JoinHandle` for awaiting completion using the `Subscription`.
SubscriptionHandle::JoinThread(join_handle),
)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
// take utilizes our unsubscribe function to stop background emissions after
// a specified item count.
.take(500)
.map(|v| format!("Mapped {}", v))
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Unsubscribe from the observable to stop emissions.
subscription.unsubscribe();
// Allow some time for the main thread to confirm that the observable indeed
// isn't emitting.
std::thread::sleep(Duration::from_millis(2000));
println!("`main` function done")
}
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
async fn main() {
// Create a custom observable that emits values in a separate task.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
// Spawn a new Tokio task to await a signal sent from the unsubscribe logic.
task::spawn(async move {
// Attempt to receive a signal sent from the unsubscribe logic.
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
// Update the `done_c` mutex with the received signal.
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
// Launch a new Tokio task for the Observable's processing and store its handle.
let join_handle = task::spawn(async move {
for i in 0..=10000 {
// If an unsubscribe signal is received, exit the loop and stop emissions.
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
// Emit the value to the subscriber.
o.next(i);
// Important. Put an await point after each emit or after some emits.
// This allows the `take()` operator to function properly.
time::sleep(time::Duration::from_millis(1)).await;
}
// Signal completion to the subscriber.
o.complete();
});
// Return a new `Subscription` with custom unsubscribe logic.
Subscription::new(
// The provided closure defines the behavior of the subscription when it
// is unsubscribed. In this case, it sends a signal to an asynchronous
// observable to stop emitting values. If your closure requires Tokio
// tasks or channels to send unsubscribe signals, consider using
// `UnsubscribeLogic::Future`.
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
// Store the `JoinHandle` for awaiting completion using the `Subscription`.
SubscriptionHandle::JoinTask(join_handle),
)
});
// Create the `Subscriber` with a mandatory `next` function, and optional
// `complete` function. No need for `error` function in this simple example.
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// This observable uses OS threads so it will not block the current thread.
// Observables are cold so if you comment out the statement bellow nothing
// will be emitted.
let subscription = observable
// take utilizes our unsubscribe function to stop background emissions after
// a specified item count.
.take(15)
.map(|v| format!("Mapped {}", v))
.delay(1000)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for the subscription to either complete as a Tokio task or join an OS thread.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn take_until<U: 'static>(
self,
notifier: Observable<U>,
unsubscribe_notifier: bool
) -> Observable<T>
fn take_until<U: 'static>( self, notifier: Observable<U>, unsubscribe_notifier: bool ) -> Observable<T>
Continuously emits the values from the source observable until an event occurs,
triggered by an emitted value from a separate notifier
observable.
The takeUntil
operator subscribes to and starts replicating the behavior of
the source observable. Simultaneously, it observes a second observable,
referred to as the notifier
, provided by the user. When the notifier
emits
a value, the resulting observable stops replicating the source observable and
completes. If the notifier
completes without emitting any value, takeUntil
will pass all values from the source observable. When the notifier
triggers
its first emission take_until
unsubscribes from ongoing emissions of the
source observable.
The take_until
operator accepts a second parameter, unsubscribe_notifier
,
allowing control over whether takeUntil
will attempt to unsubscribe from
emissions of the notifier
observable. When set to true
, takeUntil
actively attempts to unsubscribe from the notifier
’s emissions. When set to
false
, takeUntil
refrains from attempting to unsubscribe from the
notifier
, allowing the emissions to continue unaffected.
Examples found in repository?
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Create notifier, it can be observable or one of the Subject variants.
let (mut emitter, receiver) = Subject::<()>::emitter_receiver();
// Turning Subject into an observable. To continue using the receiver later,
// utilize `.clone()`, e.g. `receiver.clone().into()`.
let subscription = observable
.take_until(receiver.into(), false)
.subscribe(observer);
// Allowing some time for the `take_until` function to register the notifier
// before emitting a signal. This step is unnecessary if you're not immediately
// sending a signal.
time::sleep(time::Duration::from_millis(1)).await;
// Send signal to stop source observable emitting values.
emitter.next(());
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
More examples
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = std::thread::spawn(move || {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinThread(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Create notifier, it can be observable or one of the Subject variants.
let (mut emitter, receiver) = Subject::emitter_receiver();
// We can chain the Subject, here we use `delay()` to slow down the notifier so
// source observable has time to emmit some values. Note when we chain the
// notifier with operators we don't have to use `into()`. To continue using the
// receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
let subscription = observable
.take_until(receiver.delay(20), false)
.subscribe(observer);
// Allowing some time for the `take_until` function to register the notifier
// before emitting a signal. This step is unnecessary if you're not immediately
// sending a signal.
std::thread::sleep(Duration::from_millis(1));
// Send signal to stop source observable emitting values.
emitter.next(());
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join().is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn take_while<P>(self, predicate: P) -> Observable<T>
fn take_while<P>(self, predicate: P) -> Observable<T>
Continues emitting values from the source observable as long as each value
meets the specified predicate
criteria. The operation concludes immediately
upon encountering the first value that doesn’t satisfy the predicate
.
Upon subscription, takeWhile
starts replicating the source observable.
Every emitted value from the source is evaluated by the predicate
function,
returning a boolean that represents a condition for the source values. The
resulting observable continues emitting source values until the predicate
yields false
. When the specified condition is no longer met, takeWhile
ceases mirroring the source, subsequently unsubscribing from the source to
stop background emissions.
Examples found in repository?
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Emit values only when they are less than or equal to 40.
let subscription = observable.take_while(|v| v <= &40).subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn take_last(self, count: usize) -> Observable<T>
fn take_last(self, count: usize) -> Observable<T>
Produces an observable that emits, at maximum, the final count
values
emitted by the source observable.
Utilizing takeLast
creates an observable that retains up to ‘count’ values
in memory until the source observable completes. Upon completion, it delivers
all stored values in their original order to the consumer and signals completion.
In scenarios where the source completes before reaching the specified count
in takeLast
, it emits all received values up to that point and then signals completion.
§Notes
When applied to an observable that never completes, takeLast
yields an
observable that doesn’t emit any value.
Examples found in repository?
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
// Capture and process only the last 8 values.
let subscription = observable.take_last(8).subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn tap(self, observer: Subscriber<T>) -> Observable<T>
fn tap(self, observer: Subscriber<T>) -> Observable<T>
The tap
operator allows you to intercept the items emitted by an observable
and perform side effects on those items without modifying the emitted data or
the stream itself.
This operator is used primarily for side effects. It allows you to perform
actions or operations on the items emitted by an observable without affecting
the stream itself. The tap
operator is best used for debugging, logging, or
performing actions that don’t change the emitted values but are necessary for
monitoring or debugging purposes such as console logging, data inspection,
or triggering some external action based on the emitted values.
let log_observer = Subscriber::new(
|v| println!("Filtered {}", v),
|e| println!("Filtered error {}", e),
|| println!("Filtered complete")
);
observable
.tap(Subscriber::on_next(|v| println!("Before filtering: {}", v)))
.filter(|v| v % 2 == 0)
.tap(log_observer)
.subscribe(observer);
Examples found in repository?
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
async fn main() {
// Make a source observable.
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..10 + 1 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
observer1.on_complete(|| println!("Observer 1 completed"));
let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
observer2.on_complete(|| println!("Observer 2 completed"));
// You can use other operators before calling `connectable()` operator.
let observable = observable.tap(Subscriber::on_next(|v| println!("... emitting {v}")));
// Make a `Connectable` observable from the source observable.
let connectable = observable.connectable();
// If you want to use other operators after calling `connectable()` operator you
// can do it by cloning first.
let mut connectable_chained = connectable.clone().map(|v| v + 10).delay(1000);
// Subscribe observers to chained `Connectable`.
connectable_chained.subscribe(observer1);
connectable_chained.subscribe(observer2);
// Connect `Connectable` to start emitting to all `Subscriber`'s.
// No emissions happen if `connect()` is not called.
let connected = connectable.connect();
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for `Connectable` observable to finish before exiting the program.
// You can also use `connected.unsubscribe();` to stop all emissions.
if connected.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn merge(self, sources: Vec<Observable<T>>) -> Observable<T>
fn merge(self, sources: Vec<Observable<T>>) -> Observable<T>
Merges the current observable with a vector of observables, emitting items from all of them concurrently.
sourcefn merge_one(self, source: Observable<T>) -> Observable<T>
fn merge_one(self, source: Observable<T>) -> Observable<T>
Merges the current observable with another observable, emitting items from both concurrently.
sourcefn switch_map<R: 'static, F>(self, project: F) -> Observable<R>
fn switch_map<R: 'static, F>(self, project: F) -> Observable<R>
Transforms the items emitted by an observable into observables, and flattens
the emissions into a single observable, ignoring previous emissions once a
new one is encountered. This is similar to map
, but switches to a new inner
observable whenever a new item is emitted.
§Parameters
project
: A closure that maps each source item to an observable. The closure returns the observable for each item, and the emissions from these observables are flattened into a single observable.
§Returns
An observable that emits the items from the most recently emitted inner observable.
sourcefn merge_map<R: 'static, F>(self, project: F) -> Observable<R>
fn merge_map<R: 'static, F>(self, project: F) -> Observable<R>
Transforms the items emitted by the source observable into other observables, and merges them into a single observable stream.
This operator applies the provided project
function to each item emitted by
the source observable. The function returns another observable. The operator
subscribes to these inner observables concurrently and merges their emissions
into one observable stream.
§Parameters
project
: A closure that maps each item emitted by the source observable to another observable.
§Returns
An observable that emits items merged from the inner observables produced by
the project
function.
Examples found in repository?
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
async fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, mut rx) = channel(10);
task::spawn(async move {
if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = task::spawn(async move {
for i in 0..100 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
time::sleep(time::Duration::from_millis(1)).await;
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Future(Box::pin(async move {
if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinTask(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
observer.on_complete(|| println!("Completed"));
// Accumulate response strings into a single string.
// The types of `total` and `n` may differ as long as `total` implements the `From<n>` trait.
// In this example, `total` is of type `String`, and `n` is of type `&str`.
let subscription = observable
.take(6)
.delay(100)
.merge_map(|_| get_response_observable())
.scan(|total, n| format!("{} {}", total, n), None)
.subscribe(observer);
// Do something else here.
println!("Do something while Observable is emitting.");
// Wait for observable to finish before exiting the program.
if subscription.join_concurrent().await.is_err() {
// Handle error
}
println!("`main` function done")
}
sourcefn concat_map<R: 'static, F>(self, project: F) -> Observable<R>
fn concat_map<R: 'static, F>(self, project: F) -> Observable<R>
Transforms the items emitted by the source observable into other observables using a closure, and concatenates them into a single observable stream, waiting for each inner observable to complete before moving to the next.
This operator applies the provided project
function to each item emitted by
the source observable. The function returns another observable. The operator
subscribes to these inner observables sequentially and concatenates their
emissions into one observable stream.
§Parameters
project
: A closure that maps each item emitted by the source observable to another observable.
§Returns
An observable that emits items concatenated from the inner observables
produced by the project
function.
sourcefn exhaust_map<R: 'static, F>(self, project: F) -> Observable<R>
fn exhaust_map<R: 'static, F>(self, project: F) -> Observable<R>
Maps each item emitted by the source observable to an inner observable using a closure. It subscribes to these inner observables, ignoring new items until the current inner observable completes.
§Parameters
project
: A closure that maps each item to an inner observable.
§Returns
An observable that emits inner observables exclusively. Inner observables do not emit and remain ignored if a preceding inner observable is still emitting. The emission of a subsequent inner observable is allowed only after the current one completes its emission.