arcly-http 0.1.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Event consumer mesh — the inbound half of the event architecture.
//!
//! The transactional outbox (`data::outbox`) made *producing* events safe;
//! this module gives *consuming* them the same framework-grade guarantees,
//! with NestJS-style ergonomics:
//!
//! ```ignore
//! pub struct OrderEventsConsumer;
//!
//! #[EventConsumer]
//! impl OrderEventsConsumer {
//!     #[EventPattern("order.confirmed")]
//!     async fn on_confirmed(ctx: EventContext) -> Result<(), String> {
//!         let evt: OrderConfirmed = ctx.payload()?;
//!         ctx.inject::<InventoryService>().reserve(evt.order_id).await
//!     }
//! }
//! ```
//!
//! ## Guarantees
//!
//! - **Frozen dispatch** — `#[EventPattern]` handlers register through
//!   `inventory` (the same link-time mechanism as `#[Controller]`); the
//!   topic→handler map is built once at runtime start and never mutated.
//! - **At-least-once + dedupe** — when an `IdempotencyStore` is in the DI
//!   container, each `idempotency_key` is consumed exactly once per TTL.
//! - **Bounded retries → DLQ** — a failing message is `nack`ed and retried
//!   up to `max_retries`, then dead-lettered with the failure reason; one
//!   poison message can never wedge a topic.
//! - **Trace continuity** — the envelope's `traceparent` (stamped by the
//!   outbox producer) becomes this hop's parent span; async hops chain in
//!   the trace UI instead of starting orphan roots.
//!
//! The transport (Kafka / AMQP / NATS / in-process bridge) is app-provided
//! via [`MessageTransport`] — the same boundary rule as every other driver.

pub mod runtime;

pub use runtime::ConsumerRuntime;

use futures::future::BoxFuture;

use crate::core::engine::FrozenDiContainer;
use crate::observability::propagation::TraceContext;

// ─── Envelope ─────────────────────────────────────────────────────────────────

/// One inbound message, transport-agnostic.
#[derive(Clone, Debug)]
pub struct InboundMessage {
    pub topic: String,
    pub payload: serde_json::Value,
    /// Consumers dedupe on this under at-least-once delivery.
    pub idempotency_key: String,
    pub tenant: Option<String>,
    /// W3C trace context stamped by the producer (outbox), if any.
    pub traceparent: Option<String>,
}

// ─── Transport contract ───────────────────────────────────────────────────────

/// App-provided broker adapter (Kafka / AMQP / NATS / in-process bridge).
pub trait MessageTransport: Send + Sync + 'static {
    /// Pull up to `max` messages. An empty Vec means "nothing right now".
    fn poll(&self, max: usize) -> BoxFuture<'_, Result<Vec<InboundMessage>, String>>;
    /// Acknowledge successful processing (message will not be redelivered).
    fn ack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), String>>;
    /// Negative-ack: requeue for a later retry.
    fn nack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), String>>;
    /// Park a poison message out of band, with the final failure reason.
    fn dead_letter<'a>(
        &'a self,
        msg: &'a InboundMessage,
        reason: &'a str,
    ) -> BoxFuture<'a, Result<(), String>>;
}

// ─── Consumer-side context ────────────────────────────────────────────────────

/// What an `#[EventPattern]` handler receives — DI + payload + trace, no
/// HTTP types anywhere.
pub struct EventContext {
    pub message: InboundMessage,
    pub container: &'static FrozenDiContainer,
    /// Continues the producer's trace (or a fresh root when none was carried).
    pub trace: TraceContext,
}

impl EventContext {
    /// Resolve a singleton service. O(1), no locks. Panics when absent.
    #[inline]
    pub fn inject<T: Send + Sync + 'static>(&self) -> &'static T {
        self.container.get::<T>()
    }

    /// Non-panicking variant of [`Self::inject`].
    #[inline]
    pub fn try_inject<T: Send + Sync + 'static>(&self) -> Option<&'static T> {
        self.container.try_get::<T>()
    }

    /// Deserialize the payload into a typed event.
    pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, String> {
        serde_json::from_value(self.message.payload.clone())
            .map_err(|e| format!("payload decode failed: {e}"))
    }

    /// `traceparent` for forwarding to the next hop (HTTP call, next queue).
    pub fn traceparent(&self) -> String {
        self.trace.to_traceparent()
    }
}

// ─── Handler registration (filled in by #[EventConsumer]) ────────────────────

/// Static descriptor emitted by the `#[EventPattern]` expansion and collected
/// at link time — the runtime freezes these into its dispatch map at start.
pub struct EventHandlerDescriptor {
    pub topic: &'static str,
    pub consumer: &'static str,
    pub handler: fn(EventContext) -> BoxFuture<'static, Result<(), String>>,
}

inventory::collect!(&'static EventHandlerDescriptor);