Skip to main content

cratestack_sqlx/
audit.rs

1//! Postgres-backed audit log.
2//!
3//! Audit rows are written inside the same transaction as the mutation they
4//! describe, guaranteeing that you can never see a committed row whose audit
5//! entry didn't also commit. Downstream fan-out (Kafka, Redis pubsub) goes
6//! through [`cratestack_core::AuditSink`]; this module handles the canonical
7//! database record.
8use crate::sqlx;
9
10
11use cratestack_core::{AuditActor, AuditEvent, AuditOperation, CoolContext, CoolError};
12
13use crate::ModelDescriptor;
14
15/// DDL for the audit log table. Banks typically run migrations through their
16/// own tooling — this DDL is exposed so the [`crate::SqlxRuntime`] can
17/// idempotently ensure the table exists during bootstrap.
18pub const AUDIT_TABLE_DDL: &str = r#"
19CREATE TABLE IF NOT EXISTS cratestack_audit (
20    event_id UUID PRIMARY KEY,
21    schema_name TEXT NOT NULL,
22    model TEXT NOT NULL,
23    operation TEXT NOT NULL,
24    primary_key JSONB NOT NULL,
25    actor JSONB NOT NULL,
26    tenant TEXT,
27    before JSONB,
28    after JSONB,
29    request_id TEXT,
30    occurred_at TIMESTAMPTZ NOT NULL,
31    delivered_at TIMESTAMPTZ,
32    attempts BIGINT NOT NULL DEFAULT 0,
33    last_error TEXT
34);
35
36CREATE INDEX IF NOT EXISTS cratestack_audit_model_idx
37    ON cratestack_audit (schema_name, model, occurred_at DESC);
38
39CREATE INDEX IF NOT EXISTS cratestack_audit_tenant_idx
40    ON cratestack_audit (tenant, occurred_at DESC)
41    WHERE tenant IS NOT NULL;
42
43CREATE INDEX IF NOT EXISTS cratestack_audit_undelivered_idx
44    ON cratestack_audit (occurred_at)
45    WHERE delivered_at IS NULL;
46"#;
47
48pub(crate) async fn ensure_audit_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
49    // sqlx prepared statements accept only one statement per query, so the
50    // multi-statement DDL (table + indexes) is split on `;` and executed
51    // sequentially. Sub-statements are idempotent (`CREATE ... IF NOT
52    // EXISTS`), so this stays safe under concurrent first-runs.
53    for statement in AUDIT_TABLE_DDL
54        .split(';')
55        .map(str::trim)
56        .filter(|s| !s.is_empty())
57    {
58        sqlx::query(statement)
59            .execute(pool)
60            .await
61            .map_err(|error| CoolError::Database(error.to_string()))?;
62    }
63    Ok(())
64}
65
66/// Persist an audit event into the `cratestack_audit` table. Designed to run
67/// inside the same transaction as the mutation it describes.
68pub(crate) async fn enqueue_audit_event<'e, E>(
69    executor: E,
70    event: &AuditEvent,
71) -> Result<(), CoolError>
72where
73    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
74{
75    let actor = serde_json::to_value(&event.actor)
76        .map_err(|error| CoolError::Codec(format!("encode audit actor: {error}")))?;
77    sqlx::query(
78        "INSERT INTO cratestack_audit (\
79            event_id, schema_name, model, operation, primary_key, actor, \
80            tenant, before, after, request_id, occurred_at\
81         ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
82    )
83    .bind(event.event_id)
84    .bind(&event.schema_name)
85    .bind(&event.model)
86    .bind(event.operation.as_str())
87    .bind(&event.primary_key)
88    .bind(actor)
89    .bind(event.tenant.as_deref())
90    .bind(event.before.as_ref())
91    .bind(event.after.as_ref())
92    .bind(event.request_id.as_deref())
93    .bind(event.occurred_at)
94    .execute(executor)
95    .await
96    .map(|_| ())
97    .map_err(|error| CoolError::Database(error.to_string()))
98}
99
100/// Derive an [`AuditActor`] from the [`CoolContext`] active at mutation time.
101/// Banks generally want the principal's id, primary claims, and source IP if
102/// the transport recorded one.
103pub(crate) fn actor_from_context(ctx: &CoolContext) -> AuditActor {
104    AuditActor {
105        id: ctx.principal_actor_id().map(|s| s.to_owned()),
106        claims: ctx.audit_claims_snapshot(),
107        ip: ctx.client_ip().map(|s| s.to_owned()),
108    }
109}
110
111/// Build an `AuditEvent` for a mutation just performed inside a transaction.
112/// The caller passes the JSON snapshot(s) it has on hand — `before` for
113/// updates and deletes, `after` for creates and updates — so this helper
114/// stays decoupled from the SQL-row decoding path.
115pub(crate) fn build_audit_event<M, PK>(
116    descriptor: &'static ModelDescriptor<M, PK>,
117    operation: AuditOperation,
118    before: Option<serde_json::Value>,
119    after: Option<serde_json::Value>,
120    ctx: &CoolContext,
121) -> AuditEvent {
122    let primary_key = after
123        .as_ref()
124        .or(before.as_ref())
125        .map(|snapshot| primary_key_from_snapshot(snapshot, descriptor.primary_key))
126        .unwrap_or(serde_json::Value::Null);
127    let before = before.map(|mut snapshot| {
128        redact_snapshot(
129            &mut snapshot,
130            descriptor.pii_columns,
131            descriptor.sensitive_columns,
132        );
133        snapshot
134    });
135    let after = after.map(|mut snapshot| {
136        redact_snapshot(
137            &mut snapshot,
138            descriptor.pii_columns,
139            descriptor.sensitive_columns,
140        );
141        snapshot
142    });
143    AuditEvent {
144        event_id: uuid::Uuid::new_v4(),
145        // `ModelDescriptor.schema_name` historically holds the model's Rust
146        // ident; we surface it as `model`. A separate `schema_name` will be
147        // wired in when the parser exposes a schema-wide label.
148        schema_name: String::new(),
149        model: descriptor.schema_name.to_owned(),
150        operation,
151        primary_key,
152        actor: actor_from_context(ctx),
153        tenant: ctx.tenant_id().map(|s| s.to_owned()),
154        before,
155        after,
156        request_id: ctx.request_id().map(|s| s.to_owned()),
157        occurred_at: chrono::Utc::now(),
158    }
159}
160
161/// Replace values of PII/sensitive columns in a JSON snapshot with a fixed
162/// marker. Banks need the audit log to record THAT a field changed without
163/// retaining the actual value (PAN, SSN, address); the marker lets a human
164/// reviewer see the column shifted while keeping the data out of long-term
165/// logs.
166pub fn redact_snapshot(
167    snapshot: &mut serde_json::Value,
168    pii_columns: &[&str],
169    sensitive_columns: &[&str],
170) {
171    let Some(map) = snapshot.as_object_mut() else {
172        return;
173    };
174    for col in pii_columns {
175        if let Some(slot) = map.get_mut(*col) {
176            *slot = serde_json::Value::String("[redacted-pii]".to_owned());
177        }
178        let camel = snake_to_camel(col);
179        if camel != *col {
180            if let Some(slot) = map.get_mut(&camel) {
181                *slot = serde_json::Value::String("[redacted-pii]".to_owned());
182            }
183        }
184    }
185    for col in sensitive_columns {
186        if let Some(slot) = map.get_mut(*col) {
187            *slot = serde_json::Value::String("[redacted-sensitive]".to_owned());
188        }
189        let camel = snake_to_camel(col);
190        if camel != *col {
191            if let Some(slot) = map.get_mut(&camel) {
192                *slot = serde_json::Value::String("[redacted-sensitive]".to_owned());
193            }
194        }
195    }
196}
197
198/// Fetch the current state of a row for audit purposes, taking a row-level
199/// lock so concurrent mutations cannot race us. Bypasses read policies —
200/// audit reflects the actual database state, not the caller's filtered view.
201pub(crate) async fn fetch_for_audit<'e, E, M, PK>(
202    executor: E,
203    descriptor: &'static ModelDescriptor<M, PK>,
204    id: PK,
205) -> Result<Option<M>, CoolError>
206where
207    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
208    for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow>,
209    PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
210{
211    let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
212    query.push(descriptor.select_projection());
213    query.push(" FROM ").push(descriptor.table_name);
214    query
215        .push(" WHERE ")
216        .push(descriptor.primary_key)
217        .push(" = ");
218    query.push_bind(id);
219    query.push(" FOR UPDATE");
220    query
221        .build_query_as::<M>()
222        .fetch_optional(executor)
223        .await
224        .map_err(|error| CoolError::Database(error.to_string()))
225}
226
227/// Convert a model into the JSON snapshot used by the audit log. Returns
228/// `None` if the model isn't serializable; that should never happen for
229/// generated models which derive `Serialize`, but we degrade gracefully
230/// rather than panic.
231pub fn snapshot_model<T>(model: &T) -> Option<serde_json::Value>
232where
233    T: serde::Serialize,
234{
235    serde_json::to_value(model).ok()
236}
237
238/// Extract the primary-key field from a serialized model snapshot. Used to
239/// stamp audit events with a stable identifier even when the schema doesn't
240/// surface the PK column verbatim in the response (e.g. policy-stripped).
241pub fn primary_key_from_snapshot(
242    snapshot: &serde_json::Value,
243    primary_key_column: &str,
244) -> serde_json::Value {
245    if let Some(map) = snapshot.as_object() {
246        if let Some(value) = map.get(primary_key_column) {
247            return value.clone();
248        }
249        // Try snake/camel transposition — the SQL column name might differ
250        // from the JSON key emitted by the serializer.
251        let camel = snake_to_camel(primary_key_column);
252        if let Some(value) = map.get(&camel) {
253            return value.clone();
254        }
255    }
256    serde_json::Value::Null
257}
258
259fn snake_to_camel(input: &str) -> String {
260    let mut out = String::with_capacity(input.len());
261    let mut upper = false;
262    for ch in input.chars() {
263        if ch == '_' {
264            upper = true;
265        } else if upper {
266            out.extend(ch.to_uppercase());
267            upper = false;
268        } else {
269            out.push(ch);
270        }
271    }
272    out
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use serde_json::json;
279
280    #[test]
281    fn extracts_primary_key_by_snake_case_column() {
282        let snapshot = json!({ "user_id": 42, "balance": "10.00" });
283        let pk = primary_key_from_snapshot(&snapshot, "user_id");
284        assert_eq!(pk, json!(42));
285    }
286
287    #[test]
288    fn extracts_primary_key_via_camel_case_fallback() {
289        let snapshot = json!({ "userId": 42, "balance": "10.00" });
290        let pk = primary_key_from_snapshot(&snapshot, "user_id");
291        assert_eq!(pk, json!(42));
292    }
293
294    #[test]
295    fn returns_null_when_primary_key_absent() {
296        let snapshot = json!({ "balance": "10.00" });
297        let pk = primary_key_from_snapshot(&snapshot, "user_id");
298        assert_eq!(pk, serde_json::Value::Null);
299    }
300
301    #[test]
302    fn snapshot_round_trip_preserves_strings_and_numbers() {
303        let snap =
304            snapshot_model(&json!({ "amount": "12.34", "currency": "USD" })).expect("serializable");
305        assert_eq!(snap["amount"], json!("12.34"));
306        assert_eq!(snap["currency"], json!("USD"));
307    }
308
309    #[test]
310    fn audit_operation_string_is_lowercase() {
311        assert_eq!(AuditOperation::Create.as_str(), "create");
312        assert_eq!(AuditOperation::Update.as_str(), "update");
313        assert_eq!(AuditOperation::Delete.as_str(), "delete");
314    }
315
316    #[test]
317    fn redacts_pii_columns_with_canned_marker() {
318        let mut snap = json!({
319            "id": 1,
320            "email": "alice@example.com",
321            "balance": "10.00",
322        });
323        redact_snapshot(&mut snap, &["email"], &[]);
324        assert_eq!(snap["email"], json!("[redacted-pii]"));
325        assert_eq!(snap["balance"], json!("10.00"));
326    }
327
328    #[test]
329    fn redacts_sensitive_columns_with_distinct_marker() {
330        let mut snap = json!({
331            "id": 1,
332            "risk_score": 87,
333        });
334        redact_snapshot(&mut snap, &[], &["risk_score"]);
335        assert_eq!(snap["risk_score"], json!("[redacted-sensitive]"));
336    }
337
338    #[test]
339    fn redaction_handles_camel_case_keys() {
340        let mut snap = json!({
341            "id": 1,
342            "primaryEmail": "x@y.com",
343        });
344        redact_snapshot(&mut snap, &["primary_email"], &[]);
345        assert_eq!(snap["primaryEmail"], json!("[redacted-pii]"));
346    }
347
348    #[test]
349    fn redaction_is_noop_for_absent_columns() {
350        let mut snap = json!({ "id": 1 });
351        redact_snapshot(&mut snap, &["email"], &["risk_score"]);
352        assert_eq!(snap, json!({ "id": 1 }));
353    }
354}