async_subject/
async_subject.rs

1//! `AsyncSubject` example
2//!
3//! This example demonstrates the usage of the `AsyncSubject` in the `rxr` library.
4//!
5//! The `AsyncSubject` is a type of subject in reactive programming that emits only
6//! the last value emitted by the source observable, only after that observable
7//! completes. If the source observable terminates with an error, the `AsyncSubject`
8//! will propagate that error.
9//!
10//! To run this example, execute `cargo run --example async_subject`.
11
12use std::fmt::Display;
13
14use rxr::{subjects::AsyncSubject, subscribe::Subscriber};
15use rxr::{ObservableExt, Observer, Subscribeable};
16
17pub fn create_subscriber<T: Display>(subscriber_id: i32) -> Subscriber<T> {
18    Subscriber::new(
19        move |v| println!("Subscriber #{} emitted: {}", subscriber_id, v),
20        |_| eprintln!("Error"),
21        move || println!("Completed {}", subscriber_id),
22    )
23}
24
25pub fn main() {
26    // Initialize a `AsyncSubject` and obtain its emitter and receiver.
27    let (mut emitter, mut receiver) = AsyncSubject::emitter_receiver();
28
29    // Registers `Subscriber` 1.
30    receiver.subscribe(create_subscriber(1));
31
32    emitter.next(101); // Stores 101 ast the latest value.
33    emitter.next(102); // Latest value is now 102.
34
35    // All Observable operators can be applied to the receiver.
36    // Registers mapped `Subscriber` 2.
37    receiver
38        .clone() // Shallow clone: clones only the pointer to the `AsyncSubject` object.
39        .map(|v| format!("mapped {}", v))
40        .subscribe(create_subscriber(2));
41
42    // Registers `Subscriber` 3.
43    receiver.subscribe(create_subscriber(3));
44
45    emitter.next(103); // Latest value is now 103.
46
47    // Emits latest value (103) to registered `Subscriber`'s 1, 2 and 3 and calls
48    // `complete` on each of them.
49    emitter.complete();
50
51    // Subscriber 4: post-completion subscribe, emits latest value (103) and completes.
52    receiver.subscribe(create_subscriber(4));
53
54    // Called post-completion, does not emit.
55    emitter.next(104);
56}