take_while_operator/
take_while_operator.rs1use std::sync::{Arc, Mutex};
12
13use rxr::{
14 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
15 Observable, ObservableExt, Observer, Subscribeable,
16};
17use tokio::{sync::mpsc::channel, task, time};
18
19const UNSUBSCRIBE_SIGNAL: bool = true;
20
21#[tokio::main]
22async fn main() {
23 let observable = Observable::new(|mut o| {
24 let done = Arc::new(Mutex::new(false));
25 let done_c = Arc::clone(&done);
26 let (tx, mut rx) = channel(10);
27
28 task::spawn(async move {
29 if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
30 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
31 }
32 });
33
34 let join_handle = task::spawn(async move {
35 for i in 0..100 {
36 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
37 break;
38 }
39 o.next(i);
40 time::sleep(time::Duration::from_millis(1)).await;
41 }
42 o.complete();
43 });
44
45 Subscription::new(
46 UnsubscribeLogic::Future(Box::pin(async move {
47 if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
48 println!("Receiver dropped.");
49 }
50 })),
51 SubscriptionHandle::JoinTask(join_handle),
52 )
53 });
54
55 let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
56 observer.on_complete(|| println!("Completed"));
57
58 let subscription = observable.take_while(|v| v <= &40).subscribe(observer);
60
61 println!("Do something while Observable is emitting.");
63
64 if subscription.join_concurrent().await.is_err() {
66 }
68
69 println!("`main` function done")
70}