rx_event_bus 0.2.2

A simple, agnostic reactive extensions event bus utilising rxRust under the hood.
Documentation
# RX Event Bus

A simple, agnostic reactive extensions event bus utilising [rxRust](https://github.com/rxRust/rxRust) under the hood.

- Events are just a generic user-supplied type - implement however you wish
- Validate and reject invalid events
- Validation is a generic user-defined function - implement according to the event design
- Single-threaded `LocalEventBus` and thread-safe `SharedEventBus` variants using the equivalent rxRust `Local` and `Shared` contexts
- Async subscription execution
- No channels! The event stream is an rxRust Observable that can be filtered accordingly

*Note: this library uses rxRust v1.0.0-rc3. This has much better reactive extension coverage relative to the latest stable version v0.15.0 published in 2021.*

## Installation

```shell
cargo add rx_event_bus
````

## Sharing the event bus

Both `LocalEventBus` and `SharedEventBus` are designed to be shared by cloning. Cloning is cheap (just reference count bumps) and all clones share the same underlying event stream — publishing on one clone delivers events to subscribers on any other.

### Passing to structs

Clone the bus and pass it to any struct that needs to publish or subscribe:

```rust
use rx_event_bus::LocalEventBus;

struct Producer {
    bus: LocalEventBus<MyEvent, MyError>,
}

impl Producer {
    fn do_work(&self) {
        self.bus.publish(MyEvent::Something).unwrap();
    }
}

struct Consumer {
    bus: LocalEventBus<MyEvent, MyError>,
}

impl Consumer {
    fn listen(&self) {
        self.bus.subscribe(|event| {
            println!("{:?}", event);
        });
    }
}

let bus = LocalEventBus::new(validate);
let producer = Producer { bus: bus.clone() };
let consumer = Consumer { bus: bus.clone() };

consumer.listen();
producer.do_work(); // consumer receives the event
```

### Passing to functions

```rust
fn setup_logging(bus: &LocalEventBus<MyEvent, MyError>) {
    bus.subscribe(|event| {
        println!("[log] {:?}", event);
    });
}

let bus = LocalEventBus::new(validate);
setup_logging(&bus);
bus.publish(MyEvent::Something).unwrap(); // logger receives the event
```

### Choosing between Local and Shared

- **`LocalEventBus`** — uses `Rc` internally, so all clones must stay on the same thread. Use this when your application is single-threaded or all publishers and subscribers live on one thread.
- **`SharedEventBus`** — uses `Arc` internally and requires `Send` bounds, so clones can be sent across threads. Use this when you need to publish or subscribe from multiple threads.

The API is identical for both — the only difference is the thread-safety guarantees, so switching between them requires minimal code changes.

## Single-threaded LocalEventBus

```rust
use rx_event_bus::LocalEventBus;
use rxrust::Observable;

#[derive(Clone, Debug)]
enum ChatEvent {
    Message { user: String, text: String },
    UserJoined { user: String },
}

#[derive(Debug)]
enum ChatError {
    EmptyMessage,
    EmptyUsername,
}

fn validate(event: ChatEvent) -> Result<ChatEvent, ChatError> {
    match &event {
        ChatEvent::Message { user, text } => {
            if user.is_empty() {
                return Err(ChatError::EmptyUsername);
            }
            if text.is_empty() {
                return Err(ChatError::EmptyMessage);
            }
        }
        ChatEvent::UserJoined { user } => {
            if user.is_empty() {
                return Err(ChatError::EmptyUsername);
            }
        }
    }
    Ok(event)
}

fn main() {
    let bus = LocalEventBus::new(validate);

    // Subscribe to all events
    bus.subscribe(|event| {
        println!("  [log] {:?}", event);
    });

    // Subscribe to the event stream with filtering (only messages)
    bus.events()
        .filter(|e| matches!(e, ChatEvent::Message { .. }))
        .subscribe(|event| {
            if let ChatEvent::Message { user, text } = event {
                println!("  [chat] {user}: {text}");
            }
        });

    // Publish some events
    println!("Publishing UserJoined:");
    bus.publish(ChatEvent::UserJoined {
        user: "Alice".into(),
    })
    .unwrap();

    println!("Publishing Message:");
    bus.publish(ChatEvent::Message {
        user: "Alice".into(),
        text: "Hello, world!".into(),
    })
    .unwrap();

    // Validation rejects invalid events
    println!("Publishing invalid (empty message):");
    if let Err(e) = bus.publish(ChatEvent::Message {
        user: "Alice".into(),
        text: "".into(),
    }) {
        println!("  Rejected: {:?}", e);
    }
}
```

## Multi-threaded SharedEventBus

```rust
use std::sync::{Arc, Mutex};

use rx_event_bus::SharedEventBus;
use rxrust::Observable;

#[derive(Clone, Debug)]
enum ChatEvent {
    Message { user: String, text: String },
    UserJoined { user: String },
}

#[derive(Clone, Debug)]
enum ChatError {
    EmptyMessage,
    EmptyUsername,
}

fn validate(event: ChatEvent) -> Result<ChatEvent, ChatError> {
    match &event {
        ChatEvent::Message { user, text } => {
            if user.is_empty() {
                return Err(ChatError::EmptyUsername);
            }
            if text.is_empty() {
                return Err(ChatError::EmptyMessage);
            }
        }
        ChatEvent::UserJoined { user } => {
            if user.is_empty() {
                return Err(ChatError::EmptyUsername);
            }
        }
    }
    Ok(event)
}

fn main() {
    let bus = SharedEventBus::new(validate);

    // Collect events from a subscriber thread
    let events: Arc<Mutex<Vec<ChatEvent>>> = Arc::new(Mutex::new(Vec::new()));
    let events_clone = events.clone();

    // Subscribe to all events
    let _sub = bus.subscribe(move |event: ChatEvent| {
        println!("  [log] {:?}", event);
        events_clone.lock().unwrap().push(event);
    });

    // Subscribe to the event stream with filtering (only messages)
    let _sub2 = bus
        .events()
        .filter(|e| matches!(e, ChatEvent::Message { .. }))
        .subscribe(move |event| {
            if let ChatEvent::Message { user, text } = event {
                println!("  [chat] {user}: {text}");
            }
        });

    // Publish from the main thread
    println!("Publishing UserJoined:");
    bus.publish(ChatEvent::UserJoined {
        user: "Alice".into(),
    })
    .unwrap();

    // Publish from a spawned thread
    let bus_clone = bus.clone();
    let handle = std::thread::spawn(move || {
        println!("Publishing Message from another thread:");
        bus_clone
            .publish(ChatEvent::Message {
                user: "Bob".into(),
                text: "Hello from another thread!".into(),
            })
            .unwrap();
    });
    handle.join().unwrap();

    // Validation rejects invalid events
    println!("Publishing invalid (empty message):");
    if let Err(e) = bus.publish(ChatEvent::Message {
        user: "Alice".into(),
        text: "".into(),
    }) {
        println!("  Rejected: {:?}", e);
    }

    // Show collected events
    let collected = events.lock().unwrap();
    println!("\nCollected {} events total.", collected.len());
}
```

## Async subscriptions

By default, `publish()` calls each subscriber synchronously — if a subscriber does heavy work, it blocks the publisher and other subscribers. `SharedEventBus` provides async variants that use rxRust's `observe_on(SharedScheduler)` to dispatch subscriber execution to the tokio thread pool, so `publish()` returns immediately.

- **`events_async()`** — like `events()`, but emissions are observed on the `SharedScheduler`. Chain additional rxRust operators before subscribing as usual.
- **`subscribe_async(handler)`** — convenience for `events_async().subscribe(handler)`.

These require a tokio runtime to be active (e.g. `#[tokio::main]`).

**WASM note:** On `wasm32` targets, `SharedScheduler` falls back to `spawn_local` (single-threaded) since WASM does not support OS threads. The `Send` bounds are still enforced at compile time for portability, but subscribers will run on the same thread as the publisher. This means `events_async()` and `subscribe_async()` will still work, but will not provide the non-blocking benefit — heavy subscribers will still block `publish()` just as with the synchronous methods. If you are targeting WASM exclusively, prefer `LocalEventBus` with the synchronous API to avoid the unnecessary `Send` overhead.

```rust
use rx_event_bus::SharedEventBus;
use rxrust::Observable;

#[tokio::main]
async fn main() {
    let bus = SharedEventBus::new(|event: String| {
        if event.is_empty() { Err("empty") } else { Ok(event) }
    });

    // This subscriber runs on the tokio thread pool — publish() won't block.
    let _sub = bus.subscribe_async(|event| {
        std::thread::sleep(std::time::Duration::from_secs(5)); // heavy work
        println!("Processed: {}", event);
    });

    // You can also use events_async() with operators:
    let _sub2 = bus
        .events_async()
        .filter(|e: &String| e.starts_with("important"))
        .subscribe(|event| {
            println!("Filtered: {}", event);
        });

    // Returns immediately — subscribers run in the background.
    bus.publish("important update".into()).unwrap();

    // Give background tasks time to complete.
    tokio::time::sleep(std::time::Duration::from_secs(6)).await;
}
```