1use deadpool_postgres::Pool;
7
8use super::*;
9
10#[derive(Clone)]
18pub struct PostgresAuditBackend {
19 pool: Pool,
21}
22
23impl PostgresAuditBackend {
24 pub async fn new(pool: Pool) -> AuditResult<Self> {
34 Self::ensure_table_exists(&pool).await?;
36 Ok(Self { pool })
37 }
38
39 async fn ensure_table_exists(pool: &Pool) -> AuditResult<()> {
41 let client = pool
42 .get()
43 .await
44 .map_err(|e| AuditError::DatabaseError(format!("Failed to get connection: {}", e)))?;
45
46 let create_table_sql = r"
48 CREATE TABLE IF NOT EXISTS audit_log (
49 id UUID PRIMARY KEY,
50 timestamp TIMESTAMPTZ NOT NULL,
51 event_type VARCHAR(255) NOT NULL,
52 user_id VARCHAR(255) NOT NULL,
53 username VARCHAR(255) NOT NULL,
54 ip_address VARCHAR(45) NOT NULL,
55 resource_type VARCHAR(255) NOT NULL,
56 resource_id VARCHAR(255),
57 action VARCHAR(255) NOT NULL,
58 before_state JSONB,
59 after_state JSONB,
60 status VARCHAR(32) NOT NULL,
61 error_message TEXT,
62 tenant_id VARCHAR(255),
63 metadata JSONB NOT NULL DEFAULT '{}'::JSONB
64 )
65 ";
66
67 client
68 .execute(create_table_sql, &[])
69 .await
70 .map_err(|e| AuditError::DatabaseError(format!("Failed to create table: {}", e)))?;
71
72 Self::ensure_indexes(&client).await?;
74
75 Ok(())
76 }
77
78 async fn ensure_indexes(client: &deadpool_postgres::Object) -> AuditResult<()> {
80 let indexes = vec![
81 "CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log (timestamp DESC)",
83 "CREATE INDEX IF NOT EXISTS idx_audit_user_id ON audit_log (user_id)",
85 "CREATE INDEX IF NOT EXISTS idx_audit_event_type ON audit_log (event_type)",
87 "CREATE INDEX IF NOT EXISTS idx_audit_tenant_id ON audit_log (tenant_id) WHERE tenant_id IS NOT NULL",
89 "CREATE INDEX IF NOT EXISTS idx_audit_tenant_time ON audit_log (tenant_id, timestamp DESC) WHERE tenant_id IS NOT NULL",
91 "CREATE INDEX IF NOT EXISTS idx_audit_user_time ON audit_log (user_id, timestamp DESC)",
92 "CREATE INDEX IF NOT EXISTS idx_audit_status ON audit_log (status) WHERE status != 'success'",
94 ];
95
96 for index_sql in indexes {
97 client
98 .execute(index_sql, &[])
99 .await
100 .map_err(|e| AuditError::DatabaseError(format!("Failed to create index: {}", e)))?;
101 }
102
103 Ok(())
104 }
105
106 fn parse_uuid(id: &str) -> AuditResult<uuid::Uuid> {
108 uuid::Uuid::parse_str(id)
109 .map_err(|e| AuditError::DatabaseError(format!("Invalid UUID: {}", e)))
110 }
111}
112
113#[async_trait::async_trait]
114impl AuditBackend for PostgresAuditBackend {
115 async fn log_event(&self, event: AuditEvent) -> AuditResult<()> {
117 event.validate()?;
119
120 let client =
121 self.pool.get().await.map_err(|e| {
122 AuditError::DatabaseError(format!("Failed to get connection: {}", e))
123 })?;
124
125 let event_id = Self::parse_uuid(&event.id)?;
126 let timestamp = chrono::DateTime::parse_from_rfc3339(&event.timestamp)
127 .map_err(|e| AuditError::DatabaseError(format!("Invalid timestamp format: {}", e)))?
128 .with_timezone(&chrono::Utc);
129
130 let insert_sql = r"
131 INSERT INTO audit_log (
132 id, timestamp, event_type, user_id, username, ip_address,
133 resource_type, resource_id, action, before_state, after_state,
134 status, error_message, tenant_id, metadata
135 ) VALUES (
136 $1, $2, $3, $4, $5, $6,
137 $7, $8, $9, $10, $11,
138 $12, $13, $14, $15
139 )
140 ";
141
142 client
143 .execute(
144 insert_sql,
145 &[
146 &event_id,
147 ×tamp,
148 &event.event_type,
149 &event.user_id,
150 &event.username,
151 &event.ip_address,
152 &event.resource_type,
153 &event.resource_id,
154 &event.action,
155 &event.before_state,
156 &event.after_state,
157 &event.status,
158 &event.error_message,
159 &event.tenant_id,
160 &event.metadata,
161 ],
162 )
163 .await
164 .map_err(|e| AuditError::DatabaseError(format!("Failed to insert event: {}", e)))?;
165
166 Ok(())
167 }
168
169 async fn query_events(&self, filters: AuditQueryFilters) -> AuditResult<Vec<AuditEvent>> {
171 let client =
172 self.pool.get().await.map_err(|e| {
173 AuditError::DatabaseError(format!("Failed to get connection: {}", e))
174 })?;
175
176 let mut query = "SELECT id, timestamp, event_type, user_id, username, ip_address, \
178 resource_type, resource_id, action, before_state, after_state, \
179 status, error_message, tenant_id, metadata \
180 FROM audit_log"
181 .to_string();
182
183 let mut where_parts = vec![];
185
186 if filters.event_type.is_some() {
187 where_parts.push("event_type = $1".to_string());
188 }
189 if filters.user_id.is_some() {
190 where_parts.push(format!("user_id = ${}", where_parts.len() + 1));
191 }
192 if filters.resource_type.is_some() {
193 where_parts.push(format!("resource_type = ${}", where_parts.len() + 1));
194 }
195 if filters.status.is_some() {
196 where_parts.push(format!("status = ${}", where_parts.len() + 1));
197 }
198 if filters.tenant_id.is_some() {
199 where_parts.push(format!("tenant_id = ${}", where_parts.len() + 1));
200 }
201 if filters.start_time.is_some() {
202 where_parts.push(format!("timestamp >= ${}", where_parts.len() + 1));
203 }
204 if filters.end_time.is_some() {
205 where_parts.push(format!("timestamp <= ${}", where_parts.len() + 1));
206 }
207
208 if !where_parts.is_empty() {
209 query.push_str(" WHERE ");
210 query.push_str(&where_parts.join(" AND "));
211 }
212
213 query.push_str(" ORDER BY timestamp DESC");
214
215 let limit = filters.limit.unwrap_or(100);
216 let offset = filters.offset.unwrap_or(0);
217 query.push_str(&format!(" LIMIT {} OFFSET {}", limit, offset));
218
219 let mut param_strs: Vec<String> = vec![];
221
222 if let Some(ref val) = filters.event_type {
223 param_strs.push(val.clone());
224 }
225 if let Some(ref val) = filters.user_id {
226 param_strs.push(val.clone());
227 }
228 if let Some(ref val) = filters.resource_type {
229 param_strs.push(val.clone());
230 }
231 if let Some(ref val) = filters.status {
232 param_strs.push(val.clone());
233 }
234 if let Some(ref val) = filters.tenant_id {
235 param_strs.push(val.clone());
236 }
237 if let Some(ref val) = filters.start_time {
238 param_strs.push(val.clone());
239 }
240 if let Some(ref val) = filters.end_time {
241 param_strs.push(val.clone());
242 }
243
244 let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = param_strs
246 .iter()
247 .map(|s| s as &(dyn tokio_postgres::types::ToSql + Sync))
248 .collect();
249
250 let rows = client
251 .query(query.as_str(), params.as_slice())
252 .await
253 .map_err(|e| AuditError::DatabaseError(format!("Query failed: {}", e)))?;
254
255 let mut events = vec![];
256 for row in rows {
257 let id: uuid::Uuid = row.get(0);
258 let timestamp: chrono::DateTime<chrono::Utc> = row.get(1);
259 let event_type: String = row.get(2);
260 let user_id: String = row.get(3);
261 let username: String = row.get(4);
262 let ip_address: String = row.get(5);
263 let resource_type: String = row.get(6);
264 let resource_id: Option<String> = row.get(7);
265 let action: String = row.get(8);
266 let before_state: Option<serde_json::Value> = row.get(9);
267 let after_state: Option<serde_json::Value> = row.get(10);
268 let status: String = row.get(11);
269 let error_message: Option<String> = row.get(12);
270 let tenant_id: Option<String> = row.get(13);
271 let metadata: serde_json::Value = row.get(14);
272
273 events.push(AuditEvent {
274 id: id.to_string(),
275 timestamp: timestamp.to_rfc3339(),
276 event_type,
277 user_id,
278 username,
279 ip_address,
280 resource_type,
281 resource_id,
282 action,
283 before_state,
284 after_state,
285 status,
286 error_message,
287 tenant_id,
288 metadata,
289 });
290 }
291
292 Ok(events)
293 }
294}
295
296pub use super::AuditBackend;