# JAEB - Just Another Event Bus
[](https://crates.io/crates/jaeb)
[](https://docs.rs/jaeb)
[](https://github.com/LinkeTh/jaeb/blob/main/LICENSE)
In-process, actor-based event bus for Tokio applications.
JAEB provides:
- sync + async listeners via a unified `subscribe` API
- automatic dispatch-mode selection based on handler trait
- explicit listener unsubscription via `Subscription` handles or RAII `SubscriptionGuard`
- dependency injection via handler structs
- retry policies with configurable delay for async handlers
- dead-letter stream for terminal failures
- explicit `Result`-based error handling (no panics in the public API)
- graceful shutdown with queue draining and in-flight task completion
- idempotent shutdown (safe to call multiple times)
- optional Prometheus-compatible metrics via the `metrics` crate
- structured tracing with per-handler spans
- `#![forbid(unsafe_code)]`
## Installation
```toml
[dependencies]
jaeb = { version = "0.2.2" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```
To enable metrics instrumentation:
```toml
[dependencies]
jaeb = { version = "0.2.2", features = ["metrics"] }
```
## Quick Start
```rust
use std::time::Duration;
use jaeb::{
DeadLetter, EventBus, EventBusError, EventHandler, FailurePolicy,
HandlerResult, SyncEventHandler,
};
#[derive(Clone)]
struct OrderCheckoutEvent {
order_id: i64,
}
struct AsyncCheckoutHandler;
impl EventHandler<OrderCheckoutEvent> for AsyncCheckoutHandler {
async fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
println!("async checkout {}", event.order_id);
Ok(())
}
}
struct SyncAuditHandler;
impl SyncEventHandler<OrderCheckoutEvent> for SyncAuditHandler {
fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
println!("sync audit {}", event.order_id);
Ok(())
}
}
struct DeadLetterLogger;
impl SyncEventHandler<DeadLetter> for DeadLetterLogger {
fn handle(&self, dl: &DeadLetter) -> HandlerResult {
eprintln!(
"dead-letter: event={} listener={} attempts={} error={}",
dl.event_name, dl.subscription_id, dl.attempts, dl.error
);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), EventBusError> {
let bus = EventBus::new(64)?;
let retry_policy = FailurePolicy::default()
.with_max_retries(2)
.with_retry_delay(Duration::from_millis(50));
// Async handler -- dispatch mode inferred from EventHandler impl
let checkout_sub = bus
.subscribe_with_policy(AsyncCheckoutHandler, retry_policy)
.await?;
// Sync handler -- dispatch mode inferred from SyncEventHandler impl
let _audit_sub = bus.subscribe(SyncAuditHandler).await?;
// Dead-letter handler (convenience method, auto-sets dead_letter: false)
bus.subscribe_dead_letters(DeadLetterLogger).await?;
bus.publish(OrderCheckoutEvent { order_id: 42 }).await?;
bus.try_publish(OrderCheckoutEvent { order_id: 43 })?;
checkout_sub.unsubscribe().await?;
bus.shutdown().await?;
Ok(())
}
```
## API Overview
### EventBus
- `EventBus::new(buffer) -> Result<EventBus, EventBusError>` -- create a new bus with the given channel capacity
- `EventBus::builder()` -- builder for fine-grained configuration (buffer size, timeouts, concurrency limits)
- `subscribe(handler) -> Result<Subscription, EventBusError>` -- dispatch mode inferred from trait
- `subscribe_with_policy(handler, FailurePolicy) -> Result<Subscription, EventBusError>`
- `subscribe_dead_letters(handler) -> Result<Subscription, EventBusError>`
- `publish(event) -> Result<(), EventBusError>`
- `try_publish(event) -> Result<(), EventBusError>` -- non-blocking, returns `ChannelFull` when queue is full
- `unsubscribe(subscription_id) -> Result<bool, EventBusError>`
- `shutdown() -> Result<(), EventBusError>` -- idempotent, drains queue + in-flight tasks
- `is_healthy() -> bool` -- **async**, checks if the internal actor is still running
`EventBus` is `Clone` -- all clones share the same underlying actor.
### Handler Traits
- `EventHandler<E>` -- async handler, dispatched on a spawned task (requires `E: Clone`)
- `SyncEventHandler<E>` -- sync handler, awaited inline during dispatch
The dispatch mode is selected automatically based on which trait is implemented.
The `IntoHandler<E, Mode>` trait performs the conversion; the `Mode` parameter
(`AsyncMode` / `SyncMode`) is inferred, so callers simply write `bus.subscribe(handler)`.
### Subscription & SubscriptionGuard
`Subscription` holds a `SubscriptionId` and a bus handle. Call `subscription.unsubscribe()`
to remove the handler, or use `bus.unsubscribe(id)` directly.
`SubscriptionGuard` is an RAII wrapper that automatically unsubscribes the listener when
dropped. Convert a `Subscription` via `subscription.into_guard()`. Call `guard.disarm()`
to prevent the automatic unsubscribe.
### Types
- `Event` -- blanket trait implemented for all `T: Send + Sync + 'static`
- `HandlerResult = Result<(), Box<dyn Error + Send + Sync>>`
- `FailurePolicy { max_retries, retry_delay, dead_letter }`
- `DeadLetter { event_name, subscription_id, attempts, error }`
- `SubscriptionId` -- opaque handler ID (wraps `u64`)
## Feature Flags
| `metrics` | off | Enables Prometheus-compatible instrumentation via the `metrics` crate |
When the `metrics` feature is enabled, the bus records:
- `eventbus.publish` (counter, per event type)
- `eventbus.handler.duration` (histogram, per event type)
- `eventbus.handler.error` (counter, per event type)
- `eventbus.handler.join_error` (counter, per event type)
## Observability
JAEB uses the `tracing` crate throughout. Key spans and events:
- `event_bus.publish` / `event_bus.subscribe` / `event_bus.shutdown` -- top-level operations
- `eventbus.handler` span -- per-handler execution with `event`, `mode`, and `listener_id` fields
- `handler.retry` (warn) -- logged on each retry attempt
- `handler.failed` (error) -- logged when retries are exhausted
- `handler.join_error` (error) -- logged when a spawned task panics
## Semantics
- `publish` waits for dispatch and sync listeners to finish.
- `publish` does **not** wait for async listeners to finish.
- async handler failures can be retried based on `FailurePolicy`.
- sync handlers execute exactly once -- retries are not supported (`SyncRetryNotSupported`).
- after retries are exhausted (async) or on first failure (sync), dead-letter events are emitted when `dead_letter: true`.
- dead-letter handlers themselves cannot trigger further dead letters (recursion guard).
- `shutdown` drains queued publish messages and waits for in-flight async listeners.
- after shutdown, all operations return `EventBusError::ActorStopped`.
## Error Variants
- `EventBusError::ActorStopped` -- the actor has shut down or the channel is closed
- `EventBusError::ChannelFull` -- the internal queue is full (`try_publish` only)
- `EventBusError::InvalidConfig(ConfigError)` -- invalid builder/constructor configuration (zero buffer size, zero concurrency)
- `EventBusError::ShutdownTimeout` -- the configured `shutdown_timeout` expired and in-flight async tasks were forcibly aborted
- `EventBusError::SyncRetryNotSupported` -- a sync handler was subscribed with `max_retries > 0` (retries are async-only)
## Examples
See [`examples/jaeb-demo`](examples/jaeb-demo) for a working demo that includes:
- async and sync handlers with retry policies
- dead-letter logging
- Prometheus metrics exporter
- structured tracing setup
Run it with:
```sh
cd examples/jaeb-demo
RUST_LOG=info,jaeb=trace cargo run
```
## Notes
- JAEB requires a running Tokio runtime.
- Events must be `Send + Sync + 'static`. Async handlers additionally require events to be `Clone`.
- Events are in-process only (no persistence, replay, or broker integration).
- The crate enforces `#![forbid(unsafe_code)]`.
## License
jaeb is distributed under the [MIT License](LICENSE).
Copyright (c) 2025-2026 Linke Thomas
This project uses third-party libraries. See [THIRD-PARTY-LICENSES](THIRD-PARTY-LICENSES)
for the full list of dependencies, their versions, and their respective license terms.