arcly-http 0.1.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Consumer runtime: poll → dedupe → dispatch → ack / retry / dead-letter.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use futures::future::BoxFuture;

use crate::core::engine::FrozenDiContainer;
use crate::messaging::{EventContext, EventHandlerDescriptor, InboundMessage, MessageTransport};
use crate::observability::propagation::TraceContext;

type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), String>>;

/// Drives one transport against the link-time handler registry.
///
/// Spawn from `ArclyPlugin::on_start`. The dispatch map is frozen at spawn —
/// the polling loop performs immutable `HashMap` reads only.
pub struct ConsumerRuntime {
    pub transport: Arc<dyn MessageTransport>,
    pub container: &'static FrozenDiContainer,
    pub poll: Duration,
    pub batch: usize,
    /// Failed deliveries per message before dead-lettering.
    pub max_retries: u32,
    /// TTL for the consume-side dedupe claim (when an `IdempotencyStore`
    /// is available in the DI container).
    pub dedupe_ttl_secs: u64,
}

impl ConsumerRuntime {
    pub fn spawn(self) {
        // Freeze the dispatch map from the inventory registry — once, here.
        let dispatch: HashMap<&'static str, Handler> =
            inventory::iter::<&'static EventHandlerDescriptor>
                .into_iter()
                .map(|d| (d.topic, d.handler))
                .collect();

        for d in inventory::iter::<&'static EventHandlerDescriptor> {
            tracing::info!(
                topic = d.topic,
                consumer = d.consumer,
                "event handler registered"
            );
        }

        tokio::spawn(async move {
            // Per-key failure counts. Single-task state — no shared locks.
            let mut attempts: HashMap<String, u32> = HashMap::new();
            let mut tick = tokio::time::interval(self.poll);

            loop {
                tick.tick().await;
                let batch = match self.transport.poll(self.batch).await {
                    Ok(b) => b,
                    Err(e) => {
                        tracing::warn!(error = %e, "transport poll failed — retrying next tick");
                        continue;
                    }
                };

                for msg in batch {
                    self.process(&dispatch, &mut attempts, msg).await;
                }
            }
        });
    }

    async fn process(
        &self,
        dispatch: &HashMap<&'static str, Handler>,
        attempts: &mut HashMap<String, u32>,
        msg: InboundMessage,
    ) {
        let Some(handler) = dispatch.get(msg.topic.as_str()) else {
            // No subscriber for this topic — ack so it doesn't loop forever.
            tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
            let _ = self.transport.ack(&msg).await;
            return;
        };

        // Consume-side dedupe (at-least-once → effectively-once per TTL).
        // Same contract as the HTTP idempotency layer: claim before running,
        // `complete` only AFTER success, `release` on failure — so retries of
        // a failed delivery pass through instead of being replay-swallowed.
        let store = self
            .container
            .try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
        let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
        if let Some(store) = store {
            match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
                crate::web::idempotency::IdempotencyDecision::Fresh => {}
                crate::web::idempotency::IdempotencyDecision::Unavailable => {}
                crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
                    metrics::counter!("events_deduped_total").increment(1);
                    let _ = self.transport.ack(&msg).await;
                    return;
                }
                crate::web::idempotency::IdempotencyDecision::InFlight => {
                    // Our own released-then-retried claim or a concurrent
                    // consumer; requeue and let the next poll settle it.
                    let _ = self.transport.nack(&msg).await;
                    return;
                }
            }
        }

        // Continue the producer's trace; fresh root when none was carried.
        let trace = msg
            .traceparent
            .as_deref()
            .and_then(TraceContext::from_traceparent)
            .unwrap_or_else(TraceContext::new_root);

        let ctx = EventContext {
            message: msg.clone(),
            container: self.container,
            trace,
        };

        match handler(ctx).await {
            Ok(()) => {
                attempts.remove(&msg.idempotency_key);
                if let Some(store) = store {
                    store
                        .complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
                        .await;
                }
                metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
                    .increment(1);
                if let Err(e) = self.transport.ack(&msg).await {
                    tracing::warn!(error = %e, "ack failed — message may redeliver");
                }
            }
            Err(reason) => {
                if let Some(store) = store {
                    store.release(&dedupe_key).await; // allow the retry through
                }
                let n = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
                *n += 1;
                if *n > self.max_retries {
                    attempts.remove(&msg.idempotency_key);
                    metrics::counter!("events_dead_lettered_total").increment(1);
                    tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
                        attempts = self.max_retries, reason = %reason,
                        "poison message → dead letter");
                    // PII never parks raw in the DLQ: dead-lettered payloads
                    // are masked first (the queue copy already served its
                    // delivery purpose; the DLQ copy is for forensics).
                    let mut parked = msg.clone();
                    if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
                        masker.apply(&mut parked.payload);
                    }
                    let _ = self.transport.dead_letter(&parked, &reason).await;
                } else {
                    metrics::counter!("events_retried_total").increment(1);
                    tracing::warn!(topic = %msg.topic, attempt = *n, reason = %reason,
                        "event handler failed — nack for retry");
                    let _ = self.transport.nack(&msg).await;
                }
            }
        }
    }
}