arcly_http/observability/audit.rs
1//! Compliance-grade audit trail.
2//!
3//! Answers the regulator's question — *who did what, to which resource, when,
4//! from where, and did it succeed* — for every annotated mutation, on a
5//! tamper-evident (hash-chained) append-only stream.
6//!
7//! ## Hot-path contract
8//!
9//! [`AuditPipeline::record`] is a single `try_send` onto a bounded MPSC
10//! channel: O(1), lock-free, never blocks the request. A background worker
11//! (spawned once at construction) batches records, computes the hash chain,
12//! and flushes to the app-provided [`AuditSink`] (Postgres append-only table,
13//! Kafka, S3 WORM bucket, …). If the channel is full the record is dropped
14//! and `audit_dropped_total` is incremented — alert on it; silently blocking
15//! requests on a slow sink would be the worse failure mode.
16//!
17//! ## Usage
18//!
19//! ```ignore
20//! // boot:
21//! ctx.provide(AuditPipeline::new(Arc::new(PgAuditSink::new(pool)), 8192, 64));
22//!
23//! // handler — declarative:
24//! #[Delete("/:id", status(204), security("bearer"))]
25//! #[AuditLog(action = "user.delete", resource = "user")]
26//! async fn delete_user(ctx: RequestContext, #[Param("id")] id: u64) -> Result<Json<Value>, HttpException> { /* ... */ }
27//! ```
28
29use std::sync::Arc;
30use std::time::{SystemTime, UNIX_EPOCH};
31
32use futures::future::BoxFuture;
33use sha2::{Digest, Sha256};
34use tokio::sync::mpsc;
35
36use crate::web::context::RequestContext;
37
38// ─── Record ───────────────────────────────────────────────────────────────────
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)]
41#[serde(rename_all = "snake_case")]
42pub enum AuditOutcome {
43 Success,
44 Denied,
45 Error,
46}
47
48/// One immutable audit entry. `prev_hash` chains each record to its
49/// predecessor (SHA-256), so post-hoc tampering breaks the chain visibly.
50#[derive(Clone, Debug, serde::Serialize)]
51pub struct AuditRecord {
52 pub timestamp_ms: u64,
53 pub action: &'static str,
54 pub resource: &'static str,
55 pub actor_sub: Option<String>,
56 pub actor_role: Option<String>,
57 pub tenant: Option<String>,
58 pub trace_id: String,
59 pub method: String,
60 pub path: String,
61 pub outcome: AuditOutcome,
62 pub status: u16,
63 /// Filled by the pipeline worker — hash of the previous record's
64 /// serialized form. Empty for the first record after boot.
65 pub prev_hash: String,
66}
67
68// ─── Sink ─────────────────────────────────────────────────────────────────────
69
70/// Durable destination, implemented by the app. Must be append-only in
71/// production deployments — the framework never updates or deletes records.
72pub trait AuditSink: Send + Sync + 'static {
73 fn write_batch<'a>(
74 &'a self,
75 records: &'a [AuditRecord],
76 ) -> BoxFuture<'a, Result<(), crate::messaging::BoxError>>;
77}
78
79// ─── Pipeline ─────────────────────────────────────────────────────────────────
80
81/// Lock-free front door for audit records.
82///
83/// Not `#[Injectable]` — provide via `ctx.provide(AuditPipeline::new(...))`.
84/// When absent from the DI container, `#[AuditLog]` routes are a no-op.
85pub struct AuditPipeline {
86 tx: mpsc::Sender<AuditRecord>,
87}
88
89impl AuditPipeline {
90 /// `capacity` bounds hot-path memory; `flush_batch` caps records per sink
91 /// write. The worker drains greedily, so latency under light load is one
92 /// scheduler tick, not a timer.
93 pub fn new(sink: Arc<dyn AuditSink>, capacity: usize, flush_batch: usize) -> Self {
94 let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity);
95
96 tokio::spawn(async move {
97 let mut buf: Vec<AuditRecord> = Vec::with_capacity(flush_batch);
98 let mut prev_hash = String::new();
99
100 while rx.recv_many(&mut buf, flush_batch).await > 0 {
101 // Hash-chain inside the single worker — strictly ordered,
102 // no cross-thread coordination needed.
103 for rec in buf.iter_mut() {
104 rec.prev_hash = std::mem::take(&mut prev_hash);
105 let serialized = serde_json::to_vec(&rec).unwrap_or_default();
106 prev_hash = hex(&Sha256::digest(&serialized));
107 }
108 if let Err(e) = sink.write_batch(&buf).await {
109 metrics::counter!("audit_sink_errors_total").increment(1);
110 tracing::error!(error = %e, lost = buf.len(), "audit sink write failed");
111 }
112 buf.clear();
113 }
114 });
115
116 Self { tx }
117 }
118
119 /// O(1), lock-free, never blocks. Drops (and counts) when the buffer is
120 /// full rather than stalling request handling.
121 #[inline]
122 pub fn record(&self, rec: AuditRecord) {
123 if self.tx.try_send(rec).is_err() {
124 metrics::counter!("audit_dropped_total").increment(1);
125 }
126 }
127}
128
129fn hex(bytes: &[u8]) -> String {
130 bytes.iter().map(|b| format!("{b:02x}")).collect()
131}
132
133// ─── Macro support ────────────────────────────────────────────────────────────
134
135/// Called by the `#[AuditLog]` expansion after the response is produced.
136/// Maps HTTP status → outcome: 2xx/3xx = Success, 401/403 = Denied,
137/// everything else = Error.
138#[doc(hidden)]
139pub fn emit_route_audit(
140 ctx: &RequestContext,
141 action: &'static str,
142 resource: &'static str,
143 status: u16,
144) {
145 let Some(pipeline) = ctx.try_inject::<AuditPipeline>() else {
146 return;
147 };
148
149 let outcome = match status {
150 200..=399 => AuditOutcome::Success,
151 401 | 403 => AuditOutcome::Denied,
152 _ => AuditOutcome::Error,
153 };
154
155 let claims = ctx.claims();
156 pipeline.record(AuditRecord {
157 timestamp_ms: SystemTime::now()
158 .duration_since(UNIX_EPOCH)
159 .map(|d| d.as_millis() as u64)
160 .unwrap_or(0),
161 action,
162 resource,
163 actor_sub: claims
164 .and_then(|c| c.get("sub"))
165 .and_then(|v| v.as_str())
166 .map(str::to_owned),
167 actor_role: claims
168 .and_then(|c| c.get("role"))
169 .and_then(|v| v.as_str())
170 .map(str::to_owned),
171 tenant: ctx.tenant().map(|t| t.id.as_str().to_owned()),
172 trace_id: ctx.trace_id_hex(),
173 method: ctx.method().to_string(),
174 path: ctx.path().to_owned(),
175 outcome,
176 status,
177 prev_hash: String::new(), // assigned by the pipeline worker
178 });
179}