take_last_operator/
take_last_operator.rs

1//! Demonstrating the `take_last()` operator
2//!
3//! This example showcases the functionality of the `take_last()` operator by
4//! creating a stream and extracting the last `n` elements from it.
5//!
6//! To run this example, execute `cargo run --example take_last_operator`.
7
8use std::sync::{Arc, Mutex};
9
10use rxr::{
11    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
12    Observable, ObservableExt, Observer, Subscribeable,
13};
14use tokio::{sync::mpsc::channel, task, time};
15
16const UNSUBSCRIBE_SIGNAL: bool = true;
17
18#[tokio::main]
19async fn main() {
20    let observable = Observable::new(|mut o| {
21        let done = Arc::new(Mutex::new(false));
22        let done_c = Arc::clone(&done);
23        let (tx, mut rx) = channel(10);
24
25        task::spawn(async move {
26            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
27                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
28            }
29        });
30
31        let join_handle = task::spawn(async move {
32            for i in 0..100 {
33                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
34                    break;
35                }
36                o.next(i);
37                time::sleep(time::Duration::from_millis(1)).await;
38            }
39            o.complete();
40        });
41
42        Subscription::new(
43            UnsubscribeLogic::Future(Box::pin(async move {
44                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
45                    println!("Receiver dropped.");
46                }
47            })),
48            SubscriptionHandle::JoinTask(join_handle),
49        )
50    });
51
52    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
53    observer.on_complete(|| println!("Completed"));
54
55    // Capture and process only the last 8 values.
56    let subscription = observable.take_last(8).subscribe(observer);
57
58    // Do something else here.
59    println!("Do something while Observable is emitting.");
60
61    // Wait for observable to finish before exiting the program.
62    if subscription.join_concurrent().await.is_err() {
63        // Handle error
64    }
65
66    println!("`main` function done")
67}