1use std::sync::{Arc, Mutex};
10
11use pylon_auth::audit::{AuditAction, AuditBackend, AuditEvent};
12use rusqlite::Connection;
13
14const SQLITE_TABLE: &str = "_pylon_audit_events";
15const PG_TABLE: &str = "_pylon_audit_events";
16
17pub struct SqliteAuditBackend {
22 conn: Arc<Mutex<Connection>>,
23}
24
25impl SqliteAuditBackend {
26 pub fn open(path: &str) -> Result<Self, String> {
27 let conn = Connection::open(path).map_err(|e| format!("open: {e}"))?;
28 Self::from_connection(conn)
29 }
30 pub fn in_memory() -> Result<Self, String> {
31 let conn = Connection::open_in_memory().map_err(|e| format!("open: {e}"))?;
32 Self::from_connection(conn)
33 }
34 fn from_connection(conn: Connection) -> Result<Self, String> {
35 conn.execute_batch(&format!(
36 "CREATE TABLE IF NOT EXISTS {SQLITE_TABLE} (
37 id TEXT PRIMARY KEY,
38 created_at INTEGER NOT NULL,
39 action TEXT NOT NULL,
40 user_id TEXT,
41 actor_id TEXT,
42 tenant_id TEXT,
43 ip TEXT,
44 user_agent TEXT,
45 success INTEGER NOT NULL,
46 reason TEXT,
47 metadata_json TEXT NOT NULL DEFAULT '{{}}'
48 );
49 CREATE INDEX IF NOT EXISTS {SQLITE_TABLE}_tenant_idx
50 ON {SQLITE_TABLE}(tenant_id, created_at DESC);
51 CREATE INDEX IF NOT EXISTS {SQLITE_TABLE}_user_idx
52 ON {SQLITE_TABLE}(user_id, created_at DESC);
53 CREATE INDEX IF NOT EXISTS {SQLITE_TABLE}_actor_idx
54 ON {SQLITE_TABLE}(actor_id, created_at DESC);"
55 ))
56 .map_err(|e| format!("init schema: {e}"))?;
57 Ok(Self {
58 conn: Arc::new(Mutex::new(conn)),
59 })
60 }
61}
62
63impl AuditBackend for SqliteAuditBackend {
64 fn append(&self, e: &AuditEvent) {
65 if let Ok(c) = self.conn.lock() {
66 let metadata_json = serde_json::to_string(&e.metadata).unwrap_or_else(|_| "{}".into());
67 let _ = c.execute(
68 &format!(
69 "INSERT INTO {SQLITE_TABLE}
70 (id, created_at, action, user_id, actor_id, tenant_id, ip,
71 user_agent, success, reason, metadata_json)
72 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"
73 ),
74 rusqlite::params![
75 e.id,
76 e.created_at as i64,
77 e.action.as_str(),
78 e.user_id,
79 e.actor_id,
80 e.tenant_id,
81 e.ip,
82 e.user_agent,
83 if e.success { 1i64 } else { 0 },
84 e.reason,
85 metadata_json,
86 ],
87 );
88 }
89 }
90
91 fn find_for_tenant(&self, tenant_id: &str, limit: usize) -> Vec<AuditEvent> {
92 let Ok(c) = self.conn.lock() else {
93 return vec![];
94 };
95 let bounded = limit.min(10_000);
98 let mut stmt = match c.prepare(&format!(
99 "SELECT id, created_at, action, user_id, actor_id, tenant_id, ip,
100 user_agent, success, reason, metadata_json
101 FROM {SQLITE_TABLE}
102 WHERE tenant_id = ?1
103 ORDER BY created_at DESC
104 LIMIT ?2"
105 )) {
106 Ok(s) => s,
107 Err(_) => return vec![],
108 };
109 let iter = match stmt.query_map(rusqlite::params![tenant_id, bounded as i64], row_to_event)
110 {
111 Ok(it) => it,
112 Err(_) => return vec![],
113 };
114 iter.filter_map(|r| r.ok()).collect()
115 }
116
117 fn find_for_user(&self, user_id: &str, limit: usize) -> Vec<AuditEvent> {
118 let Ok(c) = self.conn.lock() else {
119 return vec![];
120 };
121 let bounded = limit.min(10_000);
122 let mut stmt = match c.prepare(&format!(
123 "SELECT id, created_at, action, user_id, actor_id, tenant_id, ip,
124 user_agent, success, reason, metadata_json
125 FROM {SQLITE_TABLE}
126 WHERE user_id = ?1 OR actor_id = ?1
127 ORDER BY created_at DESC
128 LIMIT ?2"
129 )) {
130 Ok(s) => s,
131 Err(_) => return vec![],
132 };
133 let iter = match stmt.query_map(rusqlite::params![user_id, bounded as i64], row_to_event) {
134 Ok(it) => it,
135 Err(_) => return vec![],
136 };
137 iter.filter_map(|r| r.ok()).collect()
138 }
139}
140
141fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<AuditEvent> {
142 let action_str: String = row.get(2)?;
143 let metadata_json: String = row.get(10)?;
144 let metadata = serde_json::from_str(&metadata_json).unwrap_or_default();
145 let success: i64 = row.get(8)?;
146 Ok(AuditEvent {
147 id: row.get(0)?,
148 created_at: row.get::<_, i64>(1)? as u64,
149 action: parse_action(&action_str),
150 user_id: row.get(3)?,
151 actor_id: row.get(4)?,
152 tenant_id: row.get(5)?,
153 ip: row.get(6)?,
154 user_agent: row.get(7)?,
155 success: success != 0,
156 reason: row.get(9)?,
157 metadata,
158 })
159}
160
161fn parse_action(s: &str) -> AuditAction {
162 match s {
163 "sign_in" => AuditAction::SignIn,
164 "sign_out" => AuditAction::SignOut,
165 "sign_in_failed" => AuditAction::SignInFailed,
166 "sign_up" => AuditAction::SignUp,
167 "password_change" => AuditAction::PasswordChange,
168 "password_reset" => AuditAction::PasswordReset,
169 "email_change" => AuditAction::EmailChange,
170 "totp_enroll" => AuditAction::TotpEnroll,
171 "totp_disable" => AuditAction::TotpDisable,
172 "totp_backup_codes_regenerate" => AuditAction::TotpBackupCodesRegenerate,
173 "passkey_register" => AuditAction::PasskeyRegister,
174 "passkey_revoke" => AuditAction::PasskeyRevoke,
175 "api_key_create" => AuditAction::ApiKeyCreate,
176 "api_key_revoke" => AuditAction::ApiKeyRevoke,
177 "oauth_link" => AuditAction::OauthLink,
178 "oauth_unlink" => AuditAction::OauthUnlink,
179 "org_create" => AuditAction::OrgCreate,
180 "org_delete" => AuditAction::OrgDelete,
181 "org_invite_send" => AuditAction::OrgInviteSend,
182 "org_invite_accept" => AuditAction::OrgInviteAccept,
183 "org_member_remove" => AuditAction::OrgMemberRemove,
184 "org_role_change" => AuditAction::OrgRoleChange,
185 "account_delete" => AuditAction::AccountDelete,
186 other => AuditAction::Custom(other.to_string()),
187 }
188}
189
190pub use pg::PostgresAuditBackend;
195
196mod pg {
197 use super::*;
198 use postgres::Client;
199
200 pub struct PostgresAuditBackend {
201 client: Mutex<Client>,
202 }
203
204 impl PostgresAuditBackend {
205 pub fn connect(url: &str) -> Result<Self, String> {
206 let mut client = pylon_storage::postgres::live::connect_pg(url)?;
207 client
208 .batch_execute(&format!(
209 "CREATE TABLE IF NOT EXISTS {PG_TABLE} (
210 id TEXT PRIMARY KEY,
211 created_at BIGINT NOT NULL,
212 action TEXT NOT NULL,
213 user_id TEXT,
214 actor_id TEXT,
215 tenant_id TEXT,
216 ip TEXT,
217 user_agent TEXT,
218 success BOOLEAN NOT NULL,
219 reason TEXT,
220 metadata_json TEXT NOT NULL DEFAULT '{{}}'
221 );
222 CREATE INDEX IF NOT EXISTS {PG_TABLE}_tenant_idx
223 ON {PG_TABLE}(tenant_id, created_at DESC);
224 CREATE INDEX IF NOT EXISTS {PG_TABLE}_user_idx
225 ON {PG_TABLE}(user_id, created_at DESC);
226 CREATE INDEX IF NOT EXISTS {PG_TABLE}_actor_idx
227 ON {PG_TABLE}(actor_id, created_at DESC);"
228 ))
229 .map_err(|e| format!("PG init schema: {e}"))?;
230 Ok(Self {
231 client: Mutex::new(client),
232 })
233 }
234 }
235
236 impl AuditBackend for PostgresAuditBackend {
237 fn append(&self, e: &AuditEvent) {
238 if let Ok(mut c) = self.client.lock() {
239 let metadata_json =
240 serde_json::to_string(&e.metadata).unwrap_or_else(|_| "{}".into());
241 let _ = c.execute(
242 &format!(
243 "INSERT INTO {PG_TABLE}
244 (id, created_at, action, user_id, actor_id, tenant_id, ip,
245 user_agent, success, reason, metadata_json)
246 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
247 ),
248 &[
249 &e.id,
250 &(e.created_at as i64),
251 &e.action.as_str(),
252 &e.user_id,
253 &e.actor_id,
254 &e.tenant_id,
255 &e.ip,
256 &e.user_agent,
257 &e.success,
258 &e.reason,
259 &metadata_json,
260 ],
261 );
262 }
263 }
264
265 fn find_for_tenant(&self, tenant_id: &str, limit: usize) -> Vec<AuditEvent> {
266 let Ok(mut c) = self.client.lock() else {
267 return vec![];
268 };
269 let bounded = limit.min(10_000) as i64;
270 let rows = match c.query(
271 &format!(
272 "SELECT id, created_at, action, user_id, actor_id, tenant_id, ip,
273 user_agent, success, reason, metadata_json
274 FROM {PG_TABLE}
275 WHERE tenant_id = $1
276 ORDER BY created_at DESC
277 LIMIT $2"
278 ),
279 &[&tenant_id, &bounded],
280 ) {
281 Ok(r) => r,
282 Err(_) => return vec![],
283 };
284 rows.iter().map(pg_row_to_event).collect()
285 }
286
287 fn find_for_user(&self, user_id: &str, limit: usize) -> Vec<AuditEvent> {
288 let Ok(mut c) = self.client.lock() else {
289 return vec![];
290 };
291 let bounded = limit.min(10_000) as i64;
292 let rows = match c.query(
293 &format!(
294 "SELECT id, created_at, action, user_id, actor_id, tenant_id, ip,
295 user_agent, success, reason, metadata_json
296 FROM {PG_TABLE}
297 WHERE user_id = $1 OR actor_id = $1
298 ORDER BY created_at DESC
299 LIMIT $2"
300 ),
301 &[&user_id, &bounded],
302 ) {
303 Ok(r) => r,
304 Err(_) => return vec![],
305 };
306 rows.iter().map(pg_row_to_event).collect()
307 }
308 }
309
310 fn pg_row_to_event(row: &postgres::Row) -> AuditEvent {
311 let action_str: String = row.get(2);
312 let metadata_json: String = row.get(10);
313 let metadata = serde_json::from_str(&metadata_json).unwrap_or_default();
314 AuditEvent {
315 id: row.get(0),
316 created_at: row.get::<_, i64>(1) as u64,
317 action: parse_action(&action_str),
318 user_id: row.get(3),
319 actor_id: row.get(4),
320 tenant_id: row.get(5),
321 ip: row.get(6),
322 user_agent: row.get(7),
323 success: row.get(8),
324 reason: row.get(9),
325 metadata,
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use pylon_auth::audit::{AuditAction, AuditEventBuilder};
334
335 #[test]
336 fn sqlite_round_trip() {
337 let b = SqliteAuditBackend::in_memory().unwrap();
338 let e = AuditEventBuilder::new(AuditAction::SignIn)
339 .user("u1")
340 .tenant("t1")
341 .ip("1.2.3.4")
342 .user_agent("Test/1.0")
343 .meta("method", "password")
344 .build();
345 b.append(&e);
346 let got = b.find_for_user("u1", 10);
347 assert_eq!(got.len(), 1);
348 assert_eq!(got[0].user_id.as_deref(), Some("u1"));
349 assert_eq!(
350 got[0].metadata.get("method").map(|s| s.as_str()),
351 Some("password")
352 );
353 assert!(got[0].success);
354 }
355
356 #[test]
357 fn sqlite_tenant_isolation() {
358 let b = SqliteAuditBackend::in_memory().unwrap();
359 b.append(
360 &AuditEventBuilder::new(AuditAction::SignIn)
361 .tenant("a")
362 .user("u1")
363 .build(),
364 );
365 b.append(
366 &AuditEventBuilder::new(AuditAction::SignIn)
367 .tenant("b")
368 .user("u2")
369 .build(),
370 );
371 assert_eq!(b.find_for_tenant("a", 10).len(), 1);
372 assert_eq!(b.find_for_tenant("b", 10).len(), 1);
373 assert_eq!(b.find_for_tenant("c", 10).len(), 0);
374 }
375
376 #[test]
377 fn limit_capped_at_10k() {
378 let b = SqliteAuditBackend::in_memory().unwrap();
379 let _ = b.find_for_tenant("t", usize::MAX);
383 }
384
385 #[test]
386 fn failed_event_persists_with_reason() {
387 let b = SqliteAuditBackend::in_memory().unwrap();
388 b.append(
389 &AuditEventBuilder::new(AuditAction::SignInFailed)
390 .user("u1")
391 .failed("WRONG_PASSWORD")
392 .build(),
393 );
394 let got = b.find_for_user("u1", 10);
395 assert!(!got[0].success);
396 assert_eq!(got[0].reason.as_deref(), Some("WRONG_PASSWORD"));
397 }
398
399 #[test]
400 fn ordering_is_newest_first_via_index() {
401 let b = SqliteAuditBackend::in_memory().unwrap();
402 for ts in [200u64, 100, 300, 50] {
405 b.append(&AuditEvent {
406 id: format!("evt_{ts}"),
407 created_at: ts,
408 action: AuditAction::SignIn,
409 user_id: Some("u".into()),
410 actor_id: None,
411 tenant_id: Some("t".into()),
412 ip: None,
413 user_agent: None,
414 success: true,
415 reason: None,
416 metadata: std::collections::HashMap::new(),
417 });
418 }
419 let got = b.find_for_tenant("t", 10);
420 let timestamps: Vec<u64> = got.iter().map(|e| e.created_at).collect();
421 assert_eq!(timestamps, vec![300, 200, 100, 50]);
422 }
423}