take_last_operator/
take_last_operator.rs1use std::sync::{Arc, Mutex};
9
10use rxr::{
11 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
12 Observable, ObservableExt, Observer, Subscribeable,
13};
14use tokio::{sync::mpsc::channel, task, time};
15
16const UNSUBSCRIBE_SIGNAL: bool = true;
17
18#[tokio::main]
19async fn main() {
20 let observable = Observable::new(|mut o| {
21 let done = Arc::new(Mutex::new(false));
22 let done_c = Arc::clone(&done);
23 let (tx, mut rx) = channel(10);
24
25 task::spawn(async move {
26 if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
27 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
28 }
29 });
30
31 let join_handle = task::spawn(async move {
32 for i in 0..100 {
33 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
34 break;
35 }
36 o.next(i);
37 time::sleep(time::Duration::from_millis(1)).await;
38 }
39 o.complete();
40 });
41
42 Subscription::new(
43 UnsubscribeLogic::Future(Box::pin(async move {
44 if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
45 println!("Receiver dropped.");
46 }
47 })),
48 SubscriptionHandle::JoinTask(join_handle),
49 )
50 });
51
52 let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
53 observer.on_complete(|| println!("Completed"));
54
55 let subscription = observable.take_last(8).subscribe(observer);
57
58 println!("Do something while Observable is emitting.");
60
61 if subscription.join_concurrent().await.is_err() {
63 }
65
66 println!("`main` function done")
67}