Skip to main content

Crate jaeb

Crate jaeb 

Source
Expand description

§JAEB - Just Another Event Bus

crates.io docs.rs CI MSRV license

In-process, snapshot-driven event bus for Tokio applications.

JAEB focuses on correctness and observability for monolith-style event-driven Rust services:

  • sync + async handlers behind one subscribe API
  • compile-time policy validation (retry policies cannot be used with sync handlers)
  • listener priority with FIFO stability for equal priorities
  • typed and global middleware
  • dead-letter stream with recursion guard
  • graceful shutdown with in-flight async drain
  • optional metrics (metrics feature) and built-in tracing
  • optional standalone macros (macros feature): #[handler] and register_handlers!
  • summer-rs integration via summer-jaeb and #[event_listener] macro support summer-jaeb-macros

§When to use JAEB

Use JAEB when you need:

  • domain events inside one process (e.g. OrderCreated -> projections, notifications, audit)
  • decoupled modules with type-safe fan-out
  • retry/dead-letter behavior per listener
  • deterministic sync-lane ordering with priority hints

JAEB is not a message broker. It does not provide persistence, replay, or cross-process delivery.

§Installation

[dependencies]
jaeb = "0.3.6"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

With metrics instrumentation:

[dependencies]
jaeb = { version = "0.3.6", features = ["metrics"] }

With standalone handler macros:

[dependencies]
jaeb = { version = "0.3.6", features = ["macros"] }

§Quick Start

use std::time::Duration;

use jaeb::{
    DeadLetter, EventBus, EventBusError, EventHandler, HandlerResult, RetryStrategy, SubscriptionPolicy, SyncEventHandler,
};

#[derive(Clone)]
struct OrderCheckoutEvent {
    order_id: i64,
}

struct AsyncCheckoutHandler;

impl EventHandler<OrderCheckoutEvent> for AsyncCheckoutHandler {
    async fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
        println!("async checkout {}", event.order_id);
        Ok(())
    }
}

struct SyncAuditHandler;

impl SyncEventHandler<OrderCheckoutEvent> for SyncAuditHandler {
    fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
        println!("sync audit {}", event.order_id);
        Ok(())
    }
}

struct DeadLetterLogger;

impl SyncEventHandler<DeadLetter> for DeadLetterLogger {
    fn handle(&self, dl: &DeadLetter) -> HandlerResult {
        eprintln!(
            "dead-letter: event={} listener={} attempts={} error={}",
            dl.event_name, dl.subscription_id, dl.attempts, dl.error
        );
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), EventBusError> {
    let bus = EventBus::new(64)?;

    let retry_policy = SubscriptionPolicy::default()
        .with_priority(10)
        .with_max_retries(2)
        .with_retry_strategy(RetryStrategy::Fixed(Duration::from_millis(50)));

    let checkout_sub = bus
        .subscribe_with_policy::<OrderCheckoutEvent, _, _>(AsyncCheckoutHandler, retry_policy)
        .await?;

    let _audit_sub = bus.subscribe::<OrderCheckoutEvent, _, _>(SyncAuditHandler).await?;
    let _dl_sub = bus.subscribe_dead_letters(DeadLetterLogger).await?;

    bus.publish(OrderCheckoutEvent { order_id: 42 }).await?;
    bus.try_publish(OrderCheckoutEvent { order_id: 43 })?;

    checkout_sub.unsubscribe().await?;
    bus.shutdown().await?;
    Ok(())
}

§Architecture

JAEB uses an immutable snapshot registry (ArcSwap) for hot-path reads:

publish(event)
  -> load snapshot (lock-free)
  -> global middleware
  -> typed middleware
  -> async lane (spawned)
  -> sync lane (serialized FIFO, priority-ordered)
  • async and sync listeners are separated per event type
  • priority is applied per lane (higher first)
  • equal priority preserves registration order

§API Highlights

  • EventBus::builder() for buffer size, timeouts, concurrency limit, and default policy
  • default_subscription_policy(SubscriptionPolicy) sets fallback policy for subscribe
  • subscribe_with_policy(handler, policy) accepts:
    • SubscriptionPolicy for async handlers
    • SyncSubscriptionPolicy for sync handlers and once handlers
  • publish waits for sync listeners and task-spawn for async listeners
  • try_publish is non-blocking and returns EventBusError::ChannelFull on saturation

Core policy types:

  • SubscriptionPolicy { priority, max_retries, retry_strategy, dead_letter }
  • SyncSubscriptionPolicy { priority, dead_letter }
  • IntoSubscriptionPolicy<M> sealed trait for compile-time mode/policy safety

Backward-compatible aliases remain available (deprecated):

  • FailurePolicy -> SubscriptionPolicy
  • NoRetryPolicy -> SyncSubscriptionPolicy
  • IntoFailurePolicy -> IntoSubscriptionPolicy

§Examples

  • examples/basic-pubsub - minimal publish/subscribe
  • examples/sync-handler - sync dispatch lane behavior
  • examples/closure-handlers - closure-based handlers
  • examples/retry-strategies - fixed/exponential/jitter retry configuration
  • examples/dead-letters - dead-letter subscription and inspection
  • examples/middleware - global and typed middleware
  • examples/backpressure - try_publish saturation behavior
  • examples/concurrency-limit - max concurrent async handlers
  • examples/graceful-shutdown - controlled shutdown and draining
  • examples/introspection - EventBus::stats() output
  • examples/axum-integration - axum REST app publishing domain events
  • examples/macro-handlers - standalone #[handler] + register_handlers!
  • examples/macro-handlers-auto - standalone #[handler] auto-discovery with register_handlers!(bus)
  • examples/jaeb-demo - full demo with tracing + metrics exporter
  • examples/summer-jaeb-demo - summer-rs plugin + #[event_listener]

Run an example:

cargo run -p axum-integration

§Feature Flags

FlagDefaultDescription
macrosoffRe-exports #[handler] and register_handlers!
metricsoffEnables Prometheus-compatible instrumentation via metrics
test-utilsoffExposes TestBus helpers for integration tests

When metrics is enabled, JAEB records:

  • eventbus.publish (counter, per event type)
  • eventbus.handler.duration (histogram, per event type)
  • eventbus.handler.error (counter, per event type)
  • eventbus.handler.join_error (counter, per event type)

§summer-rs Integration

Use summer-jaeb and summer-jaeb-macros for plugin-based auto-registration via #[event_listener].

Macro support includes:

  • retries
  • retry_strategy
  • retry_base_ms
  • retry_max_ms
  • dead_letter
  • priority
  • name

§Standalone Macros

Enable the macros feature to use #[handler] and register_handlers! without summer-rs.

The #[handler] macro generates a struct named <FunctionName>Handler and an async register(&EventBus) method. Policy attributes are supported:

  • retries
  • retry_strategy
  • retry_base_ms
  • retry_max_ms
  • dead_letter
  • priority
  • name

§Notes

  • JAEB requires a running Tokio runtime.
  • Events must be Send + Sync + 'static; async handlers also require Clone.
  • The crate enforces #![forbid(unsafe_code)].

§License

jaeb is distributed under the MIT License.

Copyright (c) 2025-2026 Linke Thomas

This project uses third-party libraries. See THIRD-PARTY-LICENSES for dependency and license details.

Structs§

AsyncFnMode
Marker type that selects async function dispatch for IntoHandler.
AsyncMode
Marker type that selects async struct dispatch for IntoHandler.
BusStats
A point-in-time snapshot of the event bus internal state.
DeadLetter
A dead-letter record emitted when a handler exhausts all retry attempts.
EventBus
The central in-process event bus.
EventBusBuilder
Builder for constructing an EventBus with custom configuration.
HandlerInfo
Information about a single registered handler, as reported by BusStats.
Subscription
Handle representing an active listener or middleware registration.
SubscriptionGuard
RAII guard that automatically unsubscribes a listener when dropped.
SubscriptionId
Unique identifier for a listener or middleware registration.
SubscriptionPolicy
Policy controlling how a subscription is scheduled and how failures are treated.
SyncFnMode
Marker type that selects sync function dispatch for IntoHandler.
SyncMode
Marker type that selects sync struct dispatch for IntoHandler.
SyncSubscriptionPolicy
Subscription policy for handlers that do not support retries.

Enums§

ConfigError
Specific reason why an EventBus configuration is invalid.
EventBusError
Errors returned by EventBus publish and shutdown operations.
MiddlewareDecision
Decision returned by a middleware after inspecting an event.
RetryStrategy
Strategy for computing the delay between retry attempts.

Traits§

Event
Marker trait for all publishable event types.
EventHandler
Trait for asynchronous event handlers.
IntoFailurePolicy
Trait that converts a policy type into a SubscriptionPolicy suitable for the handler’s dispatch mode.
IntoHandler
Type-erases a concrete handler into the internal representation expected by the bus registry.
IntoSubscriptionPolicy
Trait that converts a policy type into a SubscriptionPolicy suitable for the handler’s dispatch mode.
Middleware
Async middleware trait.
SyncEventHandler
Trait for synchronous event handlers.
SyncMiddleware
Sync middleware trait.
TypedMiddleware
Async middleware scoped to a specific event type E.
TypedSyncMiddleware
Sync middleware scoped to a specific event type E.

Type Aliases§

FailurePolicyDeprecated
HandlerError
The error type returned by EventHandler::handle and SyncEventHandler::handle on failure.
HandlerResult
The result type for handler methods.
ListenerInfoDeprecated
NoRetryPolicyDeprecated