take_until_operator_os/
take_until_operator_os.rs

1//! Demonstrating the `take_until()` operator with a notifier observable using OS threads
2//!
3//! This example illustrates the functionality of the `take_until()` operator by
4//! creating a stream and emitting elements until a notifier observable emits a signal.
5//!
6//! To run this example, execute `cargo run --example take_until_operator_os`.
7
8use std::{
9    sync::{Arc, Mutex},
10    time::Duration,
11};
12
13use rxr::{
14    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
15    Observable, ObservableExt, Observer, Subject, Subscribeable,
16};
17
18const UNSUBSCRIBE_SIGNAL: bool = true;
19
20fn main() {
21    let observable = Observable::new(|mut o| {
22        let done = Arc::new(Mutex::new(false));
23        let done_c = Arc::clone(&done);
24        let (tx, rx) = std::sync::mpsc::channel();
25
26        std::thread::spawn(move || {
27            if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
28                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
29            }
30        });
31
32        let join_handle = std::thread::spawn(move || {
33            for i in 0..100 {
34                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
35                    break;
36                }
37                o.next(i);
38                std::thread::sleep(Duration::from_millis(1));
39            }
40            o.complete();
41        });
42
43        Subscription::new(
44            UnsubscribeLogic::Logic(Box::new(move || {
45                if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
46                    println!("Receiver dropped.");
47                }
48            })),
49            SubscriptionHandle::JoinThread(join_handle),
50        )
51    });
52
53    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
54    observer.on_complete(|| println!("Completed"));
55
56    // Create notifier, it can be observable or one of the Subject variants.
57    let (mut emitter, receiver) = Subject::emitter_receiver();
58
59    // We can chain the Subject, here we use `delay()` to slow down the notifier so
60    // source observable has time to emmit some values. Note when we chain the
61    // notifier with operators we don't have to use `into()`. To continue using the
62    // receiver later, utilize `.clone()`, e.g. `receiver.clone().delay(20)`.
63    let subscription = observable
64        .take_until(receiver.delay(20), false)
65        .subscribe(observer);
66
67    // Allowing some time for the `take_until` function to register the notifier
68    // before emitting a signal. This step is unnecessary if you're not immediately
69    // sending a signal.
70    std::thread::sleep(Duration::from_millis(1));
71
72    // Send signal to stop source observable emitting values.
73    emitter.next(());
74
75    // Do something else here.
76    println!("Do something while Observable is emitting.");
77
78    // Wait for observable to finish before exiting the program.
79    if subscription.join().is_err() {
80        // Handle error
81    }
82
83    println!("`main` function done")
84}