1use super::{ArtifactStore, AuditStore, BaselineStore, StorageHealth};
7use crate::error::StoreError;
8use crate::models::{
9 AuditAction, AuditEvent, AuditResourceType, BaselineRecord, BaselineSource, BaselineVersion,
10 ListAuditEventsQuery, ListAuditEventsResponse, ListBaselinesQuery, ListBaselinesResponse,
11 ListVerdictsQuery, ListVerdictsResponse, PaginationInfo, VerdictRecord,
12};
13use crate::server::PostgresPoolConfig;
14use async_trait::async_trait;
15use perfgate_types::VerdictStatus;
16use sqlx::{Connection, Executor, PgPool, Row, postgres::PgPoolOptions};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{info, warn};
20
21#[derive(Debug, Clone)]
23pub struct PostgresStore {
24 pool: PgPool,
25 artifacts: Option<Arc<dyn ArtifactStore>>,
26}
27
28const MAX_RETRIES: u32 = 3;
30
31const INITIAL_BACKOFF: Duration = Duration::from_millis(250);
33
34fn is_transient(err: &sqlx::Error) -> bool {
37 match err {
38 sqlx::Error::PoolTimedOut => true,
39 sqlx::Error::PoolClosed => false,
40 sqlx::Error::Io(_) => true,
41 sqlx::Error::Database(db_err) => {
42 db_err
46 .code()
47 .map(|c| c.starts_with("08") || c.starts_with("57P"))
48 .unwrap_or(false)
49 }
50 _ => {
51 let msg = err.to_string().to_lowercase();
52 msg.contains("connection refused")
53 || msg.contains("connection reset")
54 || msg.contains("broken pipe")
55 || msg.contains("timed out")
56 }
57 }
58}
59
60impl PostgresStore {
61 pub async fn new(
68 url: &str,
69 artifacts: Option<Arc<dyn ArtifactStore>>,
70 pool_config: &PostgresPoolConfig,
71 ) -> Result<Self, StoreError> {
72 let stmt_timeout_ms = pool_config.statement_timeout.as_millis() as u64;
73
74 let pool = PgPoolOptions::new()
75 .max_connections(pool_config.max_connections)
76 .min_connections(pool_config.min_connections)
77 .idle_timeout(pool_config.idle_timeout)
78 .max_lifetime(pool_config.max_lifetime)
79 .acquire_timeout(pool_config.acquire_timeout)
80 .after_connect(move |conn, _meta| {
81 Box::pin(async move {
82 conn.execute(format!("SET statement_timeout = '{}'", stmt_timeout_ms).as_str())
83 .await?;
84 Ok(())
85 })
86 })
87 .before_acquire(|conn, _meta| {
88 Box::pin(async move {
89 conn.ping().await?;
91 Ok(true)
92 })
93 })
94 .connect(url)
95 .await
96 .map_err(|e| StoreError::ConnectionError(e.to_string()))?;
97
98 info!(
99 max = pool_config.max_connections,
100 min = pool_config.min_connections,
101 idle_timeout_s = pool_config.idle_timeout.as_secs(),
102 max_lifetime_s = pool_config.max_lifetime.as_secs(),
103 acquire_timeout_s = pool_config.acquire_timeout.as_secs(),
104 statement_timeout_ms = stmt_timeout_ms,
105 "PostgreSQL connection pool configured"
106 );
107
108 let store = Self { pool, artifacts };
109 store.init_schema().await?;
110 Ok(store)
111 }
112
113 fn pg_pool_metrics(&self) -> crate::models::PoolMetrics {
115 let size = self.pool.size();
116 let num_idle = self.pool.num_idle() as u32;
117 crate::models::PoolMetrics {
118 idle: num_idle,
119 active: size.saturating_sub(num_idle),
120 max: self.pool.options().get_max_connections(),
121 }
122 }
123
124 pub(crate) async fn with_retry<F, Fut, T>(&self, operation: F) -> Result<T, StoreError>
130 where
131 F: Fn(PgPool) -> Fut,
132 Fut: std::future::Future<Output = Result<T, sqlx::Error>>,
133 {
134 let mut last_err = None;
135 let mut backoff = INITIAL_BACKOFF;
136
137 for attempt in 0..=MAX_RETRIES {
138 match operation(self.pool.clone()).await {
139 Ok(val) => return Ok(val),
140 Err(e) if is_transient(&e) && attempt < MAX_RETRIES => {
141 warn!(
142 attempt = attempt + 1,
143 max = MAX_RETRIES,
144 backoff_ms = backoff.as_millis() as u64,
145 error = %e,
146 "Transient database error, retrying"
147 );
148 tokio::time::sleep(backoff).await;
149 backoff *= 2;
150 last_err = Some(e);
151 }
152 Err(e) => {
153 return Err(StoreError::QueryError(e.to_string()));
154 }
155 }
156 }
157
158 Err(StoreError::QueryError(
159 last_err
160 .map(|e| e.to_string())
161 .unwrap_or_else(|| "Unknown error after retries".to_string()),
162 ))
163 }
164
165 async fn init_schema(&self) -> Result<(), StoreError> {
166 let sql = r#"
167 CREATE TABLE IF NOT EXISTS baselines (
168 id VARCHAR(26) PRIMARY KEY,
169 project VARCHAR(255) NOT NULL,
170 benchmark VARCHAR(255) NOT NULL,
171 version VARCHAR(64) NOT NULL,
172 schema_id VARCHAR(64) NOT NULL,
173 git_ref VARCHAR(255),
174 git_sha VARCHAR(40),
175 receipt JSONB,
176 artifact_path TEXT,
177 metadata JSONB NOT NULL,
178 tags JSONB NOT NULL,
179 created_at TIMESTAMPTZ NOT NULL,
180 updated_at TIMESTAMPTZ NOT NULL,
181 content_hash VARCHAR(64) NOT NULL,
182 source VARCHAR(32) NOT NULL,
183 deleted BOOLEAN NOT NULL DEFAULT FALSE,
184 UNIQUE (project, benchmark, version)
185 );
186
187 CREATE INDEX IF NOT EXISTS idx_baselines_project_benchmark
188 ON baselines(project, benchmark);
189
190 CREATE TABLE IF NOT EXISTS verdicts (
191 id VARCHAR(26) PRIMARY KEY,
192 schema_id VARCHAR(64) NOT NULL,
193 project VARCHAR(255) NOT NULL,
194 benchmark VARCHAR(255) NOT NULL,
195 run_id VARCHAR(255) NOT NULL,
196 status VARCHAR(32) NOT NULL,
197 counts JSONB NOT NULL,
198 reasons JSONB NOT NULL,
199 git_ref VARCHAR(255),
200 git_sha VARCHAR(40),
201 created_at TIMESTAMPTZ NOT NULL
202 );
203
204 CREATE INDEX IF NOT EXISTS idx_verdicts_project_benchmark
205 ON verdicts(project, benchmark);
206
207 CREATE INDEX IF NOT EXISTS idx_verdicts_created_at
208 ON verdicts(created_at);
209
210 CREATE TABLE IF NOT EXISTS audit_events (
211 id VARCHAR(64) PRIMARY KEY,
212 timestamp TIMESTAMPTZ NOT NULL,
213 actor VARCHAR(255) NOT NULL,
214 action VARCHAR(32) NOT NULL,
215 resource_type VARCHAR(32) NOT NULL,
216 resource_id VARCHAR(255) NOT NULL,
217 project VARCHAR(255) NOT NULL,
218 metadata JSONB NOT NULL DEFAULT '{}'
219 );
220
221 CREATE INDEX IF NOT EXISTS idx_audit_events_project
222 ON audit_events(project);
223
224 CREATE INDEX IF NOT EXISTS idx_audit_events_timestamp
225 ON audit_events(timestamp DESC);
226
227 CREATE INDEX IF NOT EXISTS idx_audit_events_action
228 ON audit_events(action);
229 "#;
230
231 sqlx::query(sql)
232 .execute(&self.pool)
233 .await
234 .map_err(|e| StoreError::ConnectionError(format!("Failed to init schema: {}", e)))?;
235
236 Ok(())
237 }
238
239 async fn store_artifact(&self, record: &BaselineRecord) -> Result<Option<String>, StoreError> {
240 if let Some(store) = &self.artifacts {
241 let path = format!(
242 "{}/{}/{}.json",
243 record.project, record.benchmark, record.version
244 );
245 let data =
246 serde_json::to_vec(&record.receipt).map_err(StoreError::SerializationError)?;
247 store.put(&path, data).await?;
248 Ok(Some(path))
249 } else {
250 Ok(None)
251 }
252 }
253
254 fn row_to_record(
255 row: sqlx::postgres::PgRow,
256 ) -> Result<(BaselineRecord, Option<String>), StoreError> {
257 let artifact_path: Option<String> = row.get("artifact_path");
258
259 let receipt = if let Some(receipt_json) = row.get::<Option<serde_json::Value>, _>("receipt")
260 {
261 serde_json::from_value(receipt_json).map_err(StoreError::SerializationError)?
262 } else {
263 serde_json::from_value(serde_json::json!({
265 "schema": "perfgate.run.v1",
266 "tool": {"name": "placeholder", "version": "0"},
267 "run": {
268 "id": "placeholder",
269 "started_at": "1970-01-01T00:00:00Z",
270 "ended_at": "1970-01-01T00:00:00Z",
271 "host": {"os": "unknown", "arch": "unknown"}
272 },
273 "bench": {
274 "name": "placeholder",
275 "command": [],
276 "repeat": 0,
277 "warmup": 0
278 },
279 "samples": [],
280 "stats": {
281 "wall_ms": {"median": 0, "min": 0, "max": 0}
282 }
283 }))
284 .unwrap()
285 };
286
287 let metadata_json: serde_json::Value = row.get("metadata");
288 let metadata =
289 serde_json::from_value(metadata_json).map_err(StoreError::SerializationError)?;
290
291 let tags_json: serde_json::Value = row.get("tags");
292 let tags = serde_json::from_value(tags_json).map_err(StoreError::SerializationError)?;
293
294 let source_str: String = row.get("source");
295 let source = serde_json::from_value(serde_json::Value::String(source_str))
296 .unwrap_or(BaselineSource::Upload);
297
298 let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
299 let updated_at: chrono::DateTime<chrono::Utc> = row.get("updated_at");
300
301 Ok((
302 BaselineRecord {
303 schema: row.get("schema_id"),
304 id: row.get("id"),
305 project: row.get("project"),
306 benchmark: row.get("benchmark"),
307 version: row.get("version"),
308 git_ref: row.get("git_ref"),
309 git_sha: row.get("git_sha"),
310 receipt,
311 metadata,
312 tags,
313 created_at,
314 updated_at,
315 content_hash: row.get("content_hash"),
316 source,
317 deleted: row.get("deleted"),
318 },
319 artifact_path,
320 ))
321 }
322
323 async fn load_artifact(
324 &self,
325 path: Option<String>,
326 mut record: BaselineRecord,
327 ) -> Result<BaselineRecord, StoreError> {
328 if let (Some(store), Some(path)) = (&self.artifacts, path) {
329 let data = store.get(&path).await?;
330 record.receipt =
331 serde_json::from_slice(&data).map_err(StoreError::SerializationError)?;
332 }
333 Ok(record)
334 }
335}
336
337#[async_trait]
338impl BaselineStore for PostgresStore {
339 async fn create(&self, record: &BaselineRecord) -> Result<(), StoreError> {
340 let artifact_path = self.store_artifact(record).await?;
341
342 let receipt_json = if artifact_path.is_none() {
343 Some(serde_json::to_value(&record.receipt).map_err(StoreError::SerializationError)?)
344 } else {
345 None
346 };
347
348 let metadata_json =
349 serde_json::to_value(&record.metadata).map_err(StoreError::SerializationError)?;
350 let tags_json =
351 serde_json::to_value(&record.tags).map_err(StoreError::SerializationError)?;
352 let source_json =
353 serde_json::to_value(&record.source).map_err(StoreError::SerializationError)?;
354 let source_str = source_json.as_str().unwrap_or("upload");
355
356 let sql = r#"
357 INSERT INTO baselines (
358 id, project, benchmark, version, schema_id,
359 git_ref, git_sha, receipt, artifact_path, metadata, tags,
360 created_at, updated_at, content_hash, source, deleted
361 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
362 "#;
363
364 let result = sqlx::query(sql)
365 .bind(&record.id)
366 .bind(&record.project)
367 .bind(&record.benchmark)
368 .bind(&record.version)
369 .bind(&record.schema)
370 .bind(&record.git_ref)
371 .bind(&record.git_sha)
372 .bind(receipt_json)
373 .bind(artifact_path)
374 .bind(metadata_json)
375 .bind(tags_json)
376 .bind(record.created_at)
377 .bind(record.updated_at)
378 .bind(&record.content_hash)
379 .bind(source_str)
380 .bind(record.deleted)
381 .execute(&self.pool)
382 .await;
383
384 match result {
385 Ok(_) => Ok(()),
386 Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Err(
387 StoreError::already_exists(&record.project, &record.benchmark, &record.version),
388 ),
389 Err(e) => Err(StoreError::QueryError(e.to_string())),
390 }
391 }
392
393 async fn get(
394 &self,
395 project: &str,
396 benchmark: &str,
397 version: &str,
398 ) -> Result<Option<BaselineRecord>, StoreError> {
399 let sql = "SELECT * FROM baselines WHERE project = $1 AND benchmark = $2 AND version = $3 AND deleted = FALSE";
400
401 let row_opt = sqlx::query(sql)
402 .bind(project)
403 .bind(benchmark)
404 .bind(version)
405 .fetch_optional(&self.pool)
406 .await
407 .map_err(|e| StoreError::QueryError(e.to_string()))?;
408
409 match row_opt {
410 Some(row) => {
411 let (record, artifact_path) = Self::row_to_record(row)?;
412 Ok(Some(self.load_artifact(artifact_path, record).await?))
413 }
414 None => Ok(None),
415 }
416 }
417
418 async fn get_latest(
419 &self,
420 project: &str,
421 benchmark: &str,
422 ) -> Result<Option<BaselineRecord>, StoreError> {
423 let sql = "SELECT * FROM baselines WHERE project = $1 AND benchmark = $2 AND deleted = FALSE ORDER BY created_at DESC LIMIT 1";
424
425 let row_opt = sqlx::query(sql)
426 .bind(project)
427 .bind(benchmark)
428 .fetch_optional(&self.pool)
429 .await
430 .map_err(|e| StoreError::QueryError(e.to_string()))?;
431
432 match row_opt {
433 Some(row) => {
434 let (record, artifact_path) = Self::row_to_record(row)?;
435 Ok(Some(self.load_artifact(artifact_path, record).await?))
436 }
437 None => Ok(None),
438 }
439 }
440
441 async fn list(
442 &self,
443 project: &str,
444 query: &ListBaselinesQuery,
445 ) -> Result<ListBaselinesResponse, StoreError> {
446 let mut sql =
447 String::from("SELECT * FROM baselines WHERE project = $1 AND deleted = FALSE");
448
449 if let Some(bench) = &query.benchmark {
450 sql.push_str(" AND benchmark = '");
451 sql.push_str(&bench.replace('\'', "''"));
452 sql.push('\'');
453 }
454
455 sql.push_str(" ORDER BY created_at DESC");
456
457 let limit = query.limit.min(100) as i64;
458 sql.push_str(&format!(" LIMIT {}", limit + 1));
459
460 let offset = query.offset as i64;
461 sql.push_str(&format!(" OFFSET {}", offset));
462
463 let rows = sqlx::query(&sql)
464 .bind(project)
465 .fetch_all(&self.pool)
466 .await
467 .map_err(|e| StoreError::QueryError(e.to_string()))?;
468
469 let has_more = rows.len() > limit as usize;
470 let take_count = if has_more { limit as usize } else { rows.len() };
471
472 let mut baselines = Vec::with_capacity(take_count);
473 for row in rows.into_iter().take(take_count) {
474 let (mut record, artifact_path) = Self::row_to_record(row)?;
475 if query.include_receipt {
476 record = self.load_artifact(artifact_path, record).await?;
477 }
478 baselines.push(record.into());
479 }
480
481 let count_sql = "SELECT COUNT(*) FROM baselines WHERE project = $1 AND deleted = FALSE";
483 let mut count_query = String::from(count_sql);
484 if let Some(bench) = &query.benchmark {
485 count_query.push_str(" AND benchmark = '");
486 count_query.push_str(&bench.replace('\'', "''"));
487 count_query.push('\'');
488 }
489 let total_row = sqlx::query(&count_query)
490 .bind(project)
491 .fetch_one(&self.pool)
492 .await
493 .map_err(|e| StoreError::QueryError(e.to_string()))?;
494
495 let total: i64 = total_row.get(0);
496
497 let pagination = PaginationInfo {
498 limit: limit as u32,
499 offset: query.offset,
500 total: total as u64,
501 has_more,
502 };
503
504 Ok(ListBaselinesResponse {
505 baselines,
506 pagination,
507 })
508 }
509
510 async fn update(&self, record: &BaselineRecord) -> Result<(), StoreError> {
511 let receipt_json =
512 serde_json::to_value(&record.receipt).map_err(StoreError::SerializationError)?;
513 let metadata_json =
514 serde_json::to_value(&record.metadata).map_err(StoreError::SerializationError)?;
515 let tags_json =
516 serde_json::to_value(&record.tags).map_err(StoreError::SerializationError)?;
517
518 let sql = r#"
519 UPDATE baselines
520 SET schema_id = $1, git_ref = $2, git_sha = $3, receipt = $4,
521 metadata = $5, tags = $6, updated_at = $7, content_hash = $8
522 WHERE project = $9 AND benchmark = $10 AND version = $11 AND deleted = FALSE
523 "#;
524
525 let result = sqlx::query(sql)
526 .bind(&record.schema)
527 .bind(&record.git_ref)
528 .bind(&record.git_sha)
529 .bind(receipt_json)
530 .bind(metadata_json)
531 .bind(tags_json)
532 .bind(record.updated_at)
533 .bind(&record.content_hash)
534 .bind(&record.project)
535 .bind(&record.benchmark)
536 .bind(&record.version)
537 .execute(&self.pool)
538 .await
539 .map_err(|e| StoreError::QueryError(e.to_string()))?;
540
541 if result.rows_affected() == 0 {
542 return Err(StoreError::not_found(
543 &record.project,
544 &record.benchmark,
545 &record.version,
546 ));
547 }
548
549 Ok(())
550 }
551
552 async fn delete(
553 &self,
554 project: &str,
555 benchmark: &str,
556 version: &str,
557 ) -> Result<bool, StoreError> {
558 let sql = "UPDATE baselines SET deleted = TRUE, updated_at = NOW() WHERE project = $1 AND benchmark = $2 AND version = $3 AND deleted = FALSE";
559
560 let result = sqlx::query(sql)
561 .bind(project)
562 .bind(benchmark)
563 .bind(version)
564 .execute(&self.pool)
565 .await
566 .map_err(|e| StoreError::QueryError(e.to_string()))?;
567
568 Ok(result.rows_affected() > 0)
569 }
570
571 async fn hard_delete(
572 &self,
573 project: &str,
574 benchmark: &str,
575 version: &str,
576 ) -> Result<bool, StoreError> {
577 let sql = "DELETE FROM baselines WHERE project = $1 AND benchmark = $2 AND version = $3";
578
579 let result = sqlx::query(sql)
580 .bind(project)
581 .bind(benchmark)
582 .bind(version)
583 .execute(&self.pool)
584 .await
585 .map_err(|e| StoreError::QueryError(e.to_string()))?;
586
587 Ok(result.rows_affected() > 0)
588 }
589
590 async fn list_versions(
591 &self,
592 project: &str,
593 benchmark: &str,
594 ) -> Result<Vec<BaselineVersion>, StoreError> {
595 let sql = "SELECT version, created_at, git_ref, git_sha, source FROM baselines WHERE project = $1 AND benchmark = $2 AND deleted = FALSE ORDER BY created_at DESC";
596
597 let rows = sqlx::query(sql)
598 .bind(project)
599 .bind(benchmark)
600 .fetch_all(&self.pool)
601 .await
602 .map_err(|e| StoreError::QueryError(e.to_string()))?;
603
604 let mut versions = Vec::with_capacity(rows.len());
605 for row in rows {
606 let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
607 let source_str: String = row.get("source");
608 let source = serde_json::from_value(serde_json::Value::String(source_str))
609 .unwrap_or(BaselineSource::Upload);
610
611 versions.push(BaselineVersion {
612 version: row.get("version"),
613 created_at,
614 git_ref: row.get("git_ref"),
615 git_sha: row.get("git_sha"),
616 created_by: None,
617 is_current: false, source,
619 });
620 }
621
622 if let Some(first) = versions.first_mut() {
623 first.is_current = true;
624 }
625
626 Ok(versions)
627 }
628
629 async fn health_check(&self) -> Result<StorageHealth, StoreError> {
630 match self
631 .with_retry(|pool| async move { sqlx::query("SELECT 1").execute(&pool).await })
632 .await
633 {
634 Ok(_) => Ok(StorageHealth::Healthy),
635 Err(_) => Ok(StorageHealth::Unhealthy),
636 }
637 }
638
639 fn backend_type(&self) -> &'static str {
640 "postgres"
641 }
642
643 fn pool_metrics(&self) -> Option<crate::models::PoolMetrics> {
644 Some(self.pg_pool_metrics())
645 }
646
647 async fn create_verdict(&self, record: &VerdictRecord) -> Result<(), StoreError> {
648 let sql = r#"
649 INSERT INTO verdicts (
650 id, schema_id, project, benchmark, run_id, status, counts, reasons,
651 git_ref, git_sha, created_at
652 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
653 "#;
654
655 let counts_json =
656 serde_json::to_value(&record.counts).map_err(StoreError::SerializationError)?;
657 let reasons_json =
658 serde_json::to_value(&record.reasons).map_err(StoreError::SerializationError)?;
659 let status_str = record.status.as_str();
660
661 sqlx::query(sql)
662 .bind(&record.id)
663 .bind(&record.schema)
664 .bind(&record.project)
665 .bind(&record.benchmark)
666 .bind(&record.run_id)
667 .bind(status_str)
668 .bind(counts_json)
669 .bind(reasons_json)
670 .bind(&record.git_ref)
671 .bind(&record.git_sha)
672 .bind(record.created_at)
673 .execute(&self.pool)
674 .await
675 .map_err(|e| StoreError::QueryError(e.to_string()))?;
676
677 Ok(())
678 }
679
680 async fn list_verdicts(
681 &self,
682 project: &str,
683 query: &ListVerdictsQuery,
684 ) -> Result<ListVerdictsResponse, StoreError> {
685 let mut sql = "SELECT * FROM verdicts WHERE project = $1".to_string();
686 let mut params_count = 1;
687
688 if let Some(_bench) = &query.benchmark {
689 params_count += 1;
690 sql.push_str(&format!(" AND benchmark = ${}", params_count));
691 }
692
693 if let Some(_status) = &query.status {
694 params_count += 1;
695 sql.push_str(&format!(" AND status = ${}", params_count));
696 }
697
698 if let Some(_since) = &query.since {
699 params_count += 1;
700 sql.push_str(&format!(" AND created_at >= ${}", params_count));
701 }
702
703 if let Some(_until) = &query.until {
704 params_count += 1;
705 sql.push_str(&format!(" AND created_at <= ${}", params_count));
706 }
707
708 sql.push_str(" ORDER BY created_at DESC");
709
710 params_count += 1;
712 sql.push_str(&format!(" LIMIT ${}", params_count));
713 params_count += 1;
714 sql.push_str(&format!(" OFFSET ${}", params_count));
715
716 let mut q = sqlx::query(&sql).bind(project);
717
718 if let Some(bench) = &query.benchmark {
719 q = q.bind(bench);
720 }
721 if let Some(status) = &query.status {
722 q = q.bind(status.as_str());
723 }
724 if let Some(since) = &query.since {
725 q = q.bind(since);
726 }
727 if let Some(until) = &query.until {
728 q = q.bind(until);
729 }
730
731 q = q.bind(query.limit as i64);
732 q = q.bind(query.offset as i64);
733
734 let rows = q
735 .fetch_all(&self.pool)
736 .await
737 .map_err(|e| StoreError::QueryError(e.to_string()))?;
738
739 let mut verdicts = Vec::with_capacity(rows.len());
740 for row in rows {
741 verdicts.push(self.row_to_verdict(row)?);
742 }
743
744 let count_sql = "SELECT COUNT(*) FROM verdicts WHERE project = $1";
746 let total: i64 = sqlx::query_scalar(count_sql)
747 .bind(project)
748 .fetch_one(&self.pool)
749 .await
750 .map_err(|e| StoreError::QueryError(e.to_string()))?;
751
752 Ok(ListVerdictsResponse {
753 verdicts,
754 pagination: PaginationInfo {
755 total: total as u64,
756 offset: query.offset,
757 limit: query.limit,
758 has_more: (query.offset + query.limit as u64) < total as u64,
759 },
760 })
761 }
762}
763
764impl PostgresStore {
765 fn row_to_verdict(&self, row: sqlx::postgres::PgRow) -> Result<VerdictRecord, StoreError> {
766 let status_str: String = row.get("status");
767 let status = match status_str.as_str() {
768 "pass" => VerdictStatus::Pass,
769 "warn" => VerdictStatus::Warn,
770 "fail" => VerdictStatus::Fail,
771 "skip" => VerdictStatus::Skip,
772 _ => VerdictStatus::Pass, };
774
775 let counts_json: serde_json::Value = row.get("counts");
776 let counts = serde_json::from_value(counts_json).map_err(StoreError::SerializationError)?;
777
778 let reasons_json: serde_json::Value = row.get("reasons");
779 let reasons =
780 serde_json::from_value(reasons_json).map_err(StoreError::SerializationError)?;
781
782 Ok(VerdictRecord {
783 schema: row.get("schema_id"), id: row.get("id"),
785 project: row.get("project"),
786 benchmark: row.get("benchmark"),
787 run_id: row.get("run_id"),
788 status,
789 counts,
790 reasons,
791 git_ref: row.get("git_ref"),
792 git_sha: row.get("git_sha"),
793 created_at: row.get("created_at"),
794 })
795 }
796}
797
798#[async_trait]
799impl AuditStore for PostgresStore {
800 async fn log_event(&self, event: &AuditEvent) -> Result<(), StoreError> {
801 let metadata_json =
802 serde_json::to_value(&event.metadata).map_err(StoreError::SerializationError)?;
803
804 let sql = r#"
805 INSERT INTO audit_events (
806 id, timestamp, actor, action, resource_type, resource_id, project, metadata
807 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
808 "#;
809
810 sqlx::query(sql)
811 .bind(&event.id)
812 .bind(event.timestamp)
813 .bind(&event.actor)
814 .bind(event.action.to_string())
815 .bind(event.resource_type.to_string())
816 .bind(&event.resource_id)
817 .bind(&event.project)
818 .bind(metadata_json)
819 .execute(&self.pool)
820 .await
821 .map_err(|e| StoreError::QueryError(e.to_string()))?;
822
823 Ok(())
824 }
825
826 async fn list_events(
827 &self,
828 query: &ListAuditEventsQuery,
829 ) -> Result<ListAuditEventsResponse, StoreError> {
830 let mut sql = "SELECT * FROM audit_events WHERE TRUE".to_string();
831 let mut params_count = 0;
832
833 if query.project.is_some() {
834 params_count += 1;
835 sql.push_str(&format!(" AND project = ${}", params_count));
836 }
837 if query.action.is_some() {
838 params_count += 1;
839 sql.push_str(&format!(" AND action = ${}", params_count));
840 }
841 if query.resource_type.is_some() {
842 params_count += 1;
843 sql.push_str(&format!(" AND resource_type = ${}", params_count));
844 }
845 if query.actor.is_some() {
846 params_count += 1;
847 sql.push_str(&format!(" AND actor = ${}", params_count));
848 }
849 if query.since.is_some() {
850 params_count += 1;
851 sql.push_str(&format!(" AND timestamp >= ${}", params_count));
852 }
853 if query.until.is_some() {
854 params_count += 1;
855 sql.push_str(&format!(" AND timestamp <= ${}", params_count));
856 }
857
858 sql.push_str(" ORDER BY timestamp DESC");
859
860 params_count += 1;
861 sql.push_str(&format!(" LIMIT ${}", params_count));
862 params_count += 1;
863 sql.push_str(&format!(" OFFSET ${}", params_count));
864
865 let mut q = sqlx::query(&sql);
866
867 if let Some(ref project) = query.project {
868 q = q.bind(project);
869 }
870 if let Some(ref action) = query.action {
871 q = q.bind(action);
872 }
873 if let Some(ref resource_type) = query.resource_type {
874 q = q.bind(resource_type);
875 }
876 if let Some(ref actor) = query.actor {
877 q = q.bind(actor);
878 }
879 if let Some(ref since) = query.since {
880 q = q.bind(since);
881 }
882 if let Some(ref until) = query.until {
883 q = q.bind(until);
884 }
885
886 q = q.bind(query.limit as i64);
887 q = q.bind(query.offset as i64);
888
889 let rows = q
890 .fetch_all(&self.pool)
891 .await
892 .map_err(|e| StoreError::QueryError(e.to_string()))?;
893
894 let mut events = Vec::with_capacity(rows.len());
895 for row in rows {
896 let action_str: String = row.get("action");
897 let action = action_str
898 .parse::<AuditAction>()
899 .unwrap_or(AuditAction::Create);
900
901 let resource_type_str: String = row.get("resource_type");
902 let resource_type = resource_type_str
903 .parse::<AuditResourceType>()
904 .unwrap_or(AuditResourceType::Baseline);
905
906 let metadata_json: serde_json::Value = row.get("metadata");
907
908 events.push(AuditEvent {
909 id: row.get("id"),
910 timestamp: row.get("timestamp"),
911 actor: row.get("actor"),
912 action,
913 resource_type,
914 resource_id: row.get("resource_id"),
915 project: row.get("project"),
916 metadata: metadata_json,
917 });
918 }
919
920 let count_sql = "SELECT COUNT(*) FROM audit_events WHERE TRUE";
922 let total: i64 = sqlx::query_scalar(count_sql)
923 .fetch_one(&self.pool)
924 .await
925 .map_err(|e| StoreError::QueryError(e.to_string()))?;
926
927 Ok(ListAuditEventsResponse {
928 events,
929 pagination: PaginationInfo {
930 total: total as u64,
931 offset: query.offset,
932 limit: query.limit,
933 has_more: (query.offset + query.limit as u64) < total as u64,
934 },
935 })
936 }
937}
938
939#[cfg(test)]
940mod tests {
941 use super::*;
942
943 #[test]
944 fn test_is_transient_pool_timed_out() {
945 assert!(is_transient(&sqlx::Error::PoolTimedOut));
946 }
947
948 #[test]
949 fn test_is_transient_pool_closed() {
950 assert!(!is_transient(&sqlx::Error::PoolClosed));
951 }
952
953 #[test]
954 fn test_is_transient_io_error() {
955 let err = sqlx::Error::Io(std::io::Error::new(
956 std::io::ErrorKind::ConnectionRefused,
957 "connection refused",
958 ));
959 assert!(is_transient(&err));
960 }
961
962 #[test]
963 fn test_is_transient_non_transient() {
964 let err = sqlx::Error::ColumnNotFound("missing".to_string());
965 assert!(!is_transient(&err));
966 }
967}