arcly-http 0.1.1

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Compliance-grade audit trail.
//!
//! Answers the regulator's question — *who did what, to which resource, when,
//! from where, and did it succeed* — for every annotated mutation, on a
//! tamper-evident (hash-chained) append-only stream.
//!
//! ## Hot-path contract
//!
//! [`AuditPipeline::record`] is a single `try_send` onto a bounded MPSC
//! channel: O(1), lock-free, never blocks the request. A background worker
//! (spawned once at construction) batches records, computes the hash chain,
//! and flushes to the app-provided [`AuditSink`] (Postgres append-only table,
//! Kafka, S3 WORM bucket, …). If the channel is full the record is dropped
//! and `audit_dropped_total` is incremented — alert on it; silently blocking
//! requests on a slow sink would be the worse failure mode.
//!
//! ## Usage
//!
//! ```ignore
//! // boot:
//! ctx.provide(AuditPipeline::new(Arc::new(PgAuditSink::new(pool)), 8192, 64));
//!
//! // handler — declarative:
//! #[Delete("/:id", status(204), security("bearer"))]
//! #[AuditLog(action = "user.delete", resource = "user")]
//! async fn delete_user(ctx: RequestContext, #[Param("id")] id: u64) -> Result<Json<Value>, HttpException> { /* ... */ }
//! ```

use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use futures::future::BoxFuture;
use sha2::{Digest, Sha256};
use tokio::sync::mpsc;

use crate::web::context::RequestContext;

// ─── Record ───────────────────────────────────────────────────────────────────

#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditOutcome {
    Success,
    Denied,
    Error,
}

/// One immutable audit entry. `prev_hash` chains each record to its
/// predecessor (SHA-256), so post-hoc tampering breaks the chain visibly.
#[derive(Clone, Debug, serde::Serialize)]
pub struct AuditRecord {
    pub timestamp_ms: u64,
    pub action: &'static str,
    pub resource: &'static str,
    pub actor_sub: Option<String>,
    pub actor_role: Option<String>,
    pub tenant: Option<String>,
    pub trace_id: String,
    pub method: String,
    pub path: String,
    pub outcome: AuditOutcome,
    pub status: u16,
    /// Filled by the pipeline worker — hash of the previous record's
    /// serialized form. Empty for the first record after boot.
    pub prev_hash: String,
}

// ─── Sink ─────────────────────────────────────────────────────────────────────

/// Durable destination, implemented by the app. Must be append-only in
/// production deployments — the framework never updates or deletes records.
pub trait AuditSink: Send + Sync + 'static {
    fn write_batch<'a>(&'a self, records: &'a [AuditRecord]) -> BoxFuture<'a, Result<(), String>>;
}

// ─── Pipeline ─────────────────────────────────────────────────────────────────

/// Lock-free front door for audit records.
///
/// Not `#[Injectable]` — provide via `ctx.provide(AuditPipeline::new(...))`.
/// When absent from the DI container, `#[AuditLog]` routes are a no-op.
pub struct AuditPipeline {
    tx: mpsc::Sender<AuditRecord>,
}

impl AuditPipeline {
    /// `capacity` bounds hot-path memory; `flush_batch` caps records per sink
    /// write. The worker drains greedily, so latency under light load is one
    /// scheduler tick, not a timer.
    pub fn new(sink: Arc<dyn AuditSink>, capacity: usize, flush_batch: usize) -> Self {
        let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity);

        tokio::spawn(async move {
            let mut buf: Vec<AuditRecord> = Vec::with_capacity(flush_batch);
            let mut prev_hash = String::new();

            while rx.recv_many(&mut buf, flush_batch).await > 0 {
                // Hash-chain inside the single worker — strictly ordered,
                // no cross-thread coordination needed.
                for rec in buf.iter_mut() {
                    rec.prev_hash = std::mem::take(&mut prev_hash);
                    let serialized = serde_json::to_vec(&rec).unwrap_or_default();
                    prev_hash = hex(&Sha256::digest(&serialized));
                }
                if let Err(e) = sink.write_batch(&buf).await {
                    metrics::counter!("audit_sink_errors_total").increment(1);
                    tracing::error!(error = %e, lost = buf.len(), "audit sink write failed");
                }
                buf.clear();
            }
        });

        Self { tx }
    }

    /// O(1), lock-free, never blocks. Drops (and counts) when the buffer is
    /// full rather than stalling request handling.
    #[inline]
    pub fn record(&self, rec: AuditRecord) {
        if self.tx.try_send(rec).is_err() {
            metrics::counter!("audit_dropped_total").increment(1);
        }
    }
}

fn hex(bytes: &[u8]) -> String {
    bytes.iter().map(|b| format!("{b:02x}")).collect()
}

// ─── Macro support ────────────────────────────────────────────────────────────

/// Called by the `#[AuditLog]` expansion after the response is produced.
/// Maps HTTP status → outcome: 2xx/3xx = Success, 401/403 = Denied,
/// everything else = Error.
#[doc(hidden)]
pub fn emit_route_audit(
    ctx: &RequestContext,
    action: &'static str,
    resource: &'static str,
    status: u16,
) {
    let Some(pipeline) = ctx.try_inject::<AuditPipeline>() else {
        return;
    };

    let outcome = match status {
        200..=399 => AuditOutcome::Success,
        401 | 403 => AuditOutcome::Denied,
        _ => AuditOutcome::Error,
    };

    let claims = ctx.claims();
    pipeline.record(AuditRecord {
        timestamp_ms: SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .map(|d| d.as_millis() as u64)
            .unwrap_or(0),
        action,
        resource,
        actor_sub: claims
            .and_then(|c| c.get("sub"))
            .and_then(|v| v.as_str())
            .map(str::to_owned),
        actor_role: claims
            .and_then(|c| c.get("role"))
            .and_then(|v| v.as_str())
            .map(str::to_owned),
        tenant: ctx.tenant().map(|t| t.id.as_str().to_owned()),
        trace_id: ctx.trace_id_hex(),
        method: ctx.method().to_string(),
        path: ctx.path().to_owned(),
        outcome,
        status,
        prev_hash: String::new(), // assigned by the pipeline worker
    });
}