1use super::{ArtifactStore, BaselineStore, StorageHealth};
7use crate::error::StoreError;
8use crate::models::{
9 BaselineRecord, BaselineSource, BaselineVersion, ListBaselinesQuery, ListBaselinesResponse,
10 ListVerdictsQuery, ListVerdictsResponse, PaginationInfo, VerdictRecord,
11};
12use async_trait::async_trait;
13use perfgate_types::VerdictStatus;
14use sqlx::{PgPool, Row, postgres::PgPoolOptions};
15use std::sync::Arc;
16use std::time::Duration;
17
18#[derive(Debug, Clone)]
20pub struct PostgresStore {
21 pool: PgPool,
22 artifacts: Option<Arc<dyn ArtifactStore>>,
23}
24
25impl PostgresStore {
26 pub async fn new(
28 url: &str,
29 artifacts: Option<Arc<dyn ArtifactStore>>,
30 ) -> Result<Self, StoreError> {
31 let pool = PgPoolOptions::new()
32 .max_connections(10)
33 .acquire_timeout(Duration::from_secs(5))
34 .connect(url)
35 .await
36 .map_err(|e| StoreError::ConnectionError(e.to_string()))?;
37
38 let store = Self { pool, artifacts };
39 store.init_schema().await?;
40 Ok(store)
41 }
42
43 async fn init_schema(&self) -> Result<(), StoreError> {
44 let sql = r#"
45 CREATE TABLE IF NOT EXISTS baselines (
46 id VARCHAR(26) PRIMARY KEY,
47 project VARCHAR(255) NOT NULL,
48 benchmark VARCHAR(255) NOT NULL,
49 version VARCHAR(64) NOT NULL,
50 schema_id VARCHAR(64) NOT NULL,
51 git_ref VARCHAR(255),
52 git_sha VARCHAR(40),
53 receipt JSONB,
54 artifact_path TEXT,
55 metadata JSONB NOT NULL,
56 tags JSONB NOT NULL,
57 created_at TIMESTAMPTZ NOT NULL,
58 updated_at TIMESTAMPTZ NOT NULL,
59 content_hash VARCHAR(64) NOT NULL,
60 source VARCHAR(32) NOT NULL,
61 deleted BOOLEAN NOT NULL DEFAULT FALSE,
62 UNIQUE (project, benchmark, version)
63 );
64
65 CREATE INDEX IF NOT EXISTS idx_baselines_project_benchmark
66 ON baselines(project, benchmark);
67
68 CREATE TABLE IF NOT EXISTS verdicts (
69 id VARCHAR(26) PRIMARY KEY,
70 schema_id VARCHAR(64) NOT NULL,
71 project VARCHAR(255) NOT NULL,
72 benchmark VARCHAR(255) NOT NULL,
73 run_id VARCHAR(255) NOT NULL,
74 status VARCHAR(32) NOT NULL,
75 counts JSONB NOT NULL,
76 reasons JSONB NOT NULL,
77 git_ref VARCHAR(255),
78 git_sha VARCHAR(40),
79 created_at TIMESTAMPTZ NOT NULL
80 );
81
82 CREATE INDEX IF NOT EXISTS idx_verdicts_project_benchmark
83 ON verdicts(project, benchmark);
84
85 CREATE INDEX IF NOT EXISTS idx_verdicts_created_at
86 ON verdicts(created_at);
87 "#;
88
89 sqlx::query(sql)
90 .execute(&self.pool)
91 .await
92 .map_err(|e| StoreError::ConnectionError(format!("Failed to init schema: {}", e)))?;
93
94 Ok(())
95 }
96
97 async fn store_artifact(&self, record: &BaselineRecord) -> Result<Option<String>, StoreError> {
98 if let Some(store) = &self.artifacts {
99 let path = format!(
100 "{}/{}/{}.json",
101 record.project, record.benchmark, record.version
102 );
103 let data =
104 serde_json::to_vec(&record.receipt).map_err(StoreError::SerializationError)?;
105 store.put(&path, data).await?;
106 Ok(Some(path))
107 } else {
108 Ok(None)
109 }
110 }
111
112 fn row_to_record(
113 row: sqlx::postgres::PgRow,
114 ) -> Result<(BaselineRecord, Option<String>), StoreError> {
115 let artifact_path: Option<String> = row.get("artifact_path");
116
117 let receipt = if let Some(receipt_json) = row.get::<Option<serde_json::Value>, _>("receipt")
118 {
119 serde_json::from_value(receipt_json).map_err(StoreError::SerializationError)?
120 } else {
121 serde_json::from_value(serde_json::json!({
123 "schema": "perfgate.run.v1",
124 "tool": {"name": "placeholder", "version": "0"},
125 "run": {
126 "id": "placeholder",
127 "started_at": "1970-01-01T00:00:00Z",
128 "ended_at": "1970-01-01T00:00:00Z",
129 "host": {"os": "unknown", "arch": "unknown"}
130 },
131 "bench": {
132 "name": "placeholder",
133 "command": [],
134 "repeat": 0,
135 "warmup": 0
136 },
137 "samples": [],
138 "stats": {
139 "wall_ms": {"median": 0, "min": 0, "max": 0}
140 }
141 }))
142 .unwrap()
143 };
144
145 let metadata_json: serde_json::Value = row.get("metadata");
146 let metadata =
147 serde_json::from_value(metadata_json).map_err(StoreError::SerializationError)?;
148
149 let tags_json: serde_json::Value = row.get("tags");
150 let tags = serde_json::from_value(tags_json).map_err(StoreError::SerializationError)?;
151
152 let source_str: String = row.get("source");
153 let source = serde_json::from_value(serde_json::Value::String(source_str))
154 .unwrap_or(BaselineSource::Upload);
155
156 let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
157 let updated_at: chrono::DateTime<chrono::Utc> = row.get("updated_at");
158
159 Ok((
160 BaselineRecord {
161 schema: row.get("schema_id"),
162 id: row.get("id"),
163 project: row.get("project"),
164 benchmark: row.get("benchmark"),
165 version: row.get("version"),
166 git_ref: row.get("git_ref"),
167 git_sha: row.get("git_sha"),
168 receipt,
169 metadata,
170 tags,
171 created_at,
172 updated_at,
173 content_hash: row.get("content_hash"),
174 source,
175 deleted: row.get("deleted"),
176 },
177 artifact_path,
178 ))
179 }
180
181 async fn load_artifact(
182 &self,
183 path: Option<String>,
184 mut record: BaselineRecord,
185 ) -> Result<BaselineRecord, StoreError> {
186 if let (Some(store), Some(path)) = (&self.artifacts, path) {
187 let data = store.get(&path).await?;
188 record.receipt =
189 serde_json::from_slice(&data).map_err(StoreError::SerializationError)?;
190 }
191 Ok(record)
192 }
193}
194
195#[async_trait]
196impl BaselineStore for PostgresStore {
197 async fn create(&self, record: &BaselineRecord) -> Result<(), StoreError> {
198 let artifact_path = self.store_artifact(record).await?;
199
200 let receipt_json = if artifact_path.is_none() {
201 Some(serde_json::to_value(&record.receipt).map_err(StoreError::SerializationError)?)
202 } else {
203 None
204 };
205
206 let metadata_json =
207 serde_json::to_value(&record.metadata).map_err(StoreError::SerializationError)?;
208 let tags_json =
209 serde_json::to_value(&record.tags).map_err(StoreError::SerializationError)?;
210 let source_json =
211 serde_json::to_value(&record.source).map_err(StoreError::SerializationError)?;
212 let source_str = source_json.as_str().unwrap_or("upload");
213
214 let sql = r#"
215 INSERT INTO baselines (
216 id, project, benchmark, version, schema_id,
217 git_ref, git_sha, receipt, artifact_path, metadata, tags,
218 created_at, updated_at, content_hash, source, deleted
219 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
220 "#;
221
222 let result = sqlx::query(sql)
223 .bind(&record.id)
224 .bind(&record.project)
225 .bind(&record.benchmark)
226 .bind(&record.version)
227 .bind(&record.schema)
228 .bind(&record.git_ref)
229 .bind(&record.git_sha)
230 .bind(receipt_json)
231 .bind(artifact_path)
232 .bind(metadata_json)
233 .bind(tags_json)
234 .bind(record.created_at)
235 .bind(record.updated_at)
236 .bind(&record.content_hash)
237 .bind(source_str)
238 .bind(record.deleted)
239 .execute(&self.pool)
240 .await;
241
242 match result {
243 Ok(_) => Ok(()),
244 Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Err(
245 StoreError::already_exists(&record.project, &record.benchmark, &record.version),
246 ),
247 Err(e) => Err(StoreError::QueryError(e.to_string())),
248 }
249 }
250
251 async fn get(
252 &self,
253 project: &str,
254 benchmark: &str,
255 version: &str,
256 ) -> Result<Option<BaselineRecord>, StoreError> {
257 let sql = "SELECT * FROM baselines WHERE project = $1 AND benchmark = $2 AND version = $3 AND deleted = FALSE";
258
259 let row_opt = sqlx::query(sql)
260 .bind(project)
261 .bind(benchmark)
262 .bind(version)
263 .fetch_optional(&self.pool)
264 .await
265 .map_err(|e| StoreError::QueryError(e.to_string()))?;
266
267 match row_opt {
268 Some(row) => {
269 let (record, artifact_path) = Self::row_to_record(row)?;
270 Ok(Some(self.load_artifact(artifact_path, record).await?))
271 }
272 None => Ok(None),
273 }
274 }
275
276 async fn get_latest(
277 &self,
278 project: &str,
279 benchmark: &str,
280 ) -> Result<Option<BaselineRecord>, StoreError> {
281 let sql = "SELECT * FROM baselines WHERE project = $1 AND benchmark = $2 AND deleted = FALSE ORDER BY created_at DESC LIMIT 1";
282
283 let row_opt = sqlx::query(sql)
284 .bind(project)
285 .bind(benchmark)
286 .fetch_optional(&self.pool)
287 .await
288 .map_err(|e| StoreError::QueryError(e.to_string()))?;
289
290 match row_opt {
291 Some(row) => {
292 let (record, artifact_path) = Self::row_to_record(row)?;
293 Ok(Some(self.load_artifact(artifact_path, record).await?))
294 }
295 None => Ok(None),
296 }
297 }
298
299 async fn list(
300 &self,
301 project: &str,
302 query: &ListBaselinesQuery,
303 ) -> Result<ListBaselinesResponse, StoreError> {
304 let mut sql =
305 String::from("SELECT * FROM baselines WHERE project = $1 AND deleted = FALSE");
306
307 if let Some(bench) = &query.benchmark {
308 sql.push_str(" AND benchmark = '");
309 sql.push_str(&bench.replace('\'', "''"));
310 sql.push('\'');
311 }
312
313 sql.push_str(" ORDER BY created_at DESC");
314
315 let limit = query.limit.min(100) as i64;
316 sql.push_str(&format!(" LIMIT {}", limit + 1));
317
318 let offset = query.offset as i64;
319 sql.push_str(&format!(" OFFSET {}", offset));
320
321 let rows = sqlx::query(&sql)
322 .bind(project)
323 .fetch_all(&self.pool)
324 .await
325 .map_err(|e| StoreError::QueryError(e.to_string()))?;
326
327 let has_more = rows.len() > limit as usize;
328 let take_count = if has_more { limit as usize } else { rows.len() };
329
330 let mut baselines = Vec::with_capacity(take_count);
331 for row in rows.into_iter().take(take_count) {
332 let (mut record, artifact_path) = Self::row_to_record(row)?;
333 if query.include_receipt {
334 record = self.load_artifact(artifact_path, record).await?;
335 }
336 baselines.push(record.into());
337 }
338
339 let count_sql = "SELECT COUNT(*) FROM baselines WHERE project = $1 AND deleted = FALSE";
341 let mut count_query = String::from(count_sql);
342 if let Some(bench) = &query.benchmark {
343 count_query.push_str(" AND benchmark = '");
344 count_query.push_str(&bench.replace('\'', "''"));
345 count_query.push('\'');
346 }
347 let total_row = sqlx::query(&count_query)
348 .bind(project)
349 .fetch_one(&self.pool)
350 .await
351 .map_err(|e| StoreError::QueryError(e.to_string()))?;
352
353 let total: i64 = total_row.get(0);
354
355 let pagination = PaginationInfo {
356 limit: limit as u32,
357 offset: query.offset,
358 total: total as u64,
359 has_more,
360 };
361
362 Ok(ListBaselinesResponse {
363 baselines,
364 pagination,
365 })
366 }
367
368 async fn update(&self, record: &BaselineRecord) -> Result<(), StoreError> {
369 let receipt_json =
370 serde_json::to_value(&record.receipt).map_err(StoreError::SerializationError)?;
371 let metadata_json =
372 serde_json::to_value(&record.metadata).map_err(StoreError::SerializationError)?;
373 let tags_json =
374 serde_json::to_value(&record.tags).map_err(StoreError::SerializationError)?;
375
376 let sql = r#"
377 UPDATE baselines
378 SET schema_id = $1, git_ref = $2, git_sha = $3, receipt = $4,
379 metadata = $5, tags = $6, updated_at = $7, content_hash = $8
380 WHERE project = $9 AND benchmark = $10 AND version = $11 AND deleted = FALSE
381 "#;
382
383 let result = sqlx::query(sql)
384 .bind(&record.schema)
385 .bind(&record.git_ref)
386 .bind(&record.git_sha)
387 .bind(receipt_json)
388 .bind(metadata_json)
389 .bind(tags_json)
390 .bind(record.updated_at)
391 .bind(&record.content_hash)
392 .bind(&record.project)
393 .bind(&record.benchmark)
394 .bind(&record.version)
395 .execute(&self.pool)
396 .await
397 .map_err(|e| StoreError::QueryError(e.to_string()))?;
398
399 if result.rows_affected() == 0 {
400 return Err(StoreError::not_found(
401 &record.project,
402 &record.benchmark,
403 &record.version,
404 ));
405 }
406
407 Ok(())
408 }
409
410 async fn delete(
411 &self,
412 project: &str,
413 benchmark: &str,
414 version: &str,
415 ) -> Result<bool, StoreError> {
416 let sql = "UPDATE baselines SET deleted = TRUE, updated_at = NOW() WHERE project = $1 AND benchmark = $2 AND version = $3 AND deleted = FALSE";
417
418 let result = sqlx::query(sql)
419 .bind(project)
420 .bind(benchmark)
421 .bind(version)
422 .execute(&self.pool)
423 .await
424 .map_err(|e| StoreError::QueryError(e.to_string()))?;
425
426 Ok(result.rows_affected() > 0)
427 }
428
429 async fn hard_delete(
430 &self,
431 project: &str,
432 benchmark: &str,
433 version: &str,
434 ) -> Result<bool, StoreError> {
435 let sql = "DELETE FROM baselines WHERE project = $1 AND benchmark = $2 AND version = $3";
436
437 let result = sqlx::query(sql)
438 .bind(project)
439 .bind(benchmark)
440 .bind(version)
441 .execute(&self.pool)
442 .await
443 .map_err(|e| StoreError::QueryError(e.to_string()))?;
444
445 Ok(result.rows_affected() > 0)
446 }
447
448 async fn list_versions(
449 &self,
450 project: &str,
451 benchmark: &str,
452 ) -> Result<Vec<BaselineVersion>, StoreError> {
453 let sql = "SELECT version, created_at, git_ref, git_sha, source FROM baselines WHERE project = $1 AND benchmark = $2 AND deleted = FALSE ORDER BY created_at DESC";
454
455 let rows = sqlx::query(sql)
456 .bind(project)
457 .bind(benchmark)
458 .fetch_all(&self.pool)
459 .await
460 .map_err(|e| StoreError::QueryError(e.to_string()))?;
461
462 let mut versions = Vec::with_capacity(rows.len());
463 for row in rows {
464 let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
465 let source_str: String = row.get("source");
466 let source = serde_json::from_value(serde_json::Value::String(source_str))
467 .unwrap_or(BaselineSource::Upload);
468
469 versions.push(BaselineVersion {
470 version: row.get("version"),
471 created_at,
472 git_ref: row.get("git_ref"),
473 git_sha: row.get("git_sha"),
474 created_by: None,
475 is_current: false, source,
477 });
478 }
479
480 if let Some(first) = versions.first_mut() {
481 first.is_current = true;
482 }
483
484 Ok(versions)
485 }
486
487 async fn health_check(&self) -> Result<StorageHealth, StoreError> {
488 match sqlx::query("SELECT 1").execute(&self.pool).await {
489 Ok(_) => Ok(StorageHealth::Healthy),
490 Err(_) => Ok(StorageHealth::Unhealthy),
491 }
492 }
493
494 fn backend_type(&self) -> &'static str {
495 "postgres"
496 }
497
498 async fn create_verdict(&self, record: &VerdictRecord) -> Result<(), StoreError> {
499 let sql = r#"
500 INSERT INTO verdicts (
501 id, schema_id, project, benchmark, run_id, status, counts, reasons,
502 git_ref, git_sha, created_at
503 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
504 "#;
505
506 let counts_json =
507 serde_json::to_value(&record.counts).map_err(StoreError::SerializationError)?;
508 let reasons_json =
509 serde_json::to_value(&record.reasons).map_err(StoreError::SerializationError)?;
510 let status_str = record.status.as_str();
511
512 sqlx::query(sql)
513 .bind(&record.id)
514 .bind(&record.schema)
515 .bind(&record.project)
516 .bind(&record.benchmark)
517 .bind(&record.run_id)
518 .bind(status_str)
519 .bind(counts_json)
520 .bind(reasons_json)
521 .bind(&record.git_ref)
522 .bind(&record.git_sha)
523 .bind(record.created_at)
524 .execute(&self.pool)
525 .await
526 .map_err(|e| StoreError::QueryError(e.to_string()))?;
527
528 Ok(())
529 }
530
531 async fn list_verdicts(
532 &self,
533 project: &str,
534 query: &ListVerdictsQuery,
535 ) -> Result<ListVerdictsResponse, StoreError> {
536 let mut sql = "SELECT * FROM verdicts WHERE project = $1".to_string();
537 let mut params_count = 1;
538
539 if let Some(_bench) = &query.benchmark {
540 params_count += 1;
541 sql.push_str(&format!(" AND benchmark = ${}", params_count));
542 }
543
544 if let Some(_status) = &query.status {
545 params_count += 1;
546 sql.push_str(&format!(" AND status = ${}", params_count));
547 }
548
549 if let Some(_since) = &query.since {
550 params_count += 1;
551 sql.push_str(&format!(" AND created_at >= ${}", params_count));
552 }
553
554 if let Some(_until) = &query.until {
555 params_count += 1;
556 sql.push_str(&format!(" AND created_at <= ${}", params_count));
557 }
558
559 sql.push_str(" ORDER BY created_at DESC");
560
561 params_count += 1;
563 sql.push_str(&format!(" LIMIT ${}", params_count));
564 params_count += 1;
565 sql.push_str(&format!(" OFFSET ${}", params_count));
566
567 let mut q = sqlx::query(&sql).bind(project);
568
569 if let Some(bench) = &query.benchmark {
570 q = q.bind(bench);
571 }
572 if let Some(status) = &query.status {
573 q = q.bind(status.as_str());
574 }
575 if let Some(since) = &query.since {
576 q = q.bind(since);
577 }
578 if let Some(until) = &query.until {
579 q = q.bind(until);
580 }
581
582 q = q.bind(query.limit as i64);
583 q = q.bind(query.offset as i64);
584
585 let rows = q
586 .fetch_all(&self.pool)
587 .await
588 .map_err(|e| StoreError::QueryError(e.to_string()))?;
589
590 let mut verdicts = Vec::with_capacity(rows.len());
591 for row in rows {
592 verdicts.push(self.row_to_verdict(row)?);
593 }
594
595 let count_sql = "SELECT COUNT(*) FROM verdicts WHERE project = $1";
597 let total: i64 = sqlx::query_scalar(count_sql)
598 .bind(project)
599 .fetch_one(&self.pool)
600 .await
601 .map_err(|e| StoreError::QueryError(e.to_string()))?;
602
603 Ok(ListVerdictsResponse {
604 verdicts,
605 pagination: PaginationInfo {
606 total: total as u64,
607 offset: query.offset,
608 limit: query.limit,
609 has_more: (query.offset + query.limit as u64) < total as u64,
610 },
611 })
612 }
613}
614
615impl PostgresStore {
616 fn row_to_verdict(&self, row: sqlx::postgres::PgRow) -> Result<VerdictRecord, StoreError> {
617 let status_str: String = row.get("status");
618 let status = match status_str.as_str() {
619 "pass" => VerdictStatus::Pass,
620 "warn" => VerdictStatus::Warn,
621 "fail" => VerdictStatus::Fail,
622 "skip" => VerdictStatus::Skip,
623 _ => VerdictStatus::Pass, };
625
626 let counts_json: serde_json::Value = row.get("counts");
627 let counts = serde_json::from_value(counts_json).map_err(StoreError::SerializationError)?;
628
629 let reasons_json: serde_json::Value = row.get("reasons");
630 let reasons =
631 serde_json::from_value(reasons_json).map_err(StoreError::SerializationError)?;
632
633 Ok(VerdictRecord {
634 schema: row.get("schema_id"), id: row.get("id"),
636 project: row.get("project"),
637 benchmark: row.get("benchmark"),
638 run_id: row.get("run_id"),
639 status,
640 counts,
641 reasons,
642 git_ref: row.get("git_ref"),
643 git_sha: row.get("git_sha"),
644 created_at: row.get("created_at"),
645 })
646 }
647}