arcly_http/messaging/mod.rs
1//! Event consumer mesh — the inbound half of the event architecture.
2//!
3//! The transactional outbox (`data::outbox`) made *producing* events safe;
4//! this module gives *consuming* them the same framework-grade guarantees,
5//! with NestJS-style ergonomics:
6//!
7//! ```ignore
8//! pub struct OrderEventsConsumer;
9//!
10//! #[EventConsumer]
11//! impl OrderEventsConsumer {
12//! #[EventPattern("order.confirmed")]
13//! async fn on_confirmed(ctx: EventContext) -> Result<(), String> {
14//! let evt: OrderConfirmed = ctx.payload()?;
15//! ctx.inject::<InventoryService>().reserve(evt.order_id).await
16//! }
17//! }
18//! ```
19//!
20//! ## Guarantees
21//!
22//! - **Frozen dispatch** — `#[EventPattern]` handlers register through
23//! `inventory` (the same link-time mechanism as `#[Controller]`); the
24//! topic→handler map is built once at runtime start and never mutated.
25//! - **At-least-once + dedupe** — when an `IdempotencyStore` is in the DI
26//! container, each `idempotency_key` is consumed exactly once per TTL.
27//! - **Bounded retries → DLQ** — a failing message is `nack`ed and retried
28//! up to `max_retries`, then dead-lettered with the failure reason; one
29//! poison message can never wedge a topic.
30//! - **Trace continuity** — the envelope's `traceparent` (stamped by the
31//! outbox producer) becomes this hop's parent span; async hops chain in
32//! the trace UI instead of starting orphan roots.
33//!
34//! The transport (Kafka / AMQP / NATS / in-process bridge) is app-provided
35//! via [`MessageTransport`] — the same boundary rule as every other driver.
36
37pub mod event;
38pub mod runtime;
39
40pub use runtime::ConsumerRuntime;
41
42use futures::future::BoxFuture;
43
44use crate::core::engine::FrozenDiContainer;
45use crate::observability::propagation::TraceContext;
46
47// ─── Envelope ─────────────────────────────────────────────────────────────────
48
49/// One inbound message, transport-agnostic.
50#[derive(Clone, Debug)]
51pub struct InboundMessage {
52 pub topic: String,
53 pub payload: serde_json::Value,
54 /// Consumers dedupe on this under at-least-once delivery.
55 pub idempotency_key: String,
56 pub tenant: Option<String>,
57 /// W3C trace context stamped by the producer (outbox), if any.
58 pub traceparent: Option<String>,
59}
60
61// ─── Transport contract ───────────────────────────────────────────────────────
62
63/// App-provided broker adapter (Kafka / AMQP / NATS / in-process bridge).
64/// Boxed error for infrastructure trait contracts (transports, sinks,
65/// publishers). `String` and `&str` convert via `.into()`, so simple
66/// implementations stay simple while real ones keep their source chain.
67pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
68
69pub trait MessageTransport: Send + Sync + 'static {
70 /// Pull up to `max` messages. An empty Vec means "nothing right now".
71 fn poll(&self, max: usize) -> BoxFuture<'_, Result<Vec<InboundMessage>, BoxError>>;
72 /// Acknowledge successful processing (message will not be redelivered).
73 fn ack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), BoxError>>;
74 /// Negative-ack: requeue for a later retry.
75 fn nack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), BoxError>>;
76 /// Park a poison message out of band, with the final failure reason.
77 fn dead_letter<'a>(
78 &'a self,
79 msg: &'a InboundMessage,
80 reason: &'a str,
81 ) -> BoxFuture<'a, Result<(), BoxError>>;
82}
83
84// ─── Consumer-side context ────────────────────────────────────────────────────
85
86/// What an `#[EventPattern]` handler receives — DI + payload + trace, no
87/// HTTP types anywhere.
88#[non_exhaustive]
89pub struct EventContext {
90 pub message: InboundMessage,
91 pub container: &'static FrozenDiContainer,
92 /// Continues the producer's trace (or a fresh root when none was carried).
93 pub trace: TraceContext,
94 /// The envelope's tenant, resolved + validated against the same
95 /// `TenantRegistry` as HTTP traffic (suspended tenants never reach
96 /// handlers — their events are dead-lettered upstream).
97 pub tenant: Option<std::sync::Arc<crate::web::tenant::TenantConfig>>,
98}
99
100impl EventContext {
101 /// Resolve a singleton service. O(1), no locks. Panics when absent.
102 #[inline]
103 pub fn inject<T: Send + Sync + 'static>(&self) -> &'static T {
104 self.container.get::<T>()
105 }
106
107 /// Non-panicking variant of [`Self::inject`].
108 #[inline]
109 pub fn try_inject<T: Send + Sync + 'static>(&self) -> Option<&'static T> {
110 self.container.try_get::<T>()
111 }
112
113 /// Deserialize the payload into a typed event.
114 ///
115 /// A malformed payload is permanent by definition, so the error is
116 /// already [`EventError::DeadLetter`] — `ctx.payload()?` does the right
117 /// thing without manual mapping.
118 pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, EventError> {
119 // By-ref deserialization — no clone of the (possibly large) payload.
120 serde::Deserialize::deserialize(&self.message.payload)
121 .map_err(|e| EventError::DeadLetter(format!("payload decode failed: {e}")))
122 }
123
124 /// `traceparent` for forwarding to the next hop (HTTP call, next queue).
125 pub fn traceparent(&self) -> String {
126 self.trace.to_traceparent()
127 }
128}
129
130// ─── Typed handler errors ─────────────────────────────────────────────────────
131
132/// What the mesh should do with a failed message. Handlers may keep
133/// returning `Result<(), String>` (strings convert to [`EventError::Retry`])
134/// or return `EventError` directly to decide the message's fate:
135///
136/// ```ignore
137/// #[EventPattern("order.confirmed")]
138/// async fn on_confirmed(ctx: EventContext) -> Result<(), EventError> {
139/// let order: Order = ctx
140/// .payload()
141/// .map_err(EventError::DeadLetter)?; // malformed: retrying won't help
142/// warehouse.reserve(&order).await
143/// .map_err(|e| EventError::Retry(e.to_string())) // transient
144/// }
145/// ```
146#[derive(Debug)]
147pub enum EventError {
148 /// Transient failure (downstream 5xx, lock contention): nack → bounded
149 /// retries → dead-letter at `max_retries`.
150 Retry(String),
151 /// Permanent failure (malformed payload, violated business invariant):
152 /// dead-letter **immediately** — burning the retry budget on a poison
153 /// message only delays the alert.
154 DeadLetter(String),
155}
156
157impl From<String> for EventError {
158 fn from(s: String) -> Self {
159 Self::Retry(s)
160 }
161}
162impl From<&str> for EventError {
163 fn from(s: &str) -> Self {
164 Self::Retry(s.to_owned())
165 }
166}
167impl std::fmt::Display for EventError {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 match self {
170 Self::Retry(m) => write!(f, "retryable: {m}"),
171 Self::DeadLetter(m) => write!(f, "poison: {m}"),
172 }
173 }
174}
175impl std::error::Error for EventError {}
176
177// ─── Handler registration (filled in by #[EventConsumer]) ────────────────────
178
179/// Static descriptor emitted by the `#[EventPattern]` expansion and collected
180/// at link time — the runtime freezes these into its dispatch map at start.
181pub struct EventHandlerDescriptor {
182 pub topic: &'static str,
183 pub consumer: &'static str,
184 pub handler: fn(EventContext) -> BoxFuture<'static, Result<(), EventError>>,
185}
186
187inventory::collect!(&'static EventHandlerDescriptor);