arcly_http/messaging/
runtime.rs1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures::future::BoxFuture;
8
9use crate::core::engine::FrozenDiContainer;
10use crate::messaging::{
11 EventContext, EventError, EventHandlerDescriptor, InboundMessage, MessageTransport,
12};
13
14type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), EventError>>;
15
16pub struct ConsumerRuntime {
21 pub transport: Arc<dyn MessageTransport>,
22 pub container: &'static FrozenDiContainer,
23 pub poll: Duration,
24 pub batch: usize,
25 pub max_retries: u32,
27 pub dedupe_ttl_secs: u64,
30 pub concurrency: usize,
35}
36
37impl ConsumerRuntime {
38 pub fn spawn(self) {
39 let dispatch: HashMap<&'static str, Handler> =
41 inventory::iter::<&'static EventHandlerDescriptor>
42 .into_iter()
43 .map(|d| (d.topic, d.handler))
44 .collect();
45
46 for d in inventory::iter::<&'static EventHandlerDescriptor> {
47 tracing::info!(
48 topic = d.topic,
49 consumer = d.consumer,
50 "event handler registered"
51 );
52 }
53
54 let runtime = Arc::new(self);
55 tokio::spawn(async move {
56 let attempts: Arc<dashmap::DashMap<String, u32>> = Arc::new(dashmap::DashMap::new());
59 let mut tick = tokio::time::interval(runtime.poll);
60 let limit = runtime.concurrency.max(1);
61
62 loop {
63 tick.tick().await;
64 if crate::observability::health::is_draining() {
67 tracing::info!("consumer runtime: drain flag set — stopping");
68 return;
69 }
70 let batch = match runtime.transport.poll(runtime.batch).await {
71 Ok(b) => b,
72 Err(e) => {
73 tracing::warn!(error = %e, "transport poll failed — retrying next tick");
74 continue;
75 }
76 };
77
78 use futures::StreamExt;
79 futures::stream::iter(batch)
80 .for_each_concurrent(limit, |msg| {
81 let runtime = Arc::clone(&runtime);
82 let attempts = Arc::clone(&attempts);
83 let dispatch = &dispatch;
84 async move {
85 runtime.process(dispatch, &attempts, msg).await;
86 }
87 })
88 .await;
89 }
90 });
91 }
92
93 async fn process(
94 &self,
95 dispatch: &HashMap<&'static str, Handler>,
96 attempts: &dashmap::DashMap<String, u32>,
97 msg: InboundMessage,
98 ) {
99 let Some(handler) = dispatch.get(msg.topic.as_str()) else {
100 tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
102 let _ = self.transport.ack(&msg).await;
103 return;
104 };
105
106 let store = self
111 .container
112 .try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
113 let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
114 if let Some(store) = store {
115 match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
116 crate::web::idempotency::IdempotencyDecision::Fresh => {}
117 crate::web::idempotency::IdempotencyDecision::Unavailable => {}
118 crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
119 metrics::counter!("events_deduped_total").increment(1);
120 let _ = self.transport.ack(&msg).await;
121 return;
122 }
123 crate::web::idempotency::IdempotencyDecision::InFlight => {
124 let _ = self.transport.nack(&msg).await;
127 return;
128 }
129 }
130 }
131
132 let provenance = crate::pipeline::Provenance::from_message(&msg, self.container);
136
137 if msg.tenant.is_some()
142 && provenance.tenant.is_none()
143 && self
144 .container
145 .try_get::<crate::web::tenant::TenantRegistry>()
146 .is_some()
147 {
148 metrics::counter!("events_tenant_rejected_total").increment(1);
149 tracing::warn!(
150 topic = %msg.topic,
151 tenant = msg.tenant.as_deref().unwrap_or(""),
152 "event tenant suspended or unknown — dead-lettering"
153 );
154 if let Some(store) = store {
155 store.release(&dedupe_key).await;
156 }
157 let _ = self
158 .transport
159 .dead_letter(&msg, "tenant suspended or unknown")
160 .await;
161 return;
162 }
163
164 let ctx = EventContext {
165 message: msg.clone(),
166 container: self.container,
167 trace: provenance.trace,
168 tenant: provenance.tenant,
169 };
170
171 match handler(ctx).await {
172 Ok(()) => {
173 attempts.remove(&msg.idempotency_key);
174 if let Some(store) = store {
175 store
176 .complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
177 .await;
178 }
179 metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
180 .increment(1);
181 if let Err(e) = self.transport.ack(&msg).await {
182 tracing::warn!(error = %e, "ack failed — message may redeliver");
183 }
184 }
185 Err(error) => {
186 if let Some(store) = store {
187 store.release(&dedupe_key).await; }
189 let (reason, poison) = match error {
193 EventError::DeadLetter(m) => {
194 metrics::counter!("events_poisoned_total").increment(1);
195 (m, true)
196 }
197 EventError::Retry(m) => (m, false),
198 };
199 let n = {
202 let mut entry = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
203 *entry += 1;
204 *entry
205 };
206 if poison || n > self.max_retries {
207 attempts.remove(&msg.idempotency_key);
208 metrics::counter!("events_dead_lettered_total").increment(1);
209 tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
210 attempts = n, reason = %reason,
211 "poison message → dead letter");
212 let mut parked = msg.clone();
216 if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
217 masker.apply(&mut parked.payload);
218 }
219 let _ = self.transport.dead_letter(&parked, &reason).await;
220 } else {
221 metrics::counter!("events_retried_total").increment(1);
222 tracing::warn!(topic = %msg.topic, attempt = n, reason = %reason,
223 "event handler failed — nack for retry");
224 let _ = self.transport.nack(&msg).await;
225 }
226 }
227 }
228 }
229}