Skip to main content

arcly_http/messaging/
runtime.rs

1//! Consumer runtime: poll → dedupe → dispatch → ack / retry / dead-letter.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures::future::BoxFuture;
8
9use crate::core::engine::FrozenDiContainer;
10use crate::messaging::{
11    EventContext, EventError, EventHandlerDescriptor, InboundMessage, MessageTransport,
12};
13
14type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), EventError>>;
15
16/// Drives one transport against the link-time handler registry.
17///
18/// Spawn from `ArclyPlugin::on_start`. The dispatch map is frozen at spawn —
19/// the polling loop performs immutable `HashMap` reads only.
20#[non_exhaustive]
21pub struct ConsumerRuntime {
22    pub transport: Arc<dyn MessageTransport>,
23    pub container: &'static FrozenDiContainer,
24    pub poll: Duration,
25    pub batch: usize,
26    /// Failed deliveries per message before dead-lettering.
27    pub max_retries: u32,
28    /// TTL for the consume-side dedupe claim (when an `IdempotencyStore`
29    /// is available in the DI container).
30    pub dedupe_ttl_secs: u64,
31    /// Messages processed concurrently per polled batch. `0`/`1` keeps the
32    /// historical strictly-ordered sequential behaviour; higher values trade
33    /// cross-message ordering within a batch for throughput (per-message
34    /// dedupe and retries are unaffected — `attempts` is a sharded map).
35    pub concurrency: usize,
36}
37
38impl ConsumerRuntime {
39    /// Sensible defaults: 300ms poll, batch 32, 3 retries, 1h dedupe TTL,
40    /// sequential processing. Chain setters to adjust.
41    pub fn new(
42        transport: Arc<dyn MessageTransport>,
43        container: &'static FrozenDiContainer,
44    ) -> Self {
45        Self {
46            transport,
47            container,
48            poll: Duration::from_millis(300),
49            batch: 32,
50            max_retries: 3,
51            dedupe_ttl_secs: 3600,
52            concurrency: 1,
53        }
54    }
55    pub fn poll(mut self, v: Duration) -> Self {
56        self.poll = v;
57        self
58    }
59    pub fn batch(mut self, v: usize) -> Self {
60        self.batch = v;
61        self
62    }
63    pub fn max_retries(mut self, v: u32) -> Self {
64        self.max_retries = v;
65        self
66    }
67    pub fn dedupe_ttl_secs(mut self, v: u64) -> Self {
68        self.dedupe_ttl_secs = v;
69        self
70    }
71    pub fn concurrency(mut self, v: usize) -> Self {
72        self.concurrency = v;
73        self
74    }
75
76    pub fn spawn(self) {
77        // Freeze the dispatch map from the inventory registry — once, here.
78        let dispatch: HashMap<&'static str, Handler> =
79            inventory::iter::<&'static EventHandlerDescriptor>
80                .into_iter()
81                .map(|d| (d.topic, d.handler))
82                .collect();
83
84        for d in inventory::iter::<&'static EventHandlerDescriptor> {
85            tracing::info!(
86                topic = d.topic,
87                consumer = d.consumer,
88                "event handler registered"
89            );
90        }
91
92        let runtime = Arc::new(self);
93        tokio::spawn(async move {
94            // Per-key failure counts — sharded so concurrent processing
95            // never contends on a single lock.
96            let attempts: Arc<dashmap::DashMap<String, u32>> = Arc::new(dashmap::DashMap::new());
97            let mut tick = tokio::time::interval(runtime.poll);
98            let limit = runtime.concurrency.max(1);
99
100            loop {
101                tick.tick().await;
102                // Drain-aware (like the outbox relay): once shutdown begins,
103                // stop claiming work — unacked messages redeliver elsewhere.
104                if crate::observability::health::is_draining() {
105                    tracing::info!("consumer runtime: drain flag set — stopping");
106                    return;
107                }
108                let batch = match runtime.transport.poll(runtime.batch).await {
109                    Ok(b) => b,
110                    Err(e) => {
111                        tracing::warn!(error = %e, "transport poll failed — retrying next tick");
112                        continue;
113                    }
114                };
115
116                use futures::StreamExt;
117                futures::stream::iter(batch)
118                    .for_each_concurrent(limit, |msg| {
119                        let runtime = Arc::clone(&runtime);
120                        let attempts = Arc::clone(&attempts);
121                        let dispatch = &dispatch;
122                        async move {
123                            runtime.process(dispatch, &attempts, msg).await;
124                        }
125                    })
126                    .await;
127            }
128        });
129    }
130
131    async fn process(
132        &self,
133        dispatch: &HashMap<&'static str, Handler>,
134        attempts: &dashmap::DashMap<String, u32>,
135        msg: InboundMessage,
136    ) {
137        let Some(handler) = dispatch.get(msg.topic.as_str()) else {
138            // No subscriber for this topic — ack so it doesn't loop forever.
139            tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
140            let _ = self.transport.ack(&msg).await;
141            return;
142        };
143
144        // Consume-side dedupe (at-least-once → effectively-once per TTL).
145        // Same contract as the HTTP idempotency layer: claim before running,
146        // `complete` only AFTER success, `release` on failure — so retries of
147        // a failed delivery pass through instead of being replay-swallowed.
148        let store = self
149            .container
150            .try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
151        let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
152        if let Some(store) = store {
153            match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
154                crate::web::idempotency::IdempotencyDecision::Fresh => {}
155                crate::web::idempotency::IdempotencyDecision::Unavailable => {}
156                crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
157                    metrics::counter!("events_deduped_total").increment(1);
158                    let _ = self.transport.ack(&msg).await;
159                    return;
160                }
161                crate::web::idempotency::IdempotencyDecision::InFlight => {
162                    // Our own released-then-retried claim or a concurrent
163                    // consumer; requeue and let the next poll settle it.
164                    let _ = self.transport.nack(&msg).await;
165                    return;
166                }
167            }
168        }
169
170        // One shared extraction (pipeline::Provenance): the producer's trace
171        // continues (fresh root when none was carried) and the envelope's
172        // tenant id is validated against the SAME registry as HTTP traffic.
173        let provenance = crate::pipeline::Provenance::from_message(&msg, self.container);
174
175        // A suspended (or unknown) tenant's queued events must stop being
176        // processed, exactly like its HTTP requests — park them out of band
177        // rather than retry-looping or silently processing. Only enforced
178        // when a TenantRegistry is actually configured.
179        if msg.tenant.is_some()
180            && provenance.tenant.is_none()
181            && self
182                .container
183                .try_get::<crate::web::tenant::TenantRegistry>()
184                .is_some()
185        {
186            metrics::counter!("events_tenant_rejected_total").increment(1);
187            tracing::warn!(
188                topic = %msg.topic,
189                tenant = msg.tenant.as_deref().unwrap_or(""),
190                "event tenant suspended or unknown — dead-lettering"
191            );
192            if let Some(store) = store {
193                store.release(&dedupe_key).await;
194            }
195            let _ = self
196                .transport
197                .dead_letter(&msg, "tenant suspended or unknown")
198                .await;
199            return;
200        }
201
202        let ctx = EventContext {
203            message: msg.clone(),
204            container: self.container,
205            trace: provenance.trace,
206            tenant: provenance.tenant,
207        };
208
209        match handler(ctx).await {
210            Ok(()) => {
211                attempts.remove(&msg.idempotency_key);
212                if let Some(store) = store {
213                    store
214                        .complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
215                        .await;
216                }
217                metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
218                    .increment(1);
219                if let Err(e) = self.transport.ack(&msg).await {
220                    tracing::warn!(error = %e, "ack failed — message may redeliver");
221                }
222            }
223            Err(error) => {
224                if let Some(store) = store {
225                    store.release(&dedupe_key).await; // allow the retry through
226                }
227                // Typed fate: a permanent failure skips the retry budget and
228                // parks immediately; transient failures take the bounded
229                // retry path below.
230                let (reason, poison) = match error {
231                    EventError::DeadLetter(m) => {
232                        metrics::counter!("events_poisoned_total").increment(1);
233                        (m, true)
234                    }
235                    EventError::Retry(m) => (m, false),
236                };
237                // Bump-and-read, dropping the shard guard BEFORE any await
238                // below (dead_letter / nack are async).
239                let n = {
240                    let mut entry = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
241                    *entry += 1;
242                    *entry
243                };
244                if poison || n > self.max_retries {
245                    attempts.remove(&msg.idempotency_key);
246                    metrics::counter!("events_dead_lettered_total").increment(1);
247                    tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
248                        attempts = n, reason = %reason,
249                        "poison message → dead letter");
250                    // PII never parks raw in the DLQ: dead-lettered payloads
251                    // are masked first (the queue copy already served its
252                    // delivery purpose; the DLQ copy is for forensics).
253                    let mut parked = msg.clone();
254                    if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
255                        masker.apply(&mut parked.payload);
256                    }
257                    let _ = self.transport.dead_letter(&parked, &reason).await;
258                } else {
259                    metrics::counter!("events_retried_total").increment(1);
260                    tracing::warn!(topic = %msg.topic, attempt = n, reason = %reason,
261                        "event handler failed — nack for retry");
262                    let _ = self.transport.nack(&msg).await;
263                }
264            }
265        }
266    }
267}