arcly_http/messaging/
runtime.rs1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures::future::BoxFuture;
8
9use crate::core::engine::FrozenDiContainer;
10use crate::messaging::{EventContext, EventHandlerDescriptor, InboundMessage, MessageTransport};
11use crate::observability::propagation::TraceContext;
12
13type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), String>>;
14
15pub struct ConsumerRuntime {
20 pub transport: Arc<dyn MessageTransport>,
21 pub container: &'static FrozenDiContainer,
22 pub poll: Duration,
23 pub batch: usize,
24 pub max_retries: u32,
26 pub dedupe_ttl_secs: u64,
29}
30
31impl ConsumerRuntime {
32 pub fn spawn(self) {
33 let dispatch: HashMap<&'static str, Handler> =
35 inventory::iter::<&'static EventHandlerDescriptor>
36 .into_iter()
37 .map(|d| (d.topic, d.handler))
38 .collect();
39
40 for d in inventory::iter::<&'static EventHandlerDescriptor> {
41 tracing::info!(
42 topic = d.topic,
43 consumer = d.consumer,
44 "event handler registered"
45 );
46 }
47
48 tokio::spawn(async move {
49 let mut attempts: HashMap<String, u32> = HashMap::new();
51 let mut tick = tokio::time::interval(self.poll);
52
53 loop {
54 tick.tick().await;
55 let batch = match self.transport.poll(self.batch).await {
56 Ok(b) => b,
57 Err(e) => {
58 tracing::warn!(error = %e, "transport poll failed — retrying next tick");
59 continue;
60 }
61 };
62
63 for msg in batch {
64 self.process(&dispatch, &mut attempts, msg).await;
65 }
66 }
67 });
68 }
69
70 async fn process(
71 &self,
72 dispatch: &HashMap<&'static str, Handler>,
73 attempts: &mut HashMap<String, u32>,
74 msg: InboundMessage,
75 ) {
76 let Some(handler) = dispatch.get(msg.topic.as_str()) else {
77 tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
79 let _ = self.transport.ack(&msg).await;
80 return;
81 };
82
83 let store = self
88 .container
89 .try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
90 let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
91 if let Some(store) = store {
92 match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
93 crate::web::idempotency::IdempotencyDecision::Fresh => {}
94 crate::web::idempotency::IdempotencyDecision::Unavailable => {}
95 crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
96 metrics::counter!("events_deduped_total").increment(1);
97 let _ = self.transport.ack(&msg).await;
98 return;
99 }
100 crate::web::idempotency::IdempotencyDecision::InFlight => {
101 let _ = self.transport.nack(&msg).await;
104 return;
105 }
106 }
107 }
108
109 let trace = msg
111 .traceparent
112 .as_deref()
113 .and_then(TraceContext::from_traceparent)
114 .unwrap_or_else(TraceContext::new_root);
115
116 let ctx = EventContext {
117 message: msg.clone(),
118 container: self.container,
119 trace,
120 };
121
122 match handler(ctx).await {
123 Ok(()) => {
124 attempts.remove(&msg.idempotency_key);
125 if let Some(store) = store {
126 store
127 .complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
128 .await;
129 }
130 metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
131 .increment(1);
132 if let Err(e) = self.transport.ack(&msg).await {
133 tracing::warn!(error = %e, "ack failed — message may redeliver");
134 }
135 }
136 Err(reason) => {
137 if let Some(store) = store {
138 store.release(&dedupe_key).await; }
140 let n = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
141 *n += 1;
142 if *n > self.max_retries {
143 attempts.remove(&msg.idempotency_key);
144 metrics::counter!("events_dead_lettered_total").increment(1);
145 tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
146 attempts = self.max_retries, reason = %reason,
147 "poison message → dead letter");
148 let mut parked = msg.clone();
152 if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
153 masker.apply(&mut parked.payload);
154 }
155 let _ = self.transport.dead_letter(&parked, &reason).await;
156 } else {
157 metrics::counter!("events_retried_total").increment(1);
158 tracing::warn!(topic = %msg.topic, attempt = *n, reason = %reason,
159 "event handler failed — nack for retry");
160 let _ = self.transport.nack(&msg).await;
161 }
162 }
163 }
164 }
165}