connectable_chained_operator/
connectable_chained_operator.rs1use std::sync::{Arc, Mutex};
12
13use rxr::{
14 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
15 Observable, ObservableExt, Observer, Subscribeable,
16};
17use tokio::{sync::mpsc::channel, task, time};
18
19const UNSUBSCRIBE_SIGNAL: bool = true;
20
21#[tokio::main]
22async fn main() {
23 let observable = Observable::new(|mut o| {
25 let done = Arc::new(Mutex::new(false));
26 let done_c = Arc::clone(&done);
27 let (tx, mut rx) = channel(10);
28
29 task::spawn(async move {
30 if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
31 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
32 }
33 });
34
35 let join_handle = task::spawn(async move {
36 for i in 0..10 + 1 {
37 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
38 break;
39 }
40 o.next(i);
41 time::sleep(time::Duration::from_millis(1)).await;
42 }
43 o.complete();
44 });
45
46 Subscription::new(
47 UnsubscribeLogic::Future(Box::pin(async move {
48 if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
49 println!("Receiver dropped.");
50 }
51 })),
52 SubscriptionHandle::JoinTask(join_handle),
53 )
54 });
55
56 let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
57 observer1.on_complete(|| println!("Observer 1 completed"));
58
59 let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
60 observer2.on_complete(|| println!("Observer 2 completed"));
61
62 let observable = observable.tap(Subscriber::on_next(|v| println!("... emitting {v}")));
64
65 let connectable = observable.connectable();
67
68 let mut connectable_chained = connectable.clone().map(|v| v + 10).delay(1000);
71
72 connectable_chained.subscribe(observer1);
74 connectable_chained.subscribe(observer2);
75
76 let connected = connectable.connect();
79
80 println!("Do something while Observable is emitting.");
82
83 if connected.join_concurrent().await.is_err() {
86 }
88
89 println!("`main` function done")
90}