1use crate::sqlx;
9
10
11use cratestack_core::{AuditActor, AuditEvent, AuditOperation, CoolContext, CoolError};
12
13use crate::ModelDescriptor;
14
15pub const AUDIT_TABLE_DDL: &str = r#"
19CREATE TABLE IF NOT EXISTS cratestack_audit (
20 event_id UUID PRIMARY KEY,
21 schema_name TEXT NOT NULL,
22 model TEXT NOT NULL,
23 operation TEXT NOT NULL,
24 primary_key JSONB NOT NULL,
25 actor JSONB NOT NULL,
26 tenant TEXT,
27 before JSONB,
28 after JSONB,
29 request_id TEXT,
30 occurred_at TIMESTAMPTZ NOT NULL,
31 delivered_at TIMESTAMPTZ,
32 attempts BIGINT NOT NULL DEFAULT 0,
33 last_error TEXT
34);
35
36CREATE INDEX IF NOT EXISTS cratestack_audit_model_idx
37 ON cratestack_audit (schema_name, model, occurred_at DESC);
38
39CREATE INDEX IF NOT EXISTS cratestack_audit_tenant_idx
40 ON cratestack_audit (tenant, occurred_at DESC)
41 WHERE tenant IS NOT NULL;
42
43CREATE INDEX IF NOT EXISTS cratestack_audit_undelivered_idx
44 ON cratestack_audit (occurred_at)
45 WHERE delivered_at IS NULL;
46"#;
47
48pub(crate) async fn ensure_audit_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
49 for statement in AUDIT_TABLE_DDL
54 .split(';')
55 .map(str::trim)
56 .filter(|s| !s.is_empty())
57 {
58 sqlx::query(statement)
59 .execute(pool)
60 .await
61 .map_err(|error| CoolError::Database(error.to_string()))?;
62 }
63 Ok(())
64}
65
66pub(crate) async fn enqueue_audit_event<'e, E>(
69 executor: E,
70 event: &AuditEvent,
71) -> Result<(), CoolError>
72where
73 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
74{
75 let actor = serde_json::to_value(&event.actor)
76 .map_err(|error| CoolError::Codec(format!("encode audit actor: {error}")))?;
77 sqlx::query(
78 "INSERT INTO cratestack_audit (\
79 event_id, schema_name, model, operation, primary_key, actor, \
80 tenant, before, after, request_id, occurred_at\
81 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
82 )
83 .bind(event.event_id)
84 .bind(&event.schema_name)
85 .bind(&event.model)
86 .bind(event.operation.as_str())
87 .bind(&event.primary_key)
88 .bind(actor)
89 .bind(event.tenant.as_deref())
90 .bind(event.before.as_ref())
91 .bind(event.after.as_ref())
92 .bind(event.request_id.as_deref())
93 .bind(event.occurred_at)
94 .execute(executor)
95 .await
96 .map(|_| ())
97 .map_err(|error| CoolError::Database(error.to_string()))
98}
99
100pub(crate) fn actor_from_context(ctx: &CoolContext) -> AuditActor {
104 AuditActor {
105 id: ctx.principal_actor_id().map(|s| s.to_owned()),
106 claims: ctx.audit_claims_snapshot(),
107 ip: ctx.client_ip().map(|s| s.to_owned()),
108 }
109}
110
111pub(crate) fn build_audit_event<M, PK>(
116 descriptor: &'static ModelDescriptor<M, PK>,
117 operation: AuditOperation,
118 before: Option<serde_json::Value>,
119 after: Option<serde_json::Value>,
120 ctx: &CoolContext,
121) -> AuditEvent {
122 let primary_key = after
123 .as_ref()
124 .or(before.as_ref())
125 .map(|snapshot| primary_key_from_snapshot(snapshot, descriptor.primary_key))
126 .unwrap_or(serde_json::Value::Null);
127 let before = before.map(|mut snapshot| {
128 redact_snapshot(
129 &mut snapshot,
130 descriptor.pii_columns,
131 descriptor.sensitive_columns,
132 );
133 snapshot
134 });
135 let after = after.map(|mut snapshot| {
136 redact_snapshot(
137 &mut snapshot,
138 descriptor.pii_columns,
139 descriptor.sensitive_columns,
140 );
141 snapshot
142 });
143 AuditEvent {
144 event_id: uuid::Uuid::new_v4(),
145 schema_name: String::new(),
149 model: descriptor.schema_name.to_owned(),
150 operation,
151 primary_key,
152 actor: actor_from_context(ctx),
153 tenant: ctx.tenant_id().map(|s| s.to_owned()),
154 before,
155 after,
156 request_id: ctx.request_id().map(|s| s.to_owned()),
157 occurred_at: chrono::Utc::now(),
158 }
159}
160
161pub fn redact_snapshot(
167 snapshot: &mut serde_json::Value,
168 pii_columns: &[&str],
169 sensitive_columns: &[&str],
170) {
171 let Some(map) = snapshot.as_object_mut() else {
172 return;
173 };
174 for col in pii_columns {
175 if let Some(slot) = map.get_mut(*col) {
176 *slot = serde_json::Value::String("[redacted-pii]".to_owned());
177 }
178 let camel = snake_to_camel(col);
179 if camel != *col {
180 if let Some(slot) = map.get_mut(&camel) {
181 *slot = serde_json::Value::String("[redacted-pii]".to_owned());
182 }
183 }
184 }
185 for col in sensitive_columns {
186 if let Some(slot) = map.get_mut(*col) {
187 *slot = serde_json::Value::String("[redacted-sensitive]".to_owned());
188 }
189 let camel = snake_to_camel(col);
190 if camel != *col {
191 if let Some(slot) = map.get_mut(&camel) {
192 *slot = serde_json::Value::String("[redacted-sensitive]".to_owned());
193 }
194 }
195 }
196}
197
198pub(crate) async fn fetch_for_audit<'e, E, M, PK>(
202 executor: E,
203 descriptor: &'static ModelDescriptor<M, PK>,
204 id: PK,
205) -> Result<Option<M>, CoolError>
206where
207 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
208 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow>,
209 PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
210{
211 let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
212 query.push(descriptor.select_projection());
213 query.push(" FROM ").push(descriptor.table_name);
214 query
215 .push(" WHERE ")
216 .push(descriptor.primary_key)
217 .push(" = ");
218 query.push_bind(id);
219 query.push(" FOR UPDATE");
220 query
221 .build_query_as::<M>()
222 .fetch_optional(executor)
223 .await
224 .map_err(|error| CoolError::Database(error.to_string()))
225}
226
227pub fn snapshot_model<T>(model: &T) -> Option<serde_json::Value>
232where
233 T: serde::Serialize,
234{
235 serde_json::to_value(model).ok()
236}
237
238pub fn primary_key_from_snapshot(
242 snapshot: &serde_json::Value,
243 primary_key_column: &str,
244) -> serde_json::Value {
245 if let Some(map) = snapshot.as_object() {
246 if let Some(value) = map.get(primary_key_column) {
247 return value.clone();
248 }
249 let camel = snake_to_camel(primary_key_column);
252 if let Some(value) = map.get(&camel) {
253 return value.clone();
254 }
255 }
256 serde_json::Value::Null
257}
258
259fn snake_to_camel(input: &str) -> String {
260 let mut out = String::with_capacity(input.len());
261 let mut upper = false;
262 for ch in input.chars() {
263 if ch == '_' {
264 upper = true;
265 } else if upper {
266 out.extend(ch.to_uppercase());
267 upper = false;
268 } else {
269 out.push(ch);
270 }
271 }
272 out
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use serde_json::json;
279
280 #[test]
281 fn extracts_primary_key_by_snake_case_column() {
282 let snapshot = json!({ "user_id": 42, "balance": "10.00" });
283 let pk = primary_key_from_snapshot(&snapshot, "user_id");
284 assert_eq!(pk, json!(42));
285 }
286
287 #[test]
288 fn extracts_primary_key_via_camel_case_fallback() {
289 let snapshot = json!({ "userId": 42, "balance": "10.00" });
290 let pk = primary_key_from_snapshot(&snapshot, "user_id");
291 assert_eq!(pk, json!(42));
292 }
293
294 #[test]
295 fn returns_null_when_primary_key_absent() {
296 let snapshot = json!({ "balance": "10.00" });
297 let pk = primary_key_from_snapshot(&snapshot, "user_id");
298 assert_eq!(pk, serde_json::Value::Null);
299 }
300
301 #[test]
302 fn snapshot_round_trip_preserves_strings_and_numbers() {
303 let snap =
304 snapshot_model(&json!({ "amount": "12.34", "currency": "USD" })).expect("serializable");
305 assert_eq!(snap["amount"], json!("12.34"));
306 assert_eq!(snap["currency"], json!("USD"));
307 }
308
309 #[test]
310 fn audit_operation_string_is_lowercase() {
311 assert_eq!(AuditOperation::Create.as_str(), "create");
312 assert_eq!(AuditOperation::Update.as_str(), "update");
313 assert_eq!(AuditOperation::Delete.as_str(), "delete");
314 }
315
316 #[test]
317 fn redacts_pii_columns_with_canned_marker() {
318 let mut snap = json!({
319 "id": 1,
320 "email": "alice@example.com",
321 "balance": "10.00",
322 });
323 redact_snapshot(&mut snap, &["email"], &[]);
324 assert_eq!(snap["email"], json!("[redacted-pii]"));
325 assert_eq!(snap["balance"], json!("10.00"));
326 }
327
328 #[test]
329 fn redacts_sensitive_columns_with_distinct_marker() {
330 let mut snap = json!({
331 "id": 1,
332 "risk_score": 87,
333 });
334 redact_snapshot(&mut snap, &[], &["risk_score"]);
335 assert_eq!(snap["risk_score"], json!("[redacted-sensitive]"));
336 }
337
338 #[test]
339 fn redaction_handles_camel_case_keys() {
340 let mut snap = json!({
341 "id": 1,
342 "primaryEmail": "x@y.com",
343 });
344 redact_snapshot(&mut snap, &["primary_email"], &[]);
345 assert_eq!(snap["primaryEmail"], json!("[redacted-pii]"));
346 }
347
348 #[test]
349 fn redaction_is_noop_for_absent_columns() {
350 let mut snap = json!({ "id": 1 });
351 redact_snapshot(&mut snap, &["email"], &["risk_score"]);
352 assert_eq!(snap, json!({ "id": 1 }));
353 }
354}