connectable_operator/
connectable_operator.rs1use std::sync::{Arc, Mutex};
11
12use rxr::{
13 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
14 Observable, ObservableExt, Observer, Subscribeable, Unsubscribeable,
15};
16use tokio::{sync::mpsc::channel, task, time};
17
18const UNSUBSCRIBE_SIGNAL: bool = true;
19
20#[tokio::main]
21async fn main() {
22 let observable = Observable::new(|mut o| {
24 let done = Arc::new(Mutex::new(false));
25 let done_c = Arc::clone(&done);
26 let (tx, mut rx) = channel(10);
27
28 task::spawn(async move {
29 if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
30 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
31 }
32 });
33
34 let join_handle = task::spawn(async move {
35 for i in 0..10 + 1 {
36 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
37 break;
38 }
39 o.next(i);
40 time::sleep(time::Duration::from_millis(1)).await;
41 }
42 o.complete();
43 });
44
45 Subscription::new(
46 UnsubscribeLogic::Future(Box::pin(async move {
47 if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
48 println!("Receiver dropped.");
49 }
50 })),
51 SubscriptionHandle::JoinTask(join_handle),
52 )
53 });
54
55 let mut observer1 = Subscriber::on_next(|v| println!("Observer 1 emitted {}", v));
56 observer1.on_complete(|| println!("Observer 1 completed"));
57
58 let mut observer2 = Subscriber::on_next(|v| println!("Observer 2 emitted {}", v));
59 observer2.on_complete(|| println!("Observer 2 completed"));
60
61 let mut observer3 = Subscriber::on_next(|v| println!("Observer 3 emitted {}", v));
62 observer3.on_complete(|| println!("Observer 3 completed"));
63
64 let mut connectable = observable.connectable();
66
67 connectable.subscribe(observer1);
69 connectable.subscribe(observer2);
70
71 let subscription3 = connectable.subscribe(observer3);
75 subscription3.unsubscribe();
76
77 let connected = connectable.connect();
80
81 println!("Do something while Observable is emitting.");
83
84 if connected.join_concurrent().await.is_err() {
87 }
89
90 println!("`main` function done")
91}