subject_as_observer/
subject_as_observer.rs1use std::{fmt::Display, time::Duration};
6
7use rxr::{
8 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
9 Observable, ObservableExt, Observer, Subject, Subscribeable,
10};
11
12pub fn create_subscriber<T: Display>(subscriber_id: u32) -> Subscriber<T> {
13 Subscriber::new(
14 move |v: T| println!("Subscriber {}: {}", subscriber_id, v),
15 move |e| eprintln!("Error {}: {}", subscriber_id, e),
16 move || println!("Completed Subscriber {}", subscriber_id),
17 )
18}
19
20pub fn main() {
21 let mut observable = Observable::new(move |mut o: Subscriber<_>| {
23 for i in 0..10 + 1 {
24 o.next(i);
25 std::thread::sleep(Duration::from_millis(1));
26 }
27 o.complete();
28 Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
29 });
30
31 let (emitter, mut receiver) = Subject::emitter_receiver();
33
34 receiver.subscribe(create_subscriber(1));
36
37 receiver
39 .clone()
42 .take(7) .delay(1000)
44 .map(|v| format!("mapped {}", v))
45 .subscribe(create_subscriber(2));
46
47 receiver
49 .filter(|v| v % 2 == 0)
50 .map(|v| format!("filtered {}", v))
51 .subscribe(create_subscriber(3));
52
53 observable.subscribe(emitter.into());
55}