arcly-http 0.2.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Consumer runtime: poll → dedupe → dispatch → ack / retry / dead-letter.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use futures::future::BoxFuture;

use crate::core::engine::FrozenDiContainer;
use crate::messaging::{
    EventContext, EventError, EventHandlerDescriptor, InboundMessage, MessageTransport,
};

type Handler = fn(EventContext) -> BoxFuture<'static, Result<(), EventError>>;

/// Drives one transport against the link-time handler registry.
///
/// Spawn from `ArclyPlugin::on_start`. The dispatch map is frozen at spawn —
/// the polling loop performs immutable `HashMap` reads only.
#[non_exhaustive]
pub struct ConsumerRuntime {
    pub transport: Arc<dyn MessageTransport>,
    pub container: &'static FrozenDiContainer,
    pub poll: Duration,
    pub batch: usize,
    /// Failed deliveries per message before dead-lettering.
    pub max_retries: u32,
    /// TTL for the consume-side dedupe claim (when an `IdempotencyStore`
    /// is available in the DI container).
    pub dedupe_ttl_secs: u64,
    /// Messages processed concurrently per polled batch. `0`/`1` keeps the
    /// historical strictly-ordered sequential behaviour; higher values trade
    /// cross-message ordering within a batch for throughput (per-message
    /// dedupe and retries are unaffected — `attempts` is a sharded map).
    pub concurrency: usize,
}

impl ConsumerRuntime {
    /// Sensible defaults: 300ms poll, batch 32, 3 retries, 1h dedupe TTL,
    /// sequential processing. Chain setters to adjust.
    pub fn new(
        transport: Arc<dyn MessageTransport>,
        container: &'static FrozenDiContainer,
    ) -> Self {
        Self {
            transport,
            container,
            poll: Duration::from_millis(300),
            batch: 32,
            max_retries: 3,
            dedupe_ttl_secs: 3600,
            concurrency: 1,
        }
    }
    pub fn poll(mut self, v: Duration) -> Self {
        self.poll = v;
        self
    }
    pub fn batch(mut self, v: usize) -> Self {
        self.batch = v;
        self
    }
    pub fn max_retries(mut self, v: u32) -> Self {
        self.max_retries = v;
        self
    }
    pub fn dedupe_ttl_secs(mut self, v: u64) -> Self {
        self.dedupe_ttl_secs = v;
        self
    }
    pub fn concurrency(mut self, v: usize) -> Self {
        self.concurrency = v;
        self
    }

    pub fn spawn(self) {
        // Freeze the dispatch map from the inventory registry — once, here.
        let dispatch: HashMap<&'static str, Handler> =
            inventory::iter::<&'static EventHandlerDescriptor>
                .into_iter()
                .map(|d| (d.topic, d.handler))
                .collect();

        for d in inventory::iter::<&'static EventHandlerDescriptor> {
            tracing::info!(
                topic = d.topic,
                consumer = d.consumer,
                "event handler registered"
            );
        }

        let runtime = Arc::new(self);
        tokio::spawn(async move {
            // Per-key failure counts — sharded so concurrent processing
            // never contends on a single lock.
            let attempts: Arc<dashmap::DashMap<String, u32>> = Arc::new(dashmap::DashMap::new());
            let mut tick = tokio::time::interval(runtime.poll);
            let limit = runtime.concurrency.max(1);

            loop {
                tick.tick().await;
                // Drain-aware (like the outbox relay): once shutdown begins,
                // stop claiming work — unacked messages redeliver elsewhere.
                if crate::observability::health::is_draining() {
                    tracing::info!("consumer runtime: drain flag set — stopping");
                    return;
                }
                let batch = match runtime.transport.poll(runtime.batch).await {
                    Ok(b) => b,
                    Err(e) => {
                        tracing::warn!(error = %e, "transport poll failed — retrying next tick");
                        continue;
                    }
                };

                use futures::StreamExt;
                futures::stream::iter(batch)
                    .for_each_concurrent(limit, |msg| {
                        let runtime = Arc::clone(&runtime);
                        let attempts = Arc::clone(&attempts);
                        let dispatch = &dispatch;
                        async move {
                            runtime.process(dispatch, &attempts, msg).await;
                        }
                    })
                    .await;
            }
        });
    }

    async fn process(
        &self,
        dispatch: &HashMap<&'static str, Handler>,
        attempts: &dashmap::DashMap<String, u32>,
        msg: InboundMessage,
    ) {
        let Some(handler) = dispatch.get(msg.topic.as_str()) else {
            // No subscriber for this topic — ack so it doesn't loop forever.
            tracing::debug!(topic = %msg.topic, "no handler — acking unrouted message");
            let _ = self.transport.ack(&msg).await;
            return;
        };

        // Consume-side dedupe (at-least-once → effectively-once per TTL).
        // Same contract as the HTTP idempotency layer: claim before running,
        // `complete` only AFTER success, `release` on failure — so retries of
        // a failed delivery pass through instead of being replay-swallowed.
        let store = self
            .container
            .try_get::<Box<dyn crate::web::idempotency::IdempotencyStore>>();
        let dedupe_key = format!("consume:{}:{}", msg.topic, msg.idempotency_key);
        if let Some(store) = store {
            match store.claim(&dedupe_key, self.dedupe_ttl_secs).await {
                crate::web::idempotency::IdempotencyDecision::Fresh => {}
                crate::web::idempotency::IdempotencyDecision::Unavailable => {}
                crate::web::idempotency::IdempotencyDecision::Replay { .. } => {
                    metrics::counter!("events_deduped_total").increment(1);
                    let _ = self.transport.ack(&msg).await;
                    return;
                }
                crate::web::idempotency::IdempotencyDecision::InFlight => {
                    // Our own released-then-retried claim or a concurrent
                    // consumer; requeue and let the next poll settle it.
                    let _ = self.transport.nack(&msg).await;
                    return;
                }
            }
        }

        // One shared extraction (pipeline::Provenance): the producer's trace
        // continues (fresh root when none was carried) and the envelope's
        // tenant id is validated against the SAME registry as HTTP traffic.
        let provenance = crate::pipeline::Provenance::from_message(&msg, self.container);

        // A suspended (or unknown) tenant's queued events must stop being
        // processed, exactly like its HTTP requests — park them out of band
        // rather than retry-looping or silently processing. Only enforced
        // when a TenantRegistry is actually configured.
        if msg.tenant.is_some()
            && provenance.tenant.is_none()
            && self
                .container
                .try_get::<crate::web::tenant::TenantRegistry>()
                .is_some()
        {
            metrics::counter!("events_tenant_rejected_total").increment(1);
            tracing::warn!(
                topic = %msg.topic,
                tenant = msg.tenant.as_deref().unwrap_or(""),
                "event tenant suspended or unknown — dead-lettering"
            );
            if let Some(store) = store {
                store.release(&dedupe_key).await;
            }
            let _ = self
                .transport
                .dead_letter(&msg, "tenant suspended or unknown")
                .await;
            return;
        }

        let ctx = EventContext {
            message: msg.clone(),
            container: self.container,
            trace: provenance.trace,
            tenant: provenance.tenant,
        };

        match handler(ctx).await {
            Ok(()) => {
                attempts.remove(&msg.idempotency_key);
                if let Some(store) = store {
                    store
                        .complete(&dedupe_key, 200, b"", self.dedupe_ttl_secs)
                        .await;
                }
                metrics::counter!("events_consumed_total", "topic" => msg.topic.clone())
                    .increment(1);
                if let Err(e) = self.transport.ack(&msg).await {
                    tracing::warn!(error = %e, "ack failed — message may redeliver");
                }
            }
            Err(error) => {
                if let Some(store) = store {
                    store.release(&dedupe_key).await; // allow the retry through
                }
                // Typed fate: a permanent failure skips the retry budget and
                // parks immediately; transient failures take the bounded
                // retry path below.
                let (reason, poison) = match error {
                    EventError::DeadLetter(m) => {
                        metrics::counter!("events_poisoned_total").increment(1);
                        (m, true)
                    }
                    EventError::Retry(m) => (m, false),
                };
                // Bump-and-read, dropping the shard guard BEFORE any await
                // below (dead_letter / nack are async).
                let n = {
                    let mut entry = attempts.entry(msg.idempotency_key.clone()).or_insert(0);
                    *entry += 1;
                    *entry
                };
                if poison || n > self.max_retries {
                    attempts.remove(&msg.idempotency_key);
                    metrics::counter!("events_dead_lettered_total").increment(1);
                    tracing::error!(topic = %msg.topic, key = %msg.idempotency_key,
                        attempts = n, reason = %reason,
                        "poison message → dead letter");
                    // PII never parks raw in the DLQ: dead-lettered payloads
                    // are masked first (the queue copy already served its
                    // delivery purpose; the DLQ copy is for forensics).
                    let mut parked = msg.clone();
                    if let Some(masker) = self.container.try_get::<crate::compliance::Masker>() {
                        masker.apply(&mut parked.payload);
                    }
                    let _ = self.transport.dead_letter(&parked, &reason).await;
                } else {
                    metrics::counter!("events_retried_total").increment(1);
                    tracing::warn!(topic = %msg.topic, attempt = n, reason = %reason,
                        "event handler failed — nack for retry");
                    let _ = self.transport.nack(&msg).await;
                }
            }
        }
    }
}