Skip to main content

arcly_http/messaging/
runtime.rs

1//! Consumer runtime: poll → dedupe → dispatch → ack / retry / dead-letter.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures::future::BoxFuture;
8
9use crate::core::engine::FrozenDiContainer;
10use crate::messaging::{
11    EventContext, EventError, EventHandlerDescriptor, InboundMessage, MessageTransport,
12};
13
14type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), EventError>>;
15
16/// Drives one transport against the link-time handler registry.
17///
18/// Spawn from `ArclyPlugin::on_start`. The dispatch map is frozen at spawn —
19/// the polling loop performs immutable `HashMap` reads only.
20pub struct ConsumerRuntime {
21    pub transport: Arc<dyn MessageTransport>,
22    pub container: &'static FrozenDiContainer,
23    pub poll: Duration,
24    pub batch: usize,
25    /// Failed deliveries per message before dead-lettering.
26    pub max_retries: u32,
27    /// TTL for the consume-side dedupe claim (when an `IdempotencyStore`
28    /// is available in the DI container).
29    pub dedupe_ttl_secs: u64,
30    /// Messages processed concurrently per polled batch. `0`/`1` keeps the
31    /// historical strictly-ordered sequential behaviour; higher values trade
32    /// cross-message ordering within a batch for throughput (per-message
33    /// dedupe and retries are unaffected — `attempts` is a sharded map).
34    pub concurrency: usize,
35}
36
37impl ConsumerRuntime {
38    pub fn spawn(self) {
39        // Freeze the dispatch map from the inventory registry — once, here.
40        let dispatch: HashMap<&'static str, Handler> =
41            inventory::iter::<&'static EventHandlerDescriptor>
42                .into_iter()
43                .map(|d| (d.topic, d.handler))
44                .collect();
45
46        for d in inventory::iter::<&'static EventHandlerDescriptor> {
47            tracing::info!(
48                topic = d.topic,
49                consumer = d.consumer,
50                "event handler registered"
51            );
52        }
53
54        let runtime = Arc::new(self);
55        tokio::spawn(async move {
56            // Per-key failure counts — sharded so concurrent processing
57            // never contends on a single lock.
58            let attempts: Arc<dashmap::DashMap<String, u32>> = Arc::new(dashmap::DashMap::new());
59            let mut tick = tokio::time::interval(runtime.poll);
60            let limit = runtime.concurrency.max(1);
61
62            loop {
63                tick.tick().await;
64                // Drain-aware (like the outbox relay): once shutdown begins,
65                // stop claiming work — unacked messages redeliver elsewhere.
66                if crate::observability::health::is_draining() {
67                    tracing::info!("consumer runtime: drain flag set — stopping");
68                    return;
69                }
70                let batch = match runtime.transport.poll(runtime.batch).await {
71                    Ok(b) => b,
72                    Err(e) => {
73                        tracing::warn!(error = %e, "transport poll failed — retrying next tick");
74                        continue;
75                    }
76                };
77
78                use futures::StreamExt;
79                futures::stream::iter(batch)
80                    .for_each_concurrent(limit, |msg| {
81                        let runtime = Arc::clone(&runtime);
82                        let attempts = Arc::clone(&attempts);
83                        let dispatch = &dispatch;
84                        async move {
85                            runtime.process(dispatch, &attempts, msg).await;
86                        }
87                    })
88                    .await;
89            }
90        });
91    }
92
93    async fn process(
94        &self,
95        dispatch: &HashMap<&'static str, Handler>,
96        attempts: &dashmap::DashMap<String, u32>,
97        msg: InboundMessage,
98    ) {
99        let Some(handler) = dispatch.get(msg.topic.as_str()) else {
100            // No subscriber for this topic — ack so it doesn't loop forever.
101            tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
102            let _ = self.transport.ack(&msg).await;
103            return;
104        };
105
106        // Consume-side dedupe (at-least-once → effectively-once per TTL).
107        // Same contract as the HTTP idempotency layer: claim before running,
108        // `complete` only AFTER success, `release` on failure — so retries of
109        // a failed delivery pass through instead of being replay-swallowed.
110        let store = self
111            .container
112            .try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
113        let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
114        if let Some(store) = store {
115            match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
116                crate::web::idempotency::IdempotencyDecision::Fresh => {}
117                crate::web::idempotency::IdempotencyDecision::Unavailable => {}
118                crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
119                    metrics::counter!("events_deduped_total").increment(1);
120                    let _ = self.transport.ack(&msg).await;
121                    return;
122                }
123                crate::web::idempotency::IdempotencyDecision::InFlight => {
124                    // Our own released-then-retried claim or a concurrent
125                    // consumer; requeue and let the next poll settle it.
126                    let _ = self.transport.nack(&msg).await;
127                    return;
128                }
129            }
130        }
131
132        // One shared extraction (pipeline::Provenance): the producer's trace
133        // continues (fresh root when none was carried) and the envelope's
134        // tenant id is validated against the SAME registry as HTTP traffic.
135        let provenance = crate::pipeline::Provenance::from_message(&msg, self.container);
136
137        // A suspended (or unknown) tenant's queued events must stop being
138        // processed, exactly like its HTTP requests — park them out of band
139        // rather than retry-looping or silently processing. Only enforced
140        // when a TenantRegistry is actually configured.
141        if msg.tenant.is_some()
142            && provenance.tenant.is_none()
143            && self
144                .container
145                .try_get::<crate::web::tenant::TenantRegistry>()
146                .is_some()
147        {
148            metrics::counter!("events_tenant_rejected_total").increment(1);
149            tracing::warn!(
150                topic = %msg.topic,
151                tenant = msg.tenant.as_deref().unwrap_or(""),
152                "event tenant suspended or unknown — dead-lettering"
153            );
154            if let Some(store) = store {
155                store.release(&dedupe_key).await;
156            }
157            let _ = self
158                .transport
159                .dead_letter(&msg, "tenant suspended or unknown")
160                .await;
161            return;
162        }
163
164        let ctx = EventContext {
165            message: msg.clone(),
166            container: self.container,
167            trace: provenance.trace,
168            tenant: provenance.tenant,
169        };
170
171        match handler(ctx).await {
172            Ok(()) => {
173                attempts.remove(&msg.idempotency_key);
174                if let Some(store) = store {
175                    store
176                        .complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
177                        .await;
178                }
179                metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
180                    .increment(1);
181                if let Err(e) = self.transport.ack(&msg).await {
182                    tracing::warn!(error = %e, "ack failed — message may redeliver");
183                }
184            }
185            Err(error) => {
186                if let Some(store) = store {
187                    store.release(&dedupe_key).await; // allow the retry through
188                }
189                // Typed fate: a permanent failure skips the retry budget and
190                // parks immediately; transient failures take the bounded
191                // retry path below.
192                let (reason, poison) = match error {
193                    EventError::DeadLetter(m) => {
194                        metrics::counter!("events_poisoned_total").increment(1);
195                        (m, true)
196                    }
197                    EventError::Retry(m) => (m, false),
198                };
199                // Bump-and-read, dropping the shard guard BEFORE any await
200                // below (dead_letter / nack are async).
201                let n = {
202                    let mut entry = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
203                    *entry += 1;
204                    *entry
205                };
206                if poison || n > self.max_retries {
207                    attempts.remove(&msg.idempotency_key);
208                    metrics::counter!("events_dead_lettered_total").increment(1);
209                    tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
210                        attempts = n, reason = %reason,
211                        "poison message → dead letter");
212                    // PII never parks raw in the DLQ: dead-lettered payloads
213                    // are masked first (the queue copy already served its
214                    // delivery purpose; the DLQ copy is for forensics).
215                    let mut parked = msg.clone();
216                    if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
217                        masker.apply(&mut parked.payload);
218                    }
219                    let _ = self.transport.dead_letter(&parked, &reason).await;
220                } else {
221                    metrics::counter!("events_retried_total").increment(1);
222                    tracing::warn!(topic = %msg.topic, attempt = n, reason = %reason,
223                        "event handler failed — nack for retry");
224                    let _ = self.transport.nack(&msg).await;
225                }
226            }
227        }
228    }
229}