arcly_http/messaging/mod.rs
1//! Event consumer mesh — the inbound half of the event architecture.
2//!
3//! The transactional outbox (`data::outbox`) made *producing* events safe;
4//! this module gives *consuming* them the same framework-grade guarantees,
5//! with NestJS-style ergonomics:
6//!
7//! ```ignore
8//! pub struct OrderEventsConsumer;
9//!
10//! #[EventConsumer]
11//! impl OrderEventsConsumer {
12//! #[EventPattern("order.confirmed")]
13//! async fn on_confirmed(ctx: EventContext) -> Result<(), String> {
14//! let evt: OrderConfirmed = ctx.payload()?;
15//! ctx.inject::<InventoryService>().reserve(evt.order_id).await
16//! }
17//! }
18//! ```
19//!
20//! ## Guarantees
21//!
22//! - **Frozen dispatch** — `#[EventPattern]` handlers register through
23//! `inventory` (the same link-time mechanism as `#[Controller]`); the
24//! topic→handler map is built once at runtime start and never mutated.
25//! - **At-least-once + dedupe** — when an `IdempotencyStore` is in the DI
26//! container, each `idempotency_key` is consumed exactly once per TTL.
27//! - **Bounded retries → DLQ** — a failing message is `nack`ed and retried
28//! up to `max_retries`, then dead-lettered with the failure reason; one
29//! poison message can never wedge a topic.
30//! - **Trace continuity** — the envelope's `traceparent` (stamped by the
31//! outbox producer) becomes this hop's parent span; async hops chain in
32//! the trace UI instead of starting orphan roots.
33//!
34//! The transport (Kafka / AMQP / NATS / in-process bridge) is app-provided
35//! via [`MessageTransport`] — the same boundary rule as every other driver.
36
37pub mod runtime;
38
39pub use runtime::ConsumerRuntime;
40
41use futures::future::BoxFuture;
42
43use crate::core::engine::FrozenDiContainer;
44use crate::observability::propagation::TraceContext;
45
46// ─── Envelope ─────────────────────────────────────────────────────────────────
47
48/// One inbound message, transport-agnostic.
49#[derive(Clone, Debug)]
50pub struct InboundMessage {
51 pub topic: String,
52 pub payload: serde_json::Value,
53 /// Consumers dedupe on this under at-least-once delivery.
54 pub idempotency_key: String,
55 pub tenant: Option<String>,
56 /// W3C trace context stamped by the producer (outbox), if any.
57 pub traceparent: Option<String>,
58}
59
60// ─── Transport contract ───────────────────────────────────────────────────────
61
62/// App-provided broker adapter (Kafka / AMQP / NATS / in-process bridge).
63pub trait MessageTransport: Send + Sync + 'static {
64 /// Pull up to `max` messages. An empty Vec means "nothing right now".
65 fn poll(&self, max: usize) -> BoxFuture<'_, Result<Vec<InboundMessage>, String>>;
66 /// Acknowledge successful processing (message will not be redelivered).
67 fn ack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), String>>;
68 /// Negative-ack: requeue for a later retry.
69 fn nack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), String>>;
70 /// Park a poison message out of band, with the final failure reason.
71 fn dead_letter<'a>(
72 &'a self,
73 msg: &'a InboundMessage,
74 reason: &'a str,
75 ) -> BoxFuture<'a, Result<(), String>>;
76}
77
78// ─── Consumer-side context ────────────────────────────────────────────────────
79
80/// What an `#[EventPattern]` handler receives — DI + payload + trace, no
81/// HTTP types anywhere.
82pub struct EventContext {
83 pub message: InboundMessage,
84 pub container: &'static FrozenDiContainer,
85 /// Continues the producer's trace (or a fresh root when none was carried).
86 pub trace: TraceContext,
87}
88
89impl EventContext {
90 /// Resolve a singleton service. O(1), no locks. Panics when absent.
91 #[inline]
92 pub fn inject<T: Send + Sync + 'static>(&self) -> &'static T {
93 self.container.get::<T>()
94 }
95
96 /// Non-panicking variant of [`Self::inject`].
97 #[inline]
98 pub fn try_inject<T: Send + Sync + 'static>(&self) -> Option<&'static T> {
99 self.container.try_get::<T>()
100 }
101
102 /// Deserialize the payload into a typed event.
103 pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, String> {
104 serde_json::from_value(self.message.payload.clone())
105 .map_err(|e| format!("payload decode failed: {e}"))
106 }
107
108 /// `traceparent` for forwarding to the next hop (HTTP call, next queue).
109 pub fn traceparent(&self) -> String {
110 self.trace.to_traceparent()
111 }
112}
113
114// ─── Handler registration (filled in by #[EventConsumer]) ────────────────────
115
116/// Static descriptor emitted by the `#[EventPattern]` expansion and collected
117/// at link time — the runtime freezes these into its dispatch map at start.
118pub struct EventHandlerDescriptor {
119 pub topic: &'static str,
120 pub consumer: &'static str,
121 pub handler: fn(EventContext) -> BoxFuture<'static, Result<(), String>>,
122}
123
124inventory::collect!(&'static EventHandlerDescriptor);