Skip to main content

arcly_http/observability/
audit.rs

1//! Compliance-grade audit trail.
2//!
3//! Answers the regulator's question — *who did what, to which resource, when,
4//! from where, and did it succeed* — for every annotated mutation, on a
5//! tamper-evident (hash-chained) append-only stream.
6//!
7//! ## Hot-path contract
8//!
9//! [`AuditPipeline::record`] is a single `try_send` onto a bounded MPSC
10//! channel: O(1), lock-free, never blocks the request. A background worker
11//! (spawned once at construction) batches records, computes the hash chain,
12//! and flushes to the app-provided [`AuditSink`] (Postgres append-only table,
13//! Kafka, S3 WORM bucket, …). If the channel is full the record is dropped
14//! and `audit_dropped_total` is incremented — alert on it; silently blocking
15//! requests on a slow sink would be the worse failure mode.
16//!
17//! ## Usage
18//!
19//! ```ignore
20//! // boot:
21//! ctx.provide(AuditPipeline::new(Arc::new(PgAuditSink::new(pool)), 8192, 64));
22//!
23//! // handler — declarative:
24//! #[Delete("/:id", status(204), security("bearer"))]
25//! #[AuditLog(action = "user.delete", resource = "user")]
26//! async fn delete_user(ctx: RequestContext, #[Param("id")] id: u64) -> Result<Json<Value>, HttpException> { /* ... */ }
27//! ```
28
29use std::sync::Arc;
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use futures::future::BoxFuture;
33use sha2::{Digest, Sha256};
34use tokio::sync::mpsc;
35
36use crate::web::context::RequestContext;
37
38// ─── Record ───────────────────────────────────────────────────────────────────
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum AuditOutcome {
43    Success,
44    Denied,
45    Error,
46}
47
48/// One immutable audit entry. `prev_hash` chains each record to its
49/// predecessor (SHA-256), so post-hoc tampering breaks the chain visibly.
50#[derive(Clone, Debug, serde::Serialize)]
51pub struct AuditRecord {
52    pub timestamp_ms: u64,
53    pub action: &'static str,
54    pub resource: &'static str,
55    pub actor_sub: Option<String>,
56    pub actor_role: Option<String>,
57    pub tenant: Option<String>,
58    pub trace_id: String,
59    pub method: String,
60    pub path: String,
61    pub outcome: AuditOutcome,
62    pub status: u16,
63    /// Filled by the pipeline worker — hash of the previous record's
64    /// serialized form. Empty for the first record after boot.
65    pub prev_hash: String,
66}
67
68// ─── Sink ─────────────────────────────────────────────────────────────────────
69
70/// Durable destination, implemented by the app. Must be append-only in
71/// production deployments — the framework never updates or deletes records.
72pub trait AuditSink: Send + Sync + 'static {
73    fn write_batch<'a>(&'a self, records: &'a [AuditRecord]) -> BoxFuture<'a, Result<(), String>>;
74}
75
76// ─── Pipeline ─────────────────────────────────────────────────────────────────
77
78/// Lock-free front door for audit records.
79///
80/// Not `#[Injectable]` — provide via `ctx.provide(AuditPipeline::new(...))`.
81/// When absent from the DI container, `#[AuditLog]` routes are a no-op.
82pub struct AuditPipeline {
83    tx: mpsc::Sender<AuditRecord>,
84}
85
86impl AuditPipeline {
87    /// `capacity` bounds hot-path memory; `flush_batch` caps records per sink
88    /// write. The worker drains greedily, so latency under light load is one
89    /// scheduler tick, not a timer.
90    pub fn new(sink: Arc<dyn AuditSink>, capacity: usize, flush_batch: usize) -> Self {
91        let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity);
92
93        tokio::spawn(async move {
94            let mut buf: Vec<AuditRecord> = Vec::with_capacity(flush_batch);
95            let mut prev_hash = String::new();
96
97            while rx.recv_many(&mut buf, flush_batch).await > 0 {
98                // Hash-chain inside the single worker — strictly ordered,
99                // no cross-thread coordination needed.
100                for rec in buf.iter_mut() {
101                    rec.prev_hash = std::mem::take(&mut prev_hash);
102                    let serialized = serde_json::to_vec(&rec).unwrap_or_default();
103                    prev_hash = hex(&Sha256::digest(&serialized));
104                }
105                if let Err(e) = sink.write_batch(&buf).await {
106                    metrics::counter!("audit_sink_errors_total").increment(1);
107                    tracing::error!(error = %e, lost = buf.len(), "audit sink write failed");
108                }
109                buf.clear();
110            }
111        });
112
113        Self { tx }
114    }
115
116    /// O(1), lock-free, never blocks. Drops (and counts) when the buffer is
117    /// full rather than stalling request handling.
118    #[inline]
119    pub fn record(&self, rec: AuditRecord) {
120        if self.tx.try_send(rec).is_err() {
121            metrics::counter!("audit_dropped_total").increment(1);
122        }
123    }
124}
125
126fn hex(bytes: &[u8]) -> String {
127    bytes.iter().map(|b| format!("{b:02x}")).collect()
128}
129
130// ─── Macro support ────────────────────────────────────────────────────────────
131
132/// Called by the `#[AuditLog]` expansion after the response is produced.
133/// Maps HTTP status → outcome: 2xx/3xx = Success, 401/403 = Denied,
134/// everything else = Error.
135#[doc(hidden)]
136pub fn emit_route_audit(
137    ctx: &RequestContext,
138    action: &'static str,
139    resource: &'static str,
140    status: u16,
141) {
142    let Some(pipeline) = ctx.try_inject::<AuditPipeline>() else {
143        return;
144    };
145
146    let outcome = match status {
147        200..=399 => AuditOutcome::Success,
148        401 | 403 => AuditOutcome::Denied,
149        _ => AuditOutcome::Error,
150    };
151
152    let claims = ctx.claims();
153    pipeline.record(AuditRecord {
154        timestamp_ms: SystemTime::now()
155            .duration_since(UNIX_EPOCH)
156            .map(|d| d.as_millis() as u64)
157            .unwrap_or(0),
158        action,
159        resource,
160        actor_sub: claims
161            .and_then(|c| c.get("sub"))
162            .and_then(|v| v.as_str())
163            .map(str::to_owned),
164        actor_role: claims
165            .and_then(|c| c.get("role"))
166            .and_then(|v| v.as_str())
167            .map(str::to_owned),
168        tenant: ctx.tenant().map(|t| t.id.as_str().to_owned()),
169        trace_id: ctx.trace_id_hex(),
170        method: ctx.method().to_string(),
171        path: ctx.path().to_owned(),
172        outcome,
173        status,
174        prev_hash: String::new(), // assigned by the pipeline worker
175    });
176}