jaeb 0.2.2

simple actor based event bus
Documentation
# JAEB - Just Another Event Bus

[![crates.io](https://img.shields.io/crates/v/jaeb.svg)](https://crates.io/crates/jaeb)
[![docs.rs](https://docs.rs/jaeb/badge.svg)](https://docs.rs/jaeb)
[![license](https://img.shields.io/crates/l/jaeb.svg)](https://github.com/LinkeTh/jaeb/blob/main/LICENSE)

In-process, actor-based event bus for Tokio applications.

JAEB provides:

- sync + async listeners via a unified `subscribe` API
- automatic dispatch-mode selection based on handler trait
- explicit listener unsubscription via `Subscription` handles or RAII `SubscriptionGuard`
- dependency injection via handler structs
- retry policies with configurable delay for async handlers
- dead-letter stream for terminal failures
- explicit `Result`-based error handling (no panics in the public API)
- graceful shutdown with queue draining and in-flight task completion
- idempotent shutdown (safe to call multiple times)
- optional Prometheus-compatible metrics via the `metrics` crate
- structured tracing with per-handler spans
- `#![forbid(unsafe_code)]`

## Installation

```toml
[dependencies]
jaeb = { version = "0.2.2" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```

To enable metrics instrumentation:

```toml
[dependencies]
jaeb = { version = "0.2.2", features = ["metrics"] }
```

## Quick Start

```rust
use std::time::Duration;

use jaeb::{
    DeadLetter, EventBus, EventBusError, EventHandler, FailurePolicy,
    HandlerResult, SyncEventHandler,
};

#[derive(Clone)]
struct OrderCheckoutEvent {
    order_id: i64,
}

struct AsyncCheckoutHandler;

impl EventHandler<OrderCheckoutEvent> for AsyncCheckoutHandler {
    async fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
        println!("async checkout {}", event.order_id);
        Ok(())
    }
}

struct SyncAuditHandler;

impl SyncEventHandler<OrderCheckoutEvent> for SyncAuditHandler {
    fn handle(&self, event: &OrderCheckoutEvent) -> HandlerResult {
        println!("sync audit {}", event.order_id);
        Ok(())
    }
}

struct DeadLetterLogger;

impl SyncEventHandler<DeadLetter> for DeadLetterLogger {
    fn handle(&self, dl: &DeadLetter) -> HandlerResult {
        eprintln!(
            "dead-letter: event={} listener={} attempts={} error={}",
            dl.event_name, dl.subscription_id, dl.attempts, dl.error
        );
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), EventBusError> {
    let bus = EventBus::new(64)?;

    let retry_policy = FailurePolicy::default()
        .with_max_retries(2)
        .with_retry_delay(Duration::from_millis(50));

    // Async handler -- dispatch mode inferred from EventHandler impl
    let checkout_sub = bus
        .subscribe_with_policy(AsyncCheckoutHandler, retry_policy)
        .await?;

    // Sync handler -- dispatch mode inferred from SyncEventHandler impl
    let _audit_sub = bus.subscribe(SyncAuditHandler).await?;

    // Dead-letter handler (convenience method, auto-sets dead_letter: false)
    bus.subscribe_dead_letters(DeadLetterLogger).await?;

    bus.publish(OrderCheckoutEvent { order_id: 42 }).await?;
    bus.try_publish(OrderCheckoutEvent { order_id: 43 })?;

    checkout_sub.unsubscribe().await?;
    bus.shutdown().await?;
    Ok(())
}
```

## API Overview

### EventBus

- `EventBus::new(buffer) -> Result<EventBus, EventBusError>` -- create a new bus with the given channel capacity
- `EventBus::builder()` -- builder for fine-grained configuration (buffer size, timeouts, concurrency limits)
- `subscribe(handler) -> Result<Subscription, EventBusError>` -- dispatch mode inferred from trait
- `subscribe_with_policy(handler, FailurePolicy) -> Result<Subscription, EventBusError>`
- `subscribe_dead_letters(handler) -> Result<Subscription, EventBusError>`
- `publish(event) -> Result<(), EventBusError>`
- `try_publish(event) -> Result<(), EventBusError>` -- non-blocking, returns `ChannelFull` when queue is full
- `unsubscribe(subscription_id) -> Result<bool, EventBusError>`
- `shutdown() -> Result<(), EventBusError>` -- idempotent, drains queue + in-flight tasks
- `is_healthy() -> bool` -- **async**, checks if the internal actor is still running

`EventBus` is `Clone` -- all clones share the same underlying actor.

### Handler Traits

- `EventHandler<E>` -- async handler, dispatched on a spawned task (requires `E: Clone`)
- `SyncEventHandler<E>` -- sync handler, awaited inline during dispatch

The dispatch mode is selected automatically based on which trait is implemented.
The `IntoHandler<E, Mode>` trait performs the conversion; the `Mode` parameter
(`AsyncMode` / `SyncMode`) is inferred, so callers simply write `bus.subscribe(handler)`.

### Subscription & SubscriptionGuard

`Subscription` holds a `SubscriptionId` and a bus handle. Call `subscription.unsubscribe()`
to remove the handler, or use `bus.unsubscribe(id)` directly.

`SubscriptionGuard` is an RAII wrapper that automatically unsubscribes the listener when
dropped. Convert a `Subscription` via `subscription.into_guard()`. Call `guard.disarm()`
to prevent the automatic unsubscribe.

### Types

- `Event` -- blanket trait implemented for all `T: Send + Sync + 'static`
- `HandlerResult = Result<(), Box<dyn Error + Send + Sync>>`
- `FailurePolicy { max_retries, retry_delay, dead_letter }`
- `DeadLetter { event_name, subscription_id, attempts, error }`
- `SubscriptionId` -- opaque handler ID (wraps `u64`)

## Feature Flags

| Flag      | Default | Description                                                           |
|-----------|---------|-----------------------------------------------------------------------|
| `metrics` | off     | Enables Prometheus-compatible instrumentation via the `metrics` crate |

When the `metrics` feature is enabled, the bus records:

- `eventbus.publish` (counter, per event type)
- `eventbus.handler.duration` (histogram, per event type)
- `eventbus.handler.error` (counter, per event type)
- `eventbus.handler.join_error` (counter, per event type)

## Observability

JAEB uses the `tracing` crate throughout. Key spans and events:

- `event_bus.publish` / `event_bus.subscribe` / `event_bus.shutdown` -- top-level operations
- `eventbus.handler` span -- per-handler execution with `event`, `mode`, and `listener_id` fields
- `handler.retry` (warn) -- logged on each retry attempt
- `handler.failed` (error) -- logged when retries are exhausted
- `handler.join_error` (error) -- logged when a spawned task panics

## Semantics

- `publish` waits for dispatch and sync listeners to finish.
- `publish` does **not** wait for async listeners to finish.
- async handler failures can be retried based on `FailurePolicy`.
- sync handlers execute exactly once -- retries are not supported (`SyncRetryNotSupported`).
- after retries are exhausted (async) or on first failure (sync), dead-letter events are emitted when `dead_letter: true`.
- dead-letter handlers themselves cannot trigger further dead letters (recursion guard).
- `shutdown` drains queued publish messages and waits for in-flight async listeners.
- after shutdown, all operations return `EventBusError::ActorStopped`.

## Error Variants

- `EventBusError::ActorStopped` -- the actor has shut down or the channel is closed
- `EventBusError::ChannelFull` -- the internal queue is full (`try_publish` only)
- `EventBusError::InvalidConfig(ConfigError)` -- invalid builder/constructor configuration (zero buffer size, zero concurrency)
- `EventBusError::ShutdownTimeout` -- the configured `shutdown_timeout` expired and in-flight async tasks were forcibly aborted
- `EventBusError::SyncRetryNotSupported` -- a sync handler was subscribed with `max_retries > 0` (retries are async-only)

## Examples

See [`examples/jaeb-demo`](examples/jaeb-demo) for a working demo that includes:

- async and sync handlers with retry policies
- dead-letter logging
- Prometheus metrics exporter
- structured tracing setup

Run it with:

```sh
cd examples/jaeb-demo
RUST_LOG=info,jaeb=trace cargo run
```

## Notes

- JAEB requires a running Tokio runtime.
- Events must be `Send + Sync + 'static`. Async handlers additionally require events to be `Clone`.
- Events are in-process only (no persistence, replay, or broker integration).
- The crate enforces `#![forbid(unsafe_code)]`.

## License

jaeb is distributed under the [MIT License](LICENSE).

Copyright (c) 2025-2026 Linke Thomas

This project uses third-party libraries. See [THIRD-PARTY-LICENSES](THIRD-PARTY-LICENSES)
for the full list of dependencies, their versions, and their respective license terms.