1use async_trait::async_trait;
4use rusqlite::{OptionalExtension, params};
5use std::path::Path;
6use std::sync::{Arc, Mutex};
7
8use super::{ArtifactStore, AuditStore, BaselineStore, StorageHealth};
9use crate::error::StoreError;
10use crate::models::{
11 AuditAction, AuditEvent, AuditResourceType, BaselineRecord, BaselineSource, BaselineVersion,
12 ListAuditEventsQuery, ListAuditEventsResponse, ListBaselinesQuery, ListBaselinesResponse,
13 ListVerdictsQuery, ListVerdictsResponse, PaginationInfo, VerdictRecord,
14};
15use perfgate_types::{VerdictCounts, VerdictStatus};
16
17#[derive(Debug)]
19pub struct SqliteStore {
20 _path: std::path::PathBuf,
22
23 conn: Arc<Mutex<rusqlite::Connection>>,
25
26 artifacts: Option<Arc<dyn ArtifactStore>>,
28}
29
30impl SqliteStore {
31 pub fn new<P: AsRef<Path>>(
33 path: P,
34 artifacts: Option<Arc<dyn ArtifactStore>>,
35 ) -> Result<Self, StoreError> {
36 let path = path.as_ref().to_path_buf();
37
38 if let Some(parent) = path.parent().filter(|p| !p.exists()) {
39 std::fs::create_dir_all(parent)?;
40 }
41
42 let is_memory = path.as_os_str() == ":memory:";
43 let conn = rusqlite::Connection::open(&path)?;
44 Self::configure_pragmas(&conn, is_memory)?;
45
46 let store = Self {
47 _path: path,
48 conn: Arc::new(Mutex::new(conn)),
49 artifacts,
50 };
51
52 store.initialize()?;
53 Ok(store)
54 }
55
56 pub fn in_memory() -> Result<Self, StoreError> {
58 let conn = rusqlite::Connection::open_in_memory()?;
59 Self::configure_pragmas(&conn, true)?;
60
61 let store = Self {
62 _path: std::path::PathBuf::from(":memory:"),
63 conn: Arc::new(Mutex::new(conn)),
64 artifacts: None,
65 };
66
67 store.initialize()?;
68 Ok(store)
69 }
70
71 fn configure_pragmas(conn: &rusqlite::Connection, is_memory: bool) -> Result<(), StoreError> {
80 conn.execute_batch("PRAGMA busy_timeout=5000;")?;
81
82 if !is_memory {
88 let mode: String = conn.query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))?;
89 if mode.to_lowercase() != "wal" {
90 return Err(StoreError::Other(format!(
91 "failed to enable WAL journal mode (got '{mode}')"
92 )));
93 }
94 }
95
96 Ok(())
97 }
98
99 fn initialize(&self) -> Result<(), StoreError> {
100 let conn = self
101 .conn
102 .lock()
103 .map_err(|e| StoreError::LockError(e.to_string()))?;
104
105 conn.execute_batch(
106 r#"
107 CREATE TABLE IF NOT EXISTS baselines (
108 id TEXT PRIMARY KEY,
109 project TEXT NOT NULL,
110 benchmark TEXT NOT NULL,
111 version TEXT NOT NULL,
112 git_ref TEXT,
113 git_sha TEXT,
114 receipt TEXT,
115 artifact_path TEXT,
116 metadata TEXT NOT NULL DEFAULT '{}',
117 tags TEXT NOT NULL DEFAULT '[]',
118 source TEXT NOT NULL DEFAULT 'upload',
119 content_hash TEXT NOT NULL,
120 deleted INTEGER NOT NULL DEFAULT 0,
121 created_at TEXT NOT NULL,
122 updated_at TEXT NOT NULL,
123 UNIQUE(project, benchmark, version)
124 );
125 CREATE INDEX IF NOT EXISTS idx_baselines_project_benchmark ON baselines(project, benchmark);
126 CREATE INDEX IF NOT EXISTS idx_baselines_created_at ON baselines(created_at DESC);
127
128 CREATE TABLE IF NOT EXISTS verdicts (
129 id TEXT PRIMARY KEY,
130 schema_id TEXT NOT NULL,
131 project TEXT NOT NULL,
132 benchmark TEXT NOT NULL,
133 run_id TEXT NOT NULL,
134 status TEXT NOT NULL,
135 counts TEXT NOT NULL,
136 reasons TEXT NOT NULL,
137 git_ref TEXT,
138 git_sha TEXT,
139 created_at TEXT NOT NULL
140 );
141 CREATE INDEX IF NOT EXISTS idx_verdicts_project_benchmark ON verdicts(project, benchmark);
142 CREATE INDEX IF NOT EXISTS idx_verdicts_created_at ON verdicts(created_at DESC);
143
144 CREATE TABLE IF NOT EXISTS audit_events (
145 id TEXT PRIMARY KEY,
146 timestamp TEXT NOT NULL,
147 actor TEXT NOT NULL,
148 action TEXT NOT NULL,
149 resource_type TEXT NOT NULL,
150 resource_id TEXT NOT NULL,
151 project TEXT NOT NULL,
152 metadata TEXT NOT NULL DEFAULT '{}'
153 );
154 CREATE INDEX IF NOT EXISTS idx_audit_events_project ON audit_events(project);
155 CREATE INDEX IF NOT EXISTS idx_audit_events_timestamp ON audit_events(timestamp DESC);
156 CREATE INDEX IF NOT EXISTS idx_audit_events_action ON audit_events(action);
157 "#,
158 )?;
159 Ok(())
160 }
161
162 fn row_to_record_tuple(
163 row: &rusqlite::Row,
164 ) -> Result<(BaselineRecord, Option<String>), rusqlite::Error> {
165 let created_at_str: String = row.get(13)?;
166 let updated_at_str: String = row.get(14)?;
167
168 let receipt_json: Option<String> = row.get(6)?;
169 let receipt = if let Some(json) = receipt_json {
170 serde_json::from_str(&json).unwrap_or_else(|_| Self::placeholder_receipt())
171 } else {
172 Self::placeholder_receipt()
173 };
174
175 let record = BaselineRecord {
176 schema: crate::models::BASELINE_SCHEMA_V1.to_string(),
177 id: row.get(0)?,
178 project: row.get(1)?,
179 benchmark: row.get(2)?,
180 version: row.get(3)?,
181 git_ref: row.get(4)?,
182 git_sha: row.get(5)?,
183 receipt,
184 metadata: serde_json::from_str(&row.get::<_, String>(8)?).unwrap_or_default(),
185 tags: serde_json::from_str(&row.get::<_, String>(9)?).unwrap_or_default(),
186 created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
187 .map(|dt| dt.with_timezone(&chrono::Utc))
188 .unwrap_or_else(|_| chrono::Utc::now()),
189 updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at_str)
190 .map(|dt| dt.with_timezone(&chrono::Utc))
191 .unwrap_or_else(|_| chrono::Utc::now()),
192 content_hash: row.get(11)?,
193 source: match row.get::<_, String>(10)?.as_str() {
194 "promote" => BaselineSource::Promote,
195 "migrate" => BaselineSource::Migrate,
196 "rollback" => BaselineSource::Rollback,
197 _ => BaselineSource::Upload,
198 },
199 deleted: row.get::<_, i64>(12)? != 0,
200 };
201
202 Ok((record, row.get(7)?))
203 }
204
205 fn placeholder_receipt() -> perfgate_types::RunReceipt {
206 serde_json::from_value(serde_json::json!({
207 "schema": "perfgate.run.v1",
208 "tool": {"name": "placeholder", "version": "0"},
209 "run": {
210 "id": "placeholder",
211 "started_at": "1970-01-01T00:00:00Z",
212 "ended_at": "1970-01-01T00:00:00Z",
213 "host": {"os": "unknown", "arch": "unknown"}
214 },
215 "bench": {"name": "placeholder", "command": [], "repeat": 0, "warmup": 0},
216 "samples": [],
217 "stats": {"wall_ms": {"median": 0, "min": 0, "max": 0}}
218 }))
219 .unwrap()
220 }
221
222 async fn store_artifact(&self, record: &BaselineRecord) -> Result<Option<String>, StoreError> {
223 if let Some(store) = &self.artifacts {
224 let path = format!(
225 "{}/{}/{}.json",
226 record.project, record.benchmark, record.version
227 );
228 let data =
229 serde_json::to_vec(&record.receipt).map_err(StoreError::SerializationError)?;
230 store.put(&path, data).await?;
231 Ok(Some(path))
232 } else {
233 Ok(None)
234 }
235 }
236
237 async fn load_artifact(
238 &self,
239 path: Option<String>,
240 mut record: BaselineRecord,
241 ) -> Result<BaselineRecord, StoreError> {
242 if let (Some(store), Some(path)) = (&self.artifacts, path) {
243 let data = store.get(&path).await?;
244 record.receipt =
245 serde_json::from_slice(&data).map_err(StoreError::SerializationError)?;
246 }
247 Ok(record)
248 }
249}
250
251#[async_trait]
252impl BaselineStore for SqliteStore {
253 async fn create(&self, record: &BaselineRecord) -> Result<(), StoreError> {
254 let artifact_path = self.store_artifact(record).await?;
255 let receipt_json = if artifact_path.is_none() {
256 Some(serde_json::to_string(&record.receipt)?)
257 } else {
258 None
259 };
260
261 let conn = self
262 .conn
263 .lock()
264 .map_err(|e| StoreError::LockError(e.to_string()))?;
265 conn.execute(
266 r#"
267 INSERT INTO baselines (
268 id, project, benchmark, version, git_ref, git_sha,
269 receipt, artifact_path, metadata, tags, source, content_hash,
270 deleted, created_at, updated_at
271 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
272 "#,
273 params![
274 record.id,
275 record.project,
276 record.benchmark,
277 record.version,
278 record.git_ref,
279 record.git_sha,
280 receipt_json,
281 artifact_path,
282 serde_json::to_string(&record.metadata)?,
283 serde_json::to_string(&record.tags)?,
284 format!("{:?}", record.source).to_lowercase(),
285 record.content_hash,
286 if record.deleted { 1i64 } else { 0i64 },
287 record.created_at.to_rfc3339(),
288 record.updated_at.to_rfc3339(),
289 ],
290 )
291 .map_err(|e| match &e {
292 rusqlite::Error::SqliteFailure(err, _)
293 if err.code == rusqlite::ErrorCode::ConstraintViolation =>
294 {
295 StoreError::AlreadyExists(format!(
296 "project={}, benchmark={}, version={}",
297 record.project, record.benchmark, record.version
298 ))
299 }
300 _ => StoreError::SqliteError(e),
301 })?;
302 Ok(())
303 }
304
305 async fn get(
306 &self,
307 project: &str,
308 benchmark: &str,
309 version: &str,
310 ) -> Result<Option<BaselineRecord>, StoreError> {
311 let res = {
312 let conn = self
313 .conn
314 .lock()
315 .map_err(|e| StoreError::LockError(e.to_string()))?;
316 let mut stmt = conn.prepare(
317 "SELECT * FROM baselines WHERE project = ?1 AND benchmark = ?2 AND version = ?3 AND deleted = 0"
318 )?;
319 stmt.query_row(
320 params![project, benchmark, version],
321 Self::row_to_record_tuple,
322 )
323 .optional()?
324 };
325
326 match res {
327 Some((record, path)) => Ok(Some(self.load_artifact(path, record).await?)),
328 None => Ok(None),
329 }
330 }
331
332 async fn get_latest(
333 &self,
334 project: &str,
335 benchmark: &str,
336 ) -> Result<Option<BaselineRecord>, StoreError> {
337 let res = {
338 let conn = self
339 .conn
340 .lock()
341 .map_err(|e| StoreError::LockError(e.to_string()))?;
342 let mut stmt = conn.prepare(
343 "SELECT * FROM baselines WHERE project = ?1 AND benchmark = ?2 AND deleted = 0 ORDER BY created_at DESC LIMIT 1"
344 )?;
345 stmt.query_row(params![project, benchmark], Self::row_to_record_tuple)
346 .optional()?
347 };
348
349 match res {
350 Some((record, path)) => Ok(Some(self.load_artifact(path, record).await?)),
351 None => Ok(None),
352 }
353 }
354
355 async fn list(
356 &self,
357 project: &str,
358 query: &ListBaselinesQuery,
359 ) -> Result<ListBaselinesResponse, StoreError> {
360 let (records_with_paths, total) = {
361 let conn = self
362 .conn
363 .lock()
364 .map_err(|e| StoreError::LockError(e.to_string()))?;
365 let mut sql =
366 String::from("SELECT * FROM baselines WHERE project = ?1 AND deleted = 0");
367 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(project.to_string())];
368
369 if let Some(ref b) = query.benchmark {
370 sql.push_str(" AND benchmark = ?");
371 params.push(Box::new(b.clone()));
372 }
373
374 let count_sql = format!("SELECT COUNT(*) FROM ({})", sql);
375 let total: u64 =
376 conn.query_row(&count_sql, rusqlite::params_from_iter(params.iter()), |r| {
377 r.get(0)
378 })?;
379
380 sql.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
381 params.push(Box::new(query.limit as i64));
382 params.push(Box::new(query.offset as i64));
383
384 let mut stmt = conn.prepare(&sql)?;
385 let rows = stmt
386 .query_map(
387 rusqlite::params_from_iter(params.iter()),
388 Self::row_to_record_tuple,
389 )?
390 .collect::<Result<Vec<_>, _>>()?;
391 (rows, total)
392 };
393
394 let mut baselines = Vec::with_capacity(records_with_paths.len());
395 for (mut record, path) in records_with_paths {
396 if query.include_receipt {
397 record = self.load_artifact(path, record).await?;
398 }
399 baselines.push(record.into());
400 }
401
402 let count = baselines.len() as u64;
403
404 Ok(ListBaselinesResponse {
405 baselines,
406 pagination: PaginationInfo {
407 total,
408 limit: query.limit,
409 offset: query.offset,
410 has_more: (query.offset + count) < total,
411 },
412 })
413 }
414
415 async fn update(&self, record: &BaselineRecord) -> Result<(), StoreError> {
416 let artifact_path = self.store_artifact(record).await?;
417 let receipt_json = if artifact_path.is_none() {
418 Some(serde_json::to_string(&record.receipt)?)
419 } else {
420 None
421 };
422
423 let conn = self
424 .conn
425 .lock()
426 .map_err(|e| StoreError::LockError(e.to_string()))?;
427 conn.execute(
428 "UPDATE baselines SET git_ref=?1, git_sha=?2, receipt=?3, artifact_path=?4, metadata=?5, tags=?6, source=?7, content_hash=?8, updated_at=?9 WHERE project=?10 AND benchmark=?11 AND version=?12",
429 params![
430 record.git_ref, record.git_sha, receipt_json, artifact_path,
431 serde_json::to_string(&record.metadata)?, serde_json::to_string(&record.tags)?,
432 format!("{:?}", record.source).to_lowercase(), record.content_hash,
433 record.updated_at.to_rfc3339(), record.project, record.benchmark, record.version
434 ]
435 )?;
436 Ok(())
437 }
438
439 async fn delete(
440 &self,
441 project: &str,
442 benchmark: &str,
443 version: &str,
444 ) -> Result<bool, StoreError> {
445 let conn = self
446 .conn
447 .lock()
448 .map_err(|e| StoreError::LockError(e.to_string()))?;
449 let n = conn.execute("UPDATE baselines SET deleted = 1, updated_at = ?1 WHERE project = ?2 AND benchmark = ?3 AND version = ?4 AND deleted = 0",
450 params![chrono::Utc::now().to_rfc3339(), project, benchmark, version])?;
451 Ok(n > 0)
452 }
453
454 async fn hard_delete(
455 &self,
456 project: &str,
457 benchmark: &str,
458 version: &str,
459 ) -> Result<bool, StoreError> {
460 let conn = self
461 .conn
462 .lock()
463 .map_err(|e| StoreError::LockError(e.to_string()))?;
464 let n = conn.execute(
465 "DELETE FROM baselines WHERE project = ?1 AND benchmark = ?2 AND version = ?3",
466 params![project, benchmark, version],
467 )?;
468 Ok(n > 0)
469 }
470
471 async fn list_versions(
472 &self,
473 project: &str,
474 benchmark: &str,
475 ) -> Result<Vec<BaselineVersion>, StoreError> {
476 let conn = self
477 .conn
478 .lock()
479 .map_err(|e| StoreError::LockError(e.to_string()))?;
480 let mut stmt = conn.prepare("SELECT version, git_ref, git_sha, source, created_at FROM baselines WHERE project = ?1 AND benchmark = ?2 AND deleted = 0 ORDER BY created_at DESC")?;
481 let mut versions: Vec<BaselineVersion> = stmt
482 .query_map(params![project, benchmark], |row| {
483 let created_at_str: String = row.get(4)?;
484 Ok(BaselineVersion {
485 version: row.get(0)?,
486 git_ref: row.get(1)?,
487 git_sha: row.get(2)?,
488 created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
489 .map(|dt| dt.with_timezone(&chrono::Utc))
490 .unwrap_or_else(|_| chrono::Utc::now()),
491 created_by: None,
492 is_current: false,
493 source: match row.get::<_, String>(3)?.as_str() {
494 "promote" => BaselineSource::Promote,
495 "migrate" => BaselineSource::Migrate,
496 "rollback" => BaselineSource::Rollback,
497 _ => BaselineSource::Upload,
498 },
499 })
500 })?
501 .collect::<Result<Vec<_>, _>>()?;
502 if let Some(first) = versions.first_mut() {
503 first.is_current = true;
504 }
505 Ok(versions)
506 }
507
508 async fn health_check(&self) -> Result<StorageHealth, StoreError> {
509 let conn = self
510 .conn
511 .lock()
512 .map_err(|e| StoreError::LockError(e.to_string()))?;
513 match conn.query_row("SELECT 1", [], |_| Ok(())) {
514 Ok(_) => Ok(StorageHealth::Healthy),
515 Err(_) => Ok(StorageHealth::Unhealthy),
516 }
517 }
518
519 fn backend_type(&self) -> &'static str {
520 "sqlite"
521 }
522
523 async fn create_verdict(&self, record: &VerdictRecord) -> Result<(), StoreError> {
524 let conn = self
525 .conn
526 .lock()
527 .map_err(|e| StoreError::LockError(e.to_string()))?;
528
529 let counts_json =
530 serde_json::to_string(&record.counts).map_err(StoreError::SerializationError)?;
531 let reasons_json =
532 serde_json::to_string(&record.reasons).map_err(StoreError::SerializationError)?;
533 let status_str = record.status.as_str();
534 let created_at_str = record.created_at.to_rfc3339();
535
536 conn.execute(
537 r#"
538 INSERT INTO verdicts (
539 id, schema_id, project, benchmark, run_id, status, counts, reasons,
540 git_ref, git_sha, created_at
541 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
542 "#,
543 params![
544 record.id,
545 record.schema,
546 record.project,
547 record.benchmark,
548 record.run_id,
549 status_str,
550 counts_json,
551 reasons_json,
552 record.git_ref,
553 record.git_sha,
554 created_at_str
555 ],
556 )?;
557
558 Ok(())
559 }
560
561 async fn list_verdicts(
562 &self,
563 project: &str,
564 query: &ListVerdictsQuery,
565 ) -> Result<ListVerdictsResponse, StoreError> {
566 let mut sql = "SELECT * FROM verdicts WHERE project = ?".to_string();
567 let mut params_vec: Vec<rusqlite::types::Value> = vec![project.to_string().into()];
568
569 if let Some(bench) = &query.benchmark {
570 sql.push_str(" AND benchmark = ?");
571 params_vec.push(bench.clone().into());
572 }
573
574 if let Some(status) = &query.status {
575 sql.push_str(" AND status = ?");
576 params_vec.push(status.as_str().to_string().into());
577 }
578
579 if let Some(since) = &query.since {
580 sql.push_str(" AND created_at >= ?");
581 params_vec.push(since.to_rfc3339().into());
582 }
583
584 if let Some(until) = &query.until {
585 sql.push_str(" AND created_at <= ?");
586 params_vec.push(until.to_rfc3339().into());
587 }
588
589 sql.push_str(" ORDER BY created_at DESC");
590
591 sql.push_str(" LIMIT ? OFFSET ?");
593 params_vec.push((query.limit as i64).into());
594 params_vec.push((query.offset as i64).into());
595
596 let conn = self
597 .conn
598 .lock()
599 .map_err(|e| StoreError::LockError(e.to_string()))?;
600
601 let mut stmt = conn
602 .prepare(&sql)
603 .map_err(|e| StoreError::QueryError(e.to_string()))?;
604 let rows = stmt
605 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
606 Self::row_to_verdict(row)
607 })
608 .map_err(|e| StoreError::QueryError(e.to_string()))?;
609
610 let mut verdicts = Vec::new();
611 for row in rows {
612 verdicts.push(row?);
613 }
614
615 let count_sql = "SELECT COUNT(*) FROM verdicts WHERE project = ?";
617 let total: i64 = conn.query_row(count_sql, params![project], |row| row.get(0))?;
618
619 Ok(ListVerdictsResponse {
620 verdicts,
621 pagination: PaginationInfo {
622 total: total as u64,
623 offset: query.offset,
624 limit: query.limit,
625 has_more: (query.offset + query.limit as u64) < total as u64,
626 },
627 })
628 }
629}
630
631impl SqliteStore {
632 fn row_to_verdict(row: &rusqlite::Row) -> Result<VerdictRecord, rusqlite::Error> {
633 let status_str: String = row.get(5)?;
634 let status = match status_str.as_str() {
635 "pass" => VerdictStatus::Pass,
636 "warn" => VerdictStatus::Warn,
637 "fail" => VerdictStatus::Fail,
638 "skip" => VerdictStatus::Skip,
639 _ => VerdictStatus::Pass,
640 };
641
642 let counts_json: String = row.get(6)?;
643 let counts = serde_json::from_str(&counts_json).unwrap_or(VerdictCounts {
644 pass: 0,
645 warn: 0,
646 fail: 0,
647 skip: 0,
648 });
649
650 let reasons_json: String = row.get(7)?;
651 let reasons = serde_json::from_str(&reasons_json).unwrap_or_default();
652
653 let created_at_str: String = row.get(10)?;
654 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
655 .map(|dt| dt.with_timezone(&chrono::Utc))
656 .unwrap_or_else(|_| chrono::Utc::now());
657
658 Ok(VerdictRecord {
659 id: row.get(0)?,
660 schema: row.get(1)?,
661 project: row.get(2)?,
662 benchmark: row.get(3)?,
663 run_id: row.get(4)?,
664 status,
665 counts,
666 reasons,
667 git_ref: row.get(8)?,
668 git_sha: row.get(9)?,
669 created_at,
670 })
671 }
672}
673
674#[async_trait]
675impl AuditStore for SqliteStore {
676 async fn log_event(&self, event: &AuditEvent) -> Result<(), StoreError> {
677 let metadata_json =
678 serde_json::to_string(&event.metadata).map_err(StoreError::SerializationError)?;
679
680 let conn = self
681 .conn
682 .lock()
683 .map_err(|e| StoreError::LockError(e.to_string()))?;
684 conn.execute(
685 r#"
686 INSERT INTO audit_events (
687 id, timestamp, actor, action, resource_type, resource_id, project, metadata
688 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
689 "#,
690 params![
691 event.id,
692 event.timestamp.to_rfc3339(),
693 event.actor,
694 event.action.to_string(),
695 event.resource_type.to_string(),
696 event.resource_id,
697 event.project,
698 metadata_json,
699 ],
700 )?;
701
702 Ok(())
703 }
704
705 async fn list_events(
706 &self,
707 query: &ListAuditEventsQuery,
708 ) -> Result<ListAuditEventsResponse, StoreError> {
709 let mut sql = "SELECT * FROM audit_events WHERE 1=1".to_string();
710 let mut params_vec: Vec<rusqlite::types::Value> = Vec::new();
711
712 if let Some(ref project) = query.project {
713 sql.push_str(" AND project = ?");
714 params_vec.push(project.clone().into());
715 }
716
717 if let Some(ref action) = query.action {
718 sql.push_str(" AND action = ?");
719 params_vec.push(action.clone().into());
720 }
721
722 if let Some(ref resource_type) = query.resource_type {
723 sql.push_str(" AND resource_type = ?");
724 params_vec.push(resource_type.clone().into());
725 }
726
727 if let Some(ref actor) = query.actor {
728 sql.push_str(" AND actor = ?");
729 params_vec.push(actor.clone().into());
730 }
731
732 if let Some(ref since) = query.since {
733 sql.push_str(" AND timestamp >= ?");
734 params_vec.push(since.to_rfc3339().into());
735 }
736
737 if let Some(ref until) = query.until {
738 sql.push_str(" AND timestamp <= ?");
739 params_vec.push(until.to_rfc3339().into());
740 }
741
742 let count_sql = format!("SELECT COUNT(*) FROM ({})", sql);
744
745 sql.push_str(" ORDER BY timestamp DESC LIMIT ? OFFSET ?");
746 params_vec.push((query.limit as i64).into());
747 params_vec.push((query.offset as i64).into());
748
749 let conn = self
750 .conn
751 .lock()
752 .map_err(|e| StoreError::LockError(e.to_string()))?;
753
754 let count_params: Vec<rusqlite::types::Value> =
756 params_vec[..params_vec.len().saturating_sub(2)].to_vec();
757 let total: i64 = conn
758 .query_row(
759 &count_sql,
760 rusqlite::params_from_iter(count_params.iter()),
761 |row| row.get(0),
762 )
763 .map_err(|e| StoreError::QueryError(e.to_string()))?;
764
765 let mut stmt = conn
766 .prepare(&sql)
767 .map_err(|e| StoreError::QueryError(e.to_string()))?;
768 let rows = stmt
769 .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
770 Self::row_to_audit_event(row)
771 })
772 .map_err(|e| StoreError::QueryError(e.to_string()))?;
773
774 let mut events = Vec::new();
775 for row in rows {
776 events.push(row?);
777 }
778
779 Ok(ListAuditEventsResponse {
780 events,
781 pagination: PaginationInfo {
782 total: total as u64,
783 offset: query.offset,
784 limit: query.limit,
785 has_more: (query.offset + query.limit as u64) < total as u64,
786 },
787 })
788 }
789}
790
791impl SqliteStore {
792 fn row_to_audit_event(row: &rusqlite::Row) -> Result<AuditEvent, rusqlite::Error> {
793 let timestamp_str: String = row.get(1)?;
794 let timestamp = chrono::DateTime::parse_from_rfc3339(×tamp_str)
795 .map(|dt| dt.with_timezone(&chrono::Utc))
796 .unwrap_or_else(|_| chrono::Utc::now());
797
798 let action_str: String = row.get(3)?;
799 let action = action_str
800 .parse::<AuditAction>()
801 .unwrap_or(AuditAction::Create);
802
803 let resource_type_str: String = row.get(4)?;
804 let resource_type = resource_type_str
805 .parse::<AuditResourceType>()
806 .unwrap_or(AuditResourceType::Baseline);
807
808 let metadata_json: String = row.get(7)?;
809 let metadata: serde_json::Value = serde_json::from_str(&metadata_json)
810 .unwrap_or(serde_json::Value::Object(Default::default()));
811
812 Ok(AuditEvent {
813 id: row.get(0)?,
814 timestamp,
815 actor: row.get(2)?,
816 action,
817 resource_type,
818 resource_id: row.get(5)?,
819 project: row.get(6)?,
820 metadata,
821 })
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use super::*;
828 use crate::models::{BaselineRecordExt, BaselineSource};
829 use perfgate_types::{BenchMeta, HostInfo, RunMeta, RunReceipt, Stats, ToolInfo, U64Summary};
830 use std::collections::BTreeMap;
831 use tempfile::tempdir;
832
833 fn create_test_receipt(name: &str) -> RunReceipt {
834 RunReceipt {
835 schema: "perfgate.run.v1".to_string(),
836 tool: ToolInfo {
837 name: "perfgate".to_string(),
838 version: "0.3.0".to_string(),
839 },
840 run: RunMeta {
841 id: "test-run-id".to_string(),
842 started_at: "2026-01-01T00:00:00Z".to_string(),
843 ended_at: "2026-01-01T00:01:00Z".to_string(),
844 host: HostInfo {
845 os: "linux".to_string(),
846 arch: "x86_64".to_string(),
847 cpu_count: Some(8),
848 memory_bytes: Some(16 * 1024 * 1024 * 1024),
849 hostname_hash: None,
850 },
851 },
852 bench: BenchMeta {
853 name: name.to_string(),
854 cwd: None,
855 command: vec!["./bench.sh".to_string()],
856 repeat: 5,
857 warmup: 1,
858 work_units: None,
859 timeout_ms: None,
860 },
861 samples: vec![],
862 stats: Stats {
863 wall_ms: U64Summary::new(100, 90, 110),
864 cpu_ms: None,
865 page_faults: None,
866 ctx_switches: None,
867 max_rss_kb: None,
868 io_read_bytes: None,
869 io_write_bytes: None,
870 network_packets: None,
871 energy_uj: None,
872 binary_bytes: None,
873 throughput_per_s: None,
874 },
875 }
876 }
877
878 fn create_test_record(project: &str, benchmark: &str, version: &str) -> BaselineRecord {
879 BaselineRecord::new(
880 project.to_string(),
881 benchmark.to_string(),
882 version.to_string(),
883 create_test_receipt(benchmark),
884 Some("refs/heads/main".to_string()),
885 Some("abc123".to_string()),
886 BTreeMap::new(),
887 vec!["test".to_string()],
888 BaselineSource::Upload,
889 )
890 }
891
892 #[tokio::test(flavor = "multi_thread")]
893 async fn test_in_memory_database() {
894 let store = SqliteStore::in_memory().unwrap();
895 let record = create_test_record("my-project", "my-bench", "v1.0.0");
896 store.create(&record).await.unwrap();
897 let retrieved = store.get("my-project", "my-bench", "v1.0.0").await.unwrap();
898 assert!(retrieved.is_some());
899 let retrieved = retrieved.unwrap();
900 assert_eq!(retrieved.project, "my-project");
901 }
902
903 #[tokio::test(flavor = "multi_thread")]
904 async fn test_persistent_database() {
905 let dir = tempdir().unwrap();
906 let db_path = dir.path().join("test.db");
907 {
908 let store = SqliteStore::new(&db_path, None).unwrap();
909 let record = create_test_record("my-project", "my-bench", "v1.0.0");
910 store.create(&record).await.unwrap();
911 }
912 {
913 let store = SqliteStore::new(&db_path, None).unwrap();
914 let retrieved = store.get("my-project", "my-bench", "v1.0.0").await.unwrap();
915 assert!(retrieved.is_some());
916 }
917 }
918
919 #[tokio::test(flavor = "multi_thread")]
920 async fn test_wal_mode_enabled() {
921 let dir = tempdir().unwrap();
922 let db_path = dir.path().join("wal_test.db");
923 let store = SqliteStore::new(&db_path, None).unwrap();
924
925 let conn = store.conn.lock().unwrap();
926 let journal_mode: String = conn
927 .query_row("PRAGMA journal_mode", [], |row| row.get(0))
928 .unwrap();
929 assert_eq!(journal_mode.to_lowercase(), "wal");
930
931 let busy_timeout: i64 = conn
932 .query_row("PRAGMA busy_timeout", [], |row| row.get(0))
933 .unwrap();
934 assert_eq!(busy_timeout, 5000);
935 }
936
937 #[tokio::test(flavor = "multi_thread")]
938 async fn test_audit_log_event_and_list() {
939 use crate::models::{ListAuditEventsQuery, generate_ulid};
940
941 let store = SqliteStore::in_memory().unwrap();
942
943 let event = AuditEvent {
944 id: generate_ulid(),
945 timestamp: chrono::Utc::now(),
946 actor: "key-abc".to_string(),
947 action: AuditAction::Create,
948 resource_type: AuditResourceType::Baseline,
949 resource_id: "my-project/my-bench/v1".to_string(),
950 project: "my-project".to_string(),
951 metadata: serde_json::json!({"benchmark": "my-bench"}),
952 };
953
954 store.log_event(&event).await.unwrap();
955
956 let query = ListAuditEventsQuery::default();
958 let result = store.list_events(&query).await.unwrap();
959 assert_eq!(result.events.len(), 1);
960 assert_eq!(result.events[0].actor, "key-abc");
961 assert_eq!(result.events[0].action, AuditAction::Create);
962 assert_eq!(result.events[0].resource_type, AuditResourceType::Baseline);
963
964 let query = ListAuditEventsQuery {
966 project: Some("my-project".to_string()),
967 ..Default::default()
968 };
969 let result = store.list_events(&query).await.unwrap();
970 assert_eq!(result.events.len(), 1);
971
972 let query = ListAuditEventsQuery {
974 project: Some("other-project".to_string()),
975 ..Default::default()
976 };
977 let result = store.list_events(&query).await.unwrap();
978 assert_eq!(result.events.len(), 0);
979
980 let query = ListAuditEventsQuery {
982 action: Some("create".to_string()),
983 ..Default::default()
984 };
985 let result = store.list_events(&query).await.unwrap();
986 assert_eq!(result.events.len(), 1);
987
988 let query = ListAuditEventsQuery {
989 action: Some("delete".to_string()),
990 ..Default::default()
991 };
992 let result = store.list_events(&query).await.unwrap();
993 assert_eq!(result.events.len(), 0);
994 }
995
996 #[tokio::test(flavor = "multi_thread")]
997 async fn test_audit_log_multiple_events() {
998 use crate::models::{ListAuditEventsQuery, generate_ulid};
999
1000 let store = SqliteStore::in_memory().unwrap();
1001
1002 for i in 0..5 {
1003 let event = AuditEvent {
1004 id: generate_ulid(),
1005 timestamp: chrono::Utc::now(),
1006 actor: format!("key-{}", i),
1007 action: if i % 2 == 0 {
1008 AuditAction::Create
1009 } else {
1010 AuditAction::Delete
1011 },
1012 resource_type: AuditResourceType::Baseline,
1013 resource_id: format!("resource-{}", i),
1014 project: "proj".to_string(),
1015 metadata: serde_json::json!({}),
1016 };
1017 store.log_event(&event).await.unwrap();
1018 }
1019
1020 let query = ListAuditEventsQuery::default();
1021 let result = store.list_events(&query).await.unwrap();
1022 assert_eq!(result.events.len(), 5);
1023 assert_eq!(result.pagination.total, 5);
1024
1025 let query = ListAuditEventsQuery {
1027 limit: 2,
1028 ..Default::default()
1029 };
1030 let result = store.list_events(&query).await.unwrap();
1031 assert_eq!(result.events.len(), 2);
1032 assert!(result.pagination.has_more);
1033
1034 let query = ListAuditEventsQuery {
1036 action: Some("create".to_string()),
1037 ..Default::default()
1038 };
1039 let result = store.list_events(&query).await.unwrap();
1040 assert_eq!(result.events.len(), 3); }
1042}