Skip to main content

perfgate_server/storage/
sqlite.rs

1//! SQLite storage implementation for persistent baseline storage.
2
3use async_trait::async_trait;
4use rusqlite::{OptionalExtension, params};
5use std::path::Path;
6use std::sync::{Arc, Mutex};
7
8use super::{ArtifactStore, AuditStore, BaselineStore, StorageHealth};
9use crate::error::StoreError;
10use crate::models::{
11    AuditAction, AuditEvent, AuditResourceType, BaselineRecord, BaselineSource, BaselineVersion,
12    ListAuditEventsQuery, ListAuditEventsResponse, ListBaselinesQuery, ListBaselinesResponse,
13    ListVerdictsQuery, ListVerdictsResponse, PaginationInfo, VerdictRecord,
14};
15use perfgate_types::{VerdictCounts, VerdictStatus};
16
17/// SQLite storage backend for baselines.
18#[derive(Debug)]
19pub struct SqliteStore {
20    /// Path to the database file
21    _path: std::path::PathBuf,
22
23    /// Connection pool (simplified: single connection wrapped in Mutex)
24    conn: Arc<Mutex<rusqlite::Connection>>,
25
26    /// Optional artifact store for raw receipts
27    artifacts: Option<Arc<dyn ArtifactStore>>,
28}
29
30impl SqliteStore {
31    /// Opens or creates a SQLite database at the specified path.
32    pub fn new<P: AsRef<Path>>(
33        path: P,
34        artifacts: Option<Arc<dyn ArtifactStore>>,
35    ) -> Result<Self, StoreError> {
36        let path = path.as_ref().to_path_buf();
37
38        if let Some(parent) = path.parent().filter(|p| !p.exists()) {
39            std::fs::create_dir_all(parent)?;
40        }
41
42        let is_memory = path.as_os_str() == ":memory:";
43        let conn = rusqlite::Connection::open(&path)?;
44        Self::configure_pragmas(&conn, is_memory)?;
45
46        let store = Self {
47            _path: path,
48            conn: Arc::new(Mutex::new(conn)),
49            artifacts,
50        };
51
52        store.initialize()?;
53        Ok(store)
54    }
55
56    /// Creates an in-memory SQLite database (for testing).
57    pub fn in_memory() -> Result<Self, StoreError> {
58        let conn = rusqlite::Connection::open_in_memory()?;
59        Self::configure_pragmas(&conn, true)?;
60
61        let store = Self {
62            _path: std::path::PathBuf::from(":memory:"),
63            conn: Arc::new(Mutex::new(conn)),
64            artifacts: None,
65        };
66
67        store.initialize()?;
68        Ok(store)
69    }
70
71    /// Configures SQLite pragmas for performance and concurrent access.
72    ///
73    /// - `journal_mode=WAL`: enables write-ahead logging so readers do not
74    ///   block writers and vice-versa.  Verified via the returned mode string
75    ///   so that silent fallbacks (e.g. read-only filesystem) become hard
76    ///   errors.  Skipped for in-memory databases where WAL is not applicable.
77    /// - `busy_timeout=5000`: waits up to 5 seconds when the database is
78    ///   locked instead of returning SQLITE_BUSY immediately.
79    fn configure_pragmas(conn: &rusqlite::Connection, is_memory: bool) -> Result<(), StoreError> {
80        conn.execute_batch("PRAGMA busy_timeout=5000;")?;
81
82        // GOTCHA: In-memory SQLite databases cannot use WAL mode. Executing
83        // `PRAGMA journal_mode=WAL` on an in-memory DB silently succeeds but
84        // returns "memory" instead of "wal". You MUST check the returned
85        // string — a bare `execute_batch("PRAGMA journal_mode=WAL")` will
86        // appear to work but leave you without WAL's concurrency benefits.
87        if !is_memory {
88            let mode: String = conn.query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))?;
89            if mode.to_lowercase() != "wal" {
90                return Err(StoreError::Other(format!(
91                    "failed to enable WAL journal mode (got '{mode}')"
92                )));
93            }
94        }
95
96        Ok(())
97    }
98
99    fn initialize(&self) -> Result<(), StoreError> {
100        let conn = self
101            .conn
102            .lock()
103            .map_err(|e| StoreError::LockError(e.to_string()))?;
104
105        conn.execute_batch(
106            r#"
107            CREATE TABLE IF NOT EXISTS baselines (
108                id TEXT PRIMARY KEY,
109                project TEXT NOT NULL,
110                benchmark TEXT NOT NULL,
111                version TEXT NOT NULL,
112                git_ref TEXT,
113                git_sha TEXT,
114                receipt TEXT,
115                artifact_path TEXT,
116                metadata TEXT NOT NULL DEFAULT '{}',
117                tags TEXT NOT NULL DEFAULT '[]',
118                source TEXT NOT NULL DEFAULT 'upload',
119                content_hash TEXT NOT NULL,
120                deleted INTEGER NOT NULL DEFAULT 0,
121                created_at TEXT NOT NULL,
122                updated_at TEXT NOT NULL,
123                UNIQUE(project, benchmark, version)
124            );
125            CREATE INDEX IF NOT EXISTS idx_baselines_project_benchmark ON baselines(project, benchmark);
126            CREATE INDEX IF NOT EXISTS idx_baselines_created_at ON baselines(created_at DESC);
127
128            CREATE TABLE IF NOT EXISTS verdicts (
129                id TEXT PRIMARY KEY,
130                schema_id TEXT NOT NULL,
131                project TEXT NOT NULL,
132                benchmark TEXT NOT NULL,
133                run_id TEXT NOT NULL,
134                status TEXT NOT NULL,
135                counts TEXT NOT NULL,
136                reasons TEXT NOT NULL,
137                git_ref TEXT,
138                git_sha TEXT,
139                created_at TEXT NOT NULL
140            );
141            CREATE INDEX IF NOT EXISTS idx_verdicts_project_benchmark ON verdicts(project, benchmark);
142            CREATE INDEX IF NOT EXISTS idx_verdicts_created_at ON verdicts(created_at DESC);
143
144            CREATE TABLE IF NOT EXISTS audit_events (
145                id TEXT PRIMARY KEY,
146                timestamp TEXT NOT NULL,
147                actor TEXT NOT NULL,
148                action TEXT NOT NULL,
149                resource_type TEXT NOT NULL,
150                resource_id TEXT NOT NULL,
151                project TEXT NOT NULL,
152                metadata TEXT NOT NULL DEFAULT '{}'
153            );
154            CREATE INDEX IF NOT EXISTS idx_audit_events_project ON audit_events(project);
155            CREATE INDEX IF NOT EXISTS idx_audit_events_timestamp ON audit_events(timestamp DESC);
156            CREATE INDEX IF NOT EXISTS idx_audit_events_action ON audit_events(action);
157            "#,
158        )?;
159        Ok(())
160    }
161
162    fn row_to_record_tuple(
163        row: &rusqlite::Row,
164    ) -> Result<(BaselineRecord, Option<String>), rusqlite::Error> {
165        let created_at_str: String = row.get(13)?;
166        let updated_at_str: String = row.get(14)?;
167
168        let receipt_json: Option<String> = row.get(6)?;
169        let receipt = if let Some(json) = receipt_json {
170            serde_json::from_str(&json).unwrap_or_else(|_| Self::placeholder_receipt())
171        } else {
172            Self::placeholder_receipt()
173        };
174
175        let record = BaselineRecord {
176            schema: crate::models::BASELINE_SCHEMA_V1.to_string(),
177            id: row.get(0)?,
178            project: row.get(1)?,
179            benchmark: row.get(2)?,
180            version: row.get(3)?,
181            git_ref: row.get(4)?,
182            git_sha: row.get(5)?,
183            receipt,
184            metadata: serde_json::from_str(&row.get::<_, String>(8)?).unwrap_or_default(),
185            tags: serde_json::from_str(&row.get::<_, String>(9)?).unwrap_or_default(),
186            created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
187                .map(|dt| dt.with_timezone(&chrono::Utc))
188                .unwrap_or_else(|_| chrono::Utc::now()),
189            updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at_str)
190                .map(|dt| dt.with_timezone(&chrono::Utc))
191                .unwrap_or_else(|_| chrono::Utc::now()),
192            content_hash: row.get(11)?,
193            source: match row.get::<_, String>(10)?.as_str() {
194                "promote" => BaselineSource::Promote,
195                "migrate" => BaselineSource::Migrate,
196                "rollback" => BaselineSource::Rollback,
197                _ => BaselineSource::Upload,
198            },
199            deleted: row.get::<_, i64>(12)? != 0,
200        };
201
202        Ok((record, row.get(7)?))
203    }
204
205    fn placeholder_receipt() -> perfgate_types::RunReceipt {
206        serde_json::from_value(serde_json::json!({
207            "schema": "perfgate.run.v1",
208            "tool": {"name": "placeholder", "version": "0"},
209            "run": {
210                "id": "placeholder",
211                "started_at": "1970-01-01T00:00:00Z",
212                "ended_at": "1970-01-01T00:00:00Z",
213                "host": {"os": "unknown", "arch": "unknown"}
214            },
215            "bench": {"name": "placeholder", "command": [], "repeat": 0, "warmup": 0},
216            "samples": [],
217            "stats": {"wall_ms": {"median": 0, "min": 0, "max": 0}}
218        }))
219        .unwrap()
220    }
221
222    async fn store_artifact(&self, record: &BaselineRecord) -> Result<Option<String>, StoreError> {
223        if let Some(store) = &self.artifacts {
224            let path = format!(
225                "{}/{}/{}.json",
226                record.project, record.benchmark, record.version
227            );
228            let data =
229                serde_json::to_vec(&record.receipt).map_err(StoreError::SerializationError)?;
230            store.put(&path, data).await?;
231            Ok(Some(path))
232        } else {
233            Ok(None)
234        }
235    }
236
237    async fn load_artifact(
238        &self,
239        path: Option<String>,
240        mut record: BaselineRecord,
241    ) -> Result<BaselineRecord, StoreError> {
242        if let (Some(store), Some(path)) = (&self.artifacts, path) {
243            let data = store.get(&path).await?;
244            record.receipt =
245                serde_json::from_slice(&data).map_err(StoreError::SerializationError)?;
246        }
247        Ok(record)
248    }
249}
250
251#[async_trait]
252impl BaselineStore for SqliteStore {
253    async fn create(&self, record: &BaselineRecord) -> Result<(), StoreError> {
254        let artifact_path = self.store_artifact(record).await?;
255        let receipt_json = if artifact_path.is_none() {
256            Some(serde_json::to_string(&record.receipt)?)
257        } else {
258            None
259        };
260
261        let conn = self
262            .conn
263            .lock()
264            .map_err(|e| StoreError::LockError(e.to_string()))?;
265        conn.execute(
266            r#"
267            INSERT INTO baselines (
268                id, project, benchmark, version, git_ref, git_sha,
269                receipt, artifact_path, metadata, tags, source, content_hash,
270                deleted, created_at, updated_at
271            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
272            "#,
273            params![
274                record.id,
275                record.project,
276                record.benchmark,
277                record.version,
278                record.git_ref,
279                record.git_sha,
280                receipt_json,
281                artifact_path,
282                serde_json::to_string(&record.metadata)?,
283                serde_json::to_string(&record.tags)?,
284                format!("{:?}", record.source).to_lowercase(),
285                record.content_hash,
286                if record.deleted { 1i64 } else { 0i64 },
287                record.created_at.to_rfc3339(),
288                record.updated_at.to_rfc3339(),
289            ],
290        )
291        .map_err(|e| match &e {
292            rusqlite::Error::SqliteFailure(err, _)
293                if err.code == rusqlite::ErrorCode::ConstraintViolation =>
294            {
295                StoreError::AlreadyExists(format!(
296                    "project={}, benchmark={}, version={}",
297                    record.project, record.benchmark, record.version
298                ))
299            }
300            _ => StoreError::SqliteError(e),
301        })?;
302        Ok(())
303    }
304
305    async fn get(
306        &self,
307        project: &str,
308        benchmark: &str,
309        version: &str,
310    ) -> Result<Option<BaselineRecord>, StoreError> {
311        let res = {
312            let conn = self
313                .conn
314                .lock()
315                .map_err(|e| StoreError::LockError(e.to_string()))?;
316            let mut stmt = conn.prepare(
317                "SELECT * FROM baselines WHERE project = ?1 AND benchmark = ?2 AND version = ?3 AND deleted = 0"
318            )?;
319            stmt.query_row(
320                params![project, benchmark, version],
321                Self::row_to_record_tuple,
322            )
323            .optional()?
324        };
325
326        match res {
327            Some((record, path)) => Ok(Some(self.load_artifact(path, record).await?)),
328            None => Ok(None),
329        }
330    }
331
332    async fn get_latest(
333        &self,
334        project: &str,
335        benchmark: &str,
336    ) -> Result<Option<BaselineRecord>, StoreError> {
337        let res = {
338            let conn = self
339                .conn
340                .lock()
341                .map_err(|e| StoreError::LockError(e.to_string()))?;
342            let mut stmt = conn.prepare(
343                "SELECT * FROM baselines WHERE project = ?1 AND benchmark = ?2 AND deleted = 0 ORDER BY created_at DESC LIMIT 1"
344            )?;
345            stmt.query_row(params![project, benchmark], Self::row_to_record_tuple)
346                .optional()?
347        };
348
349        match res {
350            Some((record, path)) => Ok(Some(self.load_artifact(path, record).await?)),
351            None => Ok(None),
352        }
353    }
354
355    async fn list(
356        &self,
357        project: &str,
358        query: &ListBaselinesQuery,
359    ) -> Result<ListBaselinesResponse, StoreError> {
360        let (records_with_paths, total) = {
361            let conn = self
362                .conn
363                .lock()
364                .map_err(|e| StoreError::LockError(e.to_string()))?;
365            let mut sql =
366                String::from("SELECT * FROM baselines WHERE project = ?1 AND deleted = 0");
367            let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(project.to_string())];
368
369            if let Some(ref b) = query.benchmark {
370                sql.push_str(" AND benchmark = ?");
371                params.push(Box::new(b.clone()));
372            }
373
374            let count_sql = format!("SELECT COUNT(*) FROM ({})", sql);
375            let total: u64 =
376                conn.query_row(&count_sql, rusqlite::params_from_iter(params.iter()), |r| {
377                    r.get(0)
378                })?;
379
380            sql.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
381            params.push(Box::new(query.limit as i64));
382            params.push(Box::new(query.offset as i64));
383
384            let mut stmt = conn.prepare(&sql)?;
385            let rows = stmt
386                .query_map(
387                    rusqlite::params_from_iter(params.iter()),
388                    Self::row_to_record_tuple,
389                )?
390                .collect::<Result<Vec<_>, _>>()?;
391            (rows, total)
392        };
393
394        let mut baselines = Vec::with_capacity(records_with_paths.len());
395        for (mut record, path) in records_with_paths {
396            if query.include_receipt {
397                record = self.load_artifact(path, record).await?;
398            }
399            baselines.push(record.into());
400        }
401
402        let count = baselines.len() as u64;
403
404        Ok(ListBaselinesResponse {
405            baselines,
406            pagination: PaginationInfo {
407                total,
408                limit: query.limit,
409                offset: query.offset,
410                has_more: (query.offset + count) < total,
411            },
412        })
413    }
414
415    async fn update(&self, record: &BaselineRecord) -> Result<(), StoreError> {
416        let artifact_path = self.store_artifact(record).await?;
417        let receipt_json = if artifact_path.is_none() {
418            Some(serde_json::to_string(&record.receipt)?)
419        } else {
420            None
421        };
422
423        let conn = self
424            .conn
425            .lock()
426            .map_err(|e| StoreError::LockError(e.to_string()))?;
427        conn.execute(
428            "UPDATE baselines SET git_ref=?1, git_sha=?2, receipt=?3, artifact_path=?4, metadata=?5, tags=?6, source=?7, content_hash=?8, updated_at=?9 WHERE project=?10 AND benchmark=?11 AND version=?12",
429            params![
430                record.git_ref, record.git_sha, receipt_json, artifact_path,
431                serde_json::to_string(&record.metadata)?, serde_json::to_string(&record.tags)?,
432                format!("{:?}", record.source).to_lowercase(), record.content_hash,
433                record.updated_at.to_rfc3339(), record.project, record.benchmark, record.version
434            ]
435        )?;
436        Ok(())
437    }
438
439    async fn delete(
440        &self,
441        project: &str,
442        benchmark: &str,
443        version: &str,
444    ) -> Result<bool, StoreError> {
445        let conn = self
446            .conn
447            .lock()
448            .map_err(|e| StoreError::LockError(e.to_string()))?;
449        let n = conn.execute("UPDATE baselines SET deleted = 1, updated_at = ?1 WHERE project = ?2 AND benchmark = ?3 AND version = ?4 AND deleted = 0",
450            params![chrono::Utc::now().to_rfc3339(), project, benchmark, version])?;
451        Ok(n > 0)
452    }
453
454    async fn hard_delete(
455        &self,
456        project: &str,
457        benchmark: &str,
458        version: &str,
459    ) -> Result<bool, StoreError> {
460        let conn = self
461            .conn
462            .lock()
463            .map_err(|e| StoreError::LockError(e.to_string()))?;
464        let n = conn.execute(
465            "DELETE FROM baselines WHERE project = ?1 AND benchmark = ?2 AND version = ?3",
466            params![project, benchmark, version],
467        )?;
468        Ok(n > 0)
469    }
470
471    async fn list_versions(
472        &self,
473        project: &str,
474        benchmark: &str,
475    ) -> Result<Vec<BaselineVersion>, StoreError> {
476        let conn = self
477            .conn
478            .lock()
479            .map_err(|e| StoreError::LockError(e.to_string()))?;
480        let mut stmt = conn.prepare("SELECT version, git_ref, git_sha, source, created_at FROM baselines WHERE project = ?1 AND benchmark = ?2 AND deleted = 0 ORDER BY created_at DESC")?;
481        let mut versions: Vec<BaselineVersion> = stmt
482            .query_map(params![project, benchmark], |row| {
483                let created_at_str: String = row.get(4)?;
484                Ok(BaselineVersion {
485                    version: row.get(0)?,
486                    git_ref: row.get(1)?,
487                    git_sha: row.get(2)?,
488                    created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
489                        .map(|dt| dt.with_timezone(&chrono::Utc))
490                        .unwrap_or_else(|_| chrono::Utc::now()),
491                    created_by: None,
492                    is_current: false,
493                    source: match row.get::<_, String>(3)?.as_str() {
494                        "promote" => BaselineSource::Promote,
495                        "migrate" => BaselineSource::Migrate,
496                        "rollback" => BaselineSource::Rollback,
497                        _ => BaselineSource::Upload,
498                    },
499                })
500            })?
501            .collect::<Result<Vec<_>, _>>()?;
502        if let Some(first) = versions.first_mut() {
503            first.is_current = true;
504        }
505        Ok(versions)
506    }
507
508    async fn health_check(&self) -> Result<StorageHealth, StoreError> {
509        let conn = self
510            .conn
511            .lock()
512            .map_err(|e| StoreError::LockError(e.to_string()))?;
513        match conn.query_row("SELECT 1", [], |_| Ok(())) {
514            Ok(_) => Ok(StorageHealth::Healthy),
515            Err(_) => Ok(StorageHealth::Unhealthy),
516        }
517    }
518
519    fn backend_type(&self) -> &'static str {
520        "sqlite"
521    }
522
523    async fn create_verdict(&self, record: &VerdictRecord) -> Result<(), StoreError> {
524        let conn = self
525            .conn
526            .lock()
527            .map_err(|e| StoreError::LockError(e.to_string()))?;
528
529        let counts_json =
530            serde_json::to_string(&record.counts).map_err(StoreError::SerializationError)?;
531        let reasons_json =
532            serde_json::to_string(&record.reasons).map_err(StoreError::SerializationError)?;
533        let status_str = record.status.as_str();
534        let created_at_str = record.created_at.to_rfc3339();
535
536        conn.execute(
537            r#"
538            INSERT INTO verdicts (
539                id, schema_id, project, benchmark, run_id, status, counts, reasons,
540                git_ref, git_sha, created_at
541            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
542            "#,
543            params![
544                record.id,
545                record.schema,
546                record.project,
547                record.benchmark,
548                record.run_id,
549                status_str,
550                counts_json,
551                reasons_json,
552                record.git_ref,
553                record.git_sha,
554                created_at_str
555            ],
556        )?;
557
558        Ok(())
559    }
560
561    async fn list_verdicts(
562        &self,
563        project: &str,
564        query: &ListVerdictsQuery,
565    ) -> Result<ListVerdictsResponse, StoreError> {
566        let mut sql = "SELECT * FROM verdicts WHERE project = ?".to_string();
567        let mut params_vec: Vec<rusqlite::types::Value> = vec![project.to_string().into()];
568
569        if let Some(bench) = &query.benchmark {
570            sql.push_str(" AND benchmark = ?");
571            params_vec.push(bench.clone().into());
572        }
573
574        if let Some(status) = &query.status {
575            sql.push_str(" AND status = ?");
576            params_vec.push(status.as_str().to_string().into());
577        }
578
579        if let Some(since) = &query.since {
580            sql.push_str(" AND created_at >= ?");
581            params_vec.push(since.to_rfc3339().into());
582        }
583
584        if let Some(until) = &query.until {
585            sql.push_str(" AND created_at <= ?");
586            params_vec.push(until.to_rfc3339().into());
587        }
588
589        sql.push_str(" ORDER BY created_at DESC");
590
591        // Limit and offset
592        sql.push_str(" LIMIT ? OFFSET ?");
593        params_vec.push((query.limit as i64).into());
594        params_vec.push((query.offset as i64).into());
595
596        let conn = self
597            .conn
598            .lock()
599            .map_err(|e| StoreError::LockError(e.to_string()))?;
600
601        let mut stmt = conn
602            .prepare(&sql)
603            .map_err(|e| StoreError::QueryError(e.to_string()))?;
604        let rows = stmt
605            .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
606                Self::row_to_verdict(row)
607            })
608            .map_err(|e| StoreError::QueryError(e.to_string()))?;
609
610        let mut verdicts = Vec::new();
611        for row in rows {
612            verdicts.push(row?);
613        }
614
615        // For total count
616        let count_sql = "SELECT COUNT(*) FROM verdicts WHERE project = ?";
617        let total: i64 = conn.query_row(count_sql, params![project], |row| row.get(0))?;
618
619        Ok(ListVerdictsResponse {
620            verdicts,
621            pagination: PaginationInfo {
622                total: total as u64,
623                offset: query.offset,
624                limit: query.limit,
625                has_more: (query.offset + query.limit as u64) < total as u64,
626            },
627        })
628    }
629}
630
631impl SqliteStore {
632    fn row_to_verdict(row: &rusqlite::Row) -> Result<VerdictRecord, rusqlite::Error> {
633        let status_str: String = row.get(5)?;
634        let status = match status_str.as_str() {
635            "pass" => VerdictStatus::Pass,
636            "warn" => VerdictStatus::Warn,
637            "fail" => VerdictStatus::Fail,
638            "skip" => VerdictStatus::Skip,
639            _ => VerdictStatus::Pass,
640        };
641
642        let counts_json: String = row.get(6)?;
643        let counts = serde_json::from_str(&counts_json).unwrap_or(VerdictCounts {
644            pass: 0,
645            warn: 0,
646            fail: 0,
647            skip: 0,
648        });
649
650        let reasons_json: String = row.get(7)?;
651        let reasons = serde_json::from_str(&reasons_json).unwrap_or_default();
652
653        let created_at_str: String = row.get(10)?;
654        let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
655            .map(|dt| dt.with_timezone(&chrono::Utc))
656            .unwrap_or_else(|_| chrono::Utc::now());
657
658        Ok(VerdictRecord {
659            id: row.get(0)?,
660            schema: row.get(1)?,
661            project: row.get(2)?,
662            benchmark: row.get(3)?,
663            run_id: row.get(4)?,
664            status,
665            counts,
666            reasons,
667            git_ref: row.get(8)?,
668            git_sha: row.get(9)?,
669            created_at,
670        })
671    }
672}
673
674#[async_trait]
675impl AuditStore for SqliteStore {
676    async fn log_event(&self, event: &AuditEvent) -> Result<(), StoreError> {
677        let metadata_json =
678            serde_json::to_string(&event.metadata).map_err(StoreError::SerializationError)?;
679
680        let conn = self
681            .conn
682            .lock()
683            .map_err(|e| StoreError::LockError(e.to_string()))?;
684        conn.execute(
685            r#"
686            INSERT INTO audit_events (
687                id, timestamp, actor, action, resource_type, resource_id, project, metadata
688            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
689            "#,
690            params![
691                event.id,
692                event.timestamp.to_rfc3339(),
693                event.actor,
694                event.action.to_string(),
695                event.resource_type.to_string(),
696                event.resource_id,
697                event.project,
698                metadata_json,
699            ],
700        )?;
701
702        Ok(())
703    }
704
705    async fn list_events(
706        &self,
707        query: &ListAuditEventsQuery,
708    ) -> Result<ListAuditEventsResponse, StoreError> {
709        let mut sql = "SELECT * FROM audit_events WHERE 1=1".to_string();
710        let mut params_vec: Vec<rusqlite::types::Value> = Vec::new();
711
712        if let Some(ref project) = query.project {
713            sql.push_str(" AND project = ?");
714            params_vec.push(project.clone().into());
715        }
716
717        if let Some(ref action) = query.action {
718            sql.push_str(" AND action = ?");
719            params_vec.push(action.clone().into());
720        }
721
722        if let Some(ref resource_type) = query.resource_type {
723            sql.push_str(" AND resource_type = ?");
724            params_vec.push(resource_type.clone().into());
725        }
726
727        if let Some(ref actor) = query.actor {
728            sql.push_str(" AND actor = ?");
729            params_vec.push(actor.clone().into());
730        }
731
732        if let Some(ref since) = query.since {
733            sql.push_str(" AND timestamp >= ?");
734            params_vec.push(since.to_rfc3339().into());
735        }
736
737        if let Some(ref until) = query.until {
738            sql.push_str(" AND timestamp <= ?");
739            params_vec.push(until.to_rfc3339().into());
740        }
741
742        // Count before pagination
743        let count_sql = format!("SELECT COUNT(*) FROM ({})", sql);
744
745        sql.push_str(" ORDER BY timestamp DESC LIMIT ? OFFSET ?");
746        params_vec.push((query.limit as i64).into());
747        params_vec.push((query.offset as i64).into());
748
749        let conn = self
750            .conn
751            .lock()
752            .map_err(|e| StoreError::LockError(e.to_string()))?;
753
754        // Get total (without LIMIT/OFFSET params)
755        let count_params: Vec<rusqlite::types::Value> =
756            params_vec[..params_vec.len().saturating_sub(2)].to_vec();
757        let total: i64 = conn
758            .query_row(
759                &count_sql,
760                rusqlite::params_from_iter(count_params.iter()),
761                |row| row.get(0),
762            )
763            .map_err(|e| StoreError::QueryError(e.to_string()))?;
764
765        let mut stmt = conn
766            .prepare(&sql)
767            .map_err(|e| StoreError::QueryError(e.to_string()))?;
768        let rows = stmt
769            .query_map(rusqlite::params_from_iter(params_vec.iter()), |row| {
770                Self::row_to_audit_event(row)
771            })
772            .map_err(|e| StoreError::QueryError(e.to_string()))?;
773
774        let mut events = Vec::new();
775        for row in rows {
776            events.push(row?);
777        }
778
779        Ok(ListAuditEventsResponse {
780            events,
781            pagination: PaginationInfo {
782                total: total as u64,
783                offset: query.offset,
784                limit: query.limit,
785                has_more: (query.offset + query.limit as u64) < total as u64,
786            },
787        })
788    }
789}
790
791impl SqliteStore {
792    fn row_to_audit_event(row: &rusqlite::Row) -> Result<AuditEvent, rusqlite::Error> {
793        let timestamp_str: String = row.get(1)?;
794        let timestamp = chrono::DateTime::parse_from_rfc3339(&timestamp_str)
795            .map(|dt| dt.with_timezone(&chrono::Utc))
796            .unwrap_or_else(|_| chrono::Utc::now());
797
798        let action_str: String = row.get(3)?;
799        let action = action_str
800            .parse::<AuditAction>()
801            .unwrap_or(AuditAction::Create);
802
803        let resource_type_str: String = row.get(4)?;
804        let resource_type = resource_type_str
805            .parse::<AuditResourceType>()
806            .unwrap_or(AuditResourceType::Baseline);
807
808        let metadata_json: String = row.get(7)?;
809        let metadata: serde_json::Value = serde_json::from_str(&metadata_json)
810            .unwrap_or(serde_json::Value::Object(Default::default()));
811
812        Ok(AuditEvent {
813            id: row.get(0)?,
814            timestamp,
815            actor: row.get(2)?,
816            action,
817            resource_type,
818            resource_id: row.get(5)?,
819            project: row.get(6)?,
820            metadata,
821        })
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use super::*;
828    use crate::models::{BaselineRecordExt, BaselineSource};
829    use perfgate_types::{BenchMeta, HostInfo, RunMeta, RunReceipt, Stats, ToolInfo, U64Summary};
830    use std::collections::BTreeMap;
831    use tempfile::tempdir;
832
833    fn create_test_receipt(name: &str) -> RunReceipt {
834        RunReceipt {
835            schema: "perfgate.run.v1".to_string(),
836            tool: ToolInfo {
837                name: "perfgate".to_string(),
838                version: "0.3.0".to_string(),
839            },
840            run: RunMeta {
841                id: "test-run-id".to_string(),
842                started_at: "2026-01-01T00:00:00Z".to_string(),
843                ended_at: "2026-01-01T00:01:00Z".to_string(),
844                host: HostInfo {
845                    os: "linux".to_string(),
846                    arch: "x86_64".to_string(),
847                    cpu_count: Some(8),
848                    memory_bytes: Some(16 * 1024 * 1024 * 1024),
849                    hostname_hash: None,
850                },
851            },
852            bench: BenchMeta {
853                name: name.to_string(),
854                cwd: None,
855                command: vec!["./bench.sh".to_string()],
856                repeat: 5,
857                warmup: 1,
858                work_units: None,
859                timeout_ms: None,
860            },
861            samples: vec![],
862            stats: Stats {
863                wall_ms: U64Summary::new(100, 90, 110),
864                cpu_ms: None,
865                page_faults: None,
866                ctx_switches: None,
867                max_rss_kb: None,
868                io_read_bytes: None,
869                io_write_bytes: None,
870                network_packets: None,
871                energy_uj: None,
872                binary_bytes: None,
873                throughput_per_s: None,
874            },
875        }
876    }
877
878    fn create_test_record(project: &str, benchmark: &str, version: &str) -> BaselineRecord {
879        BaselineRecord::new(
880            project.to_string(),
881            benchmark.to_string(),
882            version.to_string(),
883            create_test_receipt(benchmark),
884            Some("refs/heads/main".to_string()),
885            Some("abc123".to_string()),
886            BTreeMap::new(),
887            vec!["test".to_string()],
888            BaselineSource::Upload,
889        )
890    }
891
892    #[tokio::test(flavor = "multi_thread")]
893    async fn test_in_memory_database() {
894        let store = SqliteStore::in_memory().unwrap();
895        let record = create_test_record("my-project", "my-bench", "v1.0.0");
896        store.create(&record).await.unwrap();
897        let retrieved = store.get("my-project", "my-bench", "v1.0.0").await.unwrap();
898        assert!(retrieved.is_some());
899        let retrieved = retrieved.unwrap();
900        assert_eq!(retrieved.project, "my-project");
901    }
902
903    #[tokio::test(flavor = "multi_thread")]
904    async fn test_persistent_database() {
905        let dir = tempdir().unwrap();
906        let db_path = dir.path().join("test.db");
907        {
908            let store = SqliteStore::new(&db_path, None).unwrap();
909            let record = create_test_record("my-project", "my-bench", "v1.0.0");
910            store.create(&record).await.unwrap();
911        }
912        {
913            let store = SqliteStore::new(&db_path, None).unwrap();
914            let retrieved = store.get("my-project", "my-bench", "v1.0.0").await.unwrap();
915            assert!(retrieved.is_some());
916        }
917    }
918
919    #[tokio::test(flavor = "multi_thread")]
920    async fn test_wal_mode_enabled() {
921        let dir = tempdir().unwrap();
922        let db_path = dir.path().join("wal_test.db");
923        let store = SqliteStore::new(&db_path, None).unwrap();
924
925        let conn = store.conn.lock().unwrap();
926        let journal_mode: String = conn
927            .query_row("PRAGMA journal_mode", [], |row| row.get(0))
928            .unwrap();
929        assert_eq!(journal_mode.to_lowercase(), "wal");
930
931        let busy_timeout: i64 = conn
932            .query_row("PRAGMA busy_timeout", [], |row| row.get(0))
933            .unwrap();
934        assert_eq!(busy_timeout, 5000);
935    }
936
937    #[tokio::test(flavor = "multi_thread")]
938    async fn test_audit_log_event_and_list() {
939        use crate::models::{ListAuditEventsQuery, generate_ulid};
940
941        let store = SqliteStore::in_memory().unwrap();
942
943        let event = AuditEvent {
944            id: generate_ulid(),
945            timestamp: chrono::Utc::now(),
946            actor: "key-abc".to_string(),
947            action: AuditAction::Create,
948            resource_type: AuditResourceType::Baseline,
949            resource_id: "my-project/my-bench/v1".to_string(),
950            project: "my-project".to_string(),
951            metadata: serde_json::json!({"benchmark": "my-bench"}),
952        };
953
954        store.log_event(&event).await.unwrap();
955
956        // List all
957        let query = ListAuditEventsQuery::default();
958        let result = store.list_events(&query).await.unwrap();
959        assert_eq!(result.events.len(), 1);
960        assert_eq!(result.events[0].actor, "key-abc");
961        assert_eq!(result.events[0].action, AuditAction::Create);
962        assert_eq!(result.events[0].resource_type, AuditResourceType::Baseline);
963
964        // Filter by project
965        let query = ListAuditEventsQuery {
966            project: Some("my-project".to_string()),
967            ..Default::default()
968        };
969        let result = store.list_events(&query).await.unwrap();
970        assert_eq!(result.events.len(), 1);
971
972        // Filter by wrong project
973        let query = ListAuditEventsQuery {
974            project: Some("other-project".to_string()),
975            ..Default::default()
976        };
977        let result = store.list_events(&query).await.unwrap();
978        assert_eq!(result.events.len(), 0);
979
980        // Filter by action
981        let query = ListAuditEventsQuery {
982            action: Some("create".to_string()),
983            ..Default::default()
984        };
985        let result = store.list_events(&query).await.unwrap();
986        assert_eq!(result.events.len(), 1);
987
988        let query = ListAuditEventsQuery {
989            action: Some("delete".to_string()),
990            ..Default::default()
991        };
992        let result = store.list_events(&query).await.unwrap();
993        assert_eq!(result.events.len(), 0);
994    }
995
996    #[tokio::test(flavor = "multi_thread")]
997    async fn test_audit_log_multiple_events() {
998        use crate::models::{ListAuditEventsQuery, generate_ulid};
999
1000        let store = SqliteStore::in_memory().unwrap();
1001
1002        for i in 0..5 {
1003            let event = AuditEvent {
1004                id: generate_ulid(),
1005                timestamp: chrono::Utc::now(),
1006                actor: format!("key-{}", i),
1007                action: if i % 2 == 0 {
1008                    AuditAction::Create
1009                } else {
1010                    AuditAction::Delete
1011                },
1012                resource_type: AuditResourceType::Baseline,
1013                resource_id: format!("resource-{}", i),
1014                project: "proj".to_string(),
1015                metadata: serde_json::json!({}),
1016            };
1017            store.log_event(&event).await.unwrap();
1018        }
1019
1020        let query = ListAuditEventsQuery::default();
1021        let result = store.list_events(&query).await.unwrap();
1022        assert_eq!(result.events.len(), 5);
1023        assert_eq!(result.pagination.total, 5);
1024
1025        // Pagination
1026        let query = ListAuditEventsQuery {
1027            limit: 2,
1028            ..Default::default()
1029        };
1030        let result = store.list_events(&query).await.unwrap();
1031        assert_eq!(result.events.len(), 2);
1032        assert!(result.pagination.has_more);
1033
1034        // Filter by action
1035        let query = ListAuditEventsQuery {
1036            action: Some("create".to_string()),
1037            ..Default::default()
1038        };
1039        let result = store.list_events(&query).await.unwrap();
1040        assert_eq!(result.events.len(), 3); // indices 0, 2, 4
1041    }
1042}