jaeb 0.2.3

simple actor based event bus
Documentation
jaeb-0.2.3 has been yanked.

JAEB - Just Another Event Bus

crates.io docs.rs license

In-process, actor-based event bus for Tokio applications.

JAEB provides:

  • sync + async listeners via a unified subscribe API
  • automatic dispatch-mode selection based on handler trait
  • explicit listener unsubscription via Subscription handles or RAII SubscriptionGuard
  • dependency injection via handler structs
  • retry policies with configurable strategy for async handlers
  • dead-letter stream for terminal failures
  • explicit Result-based error handling (no panics in the public API)
  • graceful shutdown with queue draining and in-flight task completion
  • idempotent shutdown (safe to call multiple times)
  • optional Prometheus-compatible metrics via the metrics crate
  • structured tracing with per-handler spans
  • #![forbid(unsafe_code)]

Installation

[dependencies]
jaeb = { version = "0.2.3" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

To enable metrics instrumentation:

[dependencies]
jaeb = { version = "0.2.3", features = ["metrics"] }

Quick Start

use std::time::Duration;

use jaeb::{
    DeadLetter, EventBus, EventBusError, EventHandler, FailurePolicy,
    HandlerResult, RetryStrategy, SyncEventHandler,
};

#[derive(Clone)]
struct OrderCheckoutEvent {
    order_id: i64,
}

struct AsyncCheckoutHandler;

impl EventHandler<OrderCheckoutEvent> for AsyncCheckoutHandler {
    async fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
        println!("async checkout {}", event.order_id);
        Ok(())
    }
}

struct SyncAuditHandler;

impl SyncEventHandler<OrderCheckoutEvent> for SyncAuditHandler {
    fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
        println!("sync audit {}", event.order_id);
        Ok(())
    }
}

struct DeadLetterLogger;

impl SyncEventHandler<DeadLetter> for DeadLetterLogger {
    fn handle(&self, dl: &DeadLetter) -> HandlerResult {
        eprintln!(
            "dead-letter: event={} listener={} attempts={} error={}",
            dl.event_name, dl.subscription_id, dl.attempts, dl.error
        );
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), EventBusError> {
    let bus = EventBus::new(64)?;

    let retry_policy = FailurePolicy::default()
        .with_max_retries(2)
        .with_retry_strategy(RetryStrategy::Fixed(Duration::from_millis(50)));

    // Async handler -- dispatch mode inferred from EventHandler impl
    let checkout_sub = bus
        .subscribe_with_policy(AsyncCheckoutHandler, retry_policy)
        .await?;

    // Sync handler -- dispatch mode inferred from SyncEventHandler impl
    let _audit_sub = bus.subscribe(SyncAuditHandler).await?;

    // Dead-letter handler (convenience method, auto-sets dead_letter: false)
    bus.subscribe_dead_letters(DeadLetterLogger).await?;

    bus.publish(OrderCheckoutEvent { order_id: 42 }).await?;
    bus.try_publish(OrderCheckoutEvent { order_id: 43 })?;

    checkout_sub.unsubscribe().await?;
    bus.shutdown().await?;
    Ok(())
}

API Overview

EventBus

  • EventBus::new(buffer) -> Result<EventBus, EventBusError> -- create a new bus with the given channel capacity
  • EventBus::builder() -- builder for fine-grained configuration (buffer size, timeouts, concurrency limits)
  • subscribe(handler) -> Result<Subscription, EventBusError> -- dispatch mode inferred from trait
  • subscribe_with_policy(handler, policy) -> Result<Subscription, EventBusError> -- policy type is compile-time checked (FailurePolicy for async, NoRetryPolicy for sync)
  • subscribe_dead_letters(handler) -> Result<Subscription, EventBusError>
  • publish(event) -> Result<(), EventBusError>
  • try_publish(event) -> Result<(), EventBusError> -- non-blocking, returns ChannelFull when queue is full
  • unsubscribe(subscription_id) -> Result<bool, EventBusError>
  • shutdown() -> Result<(), EventBusError> -- idempotent, drains queue + in-flight tasks
  • is_healthy() -> bool -- async, checks if the internal actor is still running

EventBus is Clone -- all clones share the same underlying actor.

Handler Traits

  • EventHandler<E> -- async handler, dispatched on a spawned task (requires E: Clone)
  • SyncEventHandler<E> -- sync handler, awaited inline during dispatch

The dispatch mode is selected automatically based on which trait is implemented. The IntoHandler<E, Mode> trait performs the conversion; the Mode parameter (AsyncMode / SyncMode) is inferred, so callers simply write bus.subscribe(handler).

Subscription & SubscriptionGuard

Subscription holds a SubscriptionId and a bus handle. Call subscription.unsubscribe() to remove the handler, or use bus.unsubscribe(id) directly.

SubscriptionGuard is an RAII wrapper that automatically unsubscribes the listener when dropped. Convert a Subscription via subscription.into_guard(). Call guard.disarm() to prevent the automatic unsubscribe.

Types

  • Event -- blanket trait implemented for all T: Send + Sync + 'static
  • HandlerResult = Result<(), Box<dyn Error + Send + Sync>>
  • FailurePolicy { max_retries, retry_strategy, dead_letter } -- for async handlers
  • NoRetryPolicy { dead_letter } -- for sync handlers (or async handlers that don't need retries)
  • IntoFailurePolicy<M> -- sealed trait enforcing compile-time policy/handler compatibility
  • DeadLetter { event_name, subscription_id, attempts, error }
  • SubscriptionId -- opaque handler ID (wraps u64)

Feature Flags

Flag Default Description
metrics off Enables Prometheus-compatible instrumentation via the metrics crate

When the metrics feature is enabled, the bus records:

  • eventbus.publish (counter, per event type)
  • eventbus.handler.duration (histogram, per event type)
  • eventbus.handler.error (counter, per event type)
  • eventbus.handler.join_error (counter, per event type)

Observability

JAEB uses the tracing crate throughout. Key spans and events:

  • event_bus.publish / event_bus.subscribe / event_bus.shutdown -- top-level operations
  • eventbus.handler span -- per-handler execution with event, mode, and listener_id fields
  • handler.retry (warn) -- logged on each retry attempt
  • handler.failed (error) -- logged when retries are exhausted
  • handler.join_error (error) -- logged when a spawned task panics

Semantics

  • publish waits for dispatch and sync listeners to finish.
  • publish does not wait for async listeners to finish.
  • async handler failures can be retried based on FailurePolicy.
  • sync handlers execute exactly once -- retries are not supported; passing a FailurePolicy with retries to a sync handler is a compile error via IntoFailurePolicy<M>.
  • after retries are exhausted (async) or on first failure (sync), dead-letter events are emitted when dead_letter: true.
  • dead-letter handlers themselves cannot trigger further dead letters (recursion guard).
  • shutdown drains queued publish messages and waits for in-flight async listeners.
  • after shutdown, all operations return EventBusError::ActorStopped.

Error Variants

  • EventBusError::ActorStopped -- the actor has shut down or the channel is closed
  • EventBusError::ChannelFull -- the internal queue is full (try_publish only)
  • EventBusError::InvalidConfig(ConfigError) -- invalid builder/constructor configuration (zero buffer size, zero concurrency)
  • EventBusError::MiddlewareRejected(String) -- a middleware rejected the event before it reached any listener
  • EventBusError::ShutdownTimeout -- the configured shutdown_timeout expired and in-flight async tasks were forcibly aborted

Examples

See examples/jaeb-demo for a working demo that includes:

  • async and sync handlers with retry policies
  • dead-letter logging
  • Prometheus metrics exporter
  • structured tracing setup

Run it with:

cd examples/jaeb-demo
RUST_LOG=info,jaeb=trace cargo run

Notes

  • JAEB requires a running Tokio runtime.
  • Events must be Send + Sync + 'static. Async handlers additionally require events to be Clone.
  • Events are in-process only (no persistence, replay, or broker integration).
  • The crate enforces #![forbid(unsafe_code)].

License

jaeb is distributed under the MIT License.

Copyright (c) 2025-2026 Linke Thomas

This project uses third-party libraries. See THIRD-PARTY-LICENSES for the full list of dependencies, their versions, and their respective license terms.