unsubscribe_observable/unsubscribe_observable.rs
1//! This `Observable` emits values and completes, returning a `Subscription` that can be
2//! unsubscribed from, enabling all operators to function correctly. It utilizes an OS
3//! thread for asynchronous processing, preventing it from blocking the current thread.
4//!
5//! To run this example, execute `cargo run --example unsubscribe_observable`.
6
7use std::{
8 sync::{Arc, Mutex},
9 time::Duration,
10};
11
12use rxr::{
13 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic, Unsubscribeable},
14 Observable, ObservableExt, Observer, Subscribeable,
15};
16
17const UNSUBSCRIBE_SIGNAL: bool = true;
18
19fn main() {
20 // Create a custom observable that emits values in a separate thread.
21 let observable = Observable::new(|mut o| {
22 let done = Arc::new(Mutex::new(false));
23 let done_c = Arc::clone(&done);
24 let (tx, rx) = std::sync::mpsc::channel();
25
26 // Spawn a new thread to await a signal sent from the unsubscribe logic.
27 std::thread::spawn(move || {
28 // Attempt to receive a signal sent from the unsubscribe logic.
29 if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
30 // Update the `done_c` mutex with the received signal.
31 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
32 }
33 });
34
35 // Launch a new thread for the Observable's processing and store its handle.
36 let join_handle = std::thread::spawn(move || {
37 for i in 0..=10000 {
38 // If an unsubscribe signal is received, exit the loop and stop emissions.
39 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
40 break;
41 }
42 // Emit the value to the subscriber.
43 o.next(i);
44 // Important. Put an await point after each emit or after some emits.
45 // This allows the `take()` operator to function properly.
46 std::thread::sleep(Duration::from_millis(1));
47 }
48 // Signal completion to the subscriber.
49 o.complete();
50 });
51
52 // Return a new `Subscription` with custom unsubscribe logic.
53 Subscription::new(
54 // The provided closure defines the behavior of the subscription when it
55 // is unsubscribed. In this case, it sends a signal to an asynchronous
56 // observable to stop emitting values.
57 UnsubscribeLogic::Logic(Box::new(move || {
58 if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
59 println!("Receiver dropped.");
60 }
61 })),
62 // Store the `JoinHandle` for awaiting completion using the `Subscription`.
63 SubscriptionHandle::JoinThread(join_handle),
64 )
65 });
66
67 // Create the `Subscriber` with a mandatory `next` function, and optional
68 // `complete` function. No need for `error` function in this simple example.
69 let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
70 observer.on_complete(|| println!("Completed"));
71
72 // This observable uses OS threads so it will not block the current thread.
73 // Observables are cold so if you comment out the statement bellow nothing
74 // will be emitted.
75 let subscription = observable
76 // take utilizes our unsubscribe function to stop background emissions after
77 // a specified item count.
78 .take(500)
79 .map(|v| format!("Mapped {}", v))
80 .subscribe(observer);
81
82 // Do something else here.
83 println!("Do something while Observable is emitting.");
84
85 // Unsubscribe from the observable to stop emissions.
86 subscription.unsubscribe();
87
88 // Allow some time for the main thread to confirm that the observable indeed
89 // isn't emitting.
90 std::thread::sleep(Duration::from_millis(2000));
91 println!("`main` function done")
92}