Skip to main content

fraiseql_core/audit/
postgres_backend.rs

1//! PostgreSQL audit backend
2//!
3//! Stores audit events in PostgreSQL with full-text search, JSONB storage,
4//! and performance-optimized indexes.
5
6use deadpool_postgres::Pool;
7
8use super::*;
9
10/// PostgreSQL audit backend for persistent, queryable audit logs.
11///
12/// Features:
13/// - JSONB columns for metadata and state snapshots
14/// - Optimized indexes for common query patterns
15/// - Multi-tenancy support with tenant_id isolation
16/// - Connection pooling via deadpool-postgres
17#[derive(Clone)]
18pub struct PostgresAuditBackend {
19    /// Connection pool for database access
20    pool: Pool,
21}
22
23impl PostgresAuditBackend {
24    /// Create a new PostgreSQL audit backend.
25    ///
26    /// # Arguments
27    ///
28    /// * `pool` - Database connection pool
29    ///
30    /// # Errors
31    ///
32    /// Returns error if table creation fails
33    pub async fn new(pool: Pool) -> AuditResult<Self> {
34        // Ensure audit_log table exists with proper schema
35        Self::ensure_table_exists(&pool).await?;
36        Ok(Self { pool })
37    }
38
39    /// Ensure audit_log table exists with proper schema and indexes.
40    async fn ensure_table_exists(pool: &Pool) -> AuditResult<()> {
41        let client = pool
42            .get()
43            .await
44            .map_err(|e| AuditError::DatabaseError(format!("Failed to get connection: {}", e)))?;
45
46        // Create audit_log table if not exists
47        let create_table_sql = r"
48            CREATE TABLE IF NOT EXISTS audit_log (
49                id UUID PRIMARY KEY,
50                timestamp TIMESTAMPTZ NOT NULL,
51                event_type VARCHAR(255) NOT NULL,
52                user_id VARCHAR(255) NOT NULL,
53                username VARCHAR(255) NOT NULL,
54                ip_address VARCHAR(45) NOT NULL,
55                resource_type VARCHAR(255) NOT NULL,
56                resource_id VARCHAR(255),
57                action VARCHAR(255) NOT NULL,
58                before_state JSONB,
59                after_state JSONB,
60                status VARCHAR(32) NOT NULL,
61                error_message TEXT,
62                tenant_id VARCHAR(255),
63                metadata JSONB NOT NULL DEFAULT '{}'::JSONB
64            )
65        ";
66
67        client
68            .execute(create_table_sql, &[])
69            .await
70            .map_err(|e| AuditError::DatabaseError(format!("Failed to create table: {}", e)))?;
71
72        // Create indexes for performance
73        Self::ensure_indexes(&client).await?;
74
75        Ok(())
76    }
77
78    /// Create performance indexes if they don't exist.
79    async fn ensure_indexes(client: &deadpool_postgres::Object) -> AuditResult<()> {
80        let indexes = vec![
81            // Index on timestamp for time range queries (descending for recent-first)
82            "CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log (timestamp DESC)",
83            // Index on user_id for user-specific audits
84            "CREATE INDEX IF NOT EXISTS idx_audit_user_id ON audit_log (user_id)",
85            // Index on event_type for event filtering
86            "CREATE INDEX IF NOT EXISTS idx_audit_event_type ON audit_log (event_type)",
87            // Partial index on tenant_id (only non-null for efficiency)
88            "CREATE INDEX IF NOT EXISTS idx_audit_tenant_id ON audit_log (tenant_id) WHERE tenant_id IS NOT NULL",
89            // Composite indexes for common query patterns
90            "CREATE INDEX IF NOT EXISTS idx_audit_tenant_time ON audit_log (tenant_id, timestamp DESC) WHERE tenant_id IS NOT NULL",
91            "CREATE INDEX IF NOT EXISTS idx_audit_user_time ON audit_log (user_id, timestamp DESC)",
92            // Index on status for failure/denied queries
93            "CREATE INDEX IF NOT EXISTS idx_audit_status ON audit_log (status) WHERE status != 'success'",
94        ];
95
96        for index_sql in indexes {
97            client
98                .execute(index_sql, &[])
99                .await
100                .map_err(|e| AuditError::DatabaseError(format!("Failed to create index: {}", e)))?;
101        }
102
103        Ok(())
104    }
105
106    /// Convert UUID from string to bytes for PostgreSQL UUID type.
107    fn parse_uuid(id: &str) -> AuditResult<uuid::Uuid> {
108        uuid::Uuid::parse_str(id)
109            .map_err(|e| AuditError::DatabaseError(format!("Invalid UUID: {}", e)))
110    }
111}
112
113#[async_trait::async_trait]
114impl AuditBackend for PostgresAuditBackend {
115    /// Log an audit event to PostgreSQL.
116    async fn log_event(&self, event: AuditEvent) -> AuditResult<()> {
117        // Validate event before logging
118        event.validate()?;
119
120        let client =
121            self.pool.get().await.map_err(|e| {
122                AuditError::DatabaseError(format!("Failed to get connection: {}", e))
123            })?;
124
125        let event_id = Self::parse_uuid(&event.id)?;
126        let timestamp = chrono::DateTime::parse_from_rfc3339(&event.timestamp)
127            .map_err(|e| AuditError::DatabaseError(format!("Invalid timestamp format: {}", e)))?
128            .with_timezone(&chrono::Utc);
129
130        let insert_sql = r"
131            INSERT INTO audit_log (
132                id, timestamp, event_type, user_id, username, ip_address,
133                resource_type, resource_id, action, before_state, after_state,
134                status, error_message, tenant_id, metadata
135            ) VALUES (
136                $1, $2, $3, $4, $5, $6,
137                $7, $8, $9, $10, $11,
138                $12, $13, $14, $15
139            )
140        ";
141
142        client
143            .execute(
144                insert_sql,
145                &[
146                    &event_id,
147                    &timestamp,
148                    &event.event_type,
149                    &event.user_id,
150                    &event.username,
151                    &event.ip_address,
152                    &event.resource_type,
153                    &event.resource_id,
154                    &event.action,
155                    &event.before_state,
156                    &event.after_state,
157                    &event.status,
158                    &event.error_message,
159                    &event.tenant_id,
160                    &event.metadata,
161                ],
162            )
163            .await
164            .map_err(|e| AuditError::DatabaseError(format!("Failed to insert event: {}", e)))?;
165
166        Ok(())
167    }
168
169    /// Query audit events from PostgreSQL with filters.
170    async fn query_events(&self, filters: AuditQueryFilters) -> AuditResult<Vec<AuditEvent>> {
171        let client =
172            self.pool.get().await.map_err(|e| {
173                AuditError::DatabaseError(format!("Failed to get connection: {}", e))
174            })?;
175
176        // Start with base query
177        let mut query = "SELECT id, timestamp, event_type, user_id, username, ip_address, \
178                         resource_type, resource_id, action, before_state, after_state, \
179                         status, error_message, tenant_id, metadata \
180                         FROM audit_log"
181            .to_string();
182
183        // Build WHERE clause with filters
184        let mut where_parts = vec![];
185
186        if filters.event_type.is_some() {
187            where_parts.push("event_type = $1".to_string());
188        }
189        if filters.user_id.is_some() {
190            where_parts.push(format!("user_id = ${}", where_parts.len() + 1));
191        }
192        if filters.resource_type.is_some() {
193            where_parts.push(format!("resource_type = ${}", where_parts.len() + 1));
194        }
195        if filters.status.is_some() {
196            where_parts.push(format!("status = ${}", where_parts.len() + 1));
197        }
198        if filters.tenant_id.is_some() {
199            where_parts.push(format!("tenant_id = ${}", where_parts.len() + 1));
200        }
201        if filters.start_time.is_some() {
202            where_parts.push(format!("timestamp >= ${}", where_parts.len() + 1));
203        }
204        if filters.end_time.is_some() {
205            where_parts.push(format!("timestamp <= ${}", where_parts.len() + 1));
206        }
207
208        if !where_parts.is_empty() {
209            query.push_str(" WHERE ");
210            query.push_str(&where_parts.join(" AND "));
211        }
212
213        query.push_str(" ORDER BY timestamp DESC");
214
215        let limit = filters.limit.unwrap_or(100);
216        let offset = filters.offset.unwrap_or(0);
217        query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
218
219        // Build parameter vector in correct order
220        let mut param_strs: Vec<String> = vec![];
221
222        if let Some(ref val) = filters.event_type {
223            param_strs.push(val.clone());
224        }
225        if let Some(ref val) = filters.user_id {
226            param_strs.push(val.clone());
227        }
228        if let Some(ref val) = filters.resource_type {
229            param_strs.push(val.clone());
230        }
231        if let Some(ref val) = filters.status {
232            param_strs.push(val.clone());
233        }
234        if let Some(ref val) = filters.tenant_id {
235            param_strs.push(val.clone());
236        }
237        if let Some(ref val) = filters.start_time {
238            param_strs.push(val.clone());
239        }
240        if let Some(ref val) = filters.end_time {
241            param_strs.push(val.clone());
242        }
243
244        // Convert owned strings to references for query parameters
245        let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = param_strs
246            .iter()
247            .map(|s| s as &(dyn tokio_postgres::types::ToSql + Sync))
248            .collect();
249
250        let rows = client
251            .query(query.as_str(), params.as_slice())
252            .await
253            .map_err(|e| AuditError::DatabaseError(format!("Query failed: {}", e)))?;
254
255        let mut events = vec![];
256        for row in rows {
257            let id: uuid::Uuid = row.get(0);
258            let timestamp: chrono::DateTime<chrono::Utc> = row.get(1);
259            let event_type: String = row.get(2);
260            let user_id: String = row.get(3);
261            let username: String = row.get(4);
262            let ip_address: String = row.get(5);
263            let resource_type: String = row.get(6);
264            let resource_id: Option<String> = row.get(7);
265            let action: String = row.get(8);
266            let before_state: Option<serde_json::Value> = row.get(9);
267            let after_state: Option<serde_json::Value> = row.get(10);
268            let status: String = row.get(11);
269            let error_message: Option<String> = row.get(12);
270            let tenant_id: Option<String> = row.get(13);
271            let metadata: serde_json::Value = row.get(14);
272
273            events.push(AuditEvent {
274                id: id.to_string(),
275                timestamp: timestamp.to_rfc3339(),
276                event_type,
277                user_id,
278                username,
279                ip_address,
280                resource_type,
281                resource_id,
282                action,
283                before_state,
284                after_state,
285                status,
286                error_message,
287                tenant_id,
288                metadata,
289            });
290        }
291
292        Ok(events)
293    }
294}
295
296// Re-export for convenience
297pub use super::AuditBackend;