Expand description
Event consumer mesh — the inbound half of the event architecture.
The transactional outbox (data::outbox) made producing events safe;
this module gives consuming them the same framework-grade guarantees,
with NestJS-style ergonomics:
ⓘ
pub struct OrderEventsConsumer;
#[EventConsumer]
impl OrderEventsConsumer {
#[EventPattern("order.confirmed")]
async fn on_confirmed(ctx: EventContext) -> Result<(), String> {
let evt: OrderConfirmed = ctx.payload()?;
ctx.inject::<InventoryService>().reserve(evt.order_id).await
}
}§Guarantees
- Frozen dispatch —
#[EventPattern]handlers register throughinventory(the same link-time mechanism as#[Controller]); the topic→handler map is built once at runtime start and never mutated. - At-least-once + dedupe — when an
IdempotencyStoreis in the DI container, eachidempotency_keyis consumed exactly once per TTL. - Bounded retries → DLQ — a failing message is
nacked and retried up tomax_retries, then dead-lettered with the failure reason; one poison message can never wedge a topic. - Trace continuity — the envelope’s
traceparent(stamped by the outbox producer) becomes this hop’s parent span; async hops chain in the trace UI instead of starting orphan roots.
The transport (Kafka / AMQP / NATS / in-process bridge) is app-provided
via MessageTransport — the same boundary rule as every other driver.
Re-exports§
pub use runtime::ConsumerRuntime;
Modules§
- event
- In-process async event bus — actually lock-free on the emit path.
- runtime
- Consumer runtime: poll → dedupe → dispatch → ack / retry / dead-letter.
Structs§
- Event
Context - What an
#[EventPattern]handler receives — DI + payload + trace, no HTTP types anywhere. - Event
Handler Descriptor - Static descriptor emitted by the
#[EventPattern]expansion and collected at link time — the runtime freezes these into its dispatch map at start. - Inbound
Message - One inbound message, transport-agnostic.
Enums§
- Event
Error - What the mesh should do with a failed message. Handlers may keep
returning
Result<(), String>(strings convert toEventError::Retry) or returnEventErrordirectly to decide the message’s fate:
Traits§
Type Aliases§
- BoxError
- App-provided broker adapter (Kafka / AMQP / NATS / in-process bridge).
Boxed error for infrastructure trait contracts (transports, sinks,
publishers).
Stringand&strconvert via.into(), so simple implementations stay simple while real ones keep their source chain.