arcly_http/messaging/
runtime.rs1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures::future::BoxFuture;
8
9use crate::core::engine::FrozenDiContainer;
10use crate::messaging::{
11 EventContext, EventError, EventHandlerDescriptor, InboundMessage, MessageTransport,
12};
13
14type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), EventError>>;
15
16#[non_exhaustive]
21pub struct ConsumerRuntime {
22 pub transport: Arc<dyn MessageTransport>,
23 pub container: &'static FrozenDiContainer,
24 pub poll: Duration,
25 pub batch: usize,
26 pub max_retries: u32,
28 pub dedupe_ttl_secs: u64,
31 pub concurrency: usize,
36}
37
38impl ConsumerRuntime {
39 pub fn new(
42 transport: Arc<dyn MessageTransport>,
43 container: &'static FrozenDiContainer,
44 ) -> Self {
45 Self {
46 transport,
47 container,
48 poll: Duration::from_millis(300),
49 batch: 32,
50 max_retries: 3,
51 dedupe_ttl_secs: 3600,
52 concurrency: 1,
53 }
54 }
55 pub fn poll(mut self, v: Duration) -> Self {
56 self.poll = v;
57 self
58 }
59 pub fn batch(mut self, v: usize) -> Self {
60 self.batch = v;
61 self
62 }
63 pub fn max_retries(mut self, v: u32) -> Self {
64 self.max_retries = v;
65 self
66 }
67 pub fn dedupe_ttl_secs(mut self, v: u64) -> Self {
68 self.dedupe_ttl_secs = v;
69 self
70 }
71 pub fn concurrency(mut self, v: usize) -> Self {
72 self.concurrency = v;
73 self
74 }
75
76 pub fn spawn(self) {
77 let dispatch: HashMap<&'static str, Handler> =
79 inventory::iter::<&'static EventHandlerDescriptor>
80 .into_iter()
81 .map(|d| (d.topic, d.handler))
82 .collect();
83
84 for d in inventory::iter::<&'static EventHandlerDescriptor> {
85 tracing::info!(
86 topic = d.topic,
87 consumer = d.consumer,
88 "event handler registered"
89 );
90 }
91
92 let runtime = Arc::new(self);
93 tokio::spawn(async move {
94 let attempts: Arc<dashmap::DashMap<String, u32>> = Arc::new(dashmap::DashMap::new());
97 let mut tick = tokio::time::interval(runtime.poll);
98 let limit = runtime.concurrency.max(1);
99
100 loop {
101 tick.tick().await;
102 if crate::observability::health::is_draining() {
105 tracing::info!("consumer runtime: drain flag set — stopping");
106 return;
107 }
108 let batch = match runtime.transport.poll(runtime.batch).await {
109 Ok(b) => b,
110 Err(e) => {
111 tracing::warn!(error = %e, "transport poll failed — retrying next tick");
112 continue;
113 }
114 };
115
116 use futures::StreamExt;
117 futures::stream::iter(batch)
118 .for_each_concurrent(limit, |msg| {
119 let runtime = Arc::clone(&runtime);
120 let attempts = Arc::clone(&attempts);
121 let dispatch = &dispatch;
122 async move {
123 runtime.process(dispatch, &attempts, msg).await;
124 }
125 })
126 .await;
127 }
128 });
129 }
130
131 async fn process(
132 &self,
133 dispatch: &HashMap<&'static str, Handler>,
134 attempts: &dashmap::DashMap<String, u32>,
135 msg: InboundMessage,
136 ) {
137 let Some(handler) = dispatch.get(msg.topic.as_str()) else {
138 tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
140 let _ = self.transport.ack(&msg).await;
141 return;
142 };
143
144 let store = self
149 .container
150 .try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
151 let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
152 if let Some(store) = store {
153 match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
154 crate::web::idempotency::IdempotencyDecision::Fresh => {}
155 crate::web::idempotency::IdempotencyDecision::Unavailable => {}
156 crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
157 metrics::counter!("events_deduped_total").increment(1);
158 let _ = self.transport.ack(&msg).await;
159 return;
160 }
161 crate::web::idempotency::IdempotencyDecision::InFlight => {
162 let _ = self.transport.nack(&msg).await;
165 return;
166 }
167 }
168 }
169
170 let provenance = crate::pipeline::Provenance::from_message(&msg, self.container);
174
175 if msg.tenant.is_some()
180 && provenance.tenant.is_none()
181 && self
182 .container
183 .try_get::<crate::web::tenant::TenantRegistry>()
184 .is_some()
185 {
186 metrics::counter!("events_tenant_rejected_total").increment(1);
187 tracing::warn!(
188 topic = %msg.topic,
189 tenant = msg.tenant.as_deref().unwrap_or(""),
190 "event tenant suspended or unknown — dead-lettering"
191 );
192 if let Some(store) = store {
193 store.release(&dedupe_key).await;
194 }
195 let _ = self
196 .transport
197 .dead_letter(&msg, "tenant suspended or unknown")
198 .await;
199 return;
200 }
201
202 let ctx = EventContext {
203 message: msg.clone(),
204 container: self.container,
205 trace: provenance.trace,
206 tenant: provenance.tenant,
207 };
208
209 match handler(ctx).await {
210 Ok(()) => {
211 attempts.remove(&msg.idempotency_key);
212 if let Some(store) = store {
213 store
214 .complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
215 .await;
216 }
217 metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
218 .increment(1);
219 if let Err(e) = self.transport.ack(&msg).await {
220 tracing::warn!(error = %e, "ack failed — message may redeliver");
221 }
222 }
223 Err(error) => {
224 if let Some(store) = store {
225 store.release(&dedupe_key).await; }
227 let (reason, poison) = match error {
231 EventError::DeadLetter(m) => {
232 metrics::counter!("events_poisoned_total").increment(1);
233 (m, true)
234 }
235 EventError::Retry(m) => (m, false),
236 };
237 let n = {
240 let mut entry = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
241 *entry += 1;
242 *entry
243 };
244 if poison || n > self.max_retries {
245 attempts.remove(&msg.idempotency_key);
246 metrics::counter!("events_dead_lettered_total").increment(1);
247 tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
248 attempts = n, reason = %reason,
249 "poison message → dead letter");
250 let mut parked = msg.clone();
254 if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
255 masker.apply(&mut parked.payload);
256 }
257 let _ = self.transport.dead_letter(&parked, &reason).await;
258 } else {
259 metrics::counter!("events_retried_total").increment(1);
260 tracing::warn!(topic = %msg.topic, attempt = n, reason = %reason,
261 "event handler failed — nack for retry");
262 let _ = self.transport.nack(&msg).await;
263 }
264 }
265 }
266 }
267}