scan_operator/
scan_operator.rs1use std::sync::{Arc, Mutex};
15
16use rxr::{
17 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
18 Observable, ObservableExt, Observer, Subscribeable,
19};
20use tokio::{sync::mpsc::channel, task, time};
21
22const UNSUBSCRIBE_SIGNAL: bool = true;
23
24fn get_response_observable() -> Observable<&'static str> {
25 Observable::new(|mut o| {
26 task::spawn(async move {
27 o.next("response");
28 time::sleep(time::Duration::from_millis(1)).await;
29 o.complete();
30 });
31 Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
32 })
33}
34
35#[tokio::main]
36async fn main() {
37 let observable = Observable::new(|mut o| {
38 let done = Arc::new(Mutex::new(false));
39 let done_c = Arc::clone(&done);
40 let (tx, mut rx) = channel(10);
41
42 task::spawn(async move {
43 if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
44 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
45 }
46 });
47
48 let join_handle = task::spawn(async move {
49 for i in 0..100 {
50 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
51 break;
52 }
53 o.next(i);
54 time::sleep(time::Duration::from_millis(1)).await;
55 }
56 o.complete();
57 });
58
59 Subscription::new(
60 UnsubscribeLogic::Future(Box::pin(async move {
61 if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
62 println!("Receiver dropped.");
63 }
64 })),
65 SubscriptionHandle::JoinTask(join_handle),
66 )
67 });
68
69 let mut observer = Subscriber::on_next(|v| println!("Emitted: {}", v));
70 observer.on_complete(|| println!("Completed"));
71
72 let subscription = observable
76 .take(6)
77 .delay(100)
78 .merge_map(|_| get_response_observable())
79 .scan(|total, n| format!("{} {}", total, n), None)
80 .subscribe(observer);
81
82 println!("Do something while Observable is emitting.");
84
85 if subscription.join_concurrent().await.is_err() {
87 }
89
90 println!("`main` function done")
91}