async_subject/async_subject.rs
1//! `AsyncSubject` example
2//!
3//! This example demonstrates the usage of the `AsyncSubject` in the `rxr` library.
4//!
5//! The `AsyncSubject` is a type of subject in reactive programming that emits only
6//! the last value emitted by the source observable, only after that observable
7//! completes. If the source observable terminates with an error, the `AsyncSubject`
8//! will propagate that error.
9//!
10//! To run this example, execute `cargo run --example async_subject`.
11
12use std::fmt::Display;
13
14use rxr::{subjects::AsyncSubject, subscribe::Subscriber};
15use rxr::{ObservableExt, Observer, Subscribeable};
16
17pub fn create_subscriber<T: Display>(subscriber_id: i32) -> Subscriber<T> {
18 Subscriber::new(
19 move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
20 |_| eprintln!("Error"),
21 move || println!("Completed {}", subscriber_id),
22 )
23}
24
25pub fn main() {
26 // Initialize a `AsyncSubject` and obtain its emitter and receiver.
27 let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
28
29 // Registers `Subscriber` 1.
30 receiver.subscribe(create_subscriber(1));
31
32 emitter.next(101); // Stores 101 ast the latest value.
33 emitter.next(102); // Latest value is now 102.
34
35 // All Observable operators can be applied to the receiver.
36 // Registers mapped `Subscriber` 2.
37 receiver
38 .clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
39 .map(|v| format!("mapped {}", v))
40 .subscribe(create_subscriber(2));
41
42 // Registers `Subscriber` 3.
43 receiver.subscribe(create_subscriber(3));
44
45 emitter.next(103); // Latest value is now 103.
46
47 // Emits latest value (103) to registered `Subscriber`'s 1, 2 and 3 and calls
48 // `complete` on each of them.
49 emitter.complete();
50
51 // Subscriber 4: post-completion subscribe, emits latest value (103) and completes.
52 receiver.subscribe(create_subscriber(4));
53
54 // Called post-completion, does not emit.
55 emitter.next(104);
56}