Expand description
§JAEB - Just Another Event Bus
In-process, snapshot-driven event bus for Tokio applications.
JAEB provides:
- sync + async listeners via a unified
subscribeAPI - automatic dispatch-mode selection based on handler trait
- explicit listener unsubscription via
Subscriptionhandles or RAIISubscriptionGuard - dependency injection via handler structs
- retry policies with configurable strategy for async handlers
- dead-letter stream for terminal failures
- explicit
Result-based error handling - graceful shutdown with in-flight task completion
- idempotent shutdown
- optional Prometheus-compatible metrics via the
metricscrate - structured tracing with per-handler spans
- summer-rs integration via summer-jaeb and
#[event_listener]macro support summer-jaeb-macros
§Installation
[dependencies]
jaeb = { version = "0.3.2" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }To enable metrics instrumentation:
[dependencies]
jaeb = { version = "0.3.2", features = ["metrics"] }§Quick Start
use std::time::Duration;
use jaeb::{
DeadLetter, EventBus, EventBusError, EventHandler, FailurePolicy,
HandlerResult, RetryStrategy, SyncEventHandler,
};
#[derive(Clone)]
struct OrderCheckoutEvent {
order_id: i64,
}
struct AsyncCheckoutHandler;
impl EventHandler<OrderCheckoutEvent> for AsyncCheckoutHandler {
async fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
println!("async checkout {}", event.order_id);
Ok(())
}
}
struct SyncAuditHandler;
impl SyncEventHandler<OrderCheckoutEvent> for SyncAuditHandler {
fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
println!("sync audit {}", event.order_id);
Ok(())
}
}
struct DeadLetterLogger;
impl SyncEventHandler<DeadLetter> for DeadLetterLogger {
fn handle(&self, dl: &DeadLetter) -> HandlerResult {
eprintln!(
"dead-letter: event={} listener={} attempts={} error={}",
dl.event_name, dl.subscription_id, dl.attempts, dl.error
);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), EventBusError> {
let bus = EventBus::new(64)?;
let retry_policy = FailurePolicy::default()
.with_max_retries(2)
.with_retry_strategy(RetryStrategy::Fixed(Duration::from_millis(50)));
// Async handler -- dispatch mode inferred from EventHandler impl
let checkout_sub = bus
.subscribe_with_policy(AsyncCheckoutHandler, retry_policy)
.await?;
// Sync handler -- dispatch mode inferred from SyncEventHandler impl
let _audit_sub = bus.subscribe(SyncAuditHandler).await?;
// Dead-letter handler (convenience method, auto-sets dead_letter: false)
bus.subscribe_dead_letters(DeadLetterLogger).await?;
bus.publish(OrderCheckoutEvent { order_id: 42 }).await?;
bus.try_publish(OrderCheckoutEvent { order_id: 43 })?;
checkout_sub.unsubscribe().await?;
bus.shutdown().await?;
Ok(())
}§API Overview
§EventBus
EventBus::new(buffer) -> Result<EventBus, EventBusError>– create a new bus with the given channel capacityEventBus::builder()– builder for fine-grained configuration (buffer size, timeouts, concurrency limits)subscribe(handler) -> Result<Subscription, EventBusError>– dispatch mode inferred from traitsubscribe_with_policy(handler, policy) -> Result<Subscription, EventBusError>– policy type is compile-time checked (FailurePolicyfor async,NoRetryPolicyfor sync)subscribe_dead_letters(handler) -> Result<Subscription, EventBusError>publish(event) -> Result<(), EventBusError>try_publish(event) -> Result<(), EventBusError>– non-blocking, returnsChannelFullwhen immediate dispatch capacity is unavailableunsubscribe(subscription_id) -> Result<bool, EventBusError>shutdown() -> Result<(), EventBusError>– idempotent, drains in-flight tasksasync fn is_healthy() -> bool– checks if the internal control loop is still running
EventBus is Clone – all clones share the same underlying runtime state.
§Handler Traits
EventHandler<E>– async handler, dispatched on a spawned task (requiresE: Clone)SyncEventHandler<E>– sync handler, awaited inline during dispatch
The dispatch mode is selected automatically based on which trait is implemented.
The IntoHandler<E, Mode> trait performs the conversion; the Mode parameter
(AsyncMode / SyncMode) is inferred, so callers simply write bus.subscribe(handler).
§Subscription & SubscriptionGuard
Subscription holds a SubscriptionId and a bus handle. Call subscription.unsubscribe()
to remove the handler, or use bus.unsubscribe(id) directly.
SubscriptionGuard is an RAII wrapper that automatically unsubscribes the listener when
dropped. Convert a Subscription via subscription.into_guard(). Call guard.disarm()
to prevent the automatic unsubscribe.
§Types
Event– blanket trait implemented for allT: Send + Sync + 'staticHandlerResult = Result<(), Box<dyn Error + Send + Sync>>FailurePolicy { max_retries, retry_strategy, dead_letter }– for async handlersNoRetryPolicy { dead_letter }– for sync handlers (or async handlers that don’t need retries)IntoFailurePolicy<M>– sealed trait enforcing compile-time policy/handler compatibilityDeadLetter { event_name, subscription_id, attempts, error, event, failed_at, listener_name }SubscriptionId– opaque handler ID (wrapsu64)
§Feature Flags
| Flag | Default | Description |
|---|---|---|
metrics | off | Enables Prometheus-compatible instrumentation via the metrics crate |
When the metrics feature is enabled, the bus records:
eventbus.publish(counter, per event type)eventbus.handler.duration(histogram, per event type)eventbus.handler.error(counter, per event type)eventbus.handler.join_error(counter, per event type)
§Observability
JAEB uses the tracing crate throughout. Key spans and events:
event_bus.publish/event_bus.subscribe/event_bus.shutdown– top-level operationseventbus.handlerspan – per-handler execution withevent,mode, andlistener_idfieldshandler.retry(warn) – logged on each retry attempthandler.failed(error) – logged when retries are exhausted- async handler panics are surfaced as task failures and follow retry/dead-letter policy
§Architecture
JAEB uses a split control/data architecture:
- A snapshot registry (
ArcSwap<RegistrySnapshot>) stores listeners and middleware in immutable per-type slots for low-overhead publish-path reads. - A lightweight control loop handles async failure notifications, dead-letter routing, and shutdown coordination.
Dispatch uses two lanes per event type:
- sync lane: serialized by a per-type gate (FIFO for sync dispatch)
- async lane: spawned in background and not blocked by sync backlog
§Semantics
publishwaits for dispatch and sync listeners to finish.publishdoes not wait for async listeners to finish.- async handler failures can be retried based on
FailurePolicy. - sync handlers execute exactly once – retries are not supported; passing a
FailurePolicywith retries to a sync handler is a compile error viaIntoFailurePolicy<M>. - after retries are exhausted (async) or on first failure (sync), dead-letter events are emitted when
dead_letter: true. - dead-letter handlers themselves cannot trigger further dead letters (recursion guard).
shutdownwaits for in-flight async listeners (with optional timeout).- after shutdown, all operations return
EventBusError::Stopped.
§Error Variants
EventBusError::Stopped– shutdown has started or the bus is stoppedEventBusError::ChannelFull– publish saturation limit reached (try_publishonly)EventBusError::InvalidConfig(ConfigError)– invalid builder/constructor configuration (zero buffer size, zero concurrency)EventBusError::MiddlewareRejected(String)– a middleware rejected the event before it reached any listenerEventBusError::ShutdownTimeout– the configuredshutdown_timeoutexpired and in-flight async tasks were forcibly aborted
§Examples
See examples/jaeb-demo for a working demo that includes:
- async and sync handlers with retry policies
- dead-letter logging
- Prometheus metrics exporter
- structured tracing setup
Run it with:
cd examples/jaeb-demo
RUST_LOG=info,jaeb=trace cargo run§Notes
- JAEB requires a running Tokio runtime.
- Events must be
Send + Sync + 'static. Async handlers additionally require events to beClone. - Events are in-process only (no persistence, replay, or broker integration).
- The crate enforces
#![forbid(unsafe_code)].
§License
jaeb is distributed under the MIT License.
Copyright (c) 2025-2026 Linke Thomas
This project uses third-party libraries. See THIRD-PARTY-LICENSES for the full list of dependencies, their versions, and their respective license terms.
Structs§
- Async
FnMode - Marker type that selects async function dispatch for
IntoHandler. - Async
Mode - Marker type that selects async struct dispatch for
IntoHandler. - BusStats
- A point-in-time snapshot of the event bus internal state.
- Dead
Letter - A dead-letter record emitted when a handler exhausts all retry attempts.
- Event
Bus - The central in-process event bus.
- Event
BusBuilder - Builder for constructing an
EventBuswith custom configuration. - Failure
Policy - Policy controlling how handler failures are treated.
- Listener
Info - Information about a single registered listener, as reported by
BusStats. - NoRetry
Policy - Failure policy for handlers that do not support retries.
- Subscription
- Handle representing an active listener or middleware registration.
- Subscription
Guard - RAII guard that automatically unsubscribes a listener when dropped.
- Subscription
Id - Unique identifier for a listener or middleware registration.
- Sync
FnMode - Marker type that selects sync function dispatch for
IntoHandler. - Sync
Mode - Marker type that selects sync struct dispatch for
IntoHandler.
Enums§
- Config
Error - Specific reason why an
EventBusconfiguration is invalid. - Event
BusError - Errors returned by
EventBuspublish and shutdown operations. - Middleware
Decision - Decision returned by a middleware after inspecting an event.
- Retry
Strategy - Strategy for computing the delay between retry attempts.
Traits§
- Event
- Marker trait for all publishable event types.
- Event
Handler - Trait for asynchronous event handlers.
- Into
Failure Policy - Trait that converts a policy type into a
FailurePolicysuitable for the handler’s dispatch mode. - Into
Handler - Type-erases a concrete handler into the internal representation expected by the bus registry.
- Middleware
- Async middleware trait.
- Sync
Event Handler - Trait for synchronous event handlers.
- Sync
Middleware - Sync middleware trait.
- Typed
Middleware - Async middleware scoped to a specific event type
E. - Typed
Sync Middleware - Sync middleware scoped to a specific event type
E.
Type Aliases§
- Handler
Error - The error type returned by
EventHandler::handleandSyncEventHandler::handleon failure. - Handler
Result - The result type for handler methods.