take_until_operator/
take_until_operator.rs

1//! Demonstrating the `take_until()` operator with a notifier observable
2//!
3//! This example illustrates the functionality of the `take_until()` operator by
4//! creating a stream and emitting elements until a notifier observable emits a signal.
5//!
6//! To run this example, execute `cargo run --example take_until_operator`.
7
8use std::sync::{Arc, Mutex};
9
10use rxr::{
11    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
12    Observable, ObservableExt, Observer, Subject, Subscribeable,
13};
14use tokio::{sync::mpsc::channel, task, time};
15
16const UNSUBSCRIBE_SIGNAL: bool = true;
17
18#[tokio::main]
19async fn main() {
20    let observable = Observable::new(|mut o| {
21        let done = Arc::new(Mutex::new(false));
22        let done_c = Arc::clone(&done);
23        let (tx, mut rx) = channel(10);
24
25        task::spawn(async move {
26            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
27                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
28            }
29        });
30
31        let join_handle = task::spawn(async move {
32            for i in 0..100 {
33                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
34                    break;
35                }
36                o.next(i);
37                time::sleep(time::Duration::from_millis(1)).await;
38            }
39            o.complete();
40        });
41
42        Subscription::new(
43            UnsubscribeLogic::Future(Box::pin(async move {
44                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
45                    println!("Receiver dropped.");
46                }
47            })),
48            SubscriptionHandle::JoinTask(join_handle),
49        )
50    });
51
52    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
53    observer.on_complete(|| println!("Completed"));
54
55    // Create notifier, it can be observable or one of the Subject variants.
56    let (mut emitter, receiver) = Subject::<()>::emitter_receiver();
57
58    // Turning Subject into an observable. To continue using the receiver later,
59    // utilize `.clone()`, e.g. `receiver.clone().into()`.
60    let subscription = observable
61        .take_until(receiver.into(), false)
62        .subscribe(observer);
63
64    // Allowing some time for the `take_until` function to register the notifier
65    // before emitting a signal. This step is unnecessary if you're not immediately
66    // sending a signal.
67    time::sleep(time::Duration::from_millis(1)).await;
68
69    // Send signal to stop source observable emitting values.
70    emitter.next(());
71
72    // Do something else here.
73    println!("Do something while Observable is emitting.");
74
75    // Wait for observable to finish before exiting the program.
76    if subscription.join_concurrent().await.is_err() {
77        // Handle error
78    }
79
80    println!("`main` function done")
81}