async_observable_tokio/
async_observable_tokio.rs

1//! This `Observable` emits values and completes, returning a `Subscription` that can
2//! be unsubscribed from, enabling all operators to function correctly. It uses `Tokio`
3//! tasks for asynchronous processing, preventing it from blocking the current thread.
4//!
5//! To run this example, execute `cargo run --example async_observable_tokio`.
6
7use std::sync::{Arc, Mutex};
8
9use rxr::{
10    subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
11    Observable, ObservableExt, Observer, Subscribeable,
12};
13
14use tokio::{sync::mpsc::channel, task, time};
15
16const UNSUBSCRIBE_SIGNAL: bool = true;
17
18#[tokio::main()]
19async fn main() {
20    // Create a custom observable that emits values in a separate task.
21    let observable = Observable::new(|mut o| {
22        let done = Arc::new(Mutex::new(false));
23        let done_c = Arc::clone(&done);
24        let (tx, mut rx) = channel(10);
25
26        // Spawn a new Tokio task to await a signal sent from the unsubscribe logic.
27        task::spawn(async move {
28            // Attempt to receive a signal sent from the unsubscribe logic.
29            if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
30                // Update the `done_c` mutex with the received signal.
31                *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
32            }
33        });
34
35        // Launch a new Tokio task for the Observable's processing and store its handle.
36        let join_handle = task::spawn(async move {
37            for i in 0..=10000 {
38                // If an unsubscribe signal is received, exit the loop and stop emissions.
39                if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
40                    break;
41                }
42                // Emit the value to the subscriber.
43                o.next(i);
44                // Important. Put an await point after each emit or after some emits.
45                // This allows the `take()` operator to function properly.
46                time::sleep(time::Duration::from_millis(1)).await;
47            }
48            // Signal completion to the subscriber.
49            o.complete();
50        });
51
52        // Return a new `Subscription` with custom unsubscribe logic.
53        Subscription::new(
54            // The provided closure defines the behavior of the subscription when it
55            // is unsubscribed. In this case, it sends a signal to an asynchronous
56            // observable to stop emitting values. If your closure requires Tokio
57            // tasks or channels to send unsubscribe signals, consider using
58            // `UnsubscribeLogic::Future`.
59            UnsubscribeLogic::Future(Box::pin(async move {
60                if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
61                    println!("Receiver dropped.");
62                }
63            })),
64            // Store the `JoinHandle` for awaiting completion using the `Subscription`.
65            SubscriptionHandle::JoinTask(join_handle),
66        )
67    });
68
69    // Create the `Subscriber` with a mandatory `next` function, and optional
70    // `complete` function. No need for `error` function in this simple example.
71    let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
72    observer.on_complete(|| println!("Completed"));
73
74    // This observable uses OS threads so it will not block the current thread.
75    // Observables are cold so if you comment out the statement bellow nothing
76    // will be emitted.
77    let subscription = observable
78        // take utilizes our unsubscribe function to stop background emissions after
79        // a specified item count.
80        .take(15)
81        .map(|v| format!("Mapped {}", v))
82        .delay(1000)
83        .subscribe(observer);
84
85    // Do something else here.
86    println!("Do something while Observable is emitting.");
87
88    // Wait for the subscription to either complete as a Tokio task or join an OS thread.
89    if subscription.join_concurrent().await.is_err() {
90        // Handle error
91    }
92
93    println!("`main` function done")
94}