Skip to main content

Crate jaeb

Crate jaeb 

Source
Expand description

§JAEB - Just Another Event Bus

crates.io docs.rs license

In-process, snapshot-driven event bus for Tokio applications.

JAEB provides:

  • sync + async listeners via a unified subscribe API
  • automatic dispatch-mode selection based on handler trait
  • explicit listener unsubscription via Subscription handles or RAII SubscriptionGuard
  • dependency injection via handler structs
  • retry policies with configurable strategy for async handlers
  • dead-letter stream for terminal failures
  • explicit Result-based error handling
  • graceful shutdown with in-flight task completion
  • idempotent shutdown
  • optional Prometheus-compatible metrics via the metrics crate
  • structured tracing with per-handler spans
  • summer-rs integration via summer-jaeb and #[event_listener] macro support summer-jaeb-macros

§Installation

[dependencies]
jaeb = { version = "0.3.2" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

To enable metrics instrumentation:

[dependencies]
jaeb = { version = "0.3.2", features = ["metrics"] }

§Quick Start

use std::time::Duration;

use jaeb::{
    DeadLetter, EventBus, EventBusError, EventHandler, FailurePolicy,
    HandlerResult, RetryStrategy, SyncEventHandler,
};

#[derive(Clone)]
struct OrderCheckoutEvent {
    order_id: i64,
}

struct AsyncCheckoutHandler;

impl EventHandler<OrderCheckoutEvent> for AsyncCheckoutHandler {
    async fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
        println!("async checkout {}", event.order_id);
        Ok(())
    }
}

struct SyncAuditHandler;

impl SyncEventHandler<OrderCheckoutEvent> for SyncAuditHandler {
    fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
        println!("sync audit {}", event.order_id);
        Ok(())
    }
}

struct DeadLetterLogger;

impl SyncEventHandler<DeadLetter> for DeadLetterLogger {
    fn handle(&self, dl: &DeadLetter) -> HandlerResult {
        eprintln!(
            "dead-letter: event={} listener={} attempts={} error={}",
            dl.event_name, dl.subscription_id, dl.attempts, dl.error
        );
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), EventBusError> {
    let bus = EventBus::new(64)?;

    let retry_policy = FailurePolicy::default()
        .with_max_retries(2)
        .with_retry_strategy(RetryStrategy::Fixed(Duration::from_millis(50)));

    // Async handler -- dispatch mode inferred from EventHandler impl
    let checkout_sub = bus
        .subscribe_with_policy(AsyncCheckoutHandler, retry_policy)
        .await?;

    // Sync handler -- dispatch mode inferred from SyncEventHandler impl
    let _audit_sub = bus.subscribe(SyncAuditHandler).await?;

    // Dead-letter handler (convenience method, auto-sets dead_letter: false)
    bus.subscribe_dead_letters(DeadLetterLogger).await?;

    bus.publish(OrderCheckoutEvent { order_id: 42 }).await?;
    bus.try_publish(OrderCheckoutEvent { order_id: 43 })?;

    checkout_sub.unsubscribe().await?;
    bus.shutdown().await?;
    Ok(())
}

§API Overview

§EventBus

  • EventBus::new(buffer) -> Result<EventBus, EventBusError> – create a new bus with the given channel capacity
  • EventBus::builder() – builder for fine-grained configuration (buffer size, timeouts, concurrency limits)
  • subscribe(handler) -> Result<Subscription, EventBusError> – dispatch mode inferred from trait
  • subscribe_with_policy(handler, policy) -> Result<Subscription, EventBusError> – policy type is compile-time checked (FailurePolicy for async, NoRetryPolicy for sync)
  • subscribe_dead_letters(handler) -> Result<Subscription, EventBusError>
  • publish(event) -> Result<(), EventBusError>
  • try_publish(event) -> Result<(), EventBusError> – non-blocking, returns ChannelFull when immediate dispatch capacity is unavailable
  • unsubscribe(subscription_id) -> Result<bool, EventBusError>
  • shutdown() -> Result<(), EventBusError> – idempotent, drains in-flight tasks
  • async fn is_healthy() -> bool – checks if the internal control loop is still running

EventBus is Clone – all clones share the same underlying runtime state.

§Handler Traits

  • EventHandler<E> – async handler, dispatched on a spawned task (requires E: Clone)
  • SyncEventHandler<E> – sync handler, awaited inline during dispatch

The dispatch mode is selected automatically based on which trait is implemented. The IntoHandler<E, Mode> trait performs the conversion; the Mode parameter (AsyncMode / SyncMode) is inferred, so callers simply write bus.subscribe(handler).

§Subscription & SubscriptionGuard

Subscription holds a SubscriptionId and a bus handle. Call subscription.unsubscribe() to remove the handler, or use bus.unsubscribe(id) directly.

SubscriptionGuard is an RAII wrapper that automatically unsubscribes the listener when dropped. Convert a Subscription via subscription.into_guard(). Call guard.disarm() to prevent the automatic unsubscribe.

§Types

  • Event – blanket trait implemented for all T: Send + Sync + 'static
  • HandlerResult = Result<(), Box<dyn Error + Send + Sync>>
  • FailurePolicy { max_retries, retry_strategy, dead_letter } – for async handlers
  • NoRetryPolicy { dead_letter } – for sync handlers (or async handlers that don’t need retries)
  • IntoFailurePolicy<M> – sealed trait enforcing compile-time policy/handler compatibility
  • DeadLetter { event_name, subscription_id, attempts, error, event, failed_at, listener_name }
  • SubscriptionId – opaque handler ID (wraps u64)

§Feature Flags

FlagDefaultDescription
metricsoffEnables Prometheus-compatible instrumentation via the metrics crate

When the metrics feature is enabled, the bus records:

  • eventbus.publish (counter, per event type)
  • eventbus.handler.duration (histogram, per event type)
  • eventbus.handler.error (counter, per event type)
  • eventbus.handler.join_error (counter, per event type)

§Observability

JAEB uses the tracing crate throughout. Key spans and events:

  • event_bus.publish / event_bus.subscribe / event_bus.shutdown – top-level operations
  • eventbus.handler span – per-handler execution with event, mode, and listener_id fields
  • handler.retry (warn) – logged on each retry attempt
  • handler.failed (error) – logged when retries are exhausted
  • async handler panics are surfaced as task failures and follow retry/dead-letter policy

§Architecture

JAEB uses a split control/data architecture:

  • A snapshot registry (ArcSwap<RegistrySnapshot>) stores listeners and middleware in immutable per-type slots for low-overhead publish-path reads.
  • A lightweight control loop handles async failure notifications, dead-letter routing, and shutdown coordination.

Dispatch uses two lanes per event type:

  • sync lane: serialized by a per-type gate (FIFO for sync dispatch)
  • async lane: spawned in background and not blocked by sync backlog

§Semantics

  • publish waits for dispatch and sync listeners to finish.
  • publish does not wait for async listeners to finish.
  • async handler failures can be retried based on FailurePolicy.
  • sync handlers execute exactly once – retries are not supported; passing a FailurePolicy with retries to a sync handler is a compile error via IntoFailurePolicy<M>.
  • after retries are exhausted (async) or on first failure (sync), dead-letter events are emitted when dead_letter: true.
  • dead-letter handlers themselves cannot trigger further dead letters (recursion guard).
  • shutdown waits for in-flight async listeners (with optional timeout).
  • after shutdown, all operations return EventBusError::Stopped.

§Error Variants

  • EventBusError::Stopped – shutdown has started or the bus is stopped
  • EventBusError::ChannelFull – publish saturation limit reached (try_publish only)
  • EventBusError::InvalidConfig(ConfigError) – invalid builder/constructor configuration (zero buffer size, zero concurrency)
  • EventBusError::MiddlewareRejected(String) – a middleware rejected the event before it reached any listener
  • EventBusError::ShutdownTimeout – the configured shutdown_timeout expired and in-flight async tasks were forcibly aborted

§Examples

See examples/jaeb-demo for a working demo that includes:

  • async and sync handlers with retry policies
  • dead-letter logging
  • Prometheus metrics exporter
  • structured tracing setup

Run it with:

cd examples/jaeb-demo
RUST_LOG=info,jaeb=trace cargo run

§Notes

  • JAEB requires a running Tokio runtime.
  • Events must be Send + Sync + 'static. Async handlers additionally require events to be Clone.
  • Events are in-process only (no persistence, replay, or broker integration).
  • The crate enforces #![forbid(unsafe_code)].

§License

jaeb is distributed under the MIT License.

Copyright (c) 2025-2026 Linke Thomas

This project uses third-party libraries. See THIRD-PARTY-LICENSES for the full list of dependencies, their versions, and their respective license terms.

Structs§

AsyncFnMode
Marker type that selects async function dispatch for IntoHandler.
AsyncMode
Marker type that selects async struct dispatch for IntoHandler.
BusStats
A point-in-time snapshot of the event bus internal state.
DeadLetter
A dead-letter record emitted when a handler exhausts all retry attempts.
EventBus
The central in-process event bus.
EventBusBuilder
Builder for constructing an EventBus with custom configuration.
FailurePolicy
Policy controlling how handler failures are treated.
ListenerInfo
Information about a single registered listener, as reported by BusStats.
NoRetryPolicy
Failure policy for handlers that do not support retries.
Subscription
Handle representing an active listener or middleware registration.
SubscriptionGuard
RAII guard that automatically unsubscribes a listener when dropped.
SubscriptionId
Unique identifier for a listener or middleware registration.
SyncFnMode
Marker type that selects sync function dispatch for IntoHandler.
SyncMode
Marker type that selects sync struct dispatch for IntoHandler.

Enums§

ConfigError
Specific reason why an EventBus configuration is invalid.
EventBusError
Errors returned by EventBus publish and shutdown operations.
MiddlewareDecision
Decision returned by a middleware after inspecting an event.
RetryStrategy
Strategy for computing the delay between retry attempts.

Traits§

Event
Marker trait for all publishable event types.
EventHandler
Trait for asynchronous event handlers.
IntoFailurePolicy
Trait that converts a policy type into a FailurePolicy suitable for the handler’s dispatch mode.
IntoHandler
Type-erases a concrete handler into the internal representation expected by the bus registry.
Middleware
Async middleware trait.
SyncEventHandler
Trait for synchronous event handlers.
SyncMiddleware
Sync middleware trait.
TypedMiddleware
Async middleware scoped to a specific event type E.
TypedSyncMiddleware
Sync middleware scoped to a specific event type E.

Type Aliases§

HandlerError
The error type returned by EventHandler::handle and SyncEventHandler::handle on failure.
HandlerResult
The result type for handler methods.