1use crate::model::{AttemptRow, EvalConfig, LlmResponse, TestResultRow, TestStatus};
2use crate::trace::schema::{EpisodeEnd, EpisodeStart, StepEntry, ToolCallEntry, TraceEvent};
3use anyhow::Context;
4use rusqlite::{params, Connection};
5use std::path::Path;
6use std::sync::{Arc, Mutex};
7
8#[derive(Clone)]
9pub struct Store {
10 pub conn: Arc<Mutex<Connection>>,
11}
12
13pub struct StoreStats {
14 pub runs: Option<u64>,
15 pub results: Option<u64>,
16 pub last_run_id: Option<i64>,
17 pub last_run_at: Option<String>,
18 pub version: Option<String>,
19}
20
21impl Store {
22 pub fn open(path: &Path) -> anyhow::Result<Self> {
23 let conn = Connection::open(path).context("failed to open sqlite db")?;
24 conn.execute("PRAGMA foreign_keys = ON", [])?;
25 Ok(Self {
26 conn: Arc::new(Mutex::new(conn)),
27 })
28 }
29
30 pub fn memory() -> anyhow::Result<Self> {
31 let conn = Connection::open_in_memory().context("failed to open in-memory sqlite db")?;
33 Ok(Self {
34 conn: Arc::new(Mutex::new(conn)),
35 })
36 }
37
38 pub fn init_schema(&self) -> anyhow::Result<()> {
39 let conn = self.conn.lock().unwrap();
40 conn.execute_batch(crate::storage::schema::DDL)?;
41
42 migrate_v030(&conn)?;
44
45 let _ = conn.execute(
50 "CREATE INDEX IF NOT EXISTS idx_results_fingerprint ON results(fingerprint)",
51 [],
52 );
53
54 Ok(())
55 }
56
57 pub fn fetch_recent_results(
58 &self,
59 suite: &str,
60 limit: u32,
61 ) -> anyhow::Result<Vec<crate::model::TestResultRow>> {
62 let conn = self.conn.lock().unwrap();
63 let mut stmt = conn.prepare(
64 "SELECT
65 r.test_id, r.outcome, r.duration_ms, r.score, r.attempts_json,
66 r.fingerprint, r.skip_reason
67 FROM results r
68 JOIN runs ON r.run_id = runs.id
69 WHERE runs.suite = ?1
70 ORDER BY r.id DESC
71 LIMIT ?2",
72 )?;
73
74 let rows = stmt.query_map(rusqlite::params![suite, limit], |row| {
75 let attempts_str: Option<String> = row.get(4)?;
76
77 let attempts: Option<Vec<crate::model::AttemptRow>> = match attempts_str {
79 Some(s) if !s.trim().is_empty() => serde_json::from_str(&s).ok(),
80 _ => None,
81 };
82
83 let (message, details) = attempts
84 .as_ref()
85 .and_then(|v| v.last())
86 .map(|a| (a.message.clone(), a.details.clone()))
87 .unwrap_or_else(|| (String::new(), serde_json::json!({})));
88
89 let cached = false;
90
91 Ok(crate::model::TestResultRow {
92 test_id: row.get(0)?,
93 status: crate::model::TestStatus::parse(&row.get::<_, String>(1)?),
94 message,
95 duration_ms: row.get(2)?,
96 details,
97 score: row.get(3)?,
98 cached,
99 fingerprint: row.get(5)?,
100 skip_reason: row.get(6)?,
101 attempts,
102 error_policy_applied: None,
103 })
104 })?;
105
106 let mut results = Vec::new();
107 for r in rows {
108 results.push(r?);
109 }
110 Ok(results)
111 }
112
113 pub fn fetch_results_for_last_n_runs(
114 &self,
115 suite: &str,
116 n: u32,
117 ) -> anyhow::Result<Vec<crate::model::TestResultRow>> {
118 let conn = self.conn.lock().unwrap();
119 let mut stmt = conn.prepare(
120 "SELECT
121 r.test_id, r.outcome, r.duration_ms, r.score, r.attempts_json,
122 r.fingerprint, r.skip_reason
123 FROM results r
124 JOIN runs ON r.run_id = runs.id
125 WHERE runs.id IN (
126 SELECT id FROM runs WHERE suite = ?1 ORDER BY id DESC LIMIT ?2
127 )
128 ORDER BY r.id DESC",
129 )?;
130
131 let rows = stmt.query_map(rusqlite::params![suite, n], |row| {
132 let attempts_str: Option<String> = row.get(4)?;
133
134 let (message, details) =
135 if let Some(s) = attempts_str.as_ref().filter(|s| !s.trim().is_empty()) {
136 if let Ok(attempts) = serde_json::from_str::<Vec<crate::model::AttemptRow>>(s) {
137 attempts
138 .last()
139 .map(|a| (a.message.clone(), a.details.clone()))
140 .unwrap_or_else(|| (String::new(), serde_json::json!({})))
141 } else {
142 (String::new(), serde_json::json!({}))
143 }
144 } else {
145 (String::new(), serde_json::json!({}))
146 };
147
148 let attempts: Option<Vec<crate::model::AttemptRow>> =
149 attempts_str.and_then(|s| serde_json::from_str(&s).ok());
150
151 Ok(crate::model::TestResultRow {
152 test_id: row.get(0)?,
153 status: crate::model::TestStatus::parse(&row.get::<_, String>(1)?),
154 message,
155 duration_ms: row.get(2)?,
156 details,
157 score: row.get(3)?,
158 cached: false,
159 fingerprint: row.get(5)?,
160 skip_reason: row.get(6)?,
161 attempts,
162 error_policy_applied: None,
163 })
164 })?;
165
166 let mut results = Vec::new();
167 for r in rows {
168 results.push(r?);
169 }
170 Ok(results)
171 }
172
173 pub fn get_latest_run_id(&self, suite: &str) -> anyhow::Result<Option<i64>> {
174 let conn = self.conn.lock().unwrap();
175 let mut stmt =
176 conn.prepare("SELECT id FROM runs WHERE suite = ?1 ORDER BY id DESC LIMIT 1")?;
177 let mut rows = stmt.query(params![suite])?;
178 if let Some(row) = rows.next()? {
179 Ok(Some(row.get(0)?))
180 } else {
181 Ok(None)
182 }
183 }
184
185 pub fn fetch_results_for_run(
186 &self,
187 run_id: i64,
188 ) -> anyhow::Result<Vec<crate::model::TestResultRow>> {
189 let conn = self.conn.lock().unwrap();
190 let mut stmt = conn.prepare(
191 "SELECT
192 r.test_id, r.outcome, r.duration_ms, r.score, r.attempts_json,
193 r.fingerprint, r.skip_reason
194 FROM results r
195 WHERE r.run_id = ?1
196 ORDER BY r.test_id ASC",
197 )?;
198
199 let rows = stmt.query_map(params![run_id], |row| {
200 let attempts_str: Option<String> = row.get(4)?;
201
202 let (message, details) =
203 if let Some(s) = attempts_str.as_ref().filter(|s| !s.trim().is_empty()) {
204 if let Ok(attempts) = serde_json::from_str::<Vec<crate::model::AttemptRow>>(s) {
205 attempts
206 .last()
207 .map(|a| (a.message.clone(), a.details.clone()))
208 .unwrap_or_else(|| (String::new(), serde_json::json!({})))
209 } else {
210 (String::new(), serde_json::json!({}))
211 }
212 } else {
213 (String::new(), serde_json::json!({}))
214 };
215
216 let attempts: Option<Vec<crate::model::AttemptRow>> =
217 attempts_str.and_then(|s| serde_json::from_str(&s).ok());
218
219 Ok(crate::model::TestResultRow {
220 test_id: row.get(0)?,
221 status: crate::model::TestStatus::parse(&row.get::<_, String>(1)?),
222 message,
223 duration_ms: row.get(2)?,
224 details,
225 score: row.get(3)?,
226 cached: false,
227 fingerprint: row.get(5)?,
228 skip_reason: row.get(6)?,
229 attempts,
230 error_policy_applied: None,
231 })
232 })?;
233
234 let mut results = Vec::new();
235 for r in rows {
236 results.push(r?);
237 }
238 Ok(results)
239 }
240
241 pub fn get_last_passing_by_fingerprint(
242 &self,
243 fingerprint: &str,
244 ) -> anyhow::Result<Option<TestResultRow>> {
245 let conn = self.conn.lock().unwrap();
246 let mut stmt = conn.prepare(
249 "SELECT r.test_id, r.outcome, r.score, r.duration_ms, r.output_json, r.skip_reason, run.id, run.started_at
250 FROM results r
251 JOIN runs run ON r.run_id = run.id
252 WHERE r.fingerprint = ?1 AND r.outcome = 'pass'
253 ORDER BY r.id DESC LIMIT 1"
254 )?;
255
256 let mut rows = stmt.query(params![fingerprint])?;
257 if let Some(row) = rows.next()? {
258 let outcome: String = row.get(1)?;
259 let status = match outcome.as_str() {
260 "pass" => TestStatus::Pass,
261 _ => TestStatus::Pass,
262 };
263
264 let skip_reason: Option<String> = row.get(5)?;
265 let run_id: i64 = row.get(6)?;
266 let started_at: String = row.get(7)?;
267
268 let details = serde_json::json!({
269 "skip": {
270 "reason": skip_reason.clone().unwrap_or_else(|| "fingerprint_match".into()),
271 "fingerprint": fingerprint,
272 "previous_run_id": run_id,
273 "previous_at": started_at,
274 "origin_run_id": run_id,
275 "previous_score": row.get::<_, Option<f64>>(2)?
276 }
277 });
278
279 Ok(Some(TestResultRow {
280 test_id: row.get(0)?,
281 status,
282 message: skip_reason.unwrap_or_else(|| "fingerprint_match".to_string()),
283 score: row.get(2)?,
284 duration_ms: row.get(3)?,
285 cached: true,
286 details,
287 fingerprint: Some(fingerprint.to_string()),
288 skip_reason: None,
289 attempts: None,
290 error_policy_applied: None,
291 }))
292 } else {
293 Ok(None)
294 }
295 }
296
297 pub fn insert_run(&self, suite: &str) -> anyhow::Result<i64> {
298 let conn = self.conn.lock().unwrap();
299 let started_at = chrono::Utc::now().to_rfc3339();
300 conn.execute(
301 "INSERT INTO runs(suite, started_at, status) VALUES (?1, ?2, ?3)",
302 params![suite, started_at, "running"],
303 )?;
304 Ok(conn.last_insert_rowid())
305 }
306
307 pub fn create_run(&self, cfg: &EvalConfig) -> anyhow::Result<i64> {
308 let started_at = now_rfc3339ish();
309 let conn = self.conn.lock().unwrap();
310 conn.execute(
311 "INSERT INTO runs(suite, started_at, status, config_json) VALUES (?1, ?2, ?3, ?4)",
312 params![
313 cfg.suite,
314 started_at,
315 "running",
316 serde_json::to_string(cfg)?
317 ],
318 )?;
319 Ok(conn.last_insert_rowid())
320 }
321
322 pub fn finalize_run(&self, run_id: i64, status: &str) -> anyhow::Result<()> {
323 let conn = self.conn.lock().unwrap();
324 conn.execute(
325 "UPDATE runs SET status=?1 WHERE id=?2",
326 params![status, run_id],
327 )?;
328 Ok(())
329 }
330
331 pub fn insert_result_embedded(
332 &self,
333 run_id: i64,
334 row: &TestResultRow,
335 attempts: &[AttemptRow],
336 output: &LlmResponse,
337 ) -> anyhow::Result<()> {
338 let conn = self.conn.lock().unwrap();
339
340 conn.execute(
342 "INSERT INTO results(run_id, test_id, outcome, score, duration_ms, attempts_json, output_json, fingerprint, skip_reason)
343 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
344 params![
345 run_id,
346 row.test_id,
347 status_to_outcome(&row.status),
348 row.score,
349 row.duration_ms.map(|v| v as i64),
350 serde_json::to_string(attempts)?,
351 serde_json::to_string(output)?,
352 row.fingerprint,
353 row.skip_reason
354 ],
355 )?;
356
357 let result_id = conn.last_insert_rowid();
358
359 let mut stmt = conn.prepare(
361 "INSERT INTO attempts(result_id, attempt_number, outcome, score, duration_ms, output_json, error_message)
362 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"
363 )?;
364
365 for attempt in attempts {
366 stmt.execute(params![
367 result_id,
368 attempt.attempt_no as i64,
369 status_to_outcome(&attempt.status),
370 0.0, attempt.duration_ms.map(|v| v as i64),
372 serde_json::to_string(&attempt.details)?,
373 Option::<String>::None
374 ])?;
375 }
376
377 Ok(())
378 }
379
380 pub fn quarantine_get_reason(
384 &self,
385 suite: &str,
386 test_id: &str,
387 ) -> anyhow::Result<Option<String>> {
388 let conn = self.conn.lock().unwrap();
389 let mut stmt =
390 conn.prepare("SELECT reason FROM quarantine WHERE suite=?1 AND test_id=?2")?;
391 let mut rows = stmt.query(params![suite, test_id])?;
392 if let Some(row) = rows.next()? {
393 Ok(Some(row.get::<_, Option<String>>(0)?.unwrap_or_default()))
394 } else {
395 Ok(None)
396 }
397 }
398
399 pub fn quarantine_add(&self, suite: &str, test_id: &str, reason: &str) -> anyhow::Result<()> {
400 let conn = self.conn.lock().unwrap();
401 conn.execute(
402 "INSERT INTO quarantine(suite, test_id, reason, added_at)
403 VALUES (?1, ?2, ?3, ?4)
404 ON CONFLICT(suite, test_id) DO UPDATE SET reason=excluded.reason, added_at=excluded.added_at",
405 params![suite, test_id, reason, now_rfc3339ish()],
406 )?;
407 Ok(())
408 }
409
410 pub fn quarantine_remove(&self, suite: &str, test_id: &str) -> anyhow::Result<()> {
411 let conn = self.conn.lock().unwrap();
412 conn.execute(
413 "DELETE FROM quarantine WHERE suite=?1 AND test_id=?2",
414 params![suite, test_id],
415 )?;
416 Ok(())
417 }
418
419 pub fn cache_get(&self, key: &str) -> anyhow::Result<Option<LlmResponse>> {
421 let conn = self.conn.lock().unwrap();
422 let mut stmt = conn.prepare("SELECT response_json FROM cache WHERE key=?1")?;
423 let mut rows = stmt.query(params![key])?;
424 if let Some(row) = rows.next()? {
425 let s: String = row.get(0)?;
426 let mut resp: LlmResponse = serde_json::from_str(&s)?;
427 resp.cached = true;
428 Ok(Some(resp))
429 } else {
430 Ok(None)
431 }
432 }
433
434 pub fn cache_put(&self, key: &str, resp: &LlmResponse) -> anyhow::Result<()> {
435 let conn = self.conn.lock().unwrap();
436 let created_at = now_rfc3339ish();
437 let mut to_store = resp.clone();
438 to_store.cached = false;
439 conn.execute(
440 "INSERT INTO cache(key, response_json, created_at) VALUES (?1, ?2, ?3)
441 ON CONFLICT(key) DO UPDATE SET response_json=excluded.response_json, created_at=excluded.created_at",
442 params![key, serde_json::to_string(&to_store)?, created_at],
443 )?;
444 Ok(())
445 }
446
447 pub fn get_embedding(&self, key: &str) -> anyhow::Result<Option<(String, Vec<f32>)>> {
449 let conn = self.conn.lock().unwrap();
450 let mut stmt = conn.prepare("SELECT model, vec FROM embeddings WHERE key = ?1 LIMIT 1")?;
451 let mut rows = stmt.query(params![key])?;
452
453 if let Some(row) = rows.next()? {
454 let model: String = row.get(0)?;
455 let blob: Vec<u8> = row.get(1)?;
456 let vec = crate::embeddings::util::decode_vec_f32(&blob)?;
457 Ok(Some((model, vec)))
458 } else {
459 Ok(None)
460 }
461 }
462
463 pub fn put_embedding(&self, key: &str, model: &str, vec: &[f32]) -> anyhow::Result<()> {
464 let conn = self.conn.lock().unwrap();
465 let blob = crate::embeddings::util::encode_vec_f32(vec);
466 let dims = vec.len() as i64;
467 let created_at = now_rfc3339ish();
468
469 conn.execute(
470 "INSERT OR REPLACE INTO embeddings (key, model, dims, vec, created_at)
471 VALUES (?1, ?2, ?3, ?4, ?5)",
472 params![key, model, dims, blob, created_at],
473 )?;
474 Ok(())
475 }
476 pub fn stats_best_effort(&self) -> anyhow::Result<StoreStats> {
477 let conn = self.conn.lock().unwrap();
478
479 let runs: Option<u64> = conn
480 .query_row("SELECT COUNT(*) FROM runs", [], |r| {
481 r.get::<_, i64>(0).map(|x| x as u64)
482 })
483 .ok();
484 let results: Option<u64> = conn
485 .query_row("SELECT COUNT(*) FROM results", [], |r| {
486 r.get::<_, i64>(0).map(|x| x as u64)
487 })
488 .ok();
489
490 let last: Option<(i64, String)> = conn
491 .query_row(
492 "SELECT id, started_at FROM runs ORDER BY id DESC LIMIT 1",
493 [],
494 |r| Ok((r.get(0)?, r.get(1)?)),
495 )
496 .ok();
497
498 let (last_id, last_started) = if let Some((id, s)) = last {
499 (Some(id), Some(s))
500 } else {
501 (None, None)
502 };
503
504 let v_str: Option<String> = conn
505 .query_row("PRAGMA user_version", [], |r| r.get(0))
506 .ok()
507 .map(|v: i64| v.to_string());
508
509 Ok(StoreStats {
510 runs,
511 results,
512 last_run_id: last_id,
513 last_run_at: last_started,
514 version: v_str,
515 })
516 }
517
518 pub fn get_episode_graph(
521 &self,
522 run_id: i64,
523 test_id: &str,
524 ) -> anyhow::Result<crate::agent_assertions::EpisodeGraph> {
525 let conn = self.conn.lock().unwrap();
526
527 let mut stmt = conn.prepare("SELECT id FROM episodes WHERE run_id = ? AND test_id = ?")?;
529 let mut rows = stmt.query(params![run_id, test_id])?;
530
531 let mut episode_ids = Vec::new();
532 while let Some(row) = rows.next()? {
533 episode_ids.push(row.get::<_, String>(0)?);
534 }
535
536 if episode_ids.is_empty() {
537 anyhow::bail!(
538 "E_TRACE_EPISODE_MISSING: No episode found for run_id={} test_id={}",
539 run_id,
540 test_id
541 );
542 }
543 if episode_ids.len() > 1 {
544 anyhow::bail!(
545 "E_TRACE_EPISODE_AMBIGUOUS: Multiple episodes ({}) found for run_id={} test_id={}",
546 episode_ids.len(),
547 run_id,
548 test_id
549 );
550 }
551 let episode_id = episode_ids[0].clone();
552
553 let mut stmt_steps = conn.prepare("SELECT id, episode_id, idx, kind, name, content FROM steps WHERE episode_id = ? ORDER BY idx ASC")?;
555 let step_rows = stmt_steps
556 .query_map(params![episode_id], |row| {
557 Ok(crate::storage::rows::StepRow {
558 id: row.get(0)?,
559 episode_id: row.get(1)?,
560 idx: row.get(2)?,
561 kind: row.get(3)?,
562 name: row.get(4)?,
563 content: row.get(5)?,
564 })
565 })?
566 .collect::<Result<Vec<_>, _>>()?;
567
568 let mut stmt_tools = conn.prepare(
570 "SELECT tc.id, tc.step_id, tc.episode_id, tc.tool_name, tc.call_index, tc.args, tc.result
571 FROM tool_calls tc
572 JOIN steps s ON tc.step_id = s.id
573 WHERE tc.episode_id = ?
574 ORDER BY s.idx ASC, tc.call_index ASC"
575 )?;
576 let tool_rows = stmt_tools
577 .query_map(params![episode_id], |row| {
578 Ok(crate::storage::rows::ToolCallRow {
579 id: row.get(0)?,
580 step_id: row.get(1)?,
581 episode_id: row.get(2)?,
582 tool_name: row.get(3)?,
583 call_index: row.get(4)?,
584 args: row.get(5)?,
585 result: row.get(6)?,
586 })
587 })?
588 .collect::<Result<Vec<_>, _>>()?;
589
590 Ok(crate::agent_assertions::EpisodeGraph {
591 episode_id,
592 steps: step_rows,
593 tool_calls: tool_rows,
594 })
595 }
596
597 pub fn insert_event(
600 &self,
601 event: &TraceEvent,
602 run_id: Option<i64>,
603 test_id: Option<&str>,
604 ) -> anyhow::Result<()> {
605 let mut conn = self.conn.lock().unwrap();
606 let tx = conn.transaction()?;
607 match event {
608 TraceEvent::EpisodeStart(e) => Self::insert_episode(&tx, e, run_id, test_id)?,
609 TraceEvent::Step(e) => Self::insert_step(&tx, e)?,
610 TraceEvent::ToolCall(e) => Self::insert_tool_call(&tx, e)?,
611 TraceEvent::EpisodeEnd(e) => Self::update_episode_end(&tx, e)?,
612 }
613 tx.commit()?;
614 Ok(())
615 }
616
617 pub fn insert_batch(
618 &self,
619 events: &[TraceEvent],
620 run_id: Option<i64>,
621 test_id: Option<&str>,
622 ) -> anyhow::Result<()> {
623 let mut conn = self.conn.lock().unwrap();
624 let tx = conn.transaction()?;
625 for event in events {
626 match event {
627 TraceEvent::EpisodeStart(e) => Self::insert_episode(&tx, e, run_id, test_id)?,
628 TraceEvent::Step(e) => Self::insert_step(&tx, e)?,
629 TraceEvent::ToolCall(e) => Self::insert_tool_call(&tx, e)?,
630 TraceEvent::EpisodeEnd(e) => Self::update_episode_end(&tx, e)?,
631 }
632 }
633 tx.commit()?;
634 Ok(())
635 }
636
637 fn insert_episode(
638 tx: &rusqlite::Transaction<'_>,
639 e: &EpisodeStart,
640 run_id: Option<i64>,
641 test_id: Option<&str>,
642 ) -> anyhow::Result<()> {
643 let prompt_val = e.input.get("prompt").unwrap_or(&serde_json::Value::Null);
644 let prompt_str = if let Some(s) = prompt_val.as_str() {
645 s.to_string()
646 } else {
647 serde_json::to_string(prompt_val).unwrap_or_default()
648 };
649 let meta = serde_json::to_string(&e.meta).unwrap_or_default();
650
651 let meta_test_id = e.meta.get("test_id").and_then(|v| v.as_str());
653 let effective_test_id = test_id.or(meta_test_id).or(Some(&e.episode_id));
654
655 tx.execute(
658 "INSERT INTO episodes (id, run_id, test_id, timestamp, prompt, meta_json) VALUES (?, ?, ?, ?, ?, ?)
659 ON CONFLICT(id) DO UPDATE SET
660 run_id=COALESCE(excluded.run_id, episodes.run_id),
661 test_id=COALESCE(excluded.test_id, episodes.test_id),
662 timestamp=excluded.timestamp,
663 prompt=excluded.prompt,
664 meta_json=excluded.meta_json",
665 (
666 &e.episode_id,
667 run_id,
668 effective_test_id,
669 e.timestamp,
670 prompt_str,
671 meta,
672 ),
673 ).context("insert episode")?;
674 Ok(())
675 }
676
677 fn insert_step(tx: &rusqlite::Transaction<'_>, e: &StepEntry) -> anyhow::Result<()> {
678 let meta = serde_json::to_string(&e.meta).unwrap_or_default();
679 let trunc = serde_json::to_string(&e.truncations).unwrap_or_default();
680
681 tx.execute(
683 "INSERT INTO steps (id, episode_id, idx, kind, name, content, content_sha256, truncations_json, meta_json)
684 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
685 ON CONFLICT(id) DO UPDATE SET content=excluded.content, meta_json=excluded.meta_json",
686 (
687 &e.step_id,
688 &e.episode_id,
689 e.idx,
690 &e.kind,
691 e.name.as_deref(),
692 e.content.as_deref(),
693 e.content_sha256.as_deref(),
694 trunc,
695 meta
696 ),
697 ).context("insert step")?;
698 Ok(())
699 }
700
701 fn insert_tool_call(tx: &rusqlite::Transaction<'_>, e: &ToolCallEntry) -> anyhow::Result<()> {
702 let args = serde_json::to_string(&e.args).unwrap_or_default();
703 let result = e
704 .result
705 .as_ref()
706 .map(|r| serde_json::to_string(r).unwrap_or_default());
707 let trunc = serde_json::to_string(&e.truncations).unwrap_or_default();
708
709 let call_idx = e.call_index.unwrap_or(0); tx.execute(
712 "INSERT INTO tool_calls (step_id, episode_id, tool_name, call_index, args, args_sha256, result, result_sha256, error, truncations_json)
713 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
714 ON CONFLICT(step_id, call_index) DO NOTHING",
715 (
716 &e.step_id,
717 &e.episode_id,
718 &e.tool_name,
719 call_idx,
720 args,
721 e.args_sha256.as_deref(),
722 result,
723 e.result_sha256.as_deref(),
724 e.error.as_deref(),
725 trunc
726 ),
727 ).context("insert tool call")?;
728 Ok(())
729 }
730
731 pub fn count_rows(&self, table: &str) -> anyhow::Result<i64> {
732 let conn = self.conn.lock().unwrap();
733 if !["episodes", "steps", "tool_calls", "runs", "results"].contains(&table) {
735 anyhow::bail!("Invalid table name for count_rows: {}", table);
736 }
737 let sql = format!("SELECT COUNT(*) FROM {}", table);
738 let n: i64 = conn.query_row(&sql, [], |r| r.get(0))?;
739 Ok(n)
740 }
741
742 fn update_episode_end(tx: &rusqlite::Transaction<'_>, e: &EpisodeEnd) -> anyhow::Result<()> {
743 tx.execute(
744 "UPDATE episodes SET outcome = ? WHERE id = ?",
745 (e.outcome.as_deref(), &e.episode_id),
746 )
747 .context("update episode outcome")?;
748 Ok(())
749 }
750}
751
752fn now_rfc3339ish() -> String {
753 use std::time::{SystemTime, UNIX_EPOCH};
754 let secs = SystemTime::now()
755 .duration_since(UNIX_EPOCH)
756 .unwrap()
757 .as_secs();
758 format!("unix:{}", secs)
759}
760
761fn status_to_outcome(s: &TestStatus) -> &'static str {
762 match s {
763 TestStatus::Pass => "pass",
764 TestStatus::Fail => "fail",
765 TestStatus::Flaky => "flaky",
766 TestStatus::Warn => "warn",
767 TestStatus::Error => "error",
768 TestStatus::Skipped => "skipped",
769 TestStatus::Unstable => "unstable",
770 TestStatus::AllowedOnError => "allowed_on_error",
771 }
772}
773
774fn migrate_v030(conn: &Connection) -> anyhow::Result<()> {
775 let cols = get_columns(conn, "results")?;
776 add_column_if_missing(conn, &cols, "results", "fingerprint", "TEXT")?;
777 add_column_if_missing(conn, &cols, "results", "skip_reason", "TEXT")?;
778 add_column_if_missing(conn, &cols, "results", "attempts_json", "TEXT")?;
779 Ok(())
780}
781
782fn get_columns(
783 conn: &Connection,
784 table: &str,
785) -> anyhow::Result<std::collections::HashSet<String>> {
786 let mut stmt = conn.prepare(&format!("PRAGMA table_info({})", table))?;
787 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
788 let mut out = std::collections::HashSet::new();
789 for r in rows {
790 out.insert(r?);
791 }
792 Ok(out)
793}
794
795impl Store {
796 pub fn get_latest_episode_graph_by_test_id(
797 &self,
798 test_id: &str,
799 ) -> anyhow::Result<crate::agent_assertions::EpisodeGraph> {
800 let conn = self.conn.lock().unwrap();
801
802 let mut stmt = conn.prepare(
804 "SELECT id FROM episodes
805 WHERE test_id = ?1
806 ORDER BY timestamp DESC
807 LIMIT 1",
808 )?;
809
810 let episode_id: String = stmt.query_row(params![test_id], |row| row.get(0))
811 .map_err(|e| anyhow::anyhow!("E_TRACE_EPISODE_MISSING: No episode found for test_id={} (fallback check) : {}", test_id, e))?;
812
813 let mut stmt = conn.prepare(
815 "SELECT id, episode_id, idx, kind, name, content
816 FROM steps
817 WHERE episode_id = ?1
818 ORDER BY idx ASC",
819 )?;
820
821 let steps_iter = stmt.query_map(params![episode_id], |row| {
822 Ok(crate::storage::rows::StepRow {
823 id: row.get(0)?,
824 episode_id: row.get(1)?,
825 idx: row.get(2)?,
826 kind: row.get(3)?,
827 name: row.get(4)?,
828 content: row.get(5)?,
829 })
830 })?;
831
832 let mut steps = Vec::new();
833 for step in steps_iter {
834 steps.push(step?);
835 }
836
837 let mut stmt_tools = conn.prepare(
839 "SELECT tc.id, tc.step_id, tc.episode_id, tc.tool_name, tc.call_index, tc.args, tc.result
840 FROM tool_calls tc
841 JOIN steps s ON tc.step_id = s.id
842 WHERE tc.episode_id = ?
843 ORDER BY s.idx ASC, tc.call_index ASC"
844 )?;
845
846 let tc_iter = stmt_tools.query_map(params![episode_id], |row| {
847 Ok(crate::storage::rows::ToolCallRow {
848 id: row.get(0)?,
849 step_id: row.get(1)?,
850 episode_id: row.get(2)?,
851 tool_name: row.get(3)?,
852 call_index: row.get(4)?,
853 args: row.get(5)?,
854 result: row.get(6)?,
855 })
856 })?;
857
858 let mut tool_calls = Vec::new();
859 for tc in tc_iter {
860 tool_calls.push(tc?);
861 }
862
863 Ok(crate::agent_assertions::EpisodeGraph {
864 episode_id,
865 steps,
866 tool_calls,
867 })
868 }
869}
870
871fn add_column_if_missing(
873 conn: &Connection,
874 cols: &std::collections::HashSet<String>,
875 table: &str,
876 col: &str,
877 ty: &str,
878) -> anyhow::Result<()> {
879 if !cols.contains(col) {
880 let sql = format!("ALTER TABLE {} ADD COLUMN {} {}", table, col, ty);
881 conn.execute(&sql, [])?;
882 }
883 Ok(())
884}