Skip to main content

pylon_runtime/
audit_backend.rs

1//! Persistent audit-log backends. Append-only by API; SQL-level
2//! tampering is a separate concern (DB user perms, immudb, S3
3//! delivery for archival).
4//!
5//! Schema is intentionally flat — one row per event, JSON for the
6//! metadata bag — so SIEM pipelines (Datadog, Splunk, Loki) can
7//! parse with one query.
8
9use std::sync::{Arc, Mutex};
10
11use pylon_auth::audit::{AuditAction, AuditBackend, AuditEvent};
12use rusqlite::Connection;
13
14const SQLITE_TABLE: &str = "_pylon_audit_events";
15const PG_TABLE: &str = "_pylon_audit_events";
16
17// ---------------------------------------------------------------------------
18// SQLite
19// ---------------------------------------------------------------------------
20
21pub struct SqliteAuditBackend {
22    conn: Arc<Mutex<Connection>>,
23}
24
25impl SqliteAuditBackend {
26    pub fn open(path: &str) -> Result<Self, String> {
27        let conn = Connection::open(path).map_err(|e| format!("open: {e}"))?;
28        Self::from_connection(conn)
29    }
30    pub fn in_memory() -> Result<Self, String> {
31        let conn = Connection::open_in_memory().map_err(|e| format!("open: {e}"))?;
32        Self::from_connection(conn)
33    }
34    fn from_connection(conn: Connection) -> Result<Self, String> {
35        conn.execute_batch(&format!(
36            "CREATE TABLE IF NOT EXISTS {SQLITE_TABLE} (
37                id TEXT PRIMARY KEY,
38                created_at INTEGER NOT NULL,
39                action TEXT NOT NULL,
40                user_id TEXT,
41                actor_id TEXT,
42                tenant_id TEXT,
43                ip TEXT,
44                user_agent TEXT,
45                success INTEGER NOT NULL,
46                reason TEXT,
47                metadata_json TEXT NOT NULL DEFAULT '{{}}'
48            );
49            CREATE INDEX IF NOT EXISTS {SQLITE_TABLE}_tenant_idx
50                ON {SQLITE_TABLE}(tenant_id, created_at DESC);
51            CREATE INDEX IF NOT EXISTS {SQLITE_TABLE}_user_idx
52                ON {SQLITE_TABLE}(user_id, created_at DESC);
53            CREATE INDEX IF NOT EXISTS {SQLITE_TABLE}_actor_idx
54                ON {SQLITE_TABLE}(actor_id, created_at DESC);"
55        ))
56        .map_err(|e| format!("init schema: {e}"))?;
57        Ok(Self {
58            conn: Arc::new(Mutex::new(conn)),
59        })
60    }
61}
62
63impl AuditBackend for SqliteAuditBackend {
64    fn append(&self, e: &AuditEvent) {
65        if let Ok(c) = self.conn.lock() {
66            let metadata_json = serde_json::to_string(&e.metadata).unwrap_or_else(|_| "{}".into());
67            let _ = c.execute(
68                &format!(
69                    "INSERT INTO {SQLITE_TABLE}
70                       (id, created_at, action, user_id, actor_id, tenant_id, ip,
71                        user_agent, success, reason, metadata_json)
72                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"
73                ),
74                rusqlite::params![
75                    e.id,
76                    e.created_at as i64,
77                    e.action.as_str(),
78                    e.user_id,
79                    e.actor_id,
80                    e.tenant_id,
81                    e.ip,
82                    e.user_agent,
83                    if e.success { 1i64 } else { 0 },
84                    e.reason,
85                    metadata_json,
86                ],
87            );
88        }
89    }
90
91    fn find_for_tenant(&self, tenant_id: &str, limit: usize) -> Vec<AuditEvent> {
92        let Ok(c) = self.conn.lock() else {
93            return vec![];
94        };
95        // Cap limit at 10k to defeat a runaway query parameter from
96        // an admin UI bug or an attacker probing for memory pressure.
97        let bounded = limit.min(10_000);
98        let mut stmt = match c.prepare(&format!(
99            "SELECT id, created_at, action, user_id, actor_id, tenant_id, ip,
100                    user_agent, success, reason, metadata_json
101             FROM {SQLITE_TABLE}
102             WHERE tenant_id = ?1
103             ORDER BY created_at DESC
104             LIMIT ?2"
105        )) {
106            Ok(s) => s,
107            Err(_) => return vec![],
108        };
109        let iter = match stmt.query_map(rusqlite::params![tenant_id, bounded as i64], row_to_event)
110        {
111            Ok(it) => it,
112            Err(_) => return vec![],
113        };
114        iter.filter_map(|r| r.ok()).collect()
115    }
116
117    fn find_for_user(&self, user_id: &str, limit: usize) -> Vec<AuditEvent> {
118        let Ok(c) = self.conn.lock() else {
119            return vec![];
120        };
121        let bounded = limit.min(10_000);
122        let mut stmt = match c.prepare(&format!(
123            "SELECT id, created_at, action, user_id, actor_id, tenant_id, ip,
124                    user_agent, success, reason, metadata_json
125             FROM {SQLITE_TABLE}
126             WHERE user_id = ?1 OR actor_id = ?1
127             ORDER BY created_at DESC
128             LIMIT ?2"
129        )) {
130            Ok(s) => s,
131            Err(_) => return vec![],
132        };
133        let iter = match stmt.query_map(rusqlite::params![user_id, bounded as i64], row_to_event) {
134            Ok(it) => it,
135            Err(_) => return vec![],
136        };
137        iter.filter_map(|r| r.ok()).collect()
138    }
139}
140
141fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<AuditEvent> {
142    let action_str: String = row.get(2)?;
143    let metadata_json: String = row.get(10)?;
144    let metadata = serde_json::from_str(&metadata_json).unwrap_or_default();
145    let success: i64 = row.get(8)?;
146    Ok(AuditEvent {
147        id: row.get(0)?,
148        created_at: row.get::<_, i64>(1)? as u64,
149        action: parse_action(&action_str),
150        user_id: row.get(3)?,
151        actor_id: row.get(4)?,
152        tenant_id: row.get(5)?,
153        ip: row.get(6)?,
154        user_agent: row.get(7)?,
155        success: success != 0,
156        reason: row.get(9)?,
157        metadata,
158    })
159}
160
161fn parse_action(s: &str) -> AuditAction {
162    match s {
163        "sign_in" => AuditAction::SignIn,
164        "sign_out" => AuditAction::SignOut,
165        "sign_in_failed" => AuditAction::SignInFailed,
166        "sign_up" => AuditAction::SignUp,
167        "password_change" => AuditAction::PasswordChange,
168        "password_reset" => AuditAction::PasswordReset,
169        "email_change" => AuditAction::EmailChange,
170        "totp_enroll" => AuditAction::TotpEnroll,
171        "totp_disable" => AuditAction::TotpDisable,
172        "totp_backup_codes_regenerate" => AuditAction::TotpBackupCodesRegenerate,
173        "passkey_register" => AuditAction::PasskeyRegister,
174        "passkey_revoke" => AuditAction::PasskeyRevoke,
175        "api_key_create" => AuditAction::ApiKeyCreate,
176        "api_key_revoke" => AuditAction::ApiKeyRevoke,
177        "oauth_link" => AuditAction::OauthLink,
178        "oauth_unlink" => AuditAction::OauthUnlink,
179        "org_create" => AuditAction::OrgCreate,
180        "org_delete" => AuditAction::OrgDelete,
181        "org_invite_send" => AuditAction::OrgInviteSend,
182        "org_invite_accept" => AuditAction::OrgInviteAccept,
183        "org_member_remove" => AuditAction::OrgMemberRemove,
184        "org_role_change" => AuditAction::OrgRoleChange,
185        "account_delete" => AuditAction::AccountDelete,
186        other => AuditAction::Custom(other.to_string()),
187    }
188}
189
190// ---------------------------------------------------------------------------
191// Postgres
192// ---------------------------------------------------------------------------
193
194pub use pg::PostgresAuditBackend;
195
196mod pg {
197    use super::*;
198    use postgres::Client;
199
200    pub struct PostgresAuditBackend {
201        client: Mutex<Client>,
202    }
203
204    impl PostgresAuditBackend {
205        pub fn connect(url: &str) -> Result<Self, String> {
206            let mut client = pylon_storage::postgres::live::connect_pg(url)?;
207            client
208                .batch_execute(&format!(
209                    "CREATE TABLE IF NOT EXISTS {PG_TABLE} (
210                        id TEXT PRIMARY KEY,
211                        created_at BIGINT NOT NULL,
212                        action TEXT NOT NULL,
213                        user_id TEXT,
214                        actor_id TEXT,
215                        tenant_id TEXT,
216                        ip TEXT,
217                        user_agent TEXT,
218                        success BOOLEAN NOT NULL,
219                        reason TEXT,
220                        metadata_json TEXT NOT NULL DEFAULT '{{}}'
221                    );
222                    CREATE INDEX IF NOT EXISTS {PG_TABLE}_tenant_idx
223                        ON {PG_TABLE}(tenant_id, created_at DESC);
224                    CREATE INDEX IF NOT EXISTS {PG_TABLE}_user_idx
225                        ON {PG_TABLE}(user_id, created_at DESC);
226                    CREATE INDEX IF NOT EXISTS {PG_TABLE}_actor_idx
227                        ON {PG_TABLE}(actor_id, created_at DESC);"
228                ))
229                .map_err(|e| format!("PG init schema: {e}"))?;
230            Ok(Self {
231                client: Mutex::new(client),
232            })
233        }
234    }
235
236    impl AuditBackend for PostgresAuditBackend {
237        fn append(&self, e: &AuditEvent) {
238            if let Ok(mut c) = self.client.lock() {
239                let metadata_json =
240                    serde_json::to_string(&e.metadata).unwrap_or_else(|_| "{}".into());
241                let _ = c.execute(
242                    &format!(
243                        "INSERT INTO {PG_TABLE}
244                           (id, created_at, action, user_id, actor_id, tenant_id, ip,
245                            user_agent, success, reason, metadata_json)
246                         VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
247                    ),
248                    &[
249                        &e.id,
250                        &(e.created_at as i64),
251                        &e.action.as_str(),
252                        &e.user_id,
253                        &e.actor_id,
254                        &e.tenant_id,
255                        &e.ip,
256                        &e.user_agent,
257                        &e.success,
258                        &e.reason,
259                        &metadata_json,
260                    ],
261                );
262            }
263        }
264
265        fn find_for_tenant(&self, tenant_id: &str, limit: usize) -> Vec<AuditEvent> {
266            let Ok(mut c) = self.client.lock() else {
267                return vec![];
268            };
269            let bounded = limit.min(10_000) as i64;
270            let rows = match c.query(
271                &format!(
272                    "SELECT id, created_at, action, user_id, actor_id, tenant_id, ip,
273                            user_agent, success, reason, metadata_json
274                     FROM {PG_TABLE}
275                     WHERE tenant_id = $1
276                     ORDER BY created_at DESC
277                     LIMIT $2"
278                ),
279                &[&tenant_id, &bounded],
280            ) {
281                Ok(r) => r,
282                Err(_) => return vec![],
283            };
284            rows.iter().map(pg_row_to_event).collect()
285        }
286
287        fn find_for_user(&self, user_id: &str, limit: usize) -> Vec<AuditEvent> {
288            let Ok(mut c) = self.client.lock() else {
289                return vec![];
290            };
291            let bounded = limit.min(10_000) as i64;
292            let rows = match c.query(
293                &format!(
294                    "SELECT id, created_at, action, user_id, actor_id, tenant_id, ip,
295                            user_agent, success, reason, metadata_json
296                     FROM {PG_TABLE}
297                     WHERE user_id = $1 OR actor_id = $1
298                     ORDER BY created_at DESC
299                     LIMIT $2"
300                ),
301                &[&user_id, &bounded],
302            ) {
303                Ok(r) => r,
304                Err(_) => return vec![],
305            };
306            rows.iter().map(pg_row_to_event).collect()
307        }
308    }
309
310    fn pg_row_to_event(row: &postgres::Row) -> AuditEvent {
311        let action_str: String = row.get(2);
312        let metadata_json: String = row.get(10);
313        let metadata = serde_json::from_str(&metadata_json).unwrap_or_default();
314        AuditEvent {
315            id: row.get(0),
316            created_at: row.get::<_, i64>(1) as u64,
317            action: parse_action(&action_str),
318            user_id: row.get(3),
319            actor_id: row.get(4),
320            tenant_id: row.get(5),
321            ip: row.get(6),
322            user_agent: row.get(7),
323            success: row.get(8),
324            reason: row.get(9),
325            metadata,
326        }
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use pylon_auth::audit::{AuditAction, AuditEventBuilder};
334
335    #[test]
336    fn sqlite_round_trip() {
337        let b = SqliteAuditBackend::in_memory().unwrap();
338        let e = AuditEventBuilder::new(AuditAction::SignIn)
339            .user("u1")
340            .tenant("t1")
341            .ip("1.2.3.4")
342            .user_agent("Test/1.0")
343            .meta("method", "password")
344            .build();
345        b.append(&e);
346        let got = b.find_for_user("u1", 10);
347        assert_eq!(got.len(), 1);
348        assert_eq!(got[0].user_id.as_deref(), Some("u1"));
349        assert_eq!(
350            got[0].metadata.get("method").map(|s| s.as_str()),
351            Some("password")
352        );
353        assert!(got[0].success);
354    }
355
356    #[test]
357    fn sqlite_tenant_isolation() {
358        let b = SqliteAuditBackend::in_memory().unwrap();
359        b.append(
360            &AuditEventBuilder::new(AuditAction::SignIn)
361                .tenant("a")
362                .user("u1")
363                .build(),
364        );
365        b.append(
366            &AuditEventBuilder::new(AuditAction::SignIn)
367                .tenant("b")
368                .user("u2")
369                .build(),
370        );
371        assert_eq!(b.find_for_tenant("a", 10).len(), 1);
372        assert_eq!(b.find_for_tenant("b", 10).len(), 1);
373        assert_eq!(b.find_for_tenant("c", 10).len(), 0);
374    }
375
376    #[test]
377    fn limit_capped_at_10k() {
378        let b = SqliteAuditBackend::in_memory().unwrap();
379        // Caller passes a wildly large limit — the cap kicks in
380        // before SQL sees it. Here we just check the SQL path
381        // doesn't choke on usize::MAX.
382        let _ = b.find_for_tenant("t", usize::MAX);
383    }
384
385    #[test]
386    fn failed_event_persists_with_reason() {
387        let b = SqliteAuditBackend::in_memory().unwrap();
388        b.append(
389            &AuditEventBuilder::new(AuditAction::SignInFailed)
390                .user("u1")
391                .failed("WRONG_PASSWORD")
392                .build(),
393        );
394        let got = b.find_for_user("u1", 10);
395        assert!(!got[0].success);
396        assert_eq!(got[0].reason.as_deref(), Some("WRONG_PASSWORD"));
397    }
398
399    #[test]
400    fn ordering_is_newest_first_via_index() {
401        let b = SqliteAuditBackend::in_memory().unwrap();
402        // Insert in scrambled time order — the DESC ORDER BY must
403        // return them sorted regardless of insertion order.
404        for ts in [200u64, 100, 300, 50] {
405            b.append(&AuditEvent {
406                id: format!("evt_{ts}"),
407                created_at: ts,
408                action: AuditAction::SignIn,
409                user_id: Some("u".into()),
410                actor_id: None,
411                tenant_id: Some("t".into()),
412                ip: None,
413                user_agent: None,
414                success: true,
415                reason: None,
416                metadata: std::collections::HashMap::new(),
417            });
418        }
419        let got = b.find_for_tenant("t", 10);
420        let timestamps: Vec<u64> = got.iter().map(|e| e.created_at).collect();
421        assert_eq!(timestamps, vec![300, 200, 100, 50]);
422    }
423}