Skip to main content

arcly_http/observability/
audit.rs

1//! Compliance-grade audit trail.
2//!
3//! Answers the regulator's question — *who did what, to which resource, when,
4//! from where, and did it succeed* — for every annotated mutation, on a
5//! tamper-evident (hash-chained) append-only stream.
6//!
7//! ## Hot-path contract
8//!
9//! [`AuditPipeline::record`] is a single `try_send` onto a bounded MPSC
10//! channel: O(1), lock-free, never blocks the request. A background worker
11//! (spawned once at construction) batches records, computes the hash chain,
12//! and flushes to the app-provided [`AuditSink`] (Postgres append-only table,
13//! Kafka, S3 WORM bucket, …). If the channel is full the record is dropped
14//! and `audit_dropped_total` is incremented — alert on it; silently blocking
15//! requests on a slow sink would be the worse failure mode.
16//!
17//! ## Usage
18//!
19//! ```ignore
20//! // boot:
21//! ctx.provide(AuditPipeline::new(Arc::new(PgAuditSink::new(pool)), 8192, 64));
22//!
23//! // handler — declarative:
24//! #[Delete("/:id", status(204), security("bearer"))]
25//! #[AuditLog(action = "user.delete", resource = "user")]
26//! async fn delete_user(ctx: RequestContext, #[Param("id")] id: u64) -> Result<Json<Value>, HttpException> { /* ... */ }
27//! ```
28
29use std::sync::Arc;
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use futures::future::BoxFuture;
33use sha2::{Digest, Sha256};
34use tokio::sync::mpsc;
35
36use crate::web::context::RequestContext;
37
38// ─── Record ───────────────────────────────────────────────────────────────────
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum AuditOutcome {
43    Success,
44    Denied,
45    Error,
46}
47
48/// One immutable audit entry. `prev_hash` chains each record to its
49/// predecessor (SHA-256), so post-hoc tampering breaks the chain visibly.
50#[derive(Clone, Debug, serde::Serialize)]
51pub struct AuditRecord {
52    pub timestamp_ms: u64,
53    pub action: &'static str,
54    pub resource: &'static str,
55    pub actor_sub: Option<String>,
56    pub actor_role: Option<String>,
57    pub tenant: Option<String>,
58    pub trace_id: String,
59    pub method: String,
60    pub path: String,
61    pub outcome: AuditOutcome,
62    pub status: u16,
63    /// Filled by the pipeline worker — hash of the previous record's
64    /// serialized form. Empty for the first record after boot.
65    pub prev_hash: String,
66}
67
68// ─── Sink ─────────────────────────────────────────────────────────────────────
69
70/// Durable destination, implemented by the app. Must be append-only in
71/// production deployments — the framework never updates or deletes records.
72pub trait AuditSink: Send + Sync + 'static {
73    fn write_batch<'a>(
74        &'a self,
75        records: &'a [AuditRecord],
76    ) -> BoxFuture<'a, Result<(), crate::messaging::BoxError>>;
77}
78
79// ─── Pipeline ─────────────────────────────────────────────────────────────────
80
81/// Lock-free front door for audit records.
82///
83/// Not `#[Injectable]` — provide via `ctx.provide(AuditPipeline::new(...))`.
84/// When absent from the DI container, `#[AuditLog]` routes are a no-op.
85pub struct AuditPipeline {
86    tx: mpsc::Sender<AuditRecord>,
87}
88
89impl AuditPipeline {
90    /// `capacity` bounds hot-path memory; `flush_batch` caps records per sink
91    /// write. The worker drains greedily, so latency under light load is one
92    /// scheduler tick, not a timer.
93    pub fn new(sink: Arc<dyn AuditSink>, capacity: usize, flush_batch: usize) -> Self {
94        let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity);
95
96        tokio::spawn(async move {
97            let mut buf: Vec<AuditRecord> = Vec::with_capacity(flush_batch);
98            let mut prev_hash = String::new();
99
100            while rx.recv_many(&mut buf, flush_batch).await > 0 {
101                // Hash-chain inside the single worker — strictly ordered,
102                // no cross-thread coordination needed.
103                for rec in buf.iter_mut() {
104                    rec.prev_hash = std::mem::take(&mut prev_hash);
105                    let serialized = serde_json::to_vec(&rec).unwrap_or_default();
106                    prev_hash = hex(&Sha256::digest(&serialized));
107                }
108                if let Err(e) = sink.write_batch(&buf).await {
109                    metrics::counter!("audit_sink_errors_total").increment(1);
110                    tracing::error!(error = %e, lost = buf.len(), "audit sink write failed");
111                }
112                buf.clear();
113            }
114        });
115
116        Self { tx }
117    }
118
119    /// O(1), lock-free, never blocks. Drops (and counts) when the buffer is
120    /// full rather than stalling request handling.
121    #[inline]
122    pub fn record(&self, rec: AuditRecord) {
123        if self.tx.try_send(rec).is_err() {
124            metrics::counter!("audit_dropped_total").increment(1);
125        }
126    }
127}
128
129fn hex(bytes: &[u8]) -> String {
130    bytes.iter().map(|b| format!("{b:02x}")).collect()
131}
132
133// ─── Macro support ────────────────────────────────────────────────────────────
134
135/// Called by the `#[AuditLog]` expansion after the response is produced.
136/// Maps HTTP status → outcome: 2xx/3xx = Success, 401/403 = Denied,
137/// everything else = Error.
138#[doc(hidden)]
139pub fn emit_route_audit(
140    ctx: &RequestContext,
141    action: &'static str,
142    resource: &'static str,
143    status: u16,
144) {
145    let Some(pipeline) = ctx.try_inject::<AuditPipeline>() else {
146        return;
147    };
148
149    let outcome = match status {
150        200..=399 => AuditOutcome::Success,
151        401 | 403 => AuditOutcome::Denied,
152        _ => AuditOutcome::Error,
153    };
154
155    let claims = ctx.claims();
156    pipeline.record(AuditRecord {
157        timestamp_ms: SystemTime::now()
158            .duration_since(UNIX_EPOCH)
159            .map(|d| d.as_millis() as u64)
160            .unwrap_or(0),
161        action,
162        resource,
163        actor_sub: claims
164            .and_then(|c| c.get("sub"))
165            .and_then(|v| v.as_str())
166            .map(str::to_owned),
167        actor_role: claims
168            .and_then(|c| c.get("role"))
169            .and_then(|v| v.as_str())
170            .map(str::to_owned),
171        tenant: ctx.tenant().map(|t| t.id.as_str().to_owned()),
172        trace_id: ctx.trace_id_hex(),
173        method: ctx.method().to_string(),
174        path: ctx.path().to_owned(),
175        outcome,
176        status,
177        prev_hash: String::new(), // assigned by the pipeline worker
178    });
179}