assay_core/storage/
store.rs

1use crate::model::{AttemptRow, EvalConfig, LlmResponse, TestResultRow, TestStatus};
2use crate::trace::schema::{EpisodeEnd, EpisodeStart, StepEntry, ToolCallEntry, TraceEvent};
3use anyhow::Context;
4use rusqlite::{params, Connection};
5use std::path::Path;
6use std::sync::{Arc, Mutex};
7
8#[derive(Clone)]
9pub struct Store {
10    pub conn: Arc<Mutex<Connection>>,
11}
12
13pub struct StoreStats {
14    pub runs: Option<u64>,
15    pub results: Option<u64>,
16    pub last_run_id: Option<i64>,
17    pub last_run_at: Option<String>,
18    pub version: Option<String>,
19}
20
21impl Store {
22    pub fn open(path: &Path) -> anyhow::Result<Self> {
23        let conn = Connection::open(path).context("failed to open sqlite db")?;
24        conn.execute("PRAGMA foreign_keys = ON", [])?;
25        Ok(Self {
26            conn: Arc::new(Mutex::new(conn)),
27        })
28    }
29
30    pub fn memory() -> anyhow::Result<Self> {
31        // SQLite in-memory DB
32        let conn = Connection::open_in_memory().context("failed to open in-memory sqlite db")?;
33        Ok(Self {
34            conn: Arc::new(Mutex::new(conn)),
35        })
36    }
37
38    pub fn init_schema(&self) -> anyhow::Result<()> {
39        let conn = self.conn.lock().unwrap();
40        conn.execute_batch(crate::storage::schema::DDL)?;
41
42        // v0.3.0 Migrations
43        migrate_v030(&conn)?;
44
45        // Ensure attempts table exists (covered by DDL if creating fresh, but good to be explicit if DDL didn't run on existing DB)
46        // DDL handles IF NOT EXISTS for attempts.
47
48        // Index on fingerprint for speed (CREATE INDEX IF NOT EXISTS is valid sqlite)
49        let _ = conn.execute(
50            "CREATE INDEX IF NOT EXISTS idx_results_fingerprint ON results(fingerprint)",
51            [],
52        );
53
54        Ok(())
55    }
56
57    pub fn fetch_recent_results(
58        &self,
59        suite: &str,
60        limit: u32,
61    ) -> anyhow::Result<Vec<crate::model::TestResultRow>> {
62        let conn = self.conn.lock().unwrap();
63        let mut stmt = conn.prepare(
64            "SELECT
65                r.test_id, r.outcome, r.duration_ms, r.score, r.attempts_json,
66                r.fingerprint, r.skip_reason
67             FROM results r
68             JOIN runs ON r.run_id = runs.id
69             WHERE runs.suite = ?1
70             ORDER BY r.id DESC
71             LIMIT ?2",
72        )?;
73
74        let rows = stmt.query_map(rusqlite::params![suite, limit], |row| {
75            let attempts_str: Option<String> = row.get(4)?;
76
77            // Rehydrate "details" from last attempt (important for calibration)
78            let attempts: Option<Vec<crate::model::AttemptRow>> = match attempts_str {
79                Some(s) if !s.trim().is_empty() => serde_json::from_str(&s).ok(),
80                _ => None,
81            };
82
83            let (message, details) = attempts
84                .as_ref()
85                .and_then(|v| v.last())
86                .map(|a| (a.message.clone(), a.details.clone()))
87                .unwrap_or_else(|| (String::new(), serde_json::json!({})));
88
89            let cached = false;
90
91            Ok(crate::model::TestResultRow {
92                test_id: row.get(0)?,
93                status: crate::model::TestStatus::parse(&row.get::<_, String>(1)?),
94                message,
95                duration_ms: row.get(2)?,
96                details,
97                score: row.get(3)?,
98                cached,
99                fingerprint: row.get(5)?,
100                skip_reason: row.get(6)?,
101                attempts,
102                error_policy_applied: None,
103            })
104        })?;
105
106        let mut results = Vec::new();
107        for r in rows {
108            results.push(r?);
109        }
110        Ok(results)
111    }
112
113    pub fn fetch_results_for_last_n_runs(
114        &self,
115        suite: &str,
116        n: u32,
117    ) -> anyhow::Result<Vec<crate::model::TestResultRow>> {
118        let conn = self.conn.lock().unwrap();
119        let mut stmt = conn.prepare(
120            "SELECT
121                r.test_id, r.outcome, r.duration_ms, r.score, r.attempts_json,
122                r.fingerprint, r.skip_reason
123             FROM results r
124             JOIN runs ON r.run_id = runs.id
125             WHERE runs.id IN (
126                 SELECT id FROM runs WHERE suite = ?1 ORDER BY id DESC LIMIT ?2
127             )
128             ORDER BY r.id DESC",
129        )?;
130
131        let rows = stmt.query_map(rusqlite::params![suite, n], |row| {
132            let attempts_str: Option<String> = row.get(4)?;
133
134            let (message, details) =
135                if let Some(s) = attempts_str.as_ref().filter(|s| !s.trim().is_empty()) {
136                    if let Ok(attempts) = serde_json::from_str::<Vec<crate::model::AttemptRow>>(s) {
137                        attempts
138                            .last()
139                            .map(|a| (a.message.clone(), a.details.clone()))
140                            .unwrap_or_else(|| (String::new(), serde_json::json!({})))
141                    } else {
142                        (String::new(), serde_json::json!({}))
143                    }
144                } else {
145                    (String::new(), serde_json::json!({}))
146                };
147
148            let attempts: Option<Vec<crate::model::AttemptRow>> =
149                attempts_str.and_then(|s| serde_json::from_str(&s).ok());
150
151            Ok(crate::model::TestResultRow {
152                test_id: row.get(0)?,
153                status: crate::model::TestStatus::parse(&row.get::<_, String>(1)?),
154                message,
155                duration_ms: row.get(2)?,
156                details,
157                score: row.get(3)?,
158                cached: false,
159                fingerprint: row.get(5)?,
160                skip_reason: row.get(6)?,
161                attempts,
162                error_policy_applied: None,
163            })
164        })?;
165
166        let mut results = Vec::new();
167        for r in rows {
168            results.push(r?);
169        }
170        Ok(results)
171    }
172
173    pub fn get_latest_run_id(&self, suite: &str) -> anyhow::Result<Option<i64>> {
174        let conn = self.conn.lock().unwrap();
175        let mut stmt =
176            conn.prepare("SELECT id FROM runs WHERE suite = ?1 ORDER BY id DESC LIMIT 1")?;
177        let mut rows = stmt.query(params![suite])?;
178        if let Some(row) = rows.next()? {
179            Ok(Some(row.get(0)?))
180        } else {
181            Ok(None)
182        }
183    }
184
185    pub fn fetch_results_for_run(
186        &self,
187        run_id: i64,
188    ) -> anyhow::Result<Vec<crate::model::TestResultRow>> {
189        let conn = self.conn.lock().unwrap();
190        let mut stmt = conn.prepare(
191            "SELECT
192                r.test_id, r.outcome, r.duration_ms, r.score, r.attempts_json,
193                r.fingerprint, r.skip_reason
194             FROM results r
195             WHERE r.run_id = ?1
196             ORDER BY r.test_id ASC",
197        )?;
198
199        let rows = stmt.query_map(params![run_id], |row| {
200            let attempts_str: Option<String> = row.get(4)?;
201
202            let (message, details) =
203                if let Some(s) = attempts_str.as_ref().filter(|s| !s.trim().is_empty()) {
204                    if let Ok(attempts) = serde_json::from_str::<Vec<crate::model::AttemptRow>>(s) {
205                        attempts
206                            .last()
207                            .map(|a| (a.message.clone(), a.details.clone()))
208                            .unwrap_or_else(|| (String::new(), serde_json::json!({})))
209                    } else {
210                        (String::new(), serde_json::json!({}))
211                    }
212                } else {
213                    (String::new(), serde_json::json!({}))
214                };
215
216            let attempts: Option<Vec<crate::model::AttemptRow>> =
217                attempts_str.and_then(|s| serde_json::from_str(&s).ok());
218
219            Ok(crate::model::TestResultRow {
220                test_id: row.get(0)?,
221                status: crate::model::TestStatus::parse(&row.get::<_, String>(1)?),
222                message,
223                duration_ms: row.get(2)?,
224                details,
225                score: row.get(3)?,
226                cached: false,
227                fingerprint: row.get(5)?,
228                skip_reason: row.get(6)?,
229                attempts,
230                error_policy_applied: None,
231            })
232        })?;
233
234        let mut results = Vec::new();
235        for r in rows {
236            results.push(r?);
237        }
238        Ok(results)
239    }
240
241    pub fn get_last_passing_by_fingerprint(
242        &self,
243        fingerprint: &str,
244    ) -> anyhow::Result<Option<TestResultRow>> {
245        let conn = self.conn.lock().unwrap();
246        // We want the most recent passing result for this fingerprint.
247        // run_id DESC ensures recency.
248        let mut stmt = conn.prepare(
249            "SELECT r.test_id, r.outcome, r.score, r.duration_ms, r.output_json, r.skip_reason, run.id, run.started_at
250             FROM results r
251             JOIN runs run ON r.run_id = run.id
252             WHERE r.fingerprint = ?1 AND r.outcome = 'pass'
253             ORDER BY r.id DESC LIMIT 1"
254        )?;
255
256        let mut rows = stmt.query(params![fingerprint])?;
257        if let Some(row) = rows.next()? {
258            let outcome: String = row.get(1)?;
259            let status = match outcome.as_str() {
260                "pass" => TestStatus::Pass,
261                _ => TestStatus::Pass,
262            };
263
264            let skip_reason: Option<String> = row.get(5)?;
265            let run_id: i64 = row.get(6)?;
266            let started_at: String = row.get(7)?;
267
268            let details = serde_json::json!({
269                "skip": {
270                    "reason": skip_reason.clone().unwrap_or_else(|| "fingerprint_match".into()),
271                    "fingerprint": fingerprint,
272                    "previous_run_id": run_id,
273                    "previous_at": started_at,
274                    "origin_run_id": run_id,
275                    "previous_score": row.get::<_, Option<f64>>(2)?
276                }
277            });
278
279            Ok(Some(TestResultRow {
280                test_id: row.get(0)?,
281                status,
282                message: skip_reason.unwrap_or_else(|| "fingerprint_match".to_string()),
283                score: row.get(2)?,
284                duration_ms: row.get(3)?,
285                cached: true,
286                details,
287                fingerprint: Some(fingerprint.to_string()),
288                skip_reason: None,
289                attempts: None,
290                error_policy_applied: None,
291            }))
292        } else {
293            Ok(None)
294        }
295    }
296
297    pub fn insert_run(&self, suite: &str) -> anyhow::Result<i64> {
298        let conn = self.conn.lock().unwrap();
299        let started_at = chrono::Utc::now().to_rfc3339();
300        conn.execute(
301            "INSERT INTO runs(suite, started_at, status) VALUES (?1, ?2, ?3)",
302            params![suite, started_at, "running"],
303        )?;
304        Ok(conn.last_insert_rowid())
305    }
306
307    pub fn create_run(&self, cfg: &EvalConfig) -> anyhow::Result<i64> {
308        let started_at = now_rfc3339ish();
309        let conn = self.conn.lock().unwrap();
310        conn.execute(
311            "INSERT INTO runs(suite, started_at, status, config_json) VALUES (?1, ?2, ?3, ?4)",
312            params![
313                cfg.suite,
314                started_at,
315                "running",
316                serde_json::to_string(cfg)?
317            ],
318        )?;
319        Ok(conn.last_insert_rowid())
320    }
321
322    pub fn finalize_run(&self, run_id: i64, status: &str) -> anyhow::Result<()> {
323        let conn = self.conn.lock().unwrap();
324        conn.execute(
325            "UPDATE runs SET status=?1 WHERE id=?2",
326            params![status, run_id],
327        )?;
328        Ok(())
329    }
330
331    pub fn insert_result_embedded(
332        &self,
333        run_id: i64,
334        row: &TestResultRow,
335        attempts: &[AttemptRow],
336        output: &LlmResponse,
337    ) -> anyhow::Result<()> {
338        let conn = self.conn.lock().unwrap();
339
340        // 1. Insert into results
341        conn.execute(
342            "INSERT INTO results(run_id, test_id, outcome, score, duration_ms, attempts_json, output_json, fingerprint, skip_reason)
343             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
344            params![
345                run_id,
346                row.test_id,
347                status_to_outcome(&row.status),
348                row.score,
349                row.duration_ms.map(|v| v as i64),
350                serde_json::to_string(attempts)?,
351                serde_json::to_string(output)?,
352                row.fingerprint,
353                row.skip_reason
354            ],
355        )?;
356
357        let result_id = conn.last_insert_rowid();
358
359        // 2. Insert individual attempts
360        let mut stmt = conn.prepare(
361            "INSERT INTO attempts(result_id, attempt_number, outcome, score, duration_ms, output_json, error_message)
362             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"
363        )?;
364
365        for attempt in attempts {
366            stmt.execute(params![
367                result_id,
368                attempt.attempt_no as i64,
369                status_to_outcome(&attempt.status),
370                0.0, // Score not tracked per attempt yet
371                attempt.duration_ms.map(|v| v as i64),
372                serde_json::to_string(&attempt.details)?,
373                Option::<String>::None
374            ])?;
375        }
376
377        Ok(())
378    }
379
380    // ... existing ...
381
382    // quarantine
383    pub fn quarantine_get_reason(
384        &self,
385        suite: &str,
386        test_id: &str,
387    ) -> anyhow::Result<Option<String>> {
388        let conn = self.conn.lock().unwrap();
389        let mut stmt =
390            conn.prepare("SELECT reason FROM quarantine WHERE suite=?1 AND test_id=?2")?;
391        let mut rows = stmt.query(params![suite, test_id])?;
392        if let Some(row) = rows.next()? {
393            Ok(Some(row.get::<_, Option<String>>(0)?.unwrap_or_default()))
394        } else {
395            Ok(None)
396        }
397    }
398
399    pub fn quarantine_add(&self, suite: &str, test_id: &str, reason: &str) -> anyhow::Result<()> {
400        let conn = self.conn.lock().unwrap();
401        conn.execute(
402            "INSERT INTO quarantine(suite, test_id, reason, added_at)
403             VALUES (?1, ?2, ?3, ?4)
404             ON CONFLICT(suite, test_id) DO UPDATE SET reason=excluded.reason, added_at=excluded.added_at",
405            params![suite, test_id, reason, now_rfc3339ish()],
406        )?;
407        Ok(())
408    }
409
410    pub fn quarantine_remove(&self, suite: &str, test_id: &str) -> anyhow::Result<()> {
411        let conn = self.conn.lock().unwrap();
412        conn.execute(
413            "DELETE FROM quarantine WHERE suite=?1 AND test_id=?2",
414            params![suite, test_id],
415        )?;
416        Ok(())
417    }
418
419    // cache
420    pub fn cache_get(&self, key: &str) -> anyhow::Result<Option<LlmResponse>> {
421        let conn = self.conn.lock().unwrap();
422        let mut stmt = conn.prepare("SELECT response_json FROM cache WHERE key=?1")?;
423        let mut rows = stmt.query(params![key])?;
424        if let Some(row) = rows.next()? {
425            let s: String = row.get(0)?;
426            let mut resp: LlmResponse = serde_json::from_str(&s)?;
427            resp.cached = true;
428            Ok(Some(resp))
429        } else {
430            Ok(None)
431        }
432    }
433
434    pub fn cache_put(&self, key: &str, resp: &LlmResponse) -> anyhow::Result<()> {
435        let conn = self.conn.lock().unwrap();
436        let created_at = now_rfc3339ish();
437        let mut to_store = resp.clone();
438        to_store.cached = false;
439        conn.execute(
440            "INSERT INTO cache(key, response_json, created_at) VALUES (?1, ?2, ?3)
441             ON CONFLICT(key) DO UPDATE SET response_json=excluded.response_json, created_at=excluded.created_at",
442            params![key, serde_json::to_string(&to_store)?, created_at],
443        )?;
444        Ok(())
445    }
446
447    // embeddings
448    pub fn get_embedding(&self, key: &str) -> anyhow::Result<Option<(String, Vec<f32>)>> {
449        let conn = self.conn.lock().unwrap();
450        let mut stmt = conn.prepare("SELECT model, vec FROM embeddings WHERE key = ?1 LIMIT 1")?;
451        let mut rows = stmt.query(params![key])?;
452
453        if let Some(row) = rows.next()? {
454            let model: String = row.get(0)?;
455            let blob: Vec<u8> = row.get(1)?;
456            let vec = crate::embeddings::util::decode_vec_f32(&blob)?;
457            Ok(Some((model, vec)))
458        } else {
459            Ok(None)
460        }
461    }
462
463    pub fn put_embedding(&self, key: &str, model: &str, vec: &[f32]) -> anyhow::Result<()> {
464        let conn = self.conn.lock().unwrap();
465        let blob = crate::embeddings::util::encode_vec_f32(vec);
466        let dims = vec.len() as i64;
467        let created_at = now_rfc3339ish();
468
469        conn.execute(
470            "INSERT OR REPLACE INTO embeddings (key, model, dims, vec, created_at)
471             VALUES (?1, ?2, ?3, ?4, ?5)",
472            params![key, model, dims, blob, created_at],
473        )?;
474        Ok(())
475    }
476    pub fn stats_best_effort(&self) -> anyhow::Result<StoreStats> {
477        let conn = self.conn.lock().unwrap();
478
479        let runs: Option<u64> = conn
480            .query_row("SELECT COUNT(*) FROM runs", [], |r| {
481                r.get::<_, i64>(0).map(|x| x as u64)
482            })
483            .ok();
484        let results: Option<u64> = conn
485            .query_row("SELECT COUNT(*) FROM results", [], |r| {
486                r.get::<_, i64>(0).map(|x| x as u64)
487            })
488            .ok();
489
490        let last: Option<(i64, String)> = conn
491            .query_row(
492                "SELECT id, started_at FROM runs ORDER BY id DESC LIMIT 1",
493                [],
494                |r| Ok((r.get(0)?, r.get(1)?)),
495            )
496            .ok();
497
498        let (last_id, last_started) = if let Some((id, s)) = last {
499            (Some(id), Some(s))
500        } else {
501            (None, None)
502        };
503
504        let v_str: Option<String> = conn
505            .query_row("PRAGMA user_version", [], |r| r.get(0))
506            .ok()
507            .map(|v: i64| v.to_string());
508
509        Ok(StoreStats {
510            runs,
511            results,
512            last_run_id: last_id,
513            last_run_at: last_started,
514            version: v_str,
515        })
516    }
517
518    // --- Assertions Support ---
519
520    pub fn get_episode_graph(
521        &self,
522        run_id: i64,
523        test_id: &str,
524    ) -> anyhow::Result<crate::agent_assertions::EpisodeGraph> {
525        let conn = self.conn.lock().unwrap();
526
527        // 1. Find Episode
528        let mut stmt = conn.prepare("SELECT id FROM episodes WHERE run_id = ? AND test_id = ?")?;
529        let mut rows = stmt.query(params![run_id, test_id])?;
530
531        let mut episode_ids = Vec::new();
532        while let Some(row) = rows.next()? {
533            episode_ids.push(row.get::<_, String>(0)?);
534        }
535
536        if episode_ids.is_empty() {
537            anyhow::bail!(
538                "E_TRACE_EPISODE_MISSING: No episode found for run_id={} test_id={}",
539                run_id,
540                test_id
541            );
542        }
543        if episode_ids.len() > 1 {
544            anyhow::bail!(
545                "E_TRACE_EPISODE_AMBIGUOUS: Multiple episodes ({}) found for run_id={} test_id={}",
546                episode_ids.len(),
547                run_id,
548                test_id
549            );
550        }
551        let episode_id = episode_ids[0].clone();
552
553        // 2. Fetch Steps
554        let mut stmt_steps = conn.prepare("SELECT id, episode_id, idx, kind, name, content FROM steps WHERE episode_id = ? ORDER BY idx ASC")?;
555        let step_rows = stmt_steps
556            .query_map(params![episode_id], |row| {
557                Ok(crate::storage::rows::StepRow {
558                    id: row.get(0)?,
559                    episode_id: row.get(1)?,
560                    idx: row.get(2)?,
561                    kind: row.get(3)?,
562                    name: row.get(4)?,
563                    content: row.get(5)?,
564                })
565            })?
566            .collect::<Result<Vec<_>, _>>()?;
567
568        // 3. Fetch ToolCalls (Joined for ordering)
569        let mut stmt_tools = conn.prepare(
570            "SELECT tc.id, tc.step_id, tc.episode_id, tc.tool_name, tc.call_index, tc.args, tc.result
571             FROM tool_calls tc
572             JOIN steps s ON tc.step_id = s.id
573             WHERE tc.episode_id = ?
574             ORDER BY s.idx ASC, tc.call_index ASC"
575        )?;
576        let tool_rows = stmt_tools
577            .query_map(params![episode_id], |row| {
578                Ok(crate::storage::rows::ToolCallRow {
579                    id: row.get(0)?,
580                    step_id: row.get(1)?,
581                    episode_id: row.get(2)?,
582                    tool_name: row.get(3)?,
583                    call_index: row.get(4)?,
584                    args: row.get(5)?,
585                    result: row.get(6)?,
586                })
587            })?
588            .collect::<Result<Vec<_>, _>>()?;
589
590        Ok(crate::agent_assertions::EpisodeGraph {
591            episode_id,
592            steps: step_rows,
593            tool_calls: tool_rows,
594        })
595    }
596
597    // --- Trace V2 Storage ---
598
599    pub fn insert_event(
600        &self,
601        event: &TraceEvent,
602        run_id: Option<i64>,
603        test_id: Option<&str>,
604    ) -> anyhow::Result<()> {
605        let mut conn = self.conn.lock().unwrap();
606        let tx = conn.transaction()?;
607        match event {
608            TraceEvent::EpisodeStart(e) => Self::insert_episode(&tx, e, run_id, test_id)?,
609            TraceEvent::Step(e) => Self::insert_step(&tx, e)?,
610            TraceEvent::ToolCall(e) => Self::insert_tool_call(&tx, e)?,
611            TraceEvent::EpisodeEnd(e) => Self::update_episode_end(&tx, e)?,
612        }
613        tx.commit()?;
614        Ok(())
615    }
616
617    pub fn insert_batch(
618        &self,
619        events: &[TraceEvent],
620        run_id: Option<i64>,
621        test_id: Option<&str>,
622    ) -> anyhow::Result<()> {
623        let mut conn = self.conn.lock().unwrap();
624        let tx = conn.transaction()?;
625        for event in events {
626            match event {
627                TraceEvent::EpisodeStart(e) => Self::insert_episode(&tx, e, run_id, test_id)?,
628                TraceEvent::Step(e) => Self::insert_step(&tx, e)?,
629                TraceEvent::ToolCall(e) => Self::insert_tool_call(&tx, e)?,
630                TraceEvent::EpisodeEnd(e) => Self::update_episode_end(&tx, e)?,
631            }
632        }
633        tx.commit()?;
634        Ok(())
635    }
636
637    fn insert_episode(
638        tx: &rusqlite::Transaction,
639        e: &EpisodeStart,
640        run_id: Option<i64>,
641        test_id: Option<&str>,
642    ) -> anyhow::Result<()> {
643        let prompt_val = e.input.get("prompt").unwrap_or(&serde_json::Value::Null);
644        let prompt_str = if let Some(s) = prompt_val.as_str() {
645            s.to_string()
646        } else {
647            serde_json::to_string(prompt_val).unwrap_or_default()
648        };
649        let meta = serde_json::to_string(&e.meta).unwrap_or_default();
650
651        // PR-406: Support test_id in meta (from MCP import) to override episode_id default
652        let meta_test_id = e.meta.get("test_id").and_then(|v| v.as_str());
653        let effective_test_id = test_id.or(meta_test_id).or(Some(&e.episode_id));
654
655        // Idempotent: OR REPLACE to update meta/prompt if re-ingesting? Or OR IGNORE?
656        // User said: "INSERT OR IGNORE op episode_id" for idempotency of IDs.
657        tx.execute(
658            "INSERT INTO episodes (id, run_id, test_id, timestamp, prompt, meta_json) VALUES (?, ?, ?, ?, ?, ?)
659             ON CONFLICT(id) DO UPDATE SET
660                run_id=COALESCE(excluded.run_id, episodes.run_id),
661                test_id=COALESCE(excluded.test_id, episodes.test_id),
662                timestamp=excluded.timestamp,
663                prompt=excluded.prompt,
664                meta_json=excluded.meta_json",
665            (
666                &e.episode_id,
667                run_id,
668                effective_test_id,
669                e.timestamp,
670                prompt_str,
671                meta,
672            ),
673        ).context("insert episode")?;
674        Ok(())
675    }
676
677    fn insert_step(tx: &rusqlite::Transaction, e: &StepEntry) -> anyhow::Result<()> {
678        let meta = serde_json::to_string(&e.meta).unwrap_or_default();
679        let trunc = serde_json::to_string(&e.truncations).unwrap_or_default();
680
681        // Idempotency: UNIQUE(episode_id, idx)
682        tx.execute(
683            "INSERT INTO steps (id, episode_id, idx, kind, name, content, content_sha256, truncations_json, meta_json)
684             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
685             ON CONFLICT(id) DO UPDATE SET content=excluded.content, meta_json=excluded.meta_json",
686            (
687                &e.step_id,
688                &e.episode_id,
689                e.idx,
690                &e.kind,
691                e.name.as_deref(),
692                e.content.as_deref(),
693                e.content_sha256.as_deref(),
694                trunc,
695                meta
696            ),
697        ).context("insert step")?;
698        Ok(())
699    }
700
701    fn insert_tool_call(tx: &rusqlite::Transaction, e: &ToolCallEntry) -> anyhow::Result<()> {
702        let args = serde_json::to_string(&e.args).unwrap_or_default();
703        let result = e
704            .result
705            .as_ref()
706            .map(|r| serde_json::to_string(r).unwrap_or_default());
707        let trunc = serde_json::to_string(&e.truncations).unwrap_or_default();
708
709        let call_idx = e.call_index.unwrap_or(0); // Default 0
710
711        tx.execute(
712            "INSERT INTO tool_calls (step_id, episode_id, tool_name, call_index, args, args_sha256, result, result_sha256, error, truncations_json)
713             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
714             ON CONFLICT(step_id, call_index) DO NOTHING",
715            (
716                &e.step_id,
717                &e.episode_id,
718                &e.tool_name,
719                call_idx,
720                args,
721                e.args_sha256.as_deref(),
722                result,
723                e.result_sha256.as_deref(),
724                e.error.as_deref(),
725                trunc
726            ),
727        ).context("insert tool call")?;
728        Ok(())
729    }
730
731    pub fn count_rows(&self, table: &str) -> anyhow::Result<i64> {
732        let conn = self.conn.lock().unwrap();
733        // Validation to prevent SQL injection (simple allowlist)
734        if !["episodes", "steps", "tool_calls", "runs", "results"].contains(&table) {
735            anyhow::bail!("Invalid table name for count_rows: {}", table);
736        }
737        let sql = format!("SELECT COUNT(*) FROM {}", table);
738        let n: i64 = conn.query_row(&sql, [], |r| r.get(0))?;
739        Ok(n)
740    }
741
742    fn update_episode_end(tx: &rusqlite::Transaction, e: &EpisodeEnd) -> anyhow::Result<()> {
743        tx.execute(
744            "UPDATE episodes SET outcome = ? WHERE id = ?",
745            (e.outcome.as_deref(), &e.episode_id),
746        )
747        .context("update episode outcome")?;
748        Ok(())
749    }
750}
751
752fn now_rfc3339ish() -> String {
753    use std::time::{SystemTime, UNIX_EPOCH};
754    let secs = SystemTime::now()
755        .duration_since(UNIX_EPOCH)
756        .unwrap()
757        .as_secs();
758    format!("unix:{}", secs)
759}
760
761fn status_to_outcome(s: &TestStatus) -> &'static str {
762    match s {
763        TestStatus::Pass => "pass",
764        TestStatus::Fail => "fail",
765        TestStatus::Flaky => "flaky",
766        TestStatus::Warn => "warn",
767        TestStatus::Error => "error",
768        TestStatus::Skipped => "skipped",
769        TestStatus::Unstable => "unstable",
770        TestStatus::AllowedOnError => "allowed_on_error",
771    }
772}
773
774fn migrate_v030(conn: &Connection) -> anyhow::Result<()> {
775    let cols = get_columns(conn, "results")?;
776    add_column_if_missing(conn, &cols, "results", "fingerprint", "TEXT")?;
777    add_column_if_missing(conn, &cols, "results", "skip_reason", "TEXT")?;
778    add_column_if_missing(conn, &cols, "results", "attempts_json", "TEXT")?;
779    Ok(())
780}
781
782fn get_columns(
783    conn: &Connection,
784    table: &str,
785) -> anyhow::Result<std::collections::HashSet<String>> {
786    let mut stmt = conn.prepare(&format!("PRAGMA table_info({})", table))?;
787    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
788    let mut out = std::collections::HashSet::new();
789    for r in rows {
790        out.insert(r?);
791    }
792    Ok(out)
793}
794
795impl Store {
796    pub fn get_latest_episode_graph_by_test_id(
797        &self,
798        test_id: &str,
799    ) -> anyhow::Result<crate::agent_assertions::EpisodeGraph> {
800        let conn = self.conn.lock().unwrap();
801
802        // 1. Find latest episode for this test_id
803        let mut stmt = conn.prepare(
804            "SELECT id FROM episodes
805             WHERE test_id = ?1
806             ORDER BY timestamp DESC
807             LIMIT 1",
808        )?;
809
810        let episode_id: String = stmt.query_row(params![test_id], |row| row.get(0))
811            .map_err(|e| anyhow::anyhow!("E_TRACE_EPISODE_MISSING: No episode found for test_id={} (fallback check) : {}", test_id, e))?;
812
813        // 2. Load steps (V2 schema)
814        let mut stmt = conn.prepare(
815            "SELECT id, episode_id, idx, kind, name, content
816             FROM steps
817             WHERE episode_id = ?1
818             ORDER BY idx ASC",
819        )?;
820
821        let steps_iter = stmt.query_map(params![episode_id], |row| {
822            Ok(crate::storage::rows::StepRow {
823                id: row.get(0)?,
824                episode_id: row.get(1)?,
825                idx: row.get(2)?,
826                kind: row.get(3)?,
827                name: row.get(4)?,
828                content: row.get(5)?,
829            })
830        })?;
831
832        let mut steps = Vec::new();
833        for step in steps_iter {
834            steps.push(step?);
835        }
836
837        // 3. Load tool calls (Joined for ordering)
838        let mut stmt_tools = conn.prepare(
839            "SELECT tc.id, tc.step_id, tc.episode_id, tc.tool_name, tc.call_index, tc.args, tc.result
840             FROM tool_calls tc
841             JOIN steps s ON tc.step_id = s.id
842             WHERE tc.episode_id = ?
843             ORDER BY s.idx ASC, tc.call_index ASC"
844        )?;
845
846        let tc_iter = stmt_tools.query_map(params![episode_id], |row| {
847            Ok(crate::storage::rows::ToolCallRow {
848                id: row.get(0)?,
849                step_id: row.get(1)?,
850                episode_id: row.get(2)?,
851                tool_name: row.get(3)?,
852                call_index: row.get(4)?,
853                args: row.get(5)?,
854                result: row.get(6)?,
855            })
856        })?;
857
858        let mut tool_calls = Vec::new();
859        for tc in tc_iter {
860            tool_calls.push(tc?);
861        }
862
863        Ok(crate::agent_assertions::EpisodeGraph {
864            episode_id,
865            steps,
866            tool_calls,
867        })
868    }
869}
870
871// Keep helper functions separate
872fn add_column_if_missing(
873    conn: &Connection,
874    cols: &std::collections::HashSet<String>,
875    table: &str,
876    col: &str,
877    ty: &str,
878) -> anyhow::Result<()> {
879    if !cols.contains(col) {
880        let sql = format!("ALTER TABLE {} ADD COLUMN {} {}", table, col, ty);
881        conn.execute(&sql, [])?;
882    }
883    Ok(())
884}