Skip to main content

perfgate_server/storage/
postgres.rs

1//! PostgreSQL storage implementation for persistent baseline storage.
2//!
3//! This module provides a robust, asynchronous PostgreSQL backend for storing
4//! and querying perfgate baseline records using sqlx.
5
6use super::{ArtifactStore, BaselineStore, StorageHealth};
7use crate::error::StoreError;
8use crate::models::{
9    BaselineRecord, BaselineSource, BaselineVersion, ListBaselinesQuery, ListBaselinesResponse,
10    ListVerdictsQuery, ListVerdictsResponse, PaginationInfo, VerdictRecord,
11};
12use async_trait::async_trait;
13use perfgate_types::VerdictStatus;
14use sqlx::{PgPool, Row, postgres::PgPoolOptions};
15use std::sync::Arc;
16use std::time::Duration;
17
18/// PostgreSQL storage backend for baselines.
19#[derive(Debug, Clone)]
20pub struct PostgresStore {
21    pool: PgPool,
22    artifacts: Option<Arc<dyn ArtifactStore>>,
23}
24
25impl PostgresStore {
26    /// Creates a new PostgreSQL storage backend and runs initial schema migrations.
27    pub async fn new(
28        url: &str,
29        artifacts: Option<Arc<dyn ArtifactStore>>,
30    ) -> Result<Self, StoreError> {
31        let pool = PgPoolOptions::new()
32            .max_connections(10)
33            .acquire_timeout(Duration::from_secs(5))
34            .connect(url)
35            .await
36            .map_err(|e| StoreError::ConnectionError(e.to_string()))?;
37
38        let store = Self { pool, artifacts };
39        store.init_schema().await?;
40        Ok(store)
41    }
42
43    async fn init_schema(&self) -> Result<(), StoreError> {
44        let sql = r#"
45            CREATE TABLE IF NOT EXISTS baselines (
46                id VARCHAR(26) PRIMARY KEY,
47                project VARCHAR(255) NOT NULL,
48                benchmark VARCHAR(255) NOT NULL,
49                version VARCHAR(64) NOT NULL,
50                schema_id VARCHAR(64) NOT NULL,
51                git_ref VARCHAR(255),
52                git_sha VARCHAR(40),
53                receipt JSONB,
54                artifact_path TEXT,
55                metadata JSONB NOT NULL,
56                tags JSONB NOT NULL,
57                created_at TIMESTAMPTZ NOT NULL,
58                updated_at TIMESTAMPTZ NOT NULL,
59                content_hash VARCHAR(64) NOT NULL,
60                source VARCHAR(32) NOT NULL,
61                deleted BOOLEAN NOT NULL DEFAULT FALSE,
62                UNIQUE (project, benchmark, version)
63            );
64            
65            CREATE INDEX IF NOT EXISTS idx_baselines_project_benchmark 
66            ON baselines(project, benchmark);
67
68            CREATE TABLE IF NOT EXISTS verdicts (
69                id VARCHAR(26) PRIMARY KEY,
70                schema_id VARCHAR(64) NOT NULL,
71                project VARCHAR(255) NOT NULL,
72                benchmark VARCHAR(255) NOT NULL,
73                run_id VARCHAR(255) NOT NULL,
74                status VARCHAR(32) NOT NULL,
75                counts JSONB NOT NULL,
76                reasons JSONB NOT NULL,
77                git_ref VARCHAR(255),
78                git_sha VARCHAR(40),
79                created_at TIMESTAMPTZ NOT NULL
80            );
81
82            CREATE INDEX IF NOT EXISTS idx_verdicts_project_benchmark
83            ON verdicts(project, benchmark);
84
85            CREATE INDEX IF NOT EXISTS idx_verdicts_created_at
86            ON verdicts(created_at);
87        "#;
88
89        sqlx::query(sql)
90            .execute(&self.pool)
91            .await
92            .map_err(|e| StoreError::ConnectionError(format!("Failed to init schema: {}", e)))?;
93
94        Ok(())
95    }
96
97    async fn store_artifact(&self, record: &BaselineRecord) -> Result<Option<String>, StoreError> {
98        if let Some(store) = &self.artifacts {
99            let path = format!(
100                "{}/{}/{}.json",
101                record.project, record.benchmark, record.version
102            );
103            let data =
104                serde_json::to_vec(&record.receipt).map_err(StoreError::SerializationError)?;
105            store.put(&path, data).await?;
106            Ok(Some(path))
107        } else {
108            Ok(None)
109        }
110    }
111
112    fn row_to_record(
113        row: sqlx::postgres::PgRow,
114    ) -> Result<(BaselineRecord, Option<String>), StoreError> {
115        let artifact_path: Option<String> = row.get("artifact_path");
116
117        let receipt = if let Some(receipt_json) = row.get::<Option<serde_json::Value>, _>("receipt")
118        {
119            serde_json::from_value(receipt_json).map_err(StoreError::SerializationError)?
120        } else {
121            // Placeholder, will be loaded from artifact store if needed
122            serde_json::from_value(serde_json::json!({
123                "schema": "perfgate.run.v1",
124                "tool": {"name": "placeholder", "version": "0"},
125                "run": {
126                    "id": "placeholder",
127                    "started_at": "1970-01-01T00:00:00Z",
128                    "ended_at": "1970-01-01T00:00:00Z",
129                    "host": {"os": "unknown", "arch": "unknown"}
130                },
131                "bench": {
132                    "name": "placeholder",
133                    "command": [],
134                    "repeat": 0,
135                    "warmup": 0
136                },
137                "samples": [],
138                "stats": {
139                    "wall_ms": {"median": 0, "min": 0, "max": 0}
140                }
141            }))
142            .unwrap()
143        };
144
145        let metadata_json: serde_json::Value = row.get("metadata");
146        let metadata =
147            serde_json::from_value(metadata_json).map_err(StoreError::SerializationError)?;
148
149        let tags_json: serde_json::Value = row.get("tags");
150        let tags = serde_json::from_value(tags_json).map_err(StoreError::SerializationError)?;
151
152        let source_str: String = row.get("source");
153        let source = serde_json::from_value(serde_json::Value::String(source_str))
154            .unwrap_or(BaselineSource::Upload);
155
156        let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
157        let updated_at: chrono::DateTime<chrono::Utc> = row.get("updated_at");
158
159        Ok((
160            BaselineRecord {
161                schema: row.get("schema_id"),
162                id: row.get("id"),
163                project: row.get("project"),
164                benchmark: row.get("benchmark"),
165                version: row.get("version"),
166                git_ref: row.get("git_ref"),
167                git_sha: row.get("git_sha"),
168                receipt,
169                metadata,
170                tags,
171                created_at,
172                updated_at,
173                content_hash: row.get("content_hash"),
174                source,
175                deleted: row.get("deleted"),
176            },
177            artifact_path,
178        ))
179    }
180
181    async fn load_artifact(
182        &self,
183        path: Option<String>,
184        mut record: BaselineRecord,
185    ) -> Result<BaselineRecord, StoreError> {
186        if let (Some(store), Some(path)) = (&self.artifacts, path) {
187            let data = store.get(&path).await?;
188            record.receipt =
189                serde_json::from_slice(&data).map_err(StoreError::SerializationError)?;
190        }
191        Ok(record)
192    }
193}
194
195#[async_trait]
196impl BaselineStore for PostgresStore {
197    async fn create(&self, record: &BaselineRecord) -> Result<(), StoreError> {
198        let artifact_path = self.store_artifact(record).await?;
199
200        let receipt_json = if artifact_path.is_none() {
201            Some(serde_json::to_value(&record.receipt).map_err(StoreError::SerializationError)?)
202        } else {
203            None
204        };
205
206        let metadata_json =
207            serde_json::to_value(&record.metadata).map_err(StoreError::SerializationError)?;
208        let tags_json =
209            serde_json::to_value(&record.tags).map_err(StoreError::SerializationError)?;
210        let source_json =
211            serde_json::to_value(&record.source).map_err(StoreError::SerializationError)?;
212        let source_str = source_json.as_str().unwrap_or("upload");
213
214        let sql = r#"
215            INSERT INTO baselines (
216                id, project, benchmark, version, schema_id, 
217                git_ref, git_sha, receipt, artifact_path, metadata, tags,
218                created_at, updated_at, content_hash, source, deleted
219            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
220        "#;
221
222        let result = sqlx::query(sql)
223            .bind(&record.id)
224            .bind(&record.project)
225            .bind(&record.benchmark)
226            .bind(&record.version)
227            .bind(&record.schema)
228            .bind(&record.git_ref)
229            .bind(&record.git_sha)
230            .bind(receipt_json)
231            .bind(artifact_path)
232            .bind(metadata_json)
233            .bind(tags_json)
234            .bind(record.created_at)
235            .bind(record.updated_at)
236            .bind(&record.content_hash)
237            .bind(source_str)
238            .bind(record.deleted)
239            .execute(&self.pool)
240            .await;
241
242        match result {
243            Ok(_) => Ok(()),
244            Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Err(
245                StoreError::already_exists(&record.project, &record.benchmark, &record.version),
246            ),
247            Err(e) => Err(StoreError::QueryError(e.to_string())),
248        }
249    }
250
251    async fn get(
252        &self,
253        project: &str,
254        benchmark: &str,
255        version: &str,
256    ) -> Result<Option<BaselineRecord>, StoreError> {
257        let sql = "SELECT * FROM baselines WHERE project = $1 AND benchmark = $2 AND version = $3 AND deleted = FALSE";
258
259        let row_opt = sqlx::query(sql)
260            .bind(project)
261            .bind(benchmark)
262            .bind(version)
263            .fetch_optional(&self.pool)
264            .await
265            .map_err(|e| StoreError::QueryError(e.to_string()))?;
266
267        match row_opt {
268            Some(row) => {
269                let (record, artifact_path) = Self::row_to_record(row)?;
270                Ok(Some(self.load_artifact(artifact_path, record).await?))
271            }
272            None => Ok(None),
273        }
274    }
275
276    async fn get_latest(
277        &self,
278        project: &str,
279        benchmark: &str,
280    ) -> Result<Option<BaselineRecord>, StoreError> {
281        let sql = "SELECT * FROM baselines WHERE project = $1 AND benchmark = $2 AND deleted = FALSE ORDER BY created_at DESC LIMIT 1";
282
283        let row_opt = sqlx::query(sql)
284            .bind(project)
285            .bind(benchmark)
286            .fetch_optional(&self.pool)
287            .await
288            .map_err(|e| StoreError::QueryError(e.to_string()))?;
289
290        match row_opt {
291            Some(row) => {
292                let (record, artifact_path) = Self::row_to_record(row)?;
293                Ok(Some(self.load_artifact(artifact_path, record).await?))
294            }
295            None => Ok(None),
296        }
297    }
298
299    async fn list(
300        &self,
301        project: &str,
302        query: &ListBaselinesQuery,
303    ) -> Result<ListBaselinesResponse, StoreError> {
304        let mut sql =
305            String::from("SELECT * FROM baselines WHERE project = $1 AND deleted = FALSE");
306
307        if let Some(bench) = &query.benchmark {
308            sql.push_str(" AND benchmark = '");
309            sql.push_str(&bench.replace('\'', "''"));
310            sql.push('\'');
311        }
312
313        sql.push_str(" ORDER BY created_at DESC");
314
315        let limit = query.limit.min(100) as i64;
316        sql.push_str(&format!(" LIMIT {}", limit + 1));
317
318        let offset = query.offset as i64;
319        sql.push_str(&format!(" OFFSET {}", offset));
320
321        let rows = sqlx::query(&sql)
322            .bind(project)
323            .fetch_all(&self.pool)
324            .await
325            .map_err(|e| StoreError::QueryError(e.to_string()))?;
326
327        let has_more = rows.len() > limit as usize;
328        let take_count = if has_more { limit as usize } else { rows.len() };
329
330        let mut baselines = Vec::with_capacity(take_count);
331        for row in rows.into_iter().take(take_count) {
332            let (mut record, artifact_path) = Self::row_to_record(row)?;
333            if query.include_receipt {
334                record = self.load_artifact(artifact_path, record).await?;
335            }
336            baselines.push(record.into());
337        }
338
339        // Determine total count
340        let count_sql = "SELECT COUNT(*) FROM baselines WHERE project = $1 AND deleted = FALSE";
341        let mut count_query = String::from(count_sql);
342        if let Some(bench) = &query.benchmark {
343            count_query.push_str(" AND benchmark = '");
344            count_query.push_str(&bench.replace('\'', "''"));
345            count_query.push('\'');
346        }
347        let total_row = sqlx::query(&count_query)
348            .bind(project)
349            .fetch_one(&self.pool)
350            .await
351            .map_err(|e| StoreError::QueryError(e.to_string()))?;
352
353        let total: i64 = total_row.get(0);
354
355        let pagination = PaginationInfo {
356            limit: limit as u32,
357            offset: query.offset,
358            total: total as u64,
359            has_more,
360        };
361
362        Ok(ListBaselinesResponse {
363            baselines,
364            pagination,
365        })
366    }
367
368    async fn update(&self, record: &BaselineRecord) -> Result<(), StoreError> {
369        let receipt_json =
370            serde_json::to_value(&record.receipt).map_err(StoreError::SerializationError)?;
371        let metadata_json =
372            serde_json::to_value(&record.metadata).map_err(StoreError::SerializationError)?;
373        let tags_json =
374            serde_json::to_value(&record.tags).map_err(StoreError::SerializationError)?;
375
376        let sql = r#"
377            UPDATE baselines 
378            SET schema_id = $1, git_ref = $2, git_sha = $3, receipt = $4, 
379                metadata = $5, tags = $6, updated_at = $7, content_hash = $8
380            WHERE project = $9 AND benchmark = $10 AND version = $11 AND deleted = FALSE
381        "#;
382
383        let result = sqlx::query(sql)
384            .bind(&record.schema)
385            .bind(&record.git_ref)
386            .bind(&record.git_sha)
387            .bind(receipt_json)
388            .bind(metadata_json)
389            .bind(tags_json)
390            .bind(record.updated_at)
391            .bind(&record.content_hash)
392            .bind(&record.project)
393            .bind(&record.benchmark)
394            .bind(&record.version)
395            .execute(&self.pool)
396            .await
397            .map_err(|e| StoreError::QueryError(e.to_string()))?;
398
399        if result.rows_affected() == 0 {
400            return Err(StoreError::not_found(
401                &record.project,
402                &record.benchmark,
403                &record.version,
404            ));
405        }
406
407        Ok(())
408    }
409
410    async fn delete(
411        &self,
412        project: &str,
413        benchmark: &str,
414        version: &str,
415    ) -> Result<bool, StoreError> {
416        let sql = "UPDATE baselines SET deleted = TRUE, updated_at = NOW() WHERE project = $1 AND benchmark = $2 AND version = $3 AND deleted = FALSE";
417
418        let result = sqlx::query(sql)
419            .bind(project)
420            .bind(benchmark)
421            .bind(version)
422            .execute(&self.pool)
423            .await
424            .map_err(|e| StoreError::QueryError(e.to_string()))?;
425
426        Ok(result.rows_affected() > 0)
427    }
428
429    async fn hard_delete(
430        &self,
431        project: &str,
432        benchmark: &str,
433        version: &str,
434    ) -> Result<bool, StoreError> {
435        let sql = "DELETE FROM baselines WHERE project = $1 AND benchmark = $2 AND version = $3";
436
437        let result = sqlx::query(sql)
438            .bind(project)
439            .bind(benchmark)
440            .bind(version)
441            .execute(&self.pool)
442            .await
443            .map_err(|e| StoreError::QueryError(e.to_string()))?;
444
445        Ok(result.rows_affected() > 0)
446    }
447
448    async fn list_versions(
449        &self,
450        project: &str,
451        benchmark: &str,
452    ) -> Result<Vec<BaselineVersion>, StoreError> {
453        let sql = "SELECT version, created_at, git_ref, git_sha, source FROM baselines WHERE project = $1 AND benchmark = $2 AND deleted = FALSE ORDER BY created_at DESC";
454
455        let rows = sqlx::query(sql)
456            .bind(project)
457            .bind(benchmark)
458            .fetch_all(&self.pool)
459            .await
460            .map_err(|e| StoreError::QueryError(e.to_string()))?;
461
462        let mut versions = Vec::with_capacity(rows.len());
463        for row in rows {
464            let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
465            let source_str: String = row.get("source");
466            let source = serde_json::from_value(serde_json::Value::String(source_str))
467                .unwrap_or(BaselineSource::Upload);
468
469            versions.push(BaselineVersion {
470                version: row.get("version"),
471                created_at,
472                git_ref: row.get("git_ref"),
473                git_sha: row.get("git_sha"),
474                created_by: None,
475                is_current: false, // Could be determined by checking if it's the latest
476                source,
477            });
478        }
479
480        if let Some(first) = versions.first_mut() {
481            first.is_current = true;
482        }
483
484        Ok(versions)
485    }
486
487    async fn health_check(&self) -> Result<StorageHealth, StoreError> {
488        match sqlx::query("SELECT 1").execute(&self.pool).await {
489            Ok(_) => Ok(StorageHealth::Healthy),
490            Err(_) => Ok(StorageHealth::Unhealthy),
491        }
492    }
493
494    fn backend_type(&self) -> &'static str {
495        "postgres"
496    }
497
498    async fn create_verdict(&self, record: &VerdictRecord) -> Result<(), StoreError> {
499        let sql = r#"
500            INSERT INTO verdicts (
501                id, schema_id, project, benchmark, run_id, status, counts, reasons,
502                git_ref, git_sha, created_at
503            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
504        "#;
505
506        let counts_json =
507            serde_json::to_value(&record.counts).map_err(StoreError::SerializationError)?;
508        let reasons_json =
509            serde_json::to_value(&record.reasons).map_err(StoreError::SerializationError)?;
510        let status_str = record.status.as_str();
511
512        sqlx::query(sql)
513            .bind(&record.id)
514            .bind(&record.schema)
515            .bind(&record.project)
516            .bind(&record.benchmark)
517            .bind(&record.run_id)
518            .bind(status_str)
519            .bind(counts_json)
520            .bind(reasons_json)
521            .bind(&record.git_ref)
522            .bind(&record.git_sha)
523            .bind(record.created_at)
524            .execute(&self.pool)
525            .await
526            .map_err(|e| StoreError::QueryError(e.to_string()))?;
527
528        Ok(())
529    }
530
531    async fn list_verdicts(
532        &self,
533        project: &str,
534        query: &ListVerdictsQuery,
535    ) -> Result<ListVerdictsResponse, StoreError> {
536        let mut sql = "SELECT * FROM verdicts WHERE project = $1".to_string();
537        let mut params_count = 1;
538
539        if let Some(_bench) = &query.benchmark {
540            params_count += 1;
541            sql.push_str(&format!(" AND benchmark = ${}", params_count));
542        }
543
544        if let Some(_status) = &query.status {
545            params_count += 1;
546            sql.push_str(&format!(" AND status = ${}", params_count));
547        }
548
549        if let Some(_since) = &query.since {
550            params_count += 1;
551            sql.push_str(&format!(" AND created_at >= ${}", params_count));
552        }
553
554        if let Some(_until) = &query.until {
555            params_count += 1;
556            sql.push_str(&format!(" AND created_at <= ${}", params_count));
557        }
558
559        sql.push_str(" ORDER BY created_at DESC");
560
561        // Limit and offset
562        params_count += 1;
563        sql.push_str(&format!(" LIMIT ${}", params_count));
564        params_count += 1;
565        sql.push_str(&format!(" OFFSET ${}", params_count));
566
567        let mut q = sqlx::query(&sql).bind(project);
568
569        if let Some(bench) = &query.benchmark {
570            q = q.bind(bench);
571        }
572        if let Some(status) = &query.status {
573            q = q.bind(status.as_str());
574        }
575        if let Some(since) = &query.since {
576            q = q.bind(since);
577        }
578        if let Some(until) = &query.until {
579            q = q.bind(until);
580        }
581
582        q = q.bind(query.limit as i64);
583        q = q.bind(query.offset as i64);
584
585        let rows = q
586            .fetch_all(&self.pool)
587            .await
588            .map_err(|e| StoreError::QueryError(e.to_string()))?;
589
590        let mut verdicts = Vec::with_capacity(rows.len());
591        for row in rows {
592            verdicts.push(self.row_to_verdict(row)?);
593        }
594
595        // For total count
596        let count_sql = "SELECT COUNT(*) FROM verdicts WHERE project = $1";
597        let total: i64 = sqlx::query_scalar(count_sql)
598            .bind(project)
599            .fetch_one(&self.pool)
600            .await
601            .map_err(|e| StoreError::QueryError(e.to_string()))?;
602
603        Ok(ListVerdictsResponse {
604            verdicts,
605            pagination: PaginationInfo {
606                total: total as u64,
607                offset: query.offset,
608                limit: query.limit,
609                has_more: (query.offset + query.limit as u64) < total as u64,
610            },
611        })
612    }
613}
614
615impl PostgresStore {
616    fn row_to_verdict(&self, row: sqlx::postgres::PgRow) -> Result<VerdictRecord, StoreError> {
617        let status_str: String = row.get("status");
618        let status = match status_str.as_str() {
619            "pass" => VerdictStatus::Pass,
620            "warn" => VerdictStatus::Warn,
621            "fail" => VerdictStatus::Fail,
622            "skip" => VerdictStatus::Skip,
623            _ => VerdictStatus::Pass, // Default fallback
624        };
625
626        let counts_json: serde_json::Value = row.get("counts");
627        let counts = serde_json::from_value(counts_json).map_err(StoreError::SerializationError)?;
628
629        let reasons_json: serde_json::Value = row.get("reasons");
630        let reasons =
631            serde_json::from_value(reasons_json).map_err(StoreError::SerializationError)?;
632
633        Ok(VerdictRecord {
634            schema: row.get("schema_id"), // Wait, I didn't add schema_id to verdicts table
635            id: row.get("id"),
636            project: row.get("project"),
637            benchmark: row.get("benchmark"),
638            run_id: row.get("run_id"),
639            status,
640            counts,
641            reasons,
642            git_ref: row.get("git_ref"),
643            git_sha: row.get("git_sha"),
644            created_at: row.get("created_at"),
645        })
646    }
647}