async_observable_tokio/async_observable_tokio.rs
1//! This `Observable` emits values and completes, returning a `Subscription` that can
2//! be unsubscribed from, enabling all operators to function correctly. It uses `Tokio`
3//! tasks for asynchronous processing, preventing it from blocking the current thread.
4//!
5//! To run this example, execute `cargo run --example async_observable_tokio`.
6
7use std::sync::{Arc, Mutex};
8
9use rxr::{
10 subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
11 Observable, ObservableExt, Observer, Subscribeable,
12};
13
14use tokio::{sync::mpsc::channel, task, time};
15
16const UNSUBSCRIBE_SIGNAL: bool = true;
17
18#[tokio::main()]
19async fn main() {
20 // Create a custom observable that emits values in a separate task.
21 let observable = Observable::new(|mut o| {
22 let done = Arc::new(Mutex::new(false));
23 let done_c = Arc::clone(&done);
24 let (tx, mut rx) = channel(10);
25
26 // Spawn a new Tokio task to await a signal sent from the unsubscribe logic.
27 task::spawn(async move {
28 // Attempt to receive a signal sent from the unsubscribe logic.
29 if let Some(UNSUBSCRIBE_SIGNAL) = rx.recv().await {
30 // Update the `done_c` mutex with the received signal.
31 *done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
32 }
33 });
34
35 // Launch a new Tokio task for the Observable's processing and store its handle.
36 let join_handle = task::spawn(async move {
37 for i in 0..=10000 {
38 // If an unsubscribe signal is received, exit the loop and stop emissions.
39 if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
40 break;
41 }
42 // Emit the value to the subscriber.
43 o.next(i);
44 // Important. Put an await point after each emit or after some emits.
45 // This allows the `take()` operator to function properly.
46 time::sleep(time::Duration::from_millis(1)).await;
47 }
48 // Signal completion to the subscriber.
49 o.complete();
50 });
51
52 // Return a new `Subscription` with custom unsubscribe logic.
53 Subscription::new(
54 // The provided closure defines the behavior of the subscription when it
55 // is unsubscribed. In this case, it sends a signal to an asynchronous
56 // observable to stop emitting values. If your closure requires Tokio
57 // tasks or channels to send unsubscribe signals, consider using
58 // `UnsubscribeLogic::Future`.
59 UnsubscribeLogic::Future(Box::pin(async move {
60 if tx.send(UNSUBSCRIBE_SIGNAL).await.is_err() {
61 println!("Receiver dropped.");
62 }
63 })),
64 // Store the `JoinHandle` for awaiting completion using the `Subscription`.
65 SubscriptionHandle::JoinTask(join_handle),
66 )
67 });
68
69 // Create the `Subscriber` with a mandatory `next` function, and optional
70 // `complete` function. No need for `error` function in this simple example.
71 let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
72 observer.on_complete(|| println!("Completed"));
73
74 // This observable uses OS threads so it will not block the current thread.
75 // Observables are cold so if you comment out the statement bellow nothing
76 // will be emitted.
77 let subscription = observable
78 // take utilizes our unsubscribe function to stop background emissions after
79 // a specified item count.
80 .take(15)
81 .map(|v| format!("Mapped {}", v))
82 .delay(1000)
83 .subscribe(observer);
84
85 // Do something else here.
86 println!("Do something while Observable is emitting.");
87
88 // Wait for the subscription to either complete as a Tokio task or join an OS thread.
89 if subscription.join_concurrent().await.is_err() {
90 // Handle error
91 }
92
93 println!("`main` function done")
94}