unsubscribe_observable/
unsubscribe_observable.rs

1//! This `Observable` emits values and completes, returning a `Subscription` that can be
2//! unsubscribed from, enabling all operators to function correctly. It utilizes an OS
3//! thread for asynchronous processing, preventing it from blocking the current thread.
4//!
5//! To run this example, execute `cargo run --example unsubscribe_observable`.
6
7use std::{
8    sync::{Arc, Mutex},
9    time::Duration,
10};
11
12use rxr::{
13    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic, Unsubscribeable},
14    Observable, ObservableExt, Observer, Subscribeable,
15};
16
17const UNSUBSCRIBE_SIGNAL: bool = true;
18
19fn main() {
20    // Create a custom observable that emits values in a separate thread.
21    let observable = Observable::new(|mut o| {
22        let done = Arc::new(Mutex::new(false));
23        let done_c = Arc::clone(&done);
24        let (tx, rx) = std::sync::mpsc::channel();
25
26        // Spawn a new thread to await a signal sent from the unsubscribe logic.
27        std::thread::spawn(move || {
28            // Attempt to receive a signal sent from the unsubscribe logic.
29            if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
30                // Update the `done_c` mutex with the received signal.
31                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
32            }
33        });
34
35        // Launch a new thread for the Observable's processing and store its handle.
36        let join_handle = std::thread::spawn(move || {
37            for i in 0..=10000 {
38                // If an unsubscribe signal is received, exit the loop and stop emissions.
39                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
40                    break;
41                }
42                // Emit the value to the subscriber.
43                o.next(i);
44                // Important. Put an await point after each emit or after some emits.
45                // This allows the `take()` operator to function properly.
46                std::thread::sleep(Duration::from_millis(1));
47            }
48            // Signal completion to the subscriber.
49            o.complete();
50        });
51
52        // Return a new `Subscription` with custom unsubscribe logic.
53        Subscription::new(
54            // The provided closure defines the behavior of the subscription when it
55            // is unsubscribed. In this case, it sends a signal to an asynchronous
56            // observable to stop emitting values.
57            UnsubscribeLogic::Logic(Box::new(move || {
58                if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
59                    println!("Receiver dropped.");
60                }
61            })),
62            // Store the `JoinHandle` for awaiting completion using the `Subscription`.
63            SubscriptionHandle::JoinThread(join_handle),
64        )
65    });
66
67    // Create the `Subscriber` with a mandatory `next` function, and optional
68    // `complete` function. No need for `error` function in this simple example.
69    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
70    observer.on_complete(|| println!("Completed"));
71
72    // This observable uses OS threads so it will not block the current thread.
73    // Observables are cold so if you comment out the statement bellow nothing
74    // will be emitted.
75    let subscription = observable
76        // take utilizes our unsubscribe function to stop background emissions after
77        // a specified item count.
78        .take(500)
79        .map(|v| format!("Mapped {}", v))
80        .subscribe(observer);
81
82    // Do something else here.
83    println!("Do something while Observable is emitting.");
84
85    // Unsubscribe from the observable to stop emissions.
86    subscription.unsubscribe();
87
88    // Allow some time for the main thread to confirm that the observable indeed
89    // isn't emitting.
90    std::thread::sleep(Duration::from_millis(2000));
91    println!("`main` function done")
92}