1use cratestack_core::{AuditActor, AuditEvent, AuditOperation, CoolContext, CoolError};
10
11use crate::ModelDescriptor;
12
13pub const AUDIT_TABLE_DDL: &str = r#"
17CREATE TABLE IF NOT EXISTS cratestack_audit (
18 event_id UUID PRIMARY KEY,
19 schema_name TEXT NOT NULL,
20 model TEXT NOT NULL,
21 operation TEXT NOT NULL,
22 primary_key JSONB NOT NULL,
23 actor JSONB NOT NULL,
24 tenant TEXT,
25 before JSONB,
26 after JSONB,
27 request_id TEXT,
28 occurred_at TIMESTAMPTZ NOT NULL,
29 delivered_at TIMESTAMPTZ,
30 attempts BIGINT NOT NULL DEFAULT 0,
31 last_error TEXT
32);
33
34CREATE INDEX IF NOT EXISTS cratestack_audit_model_idx
35 ON cratestack_audit (schema_name, model, occurred_at DESC);
36
37CREATE INDEX IF NOT EXISTS cratestack_audit_tenant_idx
38 ON cratestack_audit (tenant, occurred_at DESC)
39 WHERE tenant IS NOT NULL;
40
41CREATE INDEX IF NOT EXISTS cratestack_audit_undelivered_idx
42 ON cratestack_audit (occurred_at)
43 WHERE delivered_at IS NULL;
44"#;
45
46pub(crate) async fn ensure_audit_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
47 for statement in AUDIT_TABLE_DDL
52 .split(';')
53 .map(str::trim)
54 .filter(|s| !s.is_empty())
55 {
56 sqlx::query(statement)
57 .execute(pool)
58 .await
59 .map_err(|error| CoolError::Database(error.to_string()))?;
60 }
61 Ok(())
62}
63
64pub(crate) async fn enqueue_audit_event<'e, E>(
67 executor: E,
68 event: &AuditEvent,
69) -> Result<(), CoolError>
70where
71 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
72{
73 let actor = serde_json::to_value(&event.actor)
74 .map_err(|error| CoolError::Codec(format!("encode audit actor: {error}")))?;
75 sqlx::query(
76 "INSERT INTO cratestack_audit (\
77 event_id, schema_name, model, operation, primary_key, actor, \
78 tenant, before, after, request_id, occurred_at\
79 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
80 )
81 .bind(event.event_id)
82 .bind(&event.schema_name)
83 .bind(&event.model)
84 .bind(event.operation.as_str())
85 .bind(&event.primary_key)
86 .bind(actor)
87 .bind(event.tenant.as_deref())
88 .bind(event.before.as_ref())
89 .bind(event.after.as_ref())
90 .bind(event.request_id.as_deref())
91 .bind(event.occurred_at)
92 .execute(executor)
93 .await
94 .map(|_| ())
95 .map_err(|error| CoolError::Database(error.to_string()))
96}
97
98pub(crate) fn actor_from_context(ctx: &CoolContext) -> AuditActor {
102 AuditActor {
103 id: ctx.principal_actor_id().map(|s| s.to_owned()),
104 claims: ctx.audit_claims_snapshot(),
105 ip: ctx.client_ip().map(|s| s.to_owned()),
106 }
107}
108
109pub(crate) fn build_audit_event<M, PK>(
114 descriptor: &'static ModelDescriptor<M, PK>,
115 operation: AuditOperation,
116 before: Option<serde_json::Value>,
117 after: Option<serde_json::Value>,
118 ctx: &CoolContext,
119) -> AuditEvent {
120 let primary_key = after
121 .as_ref()
122 .or(before.as_ref())
123 .map(|snapshot| primary_key_from_snapshot(snapshot, descriptor.primary_key))
124 .unwrap_or(serde_json::Value::Null);
125 let before = before.map(|mut snapshot| {
126 redact_snapshot(
127 &mut snapshot,
128 descriptor.pii_columns,
129 descriptor.sensitive_columns,
130 );
131 snapshot
132 });
133 let after = after.map(|mut snapshot| {
134 redact_snapshot(
135 &mut snapshot,
136 descriptor.pii_columns,
137 descriptor.sensitive_columns,
138 );
139 snapshot
140 });
141 AuditEvent {
142 event_id: uuid::Uuid::new_v4(),
143 schema_name: String::new(),
147 model: descriptor.schema_name.to_owned(),
148 operation,
149 primary_key,
150 actor: actor_from_context(ctx),
151 tenant: ctx.tenant_id().map(|s| s.to_owned()),
152 before,
153 after,
154 request_id: ctx.request_id().map(|s| s.to_owned()),
155 occurred_at: chrono::Utc::now(),
156 }
157}
158
159pub fn redact_snapshot(
165 snapshot: &mut serde_json::Value,
166 pii_columns: &[&str],
167 sensitive_columns: &[&str],
168) {
169 let Some(map) = snapshot.as_object_mut() else {
170 return;
171 };
172 for col in pii_columns {
173 if let Some(slot) = map.get_mut(*col) {
174 *slot = serde_json::Value::String("[redacted-pii]".to_owned());
175 }
176 let camel = snake_to_camel(col);
177 if camel != *col {
178 if let Some(slot) = map.get_mut(&camel) {
179 *slot = serde_json::Value::String("[redacted-pii]".to_owned());
180 }
181 }
182 }
183 for col in sensitive_columns {
184 if let Some(slot) = map.get_mut(*col) {
185 *slot = serde_json::Value::String("[redacted-sensitive]".to_owned());
186 }
187 let camel = snake_to_camel(col);
188 if camel != *col {
189 if let Some(slot) = map.get_mut(&camel) {
190 *slot = serde_json::Value::String("[redacted-sensitive]".to_owned());
191 }
192 }
193 }
194}
195
196pub(crate) async fn fetch_for_audit<'e, E, M, PK>(
200 executor: E,
201 descriptor: &'static ModelDescriptor<M, PK>,
202 id: PK,
203) -> Result<Option<M>, CoolError>
204where
205 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
206 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow>,
207 PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
208{
209 let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
210 query.push(descriptor.select_projection());
211 query.push(" FROM ").push(descriptor.table_name);
212 query
213 .push(" WHERE ")
214 .push(descriptor.primary_key)
215 .push(" = ");
216 query.push_bind(id);
217 query.push(" FOR UPDATE");
218 query
219 .build_query_as::<M>()
220 .fetch_optional(executor)
221 .await
222 .map_err(|error| CoolError::Database(error.to_string()))
223}
224
225pub fn snapshot_model<T>(model: &T) -> Option<serde_json::Value>
230where
231 T: serde::Serialize,
232{
233 serde_json::to_value(model).ok()
234}
235
236pub fn primary_key_from_snapshot(
240 snapshot: &serde_json::Value,
241 primary_key_column: &str,
242) -> serde_json::Value {
243 if let Some(map) = snapshot.as_object() {
244 if let Some(value) = map.get(primary_key_column) {
245 return value.clone();
246 }
247 let camel = snake_to_camel(primary_key_column);
250 if let Some(value) = map.get(&camel) {
251 return value.clone();
252 }
253 }
254 serde_json::Value::Null
255}
256
257fn snake_to_camel(input: &str) -> String {
258 let mut out = String::with_capacity(input.len());
259 let mut upper = false;
260 for ch in input.chars() {
261 if ch == '_' {
262 upper = true;
263 } else if upper {
264 out.extend(ch.to_uppercase());
265 upper = false;
266 } else {
267 out.push(ch);
268 }
269 }
270 out
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use serde_json::json;
277
278 #[test]
279 fn extracts_primary_key_by_snake_case_column() {
280 let snapshot = json!({ "user_id": 42, "balance": "10.00" });
281 let pk = primary_key_from_snapshot(&snapshot, "user_id");
282 assert_eq!(pk, json!(42));
283 }
284
285 #[test]
286 fn extracts_primary_key_via_camel_case_fallback() {
287 let snapshot = json!({ "userId": 42, "balance": "10.00" });
288 let pk = primary_key_from_snapshot(&snapshot, "user_id");
289 assert_eq!(pk, json!(42));
290 }
291
292 #[test]
293 fn returns_null_when_primary_key_absent() {
294 let snapshot = json!({ "balance": "10.00" });
295 let pk = primary_key_from_snapshot(&snapshot, "user_id");
296 assert_eq!(pk, serde_json::Value::Null);
297 }
298
299 #[test]
300 fn snapshot_round_trip_preserves_strings_and_numbers() {
301 let snap =
302 snapshot_model(&json!({ "amount": "12.34", "currency": "USD" })).expect("serializable");
303 assert_eq!(snap["amount"], json!("12.34"));
304 assert_eq!(snap["currency"], json!("USD"));
305 }
306
307 #[test]
308 fn audit_operation_string_is_lowercase() {
309 assert_eq!(AuditOperation::Create.as_str(), "create");
310 assert_eq!(AuditOperation::Update.as_str(), "update");
311 assert_eq!(AuditOperation::Delete.as_str(), "delete");
312 }
313
314 #[test]
315 fn redacts_pii_columns_with_canned_marker() {
316 let mut snap = json!({
317 "id": 1,
318 "email": "alice@example.com",
319 "balance": "10.00",
320 });
321 redact_snapshot(&mut snap, &["email"], &[]);
322 assert_eq!(snap["email"], json!("[redacted-pii]"));
323 assert_eq!(snap["balance"], json!("10.00"));
324 }
325
326 #[test]
327 fn redacts_sensitive_columns_with_distinct_marker() {
328 let mut snap = json!({
329 "id": 1,
330 "risk_score": 87,
331 });
332 redact_snapshot(&mut snap, &[], &["risk_score"]);
333 assert_eq!(snap["risk_score"], json!("[redacted-sensitive]"));
334 }
335
336 #[test]
337 fn redaction_handles_camel_case_keys() {
338 let mut snap = json!({
339 "id": 1,
340 "primaryEmail": "x@y.com",
341 });
342 redact_snapshot(&mut snap, &["primary_email"], &[]);
343 assert_eq!(snap["primaryEmail"], json!("[redacted-pii]"));
344 }
345
346 #[test]
347 fn redaction_is_noop_for_absent_columns() {
348 let mut snap = json!({ "id": 1 });
349 redact_snapshot(&mut snap, &["email"], &["risk_score"]);
350 assert_eq!(snap, json!({ "id": 1 }));
351 }
352}