subject_as_observer/
subject_as_observer.rs

1//! Demonstrates how to use `Subject` emitter as an observer.
2//!
3//! To run this example, execute `cargo run --example subject_as_observer`.
4
5use std::{fmt::Display, time::Duration};
6
7use rxr::{
8    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
9    Observable, ObservableExt, Observer, Subject, Subscribeable,
10};
11
12pub fn create_subscriber<T: Display>(subscriber_id: u32) -> Subscriber<T> {
13    Subscriber::new(
14        move |v: T| println!("Subscriber {}: {}", subscriber_id, v),
15        move |e| eprintln!("Error {}: {}", subscriber_id, e),
16        move || println!("Completed Subscriber {}", subscriber_id),
17    )
18}
19
20pub fn main() {
21    // Make an Observable.
22    let mut observable = Observable::new(move |mut o: Subscriber<_>| {
23        for i in 0..10 + 1 {
24            o.next(i);
25            std::thread::sleep(Duration::from_millis(1));
26        }
27        o.complete();
28        Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
29    });
30
31    // Initialize a `Subject` and obtain its emitter and receiver.
32    let (emitter, mut receiver) = Subject::emitter_receiver();
33
34    // Register `Subscriber` 1.
35    receiver.subscribe(create_subscriber(1));
36
37    // Register `Subscriber` 2.
38    receiver
39        // We're cloning the receiver so we can use it again.
40        // Shallow clone: clones only the pointer to the `Subject`.
41        .clone()
42        .take(7) // For performance, prioritize placing `take()` as the first operator.
43        .delay(1000)
44        .map(|v| format!("mapped {}", v))
45        .subscribe(create_subscriber(2));
46
47    // Register `Subscriber` 3.
48    receiver
49        .filter(|v| v % 2 == 0)
50        .map(|v| format!("filtered {}", v))
51        .subscribe(create_subscriber(3));
52
53    // Convert the emitter into an observer and subscribe it to the observable.
54    observable.subscribe(emitter.into());
55}