rxr
An implementation of the reactive extensions for the Rust programming language,
inspired by the popular RxJS library in JavaScript. Currently, rxr implements
a smaller subset of operators compared to RxJS.
Design
rxr supports Observables and Subjects. You define your own Observables with
functionality you want. Your Observables can be synchronous or asynchronous. For
asynchronous Observables, you can utilize OS threads or Tokio tasks.
For examples on how to define your Observables see the documentation.
To see what operators are currently implemented check the ObservableExt trait.
Note that you don't have to use Tokio in your projects to use rxr library.
Examples
Asynchronous Observable with unsubscribe logic.
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use rxr::{
subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic, Unsubscribeable},
Observable, ObservableExt, Observer, Subscribeable,
};
const UNSUBSCRIBE_SIGNAL: bool = true;
fn main() {
let observable = Observable::new(|mut o| {
let done = Arc::new(Mutex::new(false));
let done_c = Arc::clone(&done);
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
if let Ok(UNSUBSCRIBE_SIGNAL) = rx.recv() {
*done_c.lock().unwrap() = UNSUBSCRIBE_SIGNAL;
}
});
let join_handle = std::thread::spawn(move || {
for i in 0..=10000 {
if *done.lock().unwrap() == UNSUBSCRIBE_SIGNAL {
break;
}
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
});
Subscription::new(
UnsubscribeLogic::Logic(Box::new(move || {
if tx.send(UNSUBSCRIBE_SIGNAL).is_err() {
println!("Receiver dropped.");
}
})),
SubscriptionHandle::JoinThread(join_handle),
)
});
let mut observer = Subscriber::on_next(|v| println!("Emitted {}", v));
observer.on_complete(|| println!("Completed"));
let subscription = observable
.take(500)
.map(|v| format!("Mapped {}", v))
.subscribe(observer);
println!("Do something while Observable is emitting.");
subscription.unsubscribe();
std::thread::sleep(Duration::from_millis(2000));
println!("`main` function done")
}
Utilizing a Subject as an Observer. This can be done with any variant of Subject.
use std::{fmt::Display, time::Duration};
use rxr::{
subscribe::{Subscriber, Subscription, SubscriptionHandle, UnsubscribeLogic},
Observable, ObservableExt, Observer, Subject, Subscribeable,
};
pub fn create_subscriber<T: Display>(subscriber_id: u32) -> Subscriber<T> {
Subscriber::new(
move |v: T| println!("Subscriber {}: {}", subscriber_id, v),
move |e| eprintln!("Error {}: {}", subscriber_id, e),
move || println!("Completed Subscriber {}", subscriber_id),
)
}
pub fn main() {
let mut observable = Observable::new(move |mut o: Subscriber<_>| {
for i in 0..10 + 1 {
o.next(i);
std::thread::sleep(Duration::from_millis(1));
}
o.complete();
Subscription::new(UnsubscribeLogic::Nil, SubscriptionHandle::Nil)
});
let (emitter, mut receiver) = Subject::emitter_receiver();
receiver.subscribe(create_subscriber(1));
receiver
.clone()
.take(7) .delay(1000)
.map(|v| format!("mapped {}", v))
.subscribe(create_subscriber(2));
receiver
.filter(|v| v % 2 == 0)
.map(|v| format!("filtered {}", v))
.subscribe(create_subscriber(3));
observable.subscribe(emitter.into());
}
Additional examples can be found in both the examples
directory and the
documentation.
Installation
Add a line into your Cargo.toml:
[dependencies]
rxr = "0.1.8"
License