Skip to main content

arcly_http/messaging/
runtime.rs

1//! Consumer runtime: poll → dedupe → dispatch → ack / retry / dead-letter.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures::future::BoxFuture;
8
9use crate::core::engine::FrozenDiContainer;
10use crate::messaging::{EventContext, EventHandlerDescriptor, InboundMessage, MessageTransport};
11use crate::observability::propagation::TraceContext;
12
13type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), String>>;
14
15/// Drives one transport against the link-time handler registry.
16///
17/// Spawn from `ArclyPlugin::on_start`. The dispatch map is frozen at spawn —
18/// the polling loop performs immutable `HashMap` reads only.
19pub struct ConsumerRuntime {
20    pub transport: Arc<dyn MessageTransport>,
21    pub container: &'static FrozenDiContainer,
22    pub poll: Duration,
23    pub batch: usize,
24    /// Failed deliveries per message before dead-lettering.
25    pub max_retries: u32,
26    /// TTL for the consume-side dedupe claim (when an `IdempotencyStore`
27    /// is available in the DI container).
28    pub dedupe_ttl_secs: u64,
29}
30
31impl ConsumerRuntime {
32    pub fn spawn(self) {
33        // Freeze the dispatch map from the inventory registry — once, here.
34        let dispatch: HashMap<&'static str, Handler> =
35            inventory::iter::<&'static EventHandlerDescriptor>
36                .into_iter()
37                .map(|d| (d.topic, d.handler))
38                .collect();
39
40        for d in inventory::iter::<&'static EventHandlerDescriptor> {
41            tracing::info!(
42                topic = d.topic,
43                consumer = d.consumer,
44                "event handler registered"
45            );
46        }
47
48        tokio::spawn(async move {
49            // Per-key failure counts. Single-task state — no shared locks.
50            let mut attempts: HashMap<String, u32> = HashMap::new();
51            let mut tick = tokio::time::interval(self.poll);
52
53            loop {
54                tick.tick().await;
55                let batch = match self.transport.poll(self.batch).await {
56                    Ok(b) => b,
57                    Err(e) => {
58                        tracing::warn!(error = %e, "transport poll failed — retrying next tick");
59                        continue;
60                    }
61                };
62
63                for msg in batch {
64                    self.process(&dispatch, &mut attempts, msg).await;
65                }
66            }
67        });
68    }
69
70    async fn process(
71        &self,
72        dispatch: &HashMap<&'static str, Handler>,
73        attempts: &mut HashMap<String, u32>,
74        msg: InboundMessage,
75    ) {
76        let Some(handler) = dispatch.get(msg.topic.as_str()) else {
77            // No subscriber for this topic — ack so it doesn't loop forever.
78            tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
79            let _ = self.transport.ack(&msg).await;
80            return;
81        };
82
83        // Consume-side dedupe (at-least-once → effectively-once per TTL).
84        // Same contract as the HTTP idempotency layer: claim before running,
85        // `complete` only AFTER success, `release` on failure — so retries of
86        // a failed delivery pass through instead of being replay-swallowed.
87        let store = self
88            .container
89            .try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
90        let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
91        if let Some(store) = store {
92            match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
93                crate::web::idempotency::IdempotencyDecision::Fresh => {}
94                crate::web::idempotency::IdempotencyDecision::Unavailable => {}
95                crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
96                    metrics::counter!("events_deduped_total").increment(1);
97                    let _ = self.transport.ack(&msg).await;
98                    return;
99                }
100                crate::web::idempotency::IdempotencyDecision::InFlight => {
101                    // Our own released-then-retried claim or a concurrent
102                    // consumer; requeue and let the next poll settle it.
103                    let _ = self.transport.nack(&msg).await;
104                    return;
105                }
106            }
107        }
108
109        // Continue the producer's trace; fresh root when none was carried.
110        let trace = msg
111            .traceparent
112            .as_deref()
113            .and_then(TraceContext::from_traceparent)
114            .unwrap_or_else(TraceContext::new_root);
115
116        let ctx = EventContext {
117            message: msg.clone(),
118            container: self.container,
119            trace,
120        };
121
122        match handler(ctx).await {
123            Ok(()) => {
124                attempts.remove(&msg.idempotency_key);
125                if let Some(store) = store {
126                    store
127                        .complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
128                        .await;
129                }
130                metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
131                    .increment(1);
132                if let Err(e) = self.transport.ack(&msg).await {
133                    tracing::warn!(error = %e, "ack failed — message may redeliver");
134                }
135            }
136            Err(reason) => {
137                if let Some(store) = store {
138                    store.release(&dedupe_key).await; // allow the retry through
139                }
140                let n = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
141                *n += 1;
142                if *n > self.max_retries {
143                    attempts.remove(&msg.idempotency_key);
144                    metrics::counter!("events_dead_lettered_total").increment(1);
145                    tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
146                        attempts = self.max_retries, reason = %reason,
147                        "poison message → dead letter");
148                    // PII never parks raw in the DLQ: dead-lettered payloads
149                    // are masked first (the queue copy already served its
150                    // delivery purpose; the DLQ copy is for forensics).
151                    let mut parked = msg.clone();
152                    if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
153                        masker.apply(&mut parked.payload);
154                    }
155                    let _ = self.transport.dead_letter(&parked, &reason).await;
156                } else {
157                    metrics::counter!("events_retried_total").increment(1);
158                    tracing::warn!(topic = %msg.topic, attempt = *n, reason = %reason,
159                        "event handler failed — nack for retry");
160                    let _ = self.transport.nack(&msg).await;
161                }
162            }
163        }
164    }
165}