Skip to main content

perfgate_server/storage/
postgres.rs

1//! PostgreSQL storage implementation for persistent baseline storage.
2//!
3//! This module provides a robust, asynchronous PostgreSQL backend for storing
4//! and querying perfgate baseline records using sqlx.
5
6use super::{ArtifactStore, AuditStore, BaselineStore, StorageHealth};
7use crate::error::StoreError;
8use crate::models::{
9    AuditAction, AuditEvent, AuditResourceType, BaselineRecord, BaselineSource, BaselineVersion,
10    ListAuditEventsQuery, ListAuditEventsResponse, ListBaselinesQuery, ListBaselinesResponse,
11    ListVerdictsQuery, ListVerdictsResponse, PaginationInfo, VerdictRecord,
12};
13use crate::server::PostgresPoolConfig;
14use async_trait::async_trait;
15use perfgate_types::VerdictStatus;
16use sqlx::{Connection, Executor, PgPool, Row, postgres::PgPoolOptions};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{info, warn};
20
21/// PostgreSQL storage backend for baselines.
22#[derive(Debug, Clone)]
23pub struct PostgresStore {
24    pool: PgPool,
25    artifacts: Option<Arc<dyn ArtifactStore>>,
26}
27
28/// Maximum number of retry attempts for transient connection failures.
29const MAX_RETRIES: u32 = 3;
30
31/// Initial backoff delay for connection retries.
32const INITIAL_BACKOFF: Duration = Duration::from_millis(250);
33
34/// Returns `true` when an sqlx error looks transient (connection refused,
35/// timeout, reset, etc.) -- i.e. worth retrying.
36fn is_transient(err: &sqlx::Error) -> bool {
37    match err {
38        sqlx::Error::PoolTimedOut => true,
39        sqlx::Error::PoolClosed => false,
40        sqlx::Error::Io(_) => true,
41        sqlx::Error::Database(db_err) => {
42            // Postgres error codes starting with 08 are connection exceptions.
43            // Class 57P covers operator intervention: 57P01 (admin_shutdown),
44            // 57P02 (crash_shutdown), 57P03 (cannot_connect_now).
45            db_err
46                .code()
47                .map(|c| c.starts_with("08") || c.starts_with("57P"))
48                .unwrap_or(false)
49        }
50        _ => {
51            let msg = err.to_string().to_lowercase();
52            msg.contains("connection refused")
53                || msg.contains("connection reset")
54                || msg.contains("broken pipe")
55                || msg.contains("timed out")
56        }
57    }
58}
59
60impl PostgresStore {
61    /// Creates a new PostgreSQL storage backend and runs initial schema migrations.
62    ///
63    /// The connection pool is configured according to the supplied
64    /// [`PostgresPoolConfig`]. An `after_connect` callback sets
65    /// `statement_timeout` on every new connection, and a `before_acquire`
66    /// callback pings the connection to verify it is still alive.
67    pub async fn new(
68        url: &str,
69        artifacts: Option<Arc<dyn ArtifactStore>>,
70        pool_config: &PostgresPoolConfig,
71    ) -> Result<Self, StoreError> {
72        let stmt_timeout_ms = pool_config.statement_timeout.as_millis() as u64;
73
74        let pool = PgPoolOptions::new()
75            .max_connections(pool_config.max_connections)
76            .min_connections(pool_config.min_connections)
77            .idle_timeout(pool_config.idle_timeout)
78            .max_lifetime(pool_config.max_lifetime)
79            .acquire_timeout(pool_config.acquire_timeout)
80            .after_connect(move |conn, _meta| {
81                Box::pin(async move {
82                    conn.execute(format!("SET statement_timeout = '{}'", stmt_timeout_ms).as_str())
83                        .await?;
84                    Ok(())
85                })
86            })
87            .before_acquire(|conn, _meta| {
88                Box::pin(async move {
89                    // Quick ping to verify the connection is alive.
90                    conn.ping().await?;
91                    Ok(true)
92                })
93            })
94            .connect(url)
95            .await
96            .map_err(|e| StoreError::ConnectionError(e.to_string()))?;
97
98        info!(
99            max = pool_config.max_connections,
100            min = pool_config.min_connections,
101            idle_timeout_s = pool_config.idle_timeout.as_secs(),
102            max_lifetime_s = pool_config.max_lifetime.as_secs(),
103            acquire_timeout_s = pool_config.acquire_timeout.as_secs(),
104            statement_timeout_ms = stmt_timeout_ms,
105            "PostgreSQL connection pool configured"
106        );
107
108        let store = Self { pool, artifacts };
109        store.init_schema().await?;
110        Ok(store)
111    }
112
113    /// Returns current pool metrics (idle, active, max connections).
114    fn pg_pool_metrics(&self) -> crate::models::PoolMetrics {
115        let size = self.pool.size();
116        let num_idle = self.pool.num_idle() as u32;
117        crate::models::PoolMetrics {
118            idle: num_idle,
119            active: size.saturating_sub(num_idle),
120            max: self.pool.options().get_max_connections(),
121        }
122    }
123
124    /// Executes a query with automatic retry on transient errors.
125    ///
126    /// The closure receives a reference to the pool and should return the
127    /// query result. On transient failure the call is retried up to
128    /// [`MAX_RETRIES`] times with exponential backoff.
129    pub(crate) async fn with_retry<F, Fut, T>(&self, operation: F) -> Result<T, StoreError>
130    where
131        F: Fn(PgPool) -> Fut,
132        Fut: std::future::Future<Output = Result<T, sqlx::Error>>,
133    {
134        let mut last_err = None;
135        let mut backoff = INITIAL_BACKOFF;
136
137        for attempt in 0..=MAX_RETRIES {
138            match operation(self.pool.clone()).await {
139                Ok(val) => return Ok(val),
140                Err(e) if is_transient(&e) && attempt < MAX_RETRIES => {
141                    warn!(
142                        attempt = attempt + 1,
143                        max = MAX_RETRIES,
144                        backoff_ms = backoff.as_millis() as u64,
145                        error = %e,
146                        "Transient database error, retrying"
147                    );
148                    tokio::time::sleep(backoff).await;
149                    backoff *= 2;
150                    last_err = Some(e);
151                }
152                Err(e) => {
153                    return Err(StoreError::QueryError(e.to_string()));
154                }
155            }
156        }
157
158        Err(StoreError::QueryError(
159            last_err
160                .map(|e| e.to_string())
161                .unwrap_or_else(|| "Unknown error after retries".to_string()),
162        ))
163    }
164
165    async fn init_schema(&self) -> Result<(), StoreError> {
166        let sql = r#"
167            CREATE TABLE IF NOT EXISTS baselines (
168                id VARCHAR(26) PRIMARY KEY,
169                project VARCHAR(255) NOT NULL,
170                benchmark VARCHAR(255) NOT NULL,
171                version VARCHAR(64) NOT NULL,
172                schema_id VARCHAR(64) NOT NULL,
173                git_ref VARCHAR(255),
174                git_sha VARCHAR(40),
175                receipt JSONB,
176                artifact_path TEXT,
177                metadata JSONB NOT NULL,
178                tags JSONB NOT NULL,
179                created_at TIMESTAMPTZ NOT NULL,
180                updated_at TIMESTAMPTZ NOT NULL,
181                content_hash VARCHAR(64) NOT NULL,
182                source VARCHAR(32) NOT NULL,
183                deleted BOOLEAN NOT NULL DEFAULT FALSE,
184                UNIQUE (project, benchmark, version)
185            );
186            
187            CREATE INDEX IF NOT EXISTS idx_baselines_project_benchmark 
188            ON baselines(project, benchmark);
189
190            CREATE TABLE IF NOT EXISTS verdicts (
191                id VARCHAR(26) PRIMARY KEY,
192                schema_id VARCHAR(64) NOT NULL,
193                project VARCHAR(255) NOT NULL,
194                benchmark VARCHAR(255) NOT NULL,
195                run_id VARCHAR(255) NOT NULL,
196                status VARCHAR(32) NOT NULL,
197                counts JSONB NOT NULL,
198                reasons JSONB NOT NULL,
199                git_ref VARCHAR(255),
200                git_sha VARCHAR(40),
201                created_at TIMESTAMPTZ NOT NULL
202            );
203
204            CREATE INDEX IF NOT EXISTS idx_verdicts_project_benchmark
205            ON verdicts(project, benchmark);
206
207            CREATE INDEX IF NOT EXISTS idx_verdicts_created_at
208            ON verdicts(created_at);
209
210            CREATE TABLE IF NOT EXISTS audit_events (
211                id VARCHAR(64) PRIMARY KEY,
212                timestamp TIMESTAMPTZ NOT NULL,
213                actor VARCHAR(255) NOT NULL,
214                action VARCHAR(32) NOT NULL,
215                resource_type VARCHAR(32) NOT NULL,
216                resource_id VARCHAR(255) NOT NULL,
217                project VARCHAR(255) NOT NULL,
218                metadata JSONB NOT NULL DEFAULT '{}'
219            );
220
221            CREATE INDEX IF NOT EXISTS idx_audit_events_project
222            ON audit_events(project);
223
224            CREATE INDEX IF NOT EXISTS idx_audit_events_timestamp
225            ON audit_events(timestamp DESC);
226
227            CREATE INDEX IF NOT EXISTS idx_audit_events_action
228            ON audit_events(action);
229        "#;
230
231        sqlx::query(sql)
232            .execute(&self.pool)
233            .await
234            .map_err(|e| StoreError::ConnectionError(format!("Failed to init schema: {}", e)))?;
235
236        Ok(())
237    }
238
239    async fn store_artifact(&self, record: &BaselineRecord) -> Result<Option<String>, StoreError> {
240        if let Some(store) = &self.artifacts {
241            let path = format!(
242                "{}/{}/{}.json",
243                record.project, record.benchmark, record.version
244            );
245            let data =
246                serde_json::to_vec(&record.receipt).map_err(StoreError::SerializationError)?;
247            store.put(&path, data).await?;
248            Ok(Some(path))
249        } else {
250            Ok(None)
251        }
252    }
253
254    fn row_to_record(
255        row: sqlx::postgres::PgRow,
256    ) -> Result<(BaselineRecord, Option<String>), StoreError> {
257        let artifact_path: Option<String> = row.get("artifact_path");
258
259        let receipt = if let Some(receipt_json) = row.get::<Option<serde_json::Value>, _>("receipt")
260        {
261            serde_json::from_value(receipt_json).map_err(StoreError::SerializationError)?
262        } else {
263            // Placeholder, will be loaded from artifact store if needed
264            serde_json::from_value(serde_json::json!({
265                "schema": "perfgate.run.v1",
266                "tool": {"name": "placeholder", "version": "0"},
267                "run": {
268                    "id": "placeholder",
269                    "started_at": "1970-01-01T00:00:00Z",
270                    "ended_at": "1970-01-01T00:00:00Z",
271                    "host": {"os": "unknown", "arch": "unknown"}
272                },
273                "bench": {
274                    "name": "placeholder",
275                    "command": [],
276                    "repeat": 0,
277                    "warmup": 0
278                },
279                "samples": [],
280                "stats": {
281                    "wall_ms": {"median": 0, "min": 0, "max": 0}
282                }
283            }))
284            .unwrap()
285        };
286
287        let metadata_json: serde_json::Value = row.get("metadata");
288        let metadata =
289            serde_json::from_value(metadata_json).map_err(StoreError::SerializationError)?;
290
291        let tags_json: serde_json::Value = row.get("tags");
292        let tags = serde_json::from_value(tags_json).map_err(StoreError::SerializationError)?;
293
294        let source_str: String = row.get("source");
295        let source = serde_json::from_value(serde_json::Value::String(source_str))
296            .unwrap_or(BaselineSource::Upload);
297
298        let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
299        let updated_at: chrono::DateTime<chrono::Utc> = row.get("updated_at");
300
301        Ok((
302            BaselineRecord {
303                schema: row.get("schema_id"),
304                id: row.get("id"),
305                project: row.get("project"),
306                benchmark: row.get("benchmark"),
307                version: row.get("version"),
308                git_ref: row.get("git_ref"),
309                git_sha: row.get("git_sha"),
310                receipt,
311                metadata,
312                tags,
313                created_at,
314                updated_at,
315                content_hash: row.get("content_hash"),
316                source,
317                deleted: row.get("deleted"),
318            },
319            artifact_path,
320        ))
321    }
322
323    async fn load_artifact(
324        &self,
325        path: Option<String>,
326        mut record: BaselineRecord,
327    ) -> Result<BaselineRecord, StoreError> {
328        if let (Some(store), Some(path)) = (&self.artifacts, path) {
329            let data = store.get(&path).await?;
330            record.receipt =
331                serde_json::from_slice(&data).map_err(StoreError::SerializationError)?;
332        }
333        Ok(record)
334    }
335}
336
337#[async_trait]
338impl BaselineStore for PostgresStore {
339    async fn create(&self, record: &BaselineRecord) -> Result<(), StoreError> {
340        let artifact_path = self.store_artifact(record).await?;
341
342        let receipt_json = if artifact_path.is_none() {
343            Some(serde_json::to_value(&record.receipt).map_err(StoreError::SerializationError)?)
344        } else {
345            None
346        };
347
348        let metadata_json =
349            serde_json::to_value(&record.metadata).map_err(StoreError::SerializationError)?;
350        let tags_json =
351            serde_json::to_value(&record.tags).map_err(StoreError::SerializationError)?;
352        let source_json =
353            serde_json::to_value(&record.source).map_err(StoreError::SerializationError)?;
354        let source_str = source_json.as_str().unwrap_or("upload");
355
356        let sql = r#"
357            INSERT INTO baselines (
358                id, project, benchmark, version, schema_id, 
359                git_ref, git_sha, receipt, artifact_path, metadata, tags,
360                created_at, updated_at, content_hash, source, deleted
361            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
362        "#;
363
364        let result = sqlx::query(sql)
365            .bind(&record.id)
366            .bind(&record.project)
367            .bind(&record.benchmark)
368            .bind(&record.version)
369            .bind(&record.schema)
370            .bind(&record.git_ref)
371            .bind(&record.git_sha)
372            .bind(receipt_json)
373            .bind(artifact_path)
374            .bind(metadata_json)
375            .bind(tags_json)
376            .bind(record.created_at)
377            .bind(record.updated_at)
378            .bind(&record.content_hash)
379            .bind(source_str)
380            .bind(record.deleted)
381            .execute(&self.pool)
382            .await;
383
384        match result {
385            Ok(_) => Ok(()),
386            Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Err(
387                StoreError::already_exists(&record.project, &record.benchmark, &record.version),
388            ),
389            Err(e) => Err(StoreError::QueryError(e.to_string())),
390        }
391    }
392
393    async fn get(
394        &self,
395        project: &str,
396        benchmark: &str,
397        version: &str,
398    ) -> Result<Option<BaselineRecord>, StoreError> {
399        let sql = "SELECT * FROM baselines WHERE project = $1 AND benchmark = $2 AND version = $3 AND deleted = FALSE";
400
401        let row_opt = sqlx::query(sql)
402            .bind(project)
403            .bind(benchmark)
404            .bind(version)
405            .fetch_optional(&self.pool)
406            .await
407            .map_err(|e| StoreError::QueryError(e.to_string()))?;
408
409        match row_opt {
410            Some(row) => {
411                let (record, artifact_path) = Self::row_to_record(row)?;
412                Ok(Some(self.load_artifact(artifact_path, record).await?))
413            }
414            None => Ok(None),
415        }
416    }
417
418    async fn get_latest(
419        &self,
420        project: &str,
421        benchmark: &str,
422    ) -> Result<Option<BaselineRecord>, StoreError> {
423        let sql = "SELECT * FROM baselines WHERE project = $1 AND benchmark = $2 AND deleted = FALSE ORDER BY created_at DESC LIMIT 1";
424
425        let row_opt = sqlx::query(sql)
426            .bind(project)
427            .bind(benchmark)
428            .fetch_optional(&self.pool)
429            .await
430            .map_err(|e| StoreError::QueryError(e.to_string()))?;
431
432        match row_opt {
433            Some(row) => {
434                let (record, artifact_path) = Self::row_to_record(row)?;
435                Ok(Some(self.load_artifact(artifact_path, record).await?))
436            }
437            None => Ok(None),
438        }
439    }
440
441    async fn list(
442        &self,
443        project: &str,
444        query: &ListBaselinesQuery,
445    ) -> Result<ListBaselinesResponse, StoreError> {
446        let mut sql =
447            String::from("SELECT * FROM baselines WHERE project = $1 AND deleted = FALSE");
448
449        if let Some(bench) = &query.benchmark {
450            sql.push_str(" AND benchmark = '");
451            sql.push_str(&bench.replace('\'', "''"));
452            sql.push('\'');
453        }
454
455        sql.push_str(" ORDER BY created_at DESC");
456
457        let limit = query.limit.min(100) as i64;
458        sql.push_str(&format!(" LIMIT {}", limit + 1));
459
460        let offset = query.offset as i64;
461        sql.push_str(&format!(" OFFSET {}", offset));
462
463        let rows = sqlx::query(&sql)
464            .bind(project)
465            .fetch_all(&self.pool)
466            .await
467            .map_err(|e| StoreError::QueryError(e.to_string()))?;
468
469        let has_more = rows.len() > limit as usize;
470        let take_count = if has_more { limit as usize } else { rows.len() };
471
472        let mut baselines = Vec::with_capacity(take_count);
473        for row in rows.into_iter().take(take_count) {
474            let (mut record, artifact_path) = Self::row_to_record(row)?;
475            if query.include_receipt {
476                record = self.load_artifact(artifact_path, record).await?;
477            }
478            baselines.push(record.into());
479        }
480
481        // Determine total count
482        let count_sql = "SELECT COUNT(*) FROM baselines WHERE project = $1 AND deleted = FALSE";
483        let mut count_query = String::from(count_sql);
484        if let Some(bench) = &query.benchmark {
485            count_query.push_str(" AND benchmark = '");
486            count_query.push_str(&bench.replace('\'', "''"));
487            count_query.push('\'');
488        }
489        let total_row = sqlx::query(&count_query)
490            .bind(project)
491            .fetch_one(&self.pool)
492            .await
493            .map_err(|e| StoreError::QueryError(e.to_string()))?;
494
495        let total: i64 = total_row.get(0);
496
497        let pagination = PaginationInfo {
498            limit: limit as u32,
499            offset: query.offset,
500            total: total as u64,
501            has_more,
502        };
503
504        Ok(ListBaselinesResponse {
505            baselines,
506            pagination,
507        })
508    }
509
510    async fn update(&self, record: &BaselineRecord) -> Result<(), StoreError> {
511        let receipt_json =
512            serde_json::to_value(&record.receipt).map_err(StoreError::SerializationError)?;
513        let metadata_json =
514            serde_json::to_value(&record.metadata).map_err(StoreError::SerializationError)?;
515        let tags_json =
516            serde_json::to_value(&record.tags).map_err(StoreError::SerializationError)?;
517
518        let sql = r#"
519            UPDATE baselines 
520            SET schema_id = $1, git_ref = $2, git_sha = $3, receipt = $4, 
521                metadata = $5, tags = $6, updated_at = $7, content_hash = $8
522            WHERE project = $9 AND benchmark = $10 AND version = $11 AND deleted = FALSE
523        "#;
524
525        let result = sqlx::query(sql)
526            .bind(&record.schema)
527            .bind(&record.git_ref)
528            .bind(&record.git_sha)
529            .bind(receipt_json)
530            .bind(metadata_json)
531            .bind(tags_json)
532            .bind(record.updated_at)
533            .bind(&record.content_hash)
534            .bind(&record.project)
535            .bind(&record.benchmark)
536            .bind(&record.version)
537            .execute(&self.pool)
538            .await
539            .map_err(|e| StoreError::QueryError(e.to_string()))?;
540
541        if result.rows_affected() == 0 {
542            return Err(StoreError::not_found(
543                &record.project,
544                &record.benchmark,
545                &record.version,
546            ));
547        }
548
549        Ok(())
550    }
551
552    async fn delete(
553        &self,
554        project: &str,
555        benchmark: &str,
556        version: &str,
557    ) -> Result<bool, StoreError> {
558        let sql = "UPDATE baselines SET deleted = TRUE, updated_at = NOW() WHERE project = $1 AND benchmark = $2 AND version = $3 AND deleted = FALSE";
559
560        let result = sqlx::query(sql)
561            .bind(project)
562            .bind(benchmark)
563            .bind(version)
564            .execute(&self.pool)
565            .await
566            .map_err(|e| StoreError::QueryError(e.to_string()))?;
567
568        Ok(result.rows_affected() > 0)
569    }
570
571    async fn hard_delete(
572        &self,
573        project: &str,
574        benchmark: &str,
575        version: &str,
576    ) -> Result<bool, StoreError> {
577        let sql = "DELETE FROM baselines WHERE project = $1 AND benchmark = $2 AND version = $3";
578
579        let result = sqlx::query(sql)
580            .bind(project)
581            .bind(benchmark)
582            .bind(version)
583            .execute(&self.pool)
584            .await
585            .map_err(|e| StoreError::QueryError(e.to_string()))?;
586
587        Ok(result.rows_affected() > 0)
588    }
589
590    async fn list_versions(
591        &self,
592        project: &str,
593        benchmark: &str,
594    ) -> Result<Vec<BaselineVersion>, StoreError> {
595        let sql = "SELECT version, created_at, git_ref, git_sha, source FROM baselines WHERE project = $1 AND benchmark = $2 AND deleted = FALSE ORDER BY created_at DESC";
596
597        let rows = sqlx::query(sql)
598            .bind(project)
599            .bind(benchmark)
600            .fetch_all(&self.pool)
601            .await
602            .map_err(|e| StoreError::QueryError(e.to_string()))?;
603
604        let mut versions = Vec::with_capacity(rows.len());
605        for row in rows {
606            let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
607            let source_str: String = row.get("source");
608            let source = serde_json::from_value(serde_json::Value::String(source_str))
609                .unwrap_or(BaselineSource::Upload);
610
611            versions.push(BaselineVersion {
612                version: row.get("version"),
613                created_at,
614                git_ref: row.get("git_ref"),
615                git_sha: row.get("git_sha"),
616                created_by: None,
617                is_current: false, // Could be determined by checking if it's the latest
618                source,
619            });
620        }
621
622        if let Some(first) = versions.first_mut() {
623            first.is_current = true;
624        }
625
626        Ok(versions)
627    }
628
629    async fn health_check(&self) -> Result<StorageHealth, StoreError> {
630        match self
631            .with_retry(|pool| async move { sqlx::query("SELECT 1").execute(&pool).await })
632            .await
633        {
634            Ok(_) => Ok(StorageHealth::Healthy),
635            Err(_) => Ok(StorageHealth::Unhealthy),
636        }
637    }
638
639    fn backend_type(&self) -> &'static str {
640        "postgres"
641    }
642
643    fn pool_metrics(&self) -> Option<crate::models::PoolMetrics> {
644        Some(self.pg_pool_metrics())
645    }
646
647    async fn create_verdict(&self, record: &VerdictRecord) -> Result<(), StoreError> {
648        let sql = r#"
649            INSERT INTO verdicts (
650                id, schema_id, project, benchmark, run_id, status, counts, reasons,
651                git_ref, git_sha, created_at
652            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
653        "#;
654
655        let counts_json =
656            serde_json::to_value(&record.counts).map_err(StoreError::SerializationError)?;
657        let reasons_json =
658            serde_json::to_value(&record.reasons).map_err(StoreError::SerializationError)?;
659        let status_str = record.status.as_str();
660
661        sqlx::query(sql)
662            .bind(&record.id)
663            .bind(&record.schema)
664            .bind(&record.project)
665            .bind(&record.benchmark)
666            .bind(&record.run_id)
667            .bind(status_str)
668            .bind(counts_json)
669            .bind(reasons_json)
670            .bind(&record.git_ref)
671            .bind(&record.git_sha)
672            .bind(record.created_at)
673            .execute(&self.pool)
674            .await
675            .map_err(|e| StoreError::QueryError(e.to_string()))?;
676
677        Ok(())
678    }
679
680    async fn list_verdicts(
681        &self,
682        project: &str,
683        query: &ListVerdictsQuery,
684    ) -> Result<ListVerdictsResponse, StoreError> {
685        let mut sql = "SELECT * FROM verdicts WHERE project = $1".to_string();
686        let mut params_count = 1;
687
688        if let Some(_bench) = &query.benchmark {
689            params_count += 1;
690            sql.push_str(&format!(" AND benchmark = ${}", params_count));
691        }
692
693        if let Some(_status) = &query.status {
694            params_count += 1;
695            sql.push_str(&format!(" AND status = ${}", params_count));
696        }
697
698        if let Some(_since) = &query.since {
699            params_count += 1;
700            sql.push_str(&format!(" AND created_at >= ${}", params_count));
701        }
702
703        if let Some(_until) = &query.until {
704            params_count += 1;
705            sql.push_str(&format!(" AND created_at <= ${}", params_count));
706        }
707
708        sql.push_str(" ORDER BY created_at DESC");
709
710        // Limit and offset
711        params_count += 1;
712        sql.push_str(&format!(" LIMIT ${}", params_count));
713        params_count += 1;
714        sql.push_str(&format!(" OFFSET ${}", params_count));
715
716        let mut q = sqlx::query(&sql).bind(project);
717
718        if let Some(bench) = &query.benchmark {
719            q = q.bind(bench);
720        }
721        if let Some(status) = &query.status {
722            q = q.bind(status.as_str());
723        }
724        if let Some(since) = &query.since {
725            q = q.bind(since);
726        }
727        if let Some(until) = &query.until {
728            q = q.bind(until);
729        }
730
731        q = q.bind(query.limit as i64);
732        q = q.bind(query.offset as i64);
733
734        let rows = q
735            .fetch_all(&self.pool)
736            .await
737            .map_err(|e| StoreError::QueryError(e.to_string()))?;
738
739        let mut verdicts = Vec::with_capacity(rows.len());
740        for row in rows {
741            verdicts.push(self.row_to_verdict(row)?);
742        }
743
744        // For total count
745        let count_sql = "SELECT COUNT(*) FROM verdicts WHERE project = $1";
746        let total: i64 = sqlx::query_scalar(count_sql)
747            .bind(project)
748            .fetch_one(&self.pool)
749            .await
750            .map_err(|e| StoreError::QueryError(e.to_string()))?;
751
752        Ok(ListVerdictsResponse {
753            verdicts,
754            pagination: PaginationInfo {
755                total: total as u64,
756                offset: query.offset,
757                limit: query.limit,
758                has_more: (query.offset + query.limit as u64) < total as u64,
759            },
760        })
761    }
762}
763
764impl PostgresStore {
765    fn row_to_verdict(&self, row: sqlx::postgres::PgRow) -> Result<VerdictRecord, StoreError> {
766        let status_str: String = row.get("status");
767        let status = match status_str.as_str() {
768            "pass" => VerdictStatus::Pass,
769            "warn" => VerdictStatus::Warn,
770            "fail" => VerdictStatus::Fail,
771            "skip" => VerdictStatus::Skip,
772            _ => VerdictStatus::Pass, // Default fallback
773        };
774
775        let counts_json: serde_json::Value = row.get("counts");
776        let counts = serde_json::from_value(counts_json).map_err(StoreError::SerializationError)?;
777
778        let reasons_json: serde_json::Value = row.get("reasons");
779        let reasons =
780            serde_json::from_value(reasons_json).map_err(StoreError::SerializationError)?;
781
782        Ok(VerdictRecord {
783            schema: row.get("schema_id"), // Wait, I didn't add schema_id to verdicts table
784            id: row.get("id"),
785            project: row.get("project"),
786            benchmark: row.get("benchmark"),
787            run_id: row.get("run_id"),
788            status,
789            counts,
790            reasons,
791            git_ref: row.get("git_ref"),
792            git_sha: row.get("git_sha"),
793            created_at: row.get("created_at"),
794        })
795    }
796}
797
798#[async_trait]
799impl AuditStore for PostgresStore {
800    async fn log_event(&self, event: &AuditEvent) -> Result<(), StoreError> {
801        let metadata_json =
802            serde_json::to_value(&event.metadata).map_err(StoreError::SerializationError)?;
803
804        let sql = r#"
805            INSERT INTO audit_events (
806                id, timestamp, actor, action, resource_type, resource_id, project, metadata
807            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
808        "#;
809
810        sqlx::query(sql)
811            .bind(&event.id)
812            .bind(event.timestamp)
813            .bind(&event.actor)
814            .bind(event.action.to_string())
815            .bind(event.resource_type.to_string())
816            .bind(&event.resource_id)
817            .bind(&event.project)
818            .bind(metadata_json)
819            .execute(&self.pool)
820            .await
821            .map_err(|e| StoreError::QueryError(e.to_string()))?;
822
823        Ok(())
824    }
825
826    async fn list_events(
827        &self,
828        query: &ListAuditEventsQuery,
829    ) -> Result<ListAuditEventsResponse, StoreError> {
830        let mut sql = "SELECT * FROM audit_events WHERE TRUE".to_string();
831        let mut params_count = 0;
832
833        if query.project.is_some() {
834            params_count += 1;
835            sql.push_str(&format!(" AND project = ${}", params_count));
836        }
837        if query.action.is_some() {
838            params_count += 1;
839            sql.push_str(&format!(" AND action = ${}", params_count));
840        }
841        if query.resource_type.is_some() {
842            params_count += 1;
843            sql.push_str(&format!(" AND resource_type = ${}", params_count));
844        }
845        if query.actor.is_some() {
846            params_count += 1;
847            sql.push_str(&format!(" AND actor = ${}", params_count));
848        }
849        if query.since.is_some() {
850            params_count += 1;
851            sql.push_str(&format!(" AND timestamp >= ${}", params_count));
852        }
853        if query.until.is_some() {
854            params_count += 1;
855            sql.push_str(&format!(" AND timestamp <= ${}", params_count));
856        }
857
858        sql.push_str(" ORDER BY timestamp DESC");
859
860        params_count += 1;
861        sql.push_str(&format!(" LIMIT ${}", params_count));
862        params_count += 1;
863        sql.push_str(&format!(" OFFSET ${}", params_count));
864
865        let mut q = sqlx::query(&sql);
866
867        if let Some(ref project) = query.project {
868            q = q.bind(project);
869        }
870        if let Some(ref action) = query.action {
871            q = q.bind(action);
872        }
873        if let Some(ref resource_type) = query.resource_type {
874            q = q.bind(resource_type);
875        }
876        if let Some(ref actor) = query.actor {
877            q = q.bind(actor);
878        }
879        if let Some(ref since) = query.since {
880            q = q.bind(since);
881        }
882        if let Some(ref until) = query.until {
883            q = q.bind(until);
884        }
885
886        q = q.bind(query.limit as i64);
887        q = q.bind(query.offset as i64);
888
889        let rows = q
890            .fetch_all(&self.pool)
891            .await
892            .map_err(|e| StoreError::QueryError(e.to_string()))?;
893
894        let mut events = Vec::with_capacity(rows.len());
895        for row in rows {
896            let action_str: String = row.get("action");
897            let action = action_str
898                .parse::<AuditAction>()
899                .unwrap_or(AuditAction::Create);
900
901            let resource_type_str: String = row.get("resource_type");
902            let resource_type = resource_type_str
903                .parse::<AuditResourceType>()
904                .unwrap_or(AuditResourceType::Baseline);
905
906            let metadata_json: serde_json::Value = row.get("metadata");
907
908            events.push(AuditEvent {
909                id: row.get("id"),
910                timestamp: row.get("timestamp"),
911                actor: row.get("actor"),
912                action,
913                resource_type,
914                resource_id: row.get("resource_id"),
915                project: row.get("project"),
916                metadata: metadata_json,
917            });
918        }
919
920        // Total count
921        let count_sql = "SELECT COUNT(*) FROM audit_events WHERE TRUE";
922        let total: i64 = sqlx::query_scalar(count_sql)
923            .fetch_one(&self.pool)
924            .await
925            .map_err(|e| StoreError::QueryError(e.to_string()))?;
926
927        Ok(ListAuditEventsResponse {
928            events,
929            pagination: PaginationInfo {
930                total: total as u64,
931                offset: query.offset,
932                limit: query.limit,
933                has_more: (query.offset + query.limit as u64) < total as u64,
934            },
935        })
936    }
937}
938
939#[cfg(test)]
940mod tests {
941    use super::*;
942
943    #[test]
944    fn test_is_transient_pool_timed_out() {
945        assert!(is_transient(&sqlx::Error::PoolTimedOut));
946    }
947
948    #[test]
949    fn test_is_transient_pool_closed() {
950        assert!(!is_transient(&sqlx::Error::PoolClosed));
951    }
952
953    #[test]
954    fn test_is_transient_io_error() {
955        let err = sqlx::Error::Io(std::io::Error::new(
956            std::io::ErrorKind::ConnectionRefused,
957            "connection refused",
958        ));
959        assert!(is_transient(&err));
960    }
961
962    #[test]
963    fn test_is_transient_non_transient() {
964        let err = sqlx::Error::ColumnNotFound("missing".to_string());
965        assert!(!is_transient(&err));
966    }
967}