Skip to main content

cratestack_sqlx/
audit.rs

1//! Postgres-backed audit log.
2//!
3//! Audit rows are written inside the same transaction as the mutation they
4//! describe, guaranteeing that you can never see a committed row whose audit
5//! entry didn't also commit. Downstream fan-out (Kafka, Redis pubsub) goes
6//! through [`cratestack_core::AuditSink`]; this module handles the canonical
7//! database record.
8
9use cratestack_core::{AuditActor, AuditEvent, AuditOperation, CoolContext, CoolError};
10
11use crate::ModelDescriptor;
12
13/// DDL for the audit log table. Banks typically run migrations through their
14/// own tooling — this DDL is exposed so the [`crate::SqlxRuntime`] can
15/// idempotently ensure the table exists during bootstrap.
16pub const AUDIT_TABLE_DDL: &str = r#"
17CREATE TABLE IF NOT EXISTS cratestack_audit (
18    event_id UUID PRIMARY KEY,
19    schema_name TEXT NOT NULL,
20    model TEXT NOT NULL,
21    operation TEXT NOT NULL,
22    primary_key JSONB NOT NULL,
23    actor JSONB NOT NULL,
24    tenant TEXT,
25    before JSONB,
26    after JSONB,
27    request_id TEXT,
28    occurred_at TIMESTAMPTZ NOT NULL,
29    delivered_at TIMESTAMPTZ,
30    attempts BIGINT NOT NULL DEFAULT 0,
31    last_error TEXT
32);
33
34CREATE INDEX IF NOT EXISTS cratestack_audit_model_idx
35    ON cratestack_audit (schema_name, model, occurred_at DESC);
36
37CREATE INDEX IF NOT EXISTS cratestack_audit_tenant_idx
38    ON cratestack_audit (tenant, occurred_at DESC)
39    WHERE tenant IS NOT NULL;
40
41CREATE INDEX IF NOT EXISTS cratestack_audit_undelivered_idx
42    ON cratestack_audit (occurred_at)
43    WHERE delivered_at IS NULL;
44"#;
45
46pub(crate) async fn ensure_audit_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
47    // sqlx prepared statements accept only one statement per query, so the
48    // multi-statement DDL (table + indexes) is split on `;` and executed
49    // sequentially. Sub-statements are idempotent (`CREATE ... IF NOT
50    // EXISTS`), so this stays safe under concurrent first-runs.
51    for statement in AUDIT_TABLE_DDL
52        .split(';')
53        .map(str::trim)
54        .filter(|s| !s.is_empty())
55    {
56        sqlx::query(statement)
57            .execute(pool)
58            .await
59            .map_err(|error| CoolError::Database(error.to_string()))?;
60    }
61    Ok(())
62}
63
64/// Persist an audit event into the `cratestack_audit` table. Designed to run
65/// inside the same transaction as the mutation it describes.
66pub(crate) async fn enqueue_audit_event<'e, E>(
67    executor: E,
68    event: &AuditEvent,
69) -> Result<(), CoolError>
70where
71    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
72{
73    let actor = serde_json::to_value(&event.actor)
74        .map_err(|error| CoolError::Codec(format!("encode audit actor: {error}")))?;
75    sqlx::query(
76        "INSERT INTO cratestack_audit (\
77            event_id, schema_name, model, operation, primary_key, actor, \
78            tenant, before, after, request_id, occurred_at\
79         ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
80    )
81    .bind(event.event_id)
82    .bind(&event.schema_name)
83    .bind(&event.model)
84    .bind(event.operation.as_str())
85    .bind(&event.primary_key)
86    .bind(actor)
87    .bind(event.tenant.as_deref())
88    .bind(event.before.as_ref())
89    .bind(event.after.as_ref())
90    .bind(event.request_id.as_deref())
91    .bind(event.occurred_at)
92    .execute(executor)
93    .await
94    .map(|_| ())
95    .map_err(|error| CoolError::Database(error.to_string()))
96}
97
98/// Derive an [`AuditActor`] from the [`CoolContext`] active at mutation time.
99/// Banks generally want the principal's id, primary claims, and source IP if
100/// the transport recorded one.
101pub(crate) fn actor_from_context(ctx: &CoolContext) -> AuditActor {
102    AuditActor {
103        id: ctx.principal_actor_id().map(|s| s.to_owned()),
104        claims: ctx.audit_claims_snapshot(),
105        ip: ctx.client_ip().map(|s| s.to_owned()),
106    }
107}
108
109/// Build an `AuditEvent` for a mutation just performed inside a transaction.
110/// The caller passes the JSON snapshot(s) it has on hand — `before` for
111/// updates and deletes, `after` for creates and updates — so this helper
112/// stays decoupled from the SQL-row decoding path.
113pub(crate) fn build_audit_event<M, PK>(
114    descriptor: &'static ModelDescriptor<M, PK>,
115    operation: AuditOperation,
116    before: Option<serde_json::Value>,
117    after: Option<serde_json::Value>,
118    ctx: &CoolContext,
119) -> AuditEvent {
120    let primary_key = after
121        .as_ref()
122        .or(before.as_ref())
123        .map(|snapshot| primary_key_from_snapshot(snapshot, descriptor.primary_key))
124        .unwrap_or(serde_json::Value::Null);
125    let before = before.map(|mut snapshot| {
126        redact_snapshot(
127            &mut snapshot,
128            descriptor.pii_columns,
129            descriptor.sensitive_columns,
130        );
131        snapshot
132    });
133    let after = after.map(|mut snapshot| {
134        redact_snapshot(
135            &mut snapshot,
136            descriptor.pii_columns,
137            descriptor.sensitive_columns,
138        );
139        snapshot
140    });
141    AuditEvent {
142        event_id: uuid::Uuid::new_v4(),
143        // `ModelDescriptor.schema_name` historically holds the model's Rust
144        // ident; we surface it as `model`. A separate `schema_name` will be
145        // wired in when the parser exposes a schema-wide label.
146        schema_name: String::new(),
147        model: descriptor.schema_name.to_owned(),
148        operation,
149        primary_key,
150        actor: actor_from_context(ctx),
151        tenant: ctx.tenant_id().map(|s| s.to_owned()),
152        before,
153        after,
154        request_id: ctx.request_id().map(|s| s.to_owned()),
155        occurred_at: chrono::Utc::now(),
156    }
157}
158
159/// Replace values of PII/sensitive columns in a JSON snapshot with a fixed
160/// marker. Banks need the audit log to record THAT a field changed without
161/// retaining the actual value (PAN, SSN, address); the marker lets a human
162/// reviewer see the column shifted while keeping the data out of long-term
163/// logs.
164pub fn redact_snapshot(
165    snapshot: &mut serde_json::Value,
166    pii_columns: &[&str],
167    sensitive_columns: &[&str],
168) {
169    let Some(map) = snapshot.as_object_mut() else {
170        return;
171    };
172    for col in pii_columns {
173        if let Some(slot) = map.get_mut(*col) {
174            *slot = serde_json::Value::String("[redacted-pii]".to_owned());
175        }
176        let camel = snake_to_camel(col);
177        if camel != *col {
178            if let Some(slot) = map.get_mut(&camel) {
179                *slot = serde_json::Value::String("[redacted-pii]".to_owned());
180            }
181        }
182    }
183    for col in sensitive_columns {
184        if let Some(slot) = map.get_mut(*col) {
185            *slot = serde_json::Value::String("[redacted-sensitive]".to_owned());
186        }
187        let camel = snake_to_camel(col);
188        if camel != *col {
189            if let Some(slot) = map.get_mut(&camel) {
190                *slot = serde_json::Value::String("[redacted-sensitive]".to_owned());
191            }
192        }
193    }
194}
195
196/// Fetch the current state of a row for audit purposes, taking a row-level
197/// lock so concurrent mutations cannot race us. Bypasses read policies —
198/// audit reflects the actual database state, not the caller's filtered view.
199pub(crate) async fn fetch_for_audit<'e, E, M, PK>(
200    executor: E,
201    descriptor: &'static ModelDescriptor<M, PK>,
202    id: PK,
203) -> Result<Option<M>, CoolError>
204where
205    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
206    for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow>,
207    PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
208{
209    let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
210    query.push(descriptor.select_projection());
211    query.push(" FROM ").push(descriptor.table_name);
212    query
213        .push(" WHERE ")
214        .push(descriptor.primary_key)
215        .push(" = ");
216    query.push_bind(id);
217    query.push(" FOR UPDATE");
218    query
219        .build_query_as::<M>()
220        .fetch_optional(executor)
221        .await
222        .map_err(|error| CoolError::Database(error.to_string()))
223}
224
225/// Convert a model into the JSON snapshot used by the audit log. Returns
226/// `None` if the model isn't serializable; that should never happen for
227/// generated models which derive `Serialize`, but we degrade gracefully
228/// rather than panic.
229pub fn snapshot_model<T>(model: &T) -> Option<serde_json::Value>
230where
231    T: serde::Serialize,
232{
233    serde_json::to_value(model).ok()
234}
235
236/// Extract the primary-key field from a serialized model snapshot. Used to
237/// stamp audit events with a stable identifier even when the schema doesn't
238/// surface the PK column verbatim in the response (e.g. policy-stripped).
239pub fn primary_key_from_snapshot(
240    snapshot: &serde_json::Value,
241    primary_key_column: &str,
242) -> serde_json::Value {
243    if let Some(map) = snapshot.as_object() {
244        if let Some(value) = map.get(primary_key_column) {
245            return value.clone();
246        }
247        // Try snake/camel transposition — the SQL column name might differ
248        // from the JSON key emitted by the serializer.
249        let camel = snake_to_camel(primary_key_column);
250        if let Some(value) = map.get(&camel) {
251            return value.clone();
252        }
253    }
254    serde_json::Value::Null
255}
256
257fn snake_to_camel(input: &str) -> String {
258    let mut out = String::with_capacity(input.len());
259    let mut upper = false;
260    for ch in input.chars() {
261        if ch == '_' {
262            upper = true;
263        } else if upper {
264            out.extend(ch.to_uppercase());
265            upper = false;
266        } else {
267            out.push(ch);
268        }
269    }
270    out
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use serde_json::json;
277
278    #[test]
279    fn extracts_primary_key_by_snake_case_column() {
280        let snapshot = json!({ "user_id": 42, "balance": "10.00" });
281        let pk = primary_key_from_snapshot(&snapshot, "user_id");
282        assert_eq!(pk, json!(42));
283    }
284
285    #[test]
286    fn extracts_primary_key_via_camel_case_fallback() {
287        let snapshot = json!({ "userId": 42, "balance": "10.00" });
288        let pk = primary_key_from_snapshot(&snapshot, "user_id");
289        assert_eq!(pk, json!(42));
290    }
291
292    #[test]
293    fn returns_null_when_primary_key_absent() {
294        let snapshot = json!({ "balance": "10.00" });
295        let pk = primary_key_from_snapshot(&snapshot, "user_id");
296        assert_eq!(pk, serde_json::Value::Null);
297    }
298
299    #[test]
300    fn snapshot_round_trip_preserves_strings_and_numbers() {
301        let snap =
302            snapshot_model(&json!({ "amount": "12.34", "currency": "USD" })).expect("serializable");
303        assert_eq!(snap["amount"], json!("12.34"));
304        assert_eq!(snap["currency"], json!("USD"));
305    }
306
307    #[test]
308    fn audit_operation_string_is_lowercase() {
309        assert_eq!(AuditOperation::Create.as_str(), "create");
310        assert_eq!(AuditOperation::Update.as_str(), "update");
311        assert_eq!(AuditOperation::Delete.as_str(), "delete");
312    }
313
314    #[test]
315    fn redacts_pii_columns_with_canned_marker() {
316        let mut snap = json!({
317            "id": 1,
318            "email": "alice@example.com",
319            "balance": "10.00",
320        });
321        redact_snapshot(&mut snap, &["email"], &[]);
322        assert_eq!(snap["email"], json!("[redacted-pii]"));
323        assert_eq!(snap["balance"], json!("10.00"));
324    }
325
326    #[test]
327    fn redacts_sensitive_columns_with_distinct_marker() {
328        let mut snap = json!({
329            "id": 1,
330            "risk_score": 87,
331        });
332        redact_snapshot(&mut snap, &[], &["risk_score"]);
333        assert_eq!(snap["risk_score"], json!("[redacted-sensitive]"));
334    }
335
336    #[test]
337    fn redaction_handles_camel_case_keys() {
338        let mut snap = json!({
339            "id": 1,
340            "primaryEmail": "x@y.com",
341        });
342        redact_snapshot(&mut snap, &["primary_email"], &[]);
343        assert_eq!(snap["primaryEmail"], json!("[redacted-pii]"));
344    }
345
346    #[test]
347    fn redaction_is_noop_for_absent_columns() {
348        let mut snap = json!({ "id": 1 });
349        redact_snapshot(&mut snap, &["email"], &["risk_score"]);
350        assert_eq!(snap, json!({ "id": 1 }));
351    }
352}