Expand description
§⚡ JAEB — Just Another Event Bus
A lightweight, in-process event bus for Tokio — snapshot-driven dispatch with retry, dead-letter, and middleware support.
§✨ Highlights
- 🔀 Sync + Async handlers behind one
subscribeAPI - 🔁 Retry & Dead Letters with per-listener policies
- 🧩 Typed & Global Middleware pipeline
- 📊 Optional Metrics (Prometheus-compatible via
metricscrate) - 🔍 Built-in Tracing support (
tracefeature) - 🛑 Graceful Shutdown with async drain + timeout
- 🏗️ summer-rs Integration for plugin-based auto-registration
§When to use JAEB
Use JAEB when you need:
- domain events inside one process (e.g.
OrderCreated-> projections, notifications, audit) - decoupled modules with type-safe fan-out
- retry/dead-letter behavior per listener
- deterministic sync-lane ordering with priority hints
JAEB is not a message broker — it does not provide persistence, replay, or cross-process delivery. If you need durable messaging, consider pairing JAEB with an external queue for outbox-style patterns.
§Installation
[dependencies]
jaeb = "0.4.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }With metrics instrumentation:
[dependencies]
jaeb = { version = "0.4.0", features = ["metrics"] }
With tracing:
[dependencies]
jaeb = { version = "0.4.0", features = ["trace"] }With standalone handler macros:
[dependencies]
jaeb = { version = "0.4.0", features = ["macros"] }§⚡ Quick Start
Full example with sync/async handlers, retry policies, and dead-letter handling:
use std::time::Duration;
use jaeb::{
DeadLetter, EventBus, EventBusError, EventHandler, HandlerResult, RetryStrategy, SubscriptionPolicy, SyncEventHandler,
};
#[derive(Clone)]
struct OrderCheckoutEvent {
order_id: i64,
}
struct AsyncCheckoutHandler;
impl EventHandler<OrderCheckoutEvent> for AsyncCheckoutHandler {
async fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
println!("async checkout {}", event.order_id);
Ok(())
}
}
struct SyncAuditHandler;
impl SyncEventHandler<OrderCheckoutEvent> for SyncAuditHandler {
fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
println!("sync audit {}", event.order_id);
Ok(())
}
}
struct DeadLetterLogger;
impl SyncEventHandler<DeadLetter> for DeadLetterLogger {
fn handle(&self, dl: &DeadLetter) -> HandlerResult {
eprintln!(
"dead-letter: event={} listener={} attempts={} error={}",
dl.event_name, dl.subscription_id, dl.attempts, dl.error
);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), EventBusError> {
let bus = EventBus::builder().buffer_size(64).build().await?;
let retry_policy = SubscriptionPolicy::default()
.with_priority(10)
.with_max_retries(2)
.with_retry_strategy(RetryStrategy::Fixed(Duration::from_millis(50)));
let checkout_sub = bus
.subscribe_with_policy::<OrderCheckoutEvent, _, _>(AsyncCheckoutHandler, retry_policy)
.await?;
let _audit_sub = bus.subscribe::<OrderCheckoutEvent, _, _>(SyncAuditHandler).await?;
let _dl_sub = bus.subscribe_dead_letters(DeadLetterLogger).await?;
bus.publish(OrderCheckoutEvent { order_id: 42 }).await?;
bus.try_publish(OrderCheckoutEvent { order_id: 43 })?;
checkout_sub.unsubscribe().await?;
bus.shutdown().await?;
Ok(())
}Detailed usage patterns (builder descriptors, direct subscribe*, manual descriptors,
Deps/Dep<T>, and trade-offs) are documented in USAGE.md.
§Architecture
JAEB uses an immutable snapshot registry (ArcSwap) for hot-path reads:
graph LR
P[publish] --> S[Load Snapshot]
S --> GM[Global Middleware]
GM --> TM[Typed Middleware]
TM --> AL[Async Lane - spawned]
TM --> SL[Sync Lane - serialized FIFO]- async and sync listeners are separated per event type
- priority is applied per lane (higher first)
- equal priority preserves registration order
§API Highlights
EventBus::builder()for buffer size, timeouts, concurrency limit, and default policydefault_subscription_policy(SubscriptionPolicy)sets fallback policy forsubscribesubscribe_with_policy(handler, policy)accepts:SubscriptionPolicyfor async handlersSyncSubscriptionPolicyfor sync handlers and once handlers
publishwaits for sync listeners and task-spawn for async listenerstry_publishis non-blocking and returnsEventBusError::ChannelFullon saturation
Core policy types:
SubscriptionPolicy { priority, max_retries, retry_strategy, dead_letter }SyncSubscriptionPolicy { priority, dead_letter }IntoSubscriptionPolicy<M>sealed trait for compile-time mode/policy safety
Deprecated aliases (will be removed in the next major version)
FailurePolicy->SubscriptionPolicyNoRetryPolicy->SyncSubscriptionPolicyIntoFailurePolicy->IntoSubscriptionPolicy
§Examples
examples/basic-pubsub- minimal publish/subscribeexamples/sync-handler- sync dispatch lane behaviorexamples/closure-handlers- closure-based handlersexamples/retry-strategies- fixed/exponential/jitter retry configurationexamples/dead-letters- dead-letter subscription and inspectionexamples/middleware- global and typed middlewareexamples/backpressure-try_publishsaturation behaviorexamples/concurrency-limit- max concurrent async handlersexamples/graceful-shutdown- controlled shutdown and drainingexamples/introspection-EventBus::stats()outputexamples/fire-once- one-shot / fire-once handlerexamples/panic-safety- panic handling behavior in handlersexamples/subscription-lifecycle- subscribe/unsubscribe lifecycleexamples/axum-integration- axum REST app publishing domain eventsexamples/macro-handlers- standalone#[handler]+#[dead_letter_handler]with builderexamples/jaeb-demo- full demo with tracing + metrics exporterexamples/summer-jaeb-demo- summer-rs plugin +#[event_listener]examples/observability-stack- Grafana + Prometheus + Loki + Tempo demoexamples/jaeb-visualizer- TUI visualizer for event bus activity

Run an example:
cargo run -p axum-integration§Feature Flags
| Flag | Default | Description |
|---|---|---|
macros | off | Re-exports #[handler] and #[dead_letter_handler] |
metrics | off | Enables Prometheus-compatible instrumentation via metrics |
trace | off | Enables tracing spans and events for dispatch diagnostics |
test-utils | off | Exposes TestBus helpers for integration tests |
When metrics is enabled, JAEB records:
eventbus.publish(counter, labels:event)eventbus.handler.duration(histogram, labels:event,handler)eventbus.handler.error(counter, labels:event,listener)eventbus.dead_letter(counter, labels:event,handler) — fires when a dead letter is createdeventbus.handler.join_error(counter, labels:event)
§summer-rs Integration
Use summer-jaeb and summer-jaeb-macros for plugin-based auto-registration via #[event_listener].
Macro support includes:
retriesretry_strategyretry_base_msretry_max_msdead_letterpriorityname
use jaeb::{DeadLetter, EventBus, HandlerResult};
use summer::{App, AppBuilder, async_trait};
use summer::extractor::Component;
use summer::plugin::{MutableComponentRegistry, Plugin};
use summer_jaeb::{SummerJaeb, event_listener};
#[derive(Clone, Debug)]
struct OrderPlacedEvent {
order_id: u32,
}
/// A dummy database pool registered as a summer Component via a plugin.
#[derive(Clone, Debug)]
struct DbPool;
impl DbPool {
fn log_order(&self, order_id: u32) {
println!("DbPool: persisted order {order_id}");
}
}
struct DbPoolPlugin;
#[async_trait]
impl Plugin for DbPoolPlugin {
async fn build(&self, app: &mut AppBuilder) {
app.add_component(DbPool);
}
fn name(&self) -> &str { "DbPoolPlugin" }
}
/// Async listener — `DbPool` is injected automatically from summer's DI container.
#[event_listener(retries = 2, retry_strategy = "fixed", retry_base_ms = 500, dead_letter = true)]
async fn on_order_placed(event: &OrderPlacedEvent, Component(db): Component<DbPool>) -> HandlerResult {
db.log_order(event.order_id);
Ok(())
}
/// Sync dead-letter listener — auto-detected from the `DeadLetter` event type.
#[event_listener(name = "dead_letter")]
fn on_dead_letter(event: &DeadLetter) -> HandlerResult {
eprintln!("dead letter: event={}, attempts={}", event.event_name, event.attempts);
Ok(())
}
#[tokio::main]
async fn main() {
App::new()
.add_plugin(DbPoolPlugin)
.add_plugin(SummerJaeb::new().with_dependency("DbPoolPlugin"))
.run()
.await;
}All #[event_listener] functions are auto-discovered via inventory and subscribed
during plugin startup — no manual registration needed.
§Standalone Macros
Enable the macros feature to use #[handler] and #[dead_letter_handler]
without summer-rs.
#[handler] generates a struct named <FunctionName>Handler that implements
HandlerDescriptor. Register it via EventBusBuilder::handler. Policy
attributes are supported:
retriesretry_strategyretry_base_msretry_max_msdead_letterpriorityname
#[dead_letter_handler] generates a struct that implements
DeadLetterDescriptor. The function must be synchronous and accept &DeadLetter.
Register it via EventBusBuilder::dead_letter.
Dep<T> parameters are supported for both macros and are resolved from
EventBusBuilder::deps(...) at build time. Supported forms:
Dep(name): Dep<T>name: Dep<T>
use std::time::Duration;
use jaeb::{DeadLetter, EventBus, HandlerResult, dead_letter_handler, handler};
#[derive(Clone, Debug)]
struct Payment {
id: u32,
}
#[handler(retries = 2, retry_strategy = "fixed", retry_base_ms = 50, dead_letter = true, name = "payment-processor")]
async fn process_payment(event: &Payment) -> HandlerResult {
println!("processing payment {}", event.id);
Ok(())
}
#[dead_letter_handler]
fn on_dead_letter(event: &DeadLetter) -> HandlerResult {
println!(
"dead-letter: event={}, handler={:?}, attempts={}, error={}",
event.event_name, event.handler_name, event.attempts, event.error
);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), jaeb::EventBusError> {
let bus = EventBus::builder()
.buffer_size(64)
.handler(process_payment)
.dead_letter(on_dead_letter)
.build()
.await?;
bus.publish(Payment { id: 7 }).await?;
tokio::time::sleep(Duration::from_millis(300)).await;
bus.shutdown().await
}Dep<T> example:
use std::sync::Arc;
use jaeb::{Dep, Deps, EventBus, HandlerResult, handler};
#[derive(Clone)]
struct AuditLog;
#[derive(Clone)]
struct Payment;
#[handler]
async fn process_with_dep(_event: &Payment, Dep(log): Dep<Arc<AuditLog>>) -> HandlerResult {
let _ = log;
Ok(())
}
#[handler]
fn process_with_wrapper(_event: &Payment, log: Dep<Arc<AuditLog>>) -> HandlerResult {
let _inner: Arc<AuditLog> = log.0;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), jaeb::EventBusError> {
let log = Arc::new(AuditLog);
let bus = EventBus::builder()
.handler(process_with_dep)
.handler(process_with_wrapper)
.deps(Deps::new().insert(log))
.build()
.await?;
bus.shutdown().await
}§Notes
- JAEB requires a running Tokio runtime.
- Events must be
Send + Sync + 'static; async handlers also requireClone. - The crate enforces
#![forbid(unsafe_code)].
§License
jaeb is distributed under the MIT License.
Copyright (c) 2025-2026 Linke Thomas
This project uses third-party libraries. See THIRD-PARTY-LICENSES for dependency and license details.
Re-exports§
Modules§
Structs§
- Async
FnMode - Marker type that selects async function dispatch for
IntoHandler. - Async
Mode - Marker type that selects async struct dispatch for
IntoHandler. - BusStats
- A point-in-time snapshot of the event bus internal state.
- Dead
Letter - A dead-letter record emitted when a handler exhausts all retry attempts.
- Event
Bus - The central in-process event bus.
- Event
BusBuilder - Builder for constructing an
EventBuswith custom configuration and pre-registered handlers. - Handler
Info - Information about a single registered handler, as reported by
BusStats. - Subscription
- Handle representing an active listener or middleware registration.
- Subscription
Guard - RAII guard that automatically unsubscribes a listener when dropped.
- Subscription
Id - Unique identifier for a listener or middleware registration.
- Subscription
Policy - Policy controlling how a subscription is scheduled and how failures are treated.
- Sync
FnMode - Marker type that selects sync function dispatch for
IntoHandler. - Sync
Mode - Marker type that selects sync struct dispatch for
IntoHandler. - Sync
Subscription Policy - Subscription policy for handlers that do not support retries.
Enums§
- Config
Error - Specific reason why an
EventBusconfiguration is invalid. - Event
BusError - Errors returned by
EventBuspublish and shutdown operations. - Middleware
Decision - Decision returned by a middleware after inspecting an event.
- Retry
Strategy - Strategy for computing the delay between retry attempts.
Traits§
- Dead
Letter Descriptor - Trait for types that register a dead-letter handler on an
EventBus. - Event
- Marker trait for all publishable event types.
- Event
Handler - Trait for asynchronous event handlers.
- Handler
Descriptor - Trait for types that can register themselves as a handler on an
EventBususing dependencies supplied by aDepscontainer. - Into
Handler - Type-erases a concrete handler into the internal representation expected by the bus registry.
- Into
Subscription Policy - Trait that converts a policy type into a
SubscriptionPolicysuitable for the handler’s dispatch mode. - Middleware
- Async middleware trait.
- Sync
Event Handler - Trait for synchronous event handlers.
- Sync
Middleware - Sync middleware trait.
- Typed
Middleware - Async middleware scoped to a specific event type
E. - Typed
Sync Middleware - Sync middleware scoped to a specific event type
E.
Type Aliases§
- Handler
Error - The error type returned by
EventHandler::handleandSyncEventHandler::handleon failure. - Handler
Result - The result type for handler methods.