Skip to main content

arcly_http/messaging/
mod.rs

1//! Event consumer mesh — the inbound half of the event architecture.
2//!
3//! The transactional outbox (`data::outbox`) made *producing* events safe;
4//! this module gives *consuming* them the same framework-grade guarantees,
5//! with NestJS-style ergonomics:
6//!
7//! ```ignore
8//! pub struct OrderEventsConsumer;
9//!
10//! #[EventConsumer]
11//! impl OrderEventsConsumer {
12//!     #[EventPattern("order.confirmed")]
13//!     async fn on_confirmed(ctx: EventContext) -> Result<(), String> {
14//!         let evt: OrderConfirmed = ctx.payload()?;
15//!         ctx.inject::<InventoryService>().reserve(evt.order_id).await
16//!     }
17//! }
18//! ```
19//!
20//! ## Guarantees
21//!
22//! - **Frozen dispatch** — `#[EventPattern]` handlers register through
23//!   `inventory` (the same link-time mechanism as `#[Controller]`); the
24//!   topic→handler map is built once at runtime start and never mutated.
25//! - **At-least-once + dedupe** — when an `IdempotencyStore` is in the DI
26//!   container, each `idempotency_key` is consumed exactly once per TTL.
27//! - **Bounded retries → DLQ** — a failing message is `nack`ed and retried
28//!   up to `max_retries`, then dead-lettered with the failure reason; one
29//!   poison message can never wedge a topic.
30//! - **Trace continuity** — the envelope's `traceparent` (stamped by the
31//!   outbox producer) becomes this hop's parent span; async hops chain in
32//!   the trace UI instead of starting orphan roots.
33//!
34//! The transport (Kafka / AMQP / NATS / in-process bridge) is app-provided
35//! via [`MessageTransport`] — the same boundary rule as every other driver.
36
37pub mod runtime;
38
39pub use runtime::ConsumerRuntime;
40
41use futures::future::BoxFuture;
42
43use crate::core::engine::FrozenDiContainer;
44use crate::observability::propagation::TraceContext;
45
46// ─── Envelope ─────────────────────────────────────────────────────────────────
47
48/// One inbound message, transport-agnostic.
49#[derive(Clone, Debug)]
50pub struct InboundMessage {
51    pub topic: String,
52    pub payload: serde_json::Value,
53    /// Consumers dedupe on this under at-least-once delivery.
54    pub idempotency_key: String,
55    pub tenant: Option<String>,
56    /// W3C trace context stamped by the producer (outbox), if any.
57    pub traceparent: Option<String>,
58}
59
60// ─── Transport contract ───────────────────────────────────────────────────────
61
62/// App-provided broker adapter (Kafka / AMQP / NATS / in-process bridge).
63pub trait MessageTransport: Send + Sync + 'static {
64    /// Pull up to `max` messages. An empty Vec means "nothing right now".
65    fn poll(&self, max: usize) -> BoxFuture<'_, Result<Vec<InboundMessage>, String>>;
66    /// Acknowledge successful processing (message will not be redelivered).
67    fn ack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), String>>;
68    /// Negative-ack: requeue for a later retry.
69    fn nack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), String>>;
70    /// Park a poison message out of band, with the final failure reason.
71    fn dead_letter<'a>(
72        &'a self,
73        msg: &'a InboundMessage,
74        reason: &'a str,
75    ) -> BoxFuture<'a, Result<(), String>>;
76}
77
78// ─── Consumer-side context ────────────────────────────────────────────────────
79
80/// What an `#[EventPattern]` handler receives — DI + payload + trace, no
81/// HTTP types anywhere.
82pub struct EventContext {
83    pub message: InboundMessage,
84    pub container: &'static FrozenDiContainer,
85    /// Continues the producer's trace (or a fresh root when none was carried).
86    pub trace: TraceContext,
87}
88
89impl EventContext {
90    /// Resolve a singleton service. O(1), no locks. Panics when absent.
91    #[inline]
92    pub fn inject<T: Send + Sync + 'static>(&self) -> &'static T {
93        self.container.get::<T>()
94    }
95
96    /// Non-panicking variant of [`Self::inject`].
97    #[inline]
98    pub fn try_inject<T: Send + Sync + 'static>(&self) -> Option<&'static T> {
99        self.container.try_get::<T>()
100    }
101
102    /// Deserialize the payload into a typed event.
103    pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, String> {
104        serde_json::from_value(self.message.payload.clone())
105            .map_err(|e| format!("payload decode failed: {e}"))
106    }
107
108    /// `traceparent` for forwarding to the next hop (HTTP call, next queue).
109    pub fn traceparent(&self) -> String {
110        self.trace.to_traceparent()
111    }
112}
113
114// ─── Handler registration (filled in by #[EventConsumer]) ────────────────────
115
116/// Static descriptor emitted by the `#[EventPattern]` expansion and collected
117/// at link time — the runtime freezes these into its dispatch map at start.
118pub struct EventHandlerDescriptor {
119    pub topic: &'static str,
120    pub consumer: &'static str,
121    pub handler: fn(EventContext) -> BoxFuture<'static, Result<(), String>>,
122}
123
124inventory::collect!(&'static EventHandlerDescriptor);