RX Event Bus
A simple, agnostic reactive extensions event bus utilising rxRust under the hood.
- Events are just a generic user-supplied type - implement however you wish
- Validate and reject invalid events
- Validation is a generic user-defined function - implement according to the event design
- Single-threaded
LocalEventBus and thread-safe SharedEventBus variants using the equivalent rxRust Local and Shared contexts
- Async subscription execution
- No channels! The event stream is an rxRust Observable that can be filtered accordingly
Note: this library uses rxRust v1.0.0-rc3. This has much better reactive extension coverage relative to the latest stable version v0.15.0 published in 2021.
Installation
cargo add rx_event_bus
Sharing the event bus
Both LocalEventBus and SharedEventBus are designed to be shared by cloning. Cloning is cheap (just reference count bumps) and all clones share the same underlying event stream — publishing on one clone delivers events to subscribers on any other.
Passing to structs
Clone the bus and pass it to any struct that needs to publish or subscribe:
use rx_event_bus::LocalEventBus;
struct Producer {
bus: LocalEventBus<MyEvent, MyError>,
}
impl Producer {
fn do_work(&self) {
self.bus.publish(MyEvent::Something).unwrap();
}
}
struct Consumer {
bus: LocalEventBus<MyEvent, MyError>,
}
impl Consumer {
fn listen(&self) {
self.bus.subscribe(|event| {
println!("{:?}", event);
});
}
}
let bus = LocalEventBus::new(validate);
let producer = Producer { bus: bus.clone() };
let consumer = Consumer { bus: bus.clone() };
consumer.listen();
producer.do_work();
Passing to functions
fn setup_logging(bus: &LocalEventBus<MyEvent, MyError>) {
bus.subscribe(|event| {
println!("[log] {:?}", event);
});
}
let bus = LocalEventBus::new(validate);
setup_logging(&bus);
bus.publish(MyEvent::Something).unwrap();
Choosing between Local and Shared
LocalEventBus — uses Rc internally, so all clones must stay on the same thread. Use this when your application is single-threaded or all publishers and subscribers live on one thread.
SharedEventBus — uses Arc internally and requires Send bounds, so clones can be sent across threads. Use this when you need to publish or subscribe from multiple threads.
The API is identical for both — the only difference is the thread-safety guarantees, so switching between them requires minimal code changes.
Single-threaded LocalEventBus
use rx_event_bus::LocalEventBus;
use rxrust::Observable;
#[derive(Clone, Debug)]
enum ChatEvent {
Message { user: String, text: String },
UserJoined { user: String },
}
#[derive(Debug)]
enum ChatError {
EmptyMessage,
EmptyUsername,
}
fn validate(event: ChatEvent) -> Result<ChatEvent, ChatError> {
match &event {
ChatEvent::Message { user, text } => {
if user.is_empty() {
return Err(ChatError::EmptyUsername);
}
if text.is_empty() {
return Err(ChatError::EmptyMessage);
}
}
ChatEvent::UserJoined { user } => {
if user.is_empty() {
return Err(ChatError::EmptyUsername);
}
}
}
Ok(event)
}
fn main() {
let bus = LocalEventBus::new(validate);
bus.subscribe(|event| {
println!(" [log] {:?}", event);
});
bus.events()
.filter(|e| matches!(e, ChatEvent::Message { .. }))
.subscribe(|event| {
if let ChatEvent::Message { user, text } = event {
println!(" [chat] {user}: {text}");
}
});
println!("Publishing UserJoined:");
bus.publish(ChatEvent::UserJoined {
user: "Alice".into(),
})
.unwrap();
println!("Publishing Message:");
bus.publish(ChatEvent::Message {
user: "Alice".into(),
text: "Hello, world!".into(),
})
.unwrap();
println!("Publishing invalid (empty message):");
if let Err(e) = bus.publish(ChatEvent::Message {
user: "Alice".into(),
text: "".into(),
}) {
println!(" Rejected: {:?}", e);
}
}
Multi-threaded SharedEventBus
use std::sync::{Arc, Mutex};
use rx_event_bus::SharedEventBus;
use rxrust::Observable;
#[derive(Clone, Debug)]
enum ChatEvent {
Message { user: String, text: String },
UserJoined { user: String },
}
#[derive(Clone, Debug)]
enum ChatError {
EmptyMessage,
EmptyUsername,
}
fn validate(event: ChatEvent) -> Result<ChatEvent, ChatError> {
match &event {
ChatEvent::Message { user, text } => {
if user.is_empty() {
return Err(ChatError::EmptyUsername);
}
if text.is_empty() {
return Err(ChatError::EmptyMessage);
}
}
ChatEvent::UserJoined { user } => {
if user.is_empty() {
return Err(ChatError::EmptyUsername);
}
}
}
Ok(event)
}
fn main() {
let bus = SharedEventBus::new(validate);
let events: Arc<Mutex<Vec<ChatEvent>>> = Arc::new(Mutex::new(Vec::new()));
let events_clone = events.clone();
let _sub = bus.subscribe(move |event: ChatEvent| {
println!(" [log] {:?}", event);
events_clone.lock().unwrap().push(event);
});
let _sub2 = bus
.events()
.filter(|e| matches!(e, ChatEvent::Message { .. }))
.subscribe(move |event| {
if let ChatEvent::Message { user, text } = event {
println!(" [chat] {user}: {text}");
}
});
println!("Publishing UserJoined:");
bus.publish(ChatEvent::UserJoined {
user: "Alice".into(),
})
.unwrap();
let bus_clone = bus.clone();
let handle = std::thread::spawn(move || {
println!("Publishing Message from another thread:");
bus_clone
.publish(ChatEvent::Message {
user: "Bob".into(),
text: "Hello from another thread!".into(),
})
.unwrap();
});
handle.join().unwrap();
println!("Publishing invalid (empty message):");
if let Err(e) = bus.publish(ChatEvent::Message {
user: "Alice".into(),
text: "".into(),
}) {
println!(" Rejected: {:?}", e);
}
let collected = events.lock().unwrap();
println!("\nCollected {} events total.", collected.len());
}
Async subscriptions
By default, publish() calls each subscriber synchronously — if a subscriber does heavy work, it blocks the publisher and other subscribers. SharedEventBus provides async variants that use rxRust's observe_on(SharedScheduler) to dispatch subscriber execution to the tokio thread pool, so publish() returns immediately.
events_async() — like events(), but emissions are observed on the SharedScheduler. Chain additional rxRust operators before subscribing as usual.
subscribe_async(handler) — convenience for events_async().subscribe(handler).
These require a tokio runtime to be active (e.g. #[tokio::main]).
WASM note: On wasm32 targets, SharedScheduler falls back to spawn_local (single-threaded) since WASM does not support OS threads. The Send bounds are still enforced at compile time for portability, but subscribers will run on the same thread as the publisher. This means events_async() and subscribe_async() will still work, but will not provide the non-blocking benefit — heavy subscribers will still block publish() just as with the synchronous methods. If you are targeting WASM exclusively, prefer LocalEventBus with the synchronous API to avoid the unnecessary Send overhead.
use rx_event_bus::SharedEventBus;
use rxrust::Observable;
#[tokio::main]
async fn main() {
let bus = SharedEventBus::new(|event: String| {
if event.is_empty() { Err("empty") } else { Ok(event) }
});
let _sub = bus.subscribe_async(|event| {
std::thread::sleep(std::time::Duration::from_secs(5)); println!("Processed: {}", event);
});
let _sub2 = bus
.events_async()
.filter(|e: &String| e.starts_with("important"))
.subscribe(|event| {
println!("Filtered: {}", event);
});
bus.publish("important update".into()).unwrap();
tokio::time::sleep(std::time::Duration::from_secs(6)).await;
}