rx_event_bus 0.2.2

A simple, agnostic reactive extensions event bus utilising rxRust under the hood.
Documentation

RX Event Bus

A simple, agnostic reactive extensions event bus utilising rxRust under the hood.

  • Events are just a generic user-supplied type - implement however you wish
  • Validate and reject invalid events
  • Validation is a generic user-defined function - implement according to the event design
  • Single-threaded LocalEventBus and thread-safe SharedEventBus variants using the equivalent rxRust Local and Shared contexts
  • Async subscription execution
  • No channels! The event stream is an rxRust Observable that can be filtered accordingly

Note: this library uses rxRust v1.0.0-rc3. This has much better reactive extension coverage relative to the latest stable version v0.15.0 published in 2021.

Installation

cargo add rx_event_bus

Sharing the event bus

Both LocalEventBus and SharedEventBus are designed to be shared by cloning. Cloning is cheap (just reference count bumps) and all clones share the same underlying event stream — publishing on one clone delivers events to subscribers on any other.

Passing to structs

Clone the bus and pass it to any struct that needs to publish or subscribe:

use rx_event_bus::LocalEventBus;

struct Producer {
    bus: LocalEventBus<MyEvent, MyError>,
}

impl Producer {
    fn do_work(&self) {
        self.bus.publish(MyEvent::Something).unwrap();
    }
}

struct Consumer {
    bus: LocalEventBus<MyEvent, MyError>,
}

impl Consumer {
    fn listen(&self) {
        self.bus.subscribe(|event| {
            println!("{:?}", event);
        });
    }
}

let bus = LocalEventBus::new(validate);
let producer = Producer { bus: bus.clone() };
let consumer = Consumer { bus: bus.clone() };

consumer.listen();
producer.do_work(); // consumer receives the event

Passing to functions

fn setup_logging(bus: &LocalEventBus<MyEvent, MyError>) {
    bus.subscribe(|event| {
        println!("[log] {:?}", event);
    });
}

let bus = LocalEventBus::new(validate);
setup_logging(&bus);
bus.publish(MyEvent::Something).unwrap(); // logger receives the event

Choosing between Local and Shared

  • LocalEventBus — uses Rc internally, so all clones must stay on the same thread. Use this when your application is single-threaded or all publishers and subscribers live on one thread.
  • SharedEventBus — uses Arc internally and requires Send bounds, so clones can be sent across threads. Use this when you need to publish or subscribe from multiple threads.

The API is identical for both — the only difference is the thread-safety guarantees, so switching between them requires minimal code changes.

Single-threaded LocalEventBus

use rx_event_bus::LocalEventBus;
use rxrust::Observable;

#[derive(Clone, Debug)]
enum ChatEvent {
    Message { user: String, text: String },
    UserJoined { user: String },
}

#[derive(Debug)]
enum ChatError {
    EmptyMessage,
    EmptyUsername,
}

fn validate(event: ChatEvent) -> Result<ChatEvent, ChatError> {
    match &event {
        ChatEvent::Message { user, text } => {
            if user.is_empty() {
                return Err(ChatError::EmptyUsername);
            }
            if text.is_empty() {
                return Err(ChatError::EmptyMessage);
            }
        }
        ChatEvent::UserJoined { user } => {
            if user.is_empty() {
                return Err(ChatError::EmptyUsername);
            }
        }
    }
    Ok(event)
}

fn main() {
    let bus = LocalEventBus::new(validate);

    // Subscribe to all events
    bus.subscribe(|event| {
        println!("  [log] {:?}", event);
    });

    // Subscribe to the event stream with filtering (only messages)
    bus.events()
        .filter(|e| matches!(e, ChatEvent::Message { .. }))
        .subscribe(|event| {
            if let ChatEvent::Message { user, text } = event {
                println!("  [chat] {user}: {text}");
            }
        });

    // Publish some events
    println!("Publishing UserJoined:");
    bus.publish(ChatEvent::UserJoined {
        user: "Alice".into(),
    })
    .unwrap();

    println!("Publishing Message:");
    bus.publish(ChatEvent::Message {
        user: "Alice".into(),
        text: "Hello, world!".into(),
    })
    .unwrap();

    // Validation rejects invalid events
    println!("Publishing invalid (empty message):");
    if let Err(e) = bus.publish(ChatEvent::Message {
        user: "Alice".into(),
        text: "".into(),
    }) {
        println!("  Rejected: {:?}", e);
    }
}

Multi-threaded SharedEventBus

use std::sync::{Arc, Mutex};

use rx_event_bus::SharedEventBus;
use rxrust::Observable;

#[derive(Clone, Debug)]
enum ChatEvent {
    Message { user: String, text: String },
    UserJoined { user: String },
}

#[derive(Clone, Debug)]
enum ChatError {
    EmptyMessage,
    EmptyUsername,
}

fn validate(event: ChatEvent) -> Result<ChatEvent, ChatError> {
    match &event {
        ChatEvent::Message { user, text } => {
            if user.is_empty() {
                return Err(ChatError::EmptyUsername);
            }
            if text.is_empty() {
                return Err(ChatError::EmptyMessage);
            }
        }
        ChatEvent::UserJoined { user } => {
            if user.is_empty() {
                return Err(ChatError::EmptyUsername);
            }
        }
    }
    Ok(event)
}

fn main() {
    let bus = SharedEventBus::new(validate);

    // Collect events from a subscriber thread
    let events: Arc<Mutex<Vec<ChatEvent>>> = Arc::new(Mutex::new(Vec::new()));
    let events_clone = events.clone();

    // Subscribe to all events
    let _sub = bus.subscribe(move |event: ChatEvent| {
        println!("  [log] {:?}", event);
        events_clone.lock().unwrap().push(event);
    });

    // Subscribe to the event stream with filtering (only messages)
    let _sub2 = bus
        .events()
        .filter(|e| matches!(e, ChatEvent::Message { .. }))
        .subscribe(move |event| {
            if let ChatEvent::Message { user, text } = event {
                println!("  [chat] {user}: {text}");
            }
        });

    // Publish from the main thread
    println!("Publishing UserJoined:");
    bus.publish(ChatEvent::UserJoined {
        user: "Alice".into(),
    })
    .unwrap();

    // Publish from a spawned thread
    let bus_clone = bus.clone();
    let handle = std::thread::spawn(move || {
        println!("Publishing Message from another thread:");
        bus_clone
            .publish(ChatEvent::Message {
                user: "Bob".into(),
                text: "Hello from another thread!".into(),
            })
            .unwrap();
    });
    handle.join().unwrap();

    // Validation rejects invalid events
    println!("Publishing invalid (empty message):");
    if let Err(e) = bus.publish(ChatEvent::Message {
        user: "Alice".into(),
        text: "".into(),
    }) {
        println!("  Rejected: {:?}", e);
    }

    // Show collected events
    let collected = events.lock().unwrap();
    println!("\nCollected {} events total.", collected.len());
}

Async subscriptions

By default, publish() calls each subscriber synchronously — if a subscriber does heavy work, it blocks the publisher and other subscribers. SharedEventBus provides async variants that use rxRust's observe_on(SharedScheduler) to dispatch subscriber execution to the tokio thread pool, so publish() returns immediately.

  • events_async() — like events(), but emissions are observed on the SharedScheduler. Chain additional rxRust operators before subscribing as usual.
  • subscribe_async(handler) — convenience for events_async().subscribe(handler).

These require a tokio runtime to be active (e.g. #[tokio::main]).

WASM note: On wasm32 targets, SharedScheduler falls back to spawn_local (single-threaded) since WASM does not support OS threads. The Send bounds are still enforced at compile time for portability, but subscribers will run on the same thread as the publisher. This means events_async() and subscribe_async() will still work, but will not provide the non-blocking benefit — heavy subscribers will still block publish() just as with the synchronous methods. If you are targeting WASM exclusively, prefer LocalEventBus with the synchronous API to avoid the unnecessary Send overhead.

use rx_event_bus::SharedEventBus;
use rxrust::Observable;

#[tokio::main]
async fn main() {
    let bus = SharedEventBus::new(|event: String| {
        if event.is_empty() { Err("empty") } else { Ok(event) }
    });

    // This subscriber runs on the tokio thread pool — publish() won't block.
    let _sub = bus.subscribe_async(|event| {
        std::thread::sleep(std::time::Duration::from_secs(5)); // heavy work
        println!("Processed: {}", event);
    });

    // You can also use events_async() with operators:
    let _sub2 = bus
        .events_async()
        .filter(|e: &String| e.starts_with("important"))
        .subscribe(|event| {
            println!("Filtered: {}", event);
        });

    // Returns immediately — subscribers run in the background.
    bus.publish("important update".into()).unwrap();

    // Give background tasks time to complete.
    tokio::time::sleep(std::time::Duration::from_secs(6)).await;
}