take_while_operator/
take_while_operator.rs

1//! Demonstrating the `take_while()` operator
2//!
3//! This example showcases the functionality of the `take_while()` operator by
4//! creating a stream and extracting elements while a specified condition is met.
5//!
6//! The `take_while()` operator retrieves elements from a stream as long as the
7//! provided predicate returns true.
8//!
9//! To run this example, execute `cargo run --example take_while_operator`.
10
11use std::sync::{Arc, Mutex};
12
13use rxr::{
14    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
15    Observable, ObservableExt, Observer, Subscribeable,
16};
17use tokio::{sync::mpsc::channel, task, time};
18
19const UNSUBSCRIBE_SIGNAL: bool = true;
20
21#[tokio::main]
22async fn main() {
23    let observable = Observable::new(|mut o| {
24        let done = Arc::new(Mutex::new(false));
25        let done_c = Arc::clone(&done);
26        let (tx, mut rx) = channel(10);
27
28        task::spawn(async move {
29            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
30                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
31            }
32        });
33
34        let join_handle = task::spawn(async move {
35            for i in 0..100 {
36                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
37                    break;
38                }
39                o.next(i);
40                time::sleep(time::Duration::from_millis(1)).await;
41            }
42            o.complete();
43        });
44
45        Subscription::new(
46            UnsubscribeLogic::Future(Box::pin(async move {
47                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
48                    println!("Receiver dropped.");
49                }
50            })),
51            SubscriptionHandle::JoinTask(join_handle),
52        )
53    });
54
55    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
56    observer.on_complete(|| println!("Completed"));
57
58    // Emit values only when they are less than or equal to 40.
59    let subscription = observable.take_while(|v| v <= &40).subscribe(observer);
60
61    // Do something else here.
62    println!("Do something while Observable is emitting.");
63
64    // Wait for observable to finish before exiting the program.
65    if subscription.join_concurrent().await.is_err() {
66        // Handle error
67    }
68
69    println!("`main` function done")
70}