1use anyhow::{Context, Result};
6use duckdb::Connection;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use std::path::PathBuf;
10
11use crate::schema::init_schema;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct SessionRecord {
16 pub session_id: String,
17 pub task: String,
18 pub working_dir: String,
19 pub merkle_root: Option<Vec<u8>>,
20 pub detected_toolchain: Option<String>,
21 pub status: String,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct NodeStateRecord {
27 pub node_id: String,
28 pub session_id: String,
29 pub state: String,
30 pub v_total: f32,
31 pub merkle_hash: Option<Vec<u8>>,
32 pub attempt_count: i32,
33 pub node_class: Option<String>,
35 pub owner_plugin: Option<String>,
36 pub goal: Option<String>,
37 pub parent_id: Option<String>,
38 pub children: Option<String>,
40 pub last_error_type: Option<String>,
41 pub committed_at: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct EnergyRecord {
47 pub node_id: String,
48 pub session_id: String,
49 pub v_syn: f32,
50 pub v_str: f32,
51 pub v_log: f32,
52 pub v_boot: f32,
53 pub v_sheaf: f32,
54 pub v_total: f32,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct LlmRequestRecord {
60 pub session_id: String,
61 pub node_id: Option<String>,
62 pub model: String,
63 pub prompt: String,
64 pub response: String,
65 pub tokens_in: i32,
66 pub tokens_out: i32,
67 pub latency_ms: i32,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct StructuralDigestRecord {
73 pub digest_id: String,
74 pub session_id: String,
75 pub node_id: String,
76 pub source_path: String,
77 pub artifact_kind: String,
78 pub hash: Vec<u8>,
79 pub version: i32,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ContextProvenanceRecord {
85 pub session_id: String,
86 pub node_id: String,
87 pub context_package_id: String,
88 pub structural_hashes: String,
90 pub summary_hashes: String,
92 pub dependency_hashes: String,
94 pub included_file_count: i32,
95 pub total_bytes: i32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct EscalationReportRecord {
101 pub session_id: String,
102 pub node_id: String,
103 pub category: String,
105 pub action: String,
107 pub energy_snapshot: String,
109 pub stage_outcomes: String,
111 pub evidence: String,
113 pub affected_node_ids: String,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct RewriteRecordRow {
120 pub session_id: String,
121 pub node_id: String,
122 pub action: String,
124 pub category: String,
126 pub requeued_nodes: String,
128 pub inserted_nodes: String,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct SheafValidationRow {
135 pub session_id: String,
136 pub node_id: String,
137 pub validator_class: String,
138 pub plugin_source: Option<String>,
139 pub passed: bool,
140 pub evidence_summary: String,
141 pub affected_files: String,
143 pub v_sheaf_contribution: f32,
144 pub requeue_targets: String,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ProvisionalBranchRow {
155 pub branch_id: String,
156 pub session_id: String,
157 pub node_id: String,
158 pub parent_node_id: String,
159 pub state: String,
160 pub parent_seal_hash: Option<Vec<u8>>,
161 pub sandbox_dir: Option<String>,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct BranchLineageRow {
167 pub lineage_id: String,
168 pub parent_branch_id: String,
169 pub child_branch_id: String,
170 pub depends_on_seal: bool,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct InterfaceSealRow {
176 pub seal_id: String,
177 pub session_id: String,
178 pub node_id: String,
179 pub sealed_path: String,
180 pub artifact_kind: String,
181 pub seal_hash: Vec<u8>,
182 pub version: i32,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct BranchFlushRow {
188 pub flush_id: String,
189 pub session_id: String,
190 pub parent_node_id: String,
191 pub flushed_branch_ids: String,
193 pub requeue_node_ids: String,
195 pub reason: String,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct TaskGraphEdgeRow {
205 pub session_id: String,
206 pub parent_node_id: String,
207 pub child_node_id: String,
208 pub edge_type: String,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ReviewOutcomeRow {
214 pub session_id: String,
215 pub node_id: String,
216 pub outcome: String,
218 pub reviewer_note: Option<String>,
219 pub energy_at_review: Option<f64>,
221 pub degraded: Option<bool>,
223 pub escalation_category: Option<String>,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct VerificationResultRow {
230 pub session_id: String,
231 pub node_id: String,
232 pub result_json: String,
234 pub syntax_ok: bool,
236 pub build_ok: bool,
237 pub tests_ok: bool,
238 pub lint_ok: bool,
239 pub diagnostics_count: i32,
240 pub tests_passed: i32,
241 pub tests_failed: i32,
242 pub degraded: bool,
243 pub degraded_reason: Option<String>,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ArtifactBundleRow {
249 pub session_id: String,
250 pub node_id: String,
251 pub bundle_json: String,
253 pub artifact_count: i32,
254 pub command_count: i32,
255 pub touched_files: String,
257}
258
259#[derive(Debug, Clone)]
265pub struct FeatureCharterRow {
266 pub charter_id: String,
267 pub session_id: String,
268 pub scope_description: String,
269 pub max_modules: Option<i32>,
270 pub max_files: Option<i32>,
271 pub max_revisions: Option<i32>,
272 pub language_constraint: Option<String>,
273}
274
275#[derive(Debug, Clone)]
277pub struct PlanRevisionRow {
278 pub revision_id: String,
279 pub session_id: String,
280 pub sequence: i32,
281 pub plan_json: String,
282 pub reason: String,
283 pub supersedes: Option<String>,
284 pub status: String,
285}
286
287#[derive(Debug, Clone)]
289pub struct RepairFootprintRow {
290 pub footprint_id: String,
291 pub session_id: String,
292 pub node_id: String,
293 pub revision_id: String,
294 pub attempt: i32,
295 pub affected_files: String,
296 pub bundle_json: String,
297 pub diagnosis: String,
298 pub resolved: bool,
299}
300
301#[derive(Debug, Clone)]
303pub struct BudgetEnvelopeRow {
304 pub session_id: String,
305 pub max_steps: Option<i32>,
306 pub steps_used: i32,
307 pub max_revisions: Option<i32>,
308 pub revisions_used: i32,
309 pub max_cost_usd: Option<f64>,
310 pub cost_used_usd: f64,
311}
312
313use std::sync::Mutex;
314
315pub struct SessionStore {
317 conn: Mutex<Connection>,
318}
319
320impl SessionStore {
321 pub fn new() -> Result<Self> {
323 let db_path = Self::default_db_path()?;
324 Self::open(&db_path)
325 }
326
327 pub fn open(path: &PathBuf) -> Result<Self> {
329 if let Some(parent) = path.parent() {
331 std::fs::create_dir_all(parent)?;
332 }
333
334 let conn = Connection::open(path).context("Failed to open DuckDB")?;
335 init_schema(&conn)?;
336
337 Ok(Self {
338 conn: Mutex::new(conn),
339 })
340 }
341
342 pub fn open_read_only(path: &std::path::Path) -> Result<Self> {
348 let config = duckdb::Config::default()
349 .access_mode(duckdb::AccessMode::ReadOnly)
350 .context("Failed to configure DuckDB read-only mode")?;
351 let conn = Connection::open_with_flags(path, config)
352 .context("Failed to open DuckDB in read-only mode")?;
353 Ok(Self {
354 conn: Mutex::new(conn),
355 })
356 }
357
358 pub fn default_db_path() -> Result<PathBuf> {
360 perspt_core::paths::database_path().context("Could not determine platform data directory")
361 }
362
363 pub fn create_session(&self, session: &SessionRecord) -> Result<()> {
365 self.conn.lock().unwrap().execute(
366 r#"
367 INSERT INTO sessions (session_id, task, working_dir, merkle_root, detected_toolchain, status)
368 VALUES (?, ?, ?, ?, ?, ?)
369 "#,
370 [
371 &session.session_id,
372 &session.task,
373 &session.working_dir,
374 &session.merkle_root.as_ref().map(hex::encode).unwrap_or_default(),
375 &session.detected_toolchain.clone().unwrap_or_default(),
376 &session.status,
377 ],
378 )?;
379 Ok(())
380 }
381
382 pub fn update_merkle_root(&self, session_id: &str, merkle_root: &[u8]) -> Result<()> {
384 self.conn.lock().unwrap().execute(
385 "UPDATE sessions SET merkle_root = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
386 [hex::encode(merkle_root), session_id.to_string()],
387 )?;
388 Ok(())
389 }
390
391 pub fn record_node_state(&self, record: &NodeStateRecord) -> Result<()> {
393 let v_total = record.v_total.to_string();
394 let merkle_hash = record
395 .merkle_hash
396 .as_ref()
397 .map(hex::encode)
398 .unwrap_or_default();
399 let attempt_count = record.attempt_count.to_string();
400 let node_class = record.node_class.clone().unwrap_or_default();
401 let owner_plugin = record.owner_plugin.clone().unwrap_or_default();
402 let goal = record.goal.clone().unwrap_or_default();
403 let parent_id = record.parent_id.clone().unwrap_or_default();
404 let children = record.children.clone().unwrap_or_default();
405 let last_error_type = record.last_error_type.clone().unwrap_or_default();
406 let committed_at = record.committed_at.clone().unwrap_or_default();
407
408 self.conn.lock().unwrap().execute(
409 r#"
410 INSERT INTO node_states (node_id, session_id, state, v_total, merkle_hash, attempt_count,
411 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at)
412 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
413 "#,
414 [
415 &record.node_id,
416 &record.session_id,
417 &record.state,
418 &v_total,
419 &merkle_hash,
420 &attempt_count,
421 &node_class,
422 &owner_plugin,
423 &goal,
424 &parent_id,
425 &children,
426 &last_error_type,
427 &committed_at,
428 ],
429 )?;
430 Ok(())
431 }
432
433 pub fn record_energy(&self, record: &EnergyRecord) -> Result<()> {
435 self.conn.lock().unwrap().execute(
436 r#"
437 INSERT INTO energy_history (node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total)
438 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
439 "#,
440 [
441 &record.node_id,
442 &record.session_id,
443 &record.v_syn.to_string(),
444 &record.v_str.to_string(),
445 &record.v_log.to_string(),
446 &record.v_boot.to_string(),
447 &record.v_sheaf.to_string(),
448 &record.v_total.to_string(),
449 ],
450 )?;
451 Ok(())
452 }
453
454 pub fn calculate_hash(content: &[u8]) -> Vec<u8> {
456 let mut hasher = Sha256::new();
457 hasher.update(content);
458 hasher.finalize().to_vec()
459 }
460
461 pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>> {
463 let conn = self.conn.lock().unwrap();
464 let mut stmt = conn.prepare(
465 "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status FROM sessions WHERE session_id = ?"
466 )?;
467
468 let mut rows = stmt.query([session_id])?;
469 if let Some(row) = rows.next()? {
470 let merkle_root: Option<Vec<u8>> = row.get(3).ok();
473
474 Ok(Some(SessionRecord {
475 session_id: row.get(0)?,
476 task: row.get(1)?,
477 working_dir: row.get(2)?,
478 merkle_root,
479 detected_toolchain: row.get(4)?,
480 status: row.get(5)?,
481 }))
482 } else {
483 Ok(None)
484 }
485 }
486
487 pub fn get_session_dir(&self, session_id: &str) -> Result<PathBuf> {
489 let data_dir = dirs::data_local_dir()
490 .context("Could not find local data directory")?
491 .join("perspt")
492 .join("sessions")
493 .join(session_id);
494 Ok(data_dir)
495 }
496
497 pub fn create_session_dir(&self, session_id: &str) -> Result<PathBuf> {
499 let dir = self.get_session_dir(session_id)?;
500 if !dir.exists() {
501 std::fs::create_dir_all(&dir).context("Failed to create session directory")?;
502 }
503 Ok(dir)
504 }
505
506 pub fn get_energy_history(&self, session_id: &str, node_id: &str) -> Result<Vec<EnergyRecord>> {
508 let conn = self.conn.lock().unwrap();
509 let mut stmt = conn.prepare(
510 "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? AND node_id = ? ORDER BY timestamp"
511 )?;
512
513 let mut rows = stmt.query([session_id, node_id])?;
514 let mut records = Vec::new();
515
516 while let Some(row) = rows.next()? {
517 records.push(EnergyRecord {
518 node_id: row.get(0)?,
519 session_id: row.get(1)?,
520 v_syn: row.get::<_, f64>(2)? as f32,
521 v_str: row.get::<_, f64>(3)? as f32,
522 v_log: row.get::<_, f64>(4)? as f32,
523 v_boot: row.get::<_, f64>(5)? as f32,
524 v_sheaf: row.get::<_, f64>(6)? as f32,
525 v_total: row.get::<_, f64>(7)? as f32,
526 });
527 }
528
529 Ok(records)
530 }
531
532 pub fn get_session_energy_history(&self, session_id: &str) -> Result<Vec<EnergyRecord>> {
534 let conn = self.conn.lock().unwrap();
535 let mut stmt = conn.prepare(
536 "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? ORDER BY timestamp"
537 )?;
538
539 let mut rows = stmt.query([session_id])?;
540 let mut records = Vec::new();
541
542 while let Some(row) = rows.next()? {
543 records.push(EnergyRecord {
544 node_id: row.get(0)?,
545 session_id: row.get(1)?,
546 v_syn: row.get::<_, f64>(2)? as f32,
547 v_str: row.get::<_, f64>(3)? as f32,
548 v_log: row.get::<_, f64>(4)? as f32,
549 v_boot: row.get::<_, f64>(5)? as f32,
550 v_sheaf: row.get::<_, f64>(6)? as f32,
551 v_total: row.get::<_, f64>(7)? as f32,
552 });
553 }
554
555 Ok(records)
556 }
557
558 pub fn list_recent_sessions(&self, limit: usize) -> Result<Vec<SessionRecord>> {
560 self.list_sessions_paginated(limit, 0)
561 }
562
563 pub fn list_sessions_paginated(
565 &self,
566 limit: usize,
567 offset: usize,
568 ) -> Result<Vec<SessionRecord>> {
569 let conn = self.conn.lock().unwrap();
570 let mut stmt = conn.prepare(
571 "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status
572 FROM sessions ORDER BY created_at DESC LIMIT ? OFFSET ?",
573 )?;
574
575 let mut rows = stmt.query([limit.to_string(), offset.to_string()])?;
576 let mut records = Vec::new();
577
578 while let Some(row) = rows.next()? {
579 let merkle_root: Option<Vec<u8>> = row.get(3).ok();
580
581 records.push(SessionRecord {
582 session_id: row.get(0)?,
583 task: row.get(1)?,
584 working_dir: row.get(2)?,
585 merkle_root,
586 detected_toolchain: row.get(4)?,
587 status: row.get(5)?,
588 });
589 }
590
591 Ok(records)
592 }
593
594 pub fn count_sessions(&self) -> Result<usize> {
596 let conn = self.conn.lock().unwrap();
597 let mut stmt = conn.prepare("SELECT COUNT(*) FROM sessions")?;
598 let mut rows = stmt.query([])?;
599 if let Some(row) = rows.next()? {
600 let count: i64 = row.get(0)?;
601 Ok(count as usize)
602 } else {
603 Ok(0)
604 }
605 }
606
607 pub fn get_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
609 let conn = self.conn.lock().unwrap();
610 let mut stmt = conn.prepare(
611 "SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
612 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
613 FROM node_states WHERE session_id = ? ORDER BY created_at",
614 )?;
615
616 let mut rows = stmt.query([session_id])?;
617 let mut records = Vec::new();
618
619 while let Some(row) = rows.next()? {
620 records.push(NodeStateRecord {
621 node_id: row.get(0)?,
622 session_id: row.get(1)?,
623 state: row.get(2)?,
624 v_total: row.get::<_, f64>(3)? as f32,
625 merkle_hash: row
626 .get::<_, Option<String>>(4)?
627 .and_then(|s| hex::decode(s).ok()),
628 attempt_count: row.get(5)?,
629 node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
630 owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
631 goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
632 parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
633 children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
634 last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
635 committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
636 });
637 }
638
639 Ok(records)
640 }
641
642 pub fn update_session_status(&self, session_id: &str, status: &str) -> Result<()> {
644 self.conn.lock().unwrap().execute(
645 "UPDATE sessions SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
646 [status, session_id],
647 )?;
648 Ok(())
649 }
650
651 pub fn record_llm_request(&self, record: &LlmRequestRecord) -> Result<()> {
653 let conn = self.conn.lock().unwrap();
654 conn.execute(
655 r#"
656 INSERT INTO llm_requests (session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms)
657 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
658 "#,
659 [
660 &record.session_id,
661 &record.node_id.clone().unwrap_or_default(),
662 &record.model,
663 &record.prompt,
664 &record.response,
665 &record.tokens_in.to_string(),
666 &record.tokens_out.to_string(),
667 &record.latency_ms.to_string(),
668 ],
669 )?;
670 Ok(())
671 }
672
673 pub fn get_llm_requests(&self, session_id: &str) -> Result<Vec<LlmRequestRecord>> {
675 let conn = self.conn.lock().unwrap();
676 let mut stmt = conn.prepare(
677 "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
678 FROM llm_requests WHERE session_id = ? ORDER BY timestamp",
679 )?;
680
681 let mut rows = stmt.query([session_id])?;
682 let mut records = Vec::new();
683
684 while let Some(row) = rows.next()? {
685 let node_id: Option<String> = row.get(1)?;
686 records.push(LlmRequestRecord {
687 session_id: row.get(0)?,
688 node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
689 None
690 } else {
691 node_id
692 },
693 model: row.get(2)?,
694 prompt: row.get(3)?,
695 response: row.get(4)?,
696 tokens_in: row.get(5)?,
697 tokens_out: row.get(6)?,
698 latency_ms: row.get(7)?,
699 });
700 }
701
702 Ok(records)
703 }
704
705 pub fn count_all_llm_requests(&self) -> Result<i64> {
707 let conn = self.conn.lock().unwrap();
708 let mut stmt = conn.prepare("SELECT COUNT(*) FROM llm_requests")?;
709 let count: i64 = stmt.query_row([], |row| row.get(0))?;
710 Ok(count)
711 }
712
713 pub fn get_global_llm_summary(&self) -> Result<(i64, i64, i64, i64)> {
715 let conn = self.conn.lock().unwrap();
716 let mut stmt = conn.prepare(
717 "SELECT COUNT(*), \
718 COALESCE(SUM(CASE WHEN tokens_in > 0 THEN tokens_in ELSE (LENGTH(prompt) + 3) / 4 END), 0), \
719 COALESCE(SUM(CASE WHEN tokens_out > 0 THEN tokens_out ELSE (LENGTH(response) + 3) / 4 END), 0), \
720 COALESCE(MEDIAN(latency_ms), 0) \
721 FROM llm_requests",
722 )?;
723 let result = stmt.query_row([], |row| {
724 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
725 })?;
726 Ok(result)
727 }
728
729 pub fn get_all_llm_requests(&self, limit: usize) -> Result<Vec<LlmRequestRecord>> {
731 let conn = self.conn.lock().unwrap();
732 let mut stmt = conn.prepare(
733 "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
734 FROM llm_requests ORDER BY timestamp DESC LIMIT ?",
735 )?;
736
737 let mut rows = stmt.query([limit as i64])?;
738 let mut records = Vec::new();
739
740 while let Some(row) = rows.next()? {
741 let node_id: Option<String> = row.get(1)?;
742 records.push(LlmRequestRecord {
743 session_id: row.get(0)?,
744 node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
745 None
746 } else {
747 node_id
748 },
749 model: row.get(2)?,
750 prompt: row.get(3)?,
751 response: row.get(4)?,
752 tokens_in: row.get(5)?,
753 tokens_out: row.get(6)?,
754 latency_ms: row.get(7)?,
755 });
756 }
757
758 Ok(records)
759 }
760
761 pub fn record_structural_digest(&self, record: &StructuralDigestRecord) -> Result<()> {
767 self.conn.lock().unwrap().execute(
768 r#"
769 INSERT INTO structural_digests (digest_id, session_id, node_id, source_path, artifact_kind, hash, version)
770 VALUES (?, ?, ?, ?, ?, ?, ?)
771 "#,
772 [
773 &record.digest_id,
774 &record.session_id,
775 &record.node_id,
776 &record.source_path,
777 &record.artifact_kind,
778 &hex::encode(&record.hash),
779 &record.version.to_string(),
780 ],
781 )?;
782 Ok(())
783 }
784
785 pub fn get_structural_digests(
787 &self,
788 session_id: &str,
789 node_id: &str,
790 ) -> Result<Vec<StructuralDigestRecord>> {
791 let conn = self.conn.lock().unwrap();
792 let mut stmt = conn.prepare(
793 "SELECT digest_id, session_id, node_id, source_path, artifact_kind, hash, version
794 FROM structural_digests WHERE session_id = ? AND node_id = ? ORDER BY created_at",
795 )?;
796
797 let mut rows = stmt.query([session_id, node_id])?;
798 let mut records = Vec::new();
799
800 while let Some(row) = rows.next()? {
801 records.push(StructuralDigestRecord {
802 digest_id: row.get(0)?,
803 session_id: row.get(1)?,
804 node_id: row.get(2)?,
805 source_path: row.get(3)?,
806 artifact_kind: row.get(4)?,
807 hash: row
808 .get::<_, String>(5)
809 .ok()
810 .and_then(|s| hex::decode(s).ok())
811 .unwrap_or_default(),
812 version: row.get(5)?,
813 });
814 }
815
816 Ok(records)
817 }
818
819 pub fn record_context_provenance(&self, record: &ContextProvenanceRecord) -> Result<()> {
821 self.conn.lock().unwrap().execute(
822 r#"
823 INSERT INTO context_provenance (session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes)
824 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
825 "#,
826 [
827 &record.session_id,
828 &record.node_id,
829 &record.context_package_id,
830 &record.structural_hashes,
831 &record.summary_hashes,
832 &record.dependency_hashes,
833 &record.included_file_count.to_string(),
834 &record.total_bytes.to_string(),
835 ],
836 )?;
837 Ok(())
838 }
839
840 pub fn get_context_provenance(
842 &self,
843 session_id: &str,
844 node_id: &str,
845 ) -> Result<Option<ContextProvenanceRecord>> {
846 let conn = self.conn.lock().unwrap();
847 let mut stmt = conn.prepare(
848 "SELECT session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes
849 FROM context_provenance WHERE session_id = ? AND node_id = ? ORDER BY created_at DESC LIMIT 1",
850 )?;
851
852 let mut rows = stmt.query([session_id, node_id])?;
853 if let Some(row) = rows.next()? {
854 Ok(Some(ContextProvenanceRecord {
855 session_id: row.get(0)?,
856 node_id: row.get(1)?,
857 context_package_id: row.get(2)?,
858 structural_hashes: row.get(3)?,
859 summary_hashes: row.get(4)?,
860 dependency_hashes: row.get(5)?,
861 included_file_count: row.get(6)?,
862 total_bytes: row.get(7)?,
863 }))
864 } else {
865 Ok(None)
866 }
867 }
868
869 pub fn record_escalation_report(&self, record: &EscalationReportRecord) -> Result<()> {
875 self.conn.lock().unwrap().execute(
876 r#"
877 INSERT INTO escalation_reports (session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids)
878 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
879 "#,
880 [
881 &record.session_id,
882 &record.node_id,
883 &record.category,
884 &record.action,
885 &record.energy_snapshot,
886 &record.stage_outcomes,
887 &record.evidence,
888 &record.affected_node_ids,
889 ],
890 )?;
891 Ok(())
892 }
893
894 pub fn get_escalation_reports(&self, session_id: &str) -> Result<Vec<EscalationReportRecord>> {
896 let conn = self.conn.lock().unwrap();
897 let mut stmt = conn.prepare(
898 "SELECT session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids
899 FROM escalation_reports WHERE session_id = ? ORDER BY created_at",
900 )?;
901 let mut rows = stmt.query([session_id])?;
902 let mut records = Vec::new();
903 while let Some(row) = rows.next()? {
904 records.push(EscalationReportRecord {
905 session_id: row.get(0)?,
906 node_id: row.get(1)?,
907 category: row.get(2)?,
908 action: row.get(3)?,
909 energy_snapshot: row.get(4)?,
910 stage_outcomes: row.get(5)?,
911 evidence: row.get(6)?,
912 affected_node_ids: row.get(7)?,
913 });
914 }
915 Ok(records)
916 }
917
918 pub fn record_rewrite(&self, record: &RewriteRecordRow) -> Result<()> {
920 self.conn.lock().unwrap().execute(
921 r#"
922 INSERT INTO rewrite_records (session_id, node_id, action, category, requeued_nodes, inserted_nodes)
923 VALUES (?, ?, ?, ?, ?, ?)
924 "#,
925 [
926 &record.session_id,
927 &record.node_id,
928 &record.action,
929 &record.category,
930 &record.requeued_nodes,
931 &record.inserted_nodes,
932 ],
933 )?;
934 Ok(())
935 }
936
937 pub fn get_rewrite_records(&self, session_id: &str) -> Result<Vec<RewriteRecordRow>> {
939 let conn = self.conn.lock().unwrap();
940 let mut stmt = conn.prepare(
941 "SELECT session_id, node_id, action, category, requeued_nodes, inserted_nodes
942 FROM rewrite_records WHERE session_id = ? ORDER BY created_at",
943 )?;
944 let mut rows = stmt.query([session_id])?;
945 let mut records = Vec::new();
946 while let Some(row) = rows.next()? {
947 records.push(RewriteRecordRow {
948 session_id: row.get(0)?,
949 node_id: row.get(1)?,
950 action: row.get(2)?,
951 category: row.get(3)?,
952 requeued_nodes: row.get(4)?,
953 inserted_nodes: row.get(5)?,
954 });
955 }
956 Ok(records)
957 }
958
959 pub fn record_sheaf_validation(&self, record: &SheafValidationRow) -> Result<()> {
961 self.conn.lock().unwrap().execute(
962 r#"
963 INSERT INTO sheaf_validations (session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets)
964 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
965 "#,
966 [
967 &record.session_id,
968 &record.node_id,
969 &record.validator_class,
970 &record.plugin_source.clone().unwrap_or_default(),
971 &record.passed.to_string(),
972 &record.evidence_summary,
973 &record.affected_files,
974 &record.v_sheaf_contribution.to_string(),
975 &record.requeue_targets,
976 ],
977 )?;
978 Ok(())
979 }
980
981 pub fn get_sheaf_validations(
983 &self,
984 session_id: &str,
985 node_id: &str,
986 ) -> Result<Vec<SheafValidationRow>> {
987 let conn = self.conn.lock().unwrap();
988 let mut stmt = conn.prepare(
989 "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
990 FROM sheaf_validations WHERE session_id = ? AND node_id = ? ORDER BY created_at",
991 )?;
992 let mut rows = stmt.query([session_id, node_id])?;
993 let mut records = Vec::new();
994 while let Some(row) = rows.next()? {
995 records.push(SheafValidationRow {
996 session_id: row.get(0)?,
997 node_id: row.get(1)?,
998 validator_class: row.get(2)?,
999 plugin_source: row.get::<_, Option<String>>(3)?,
1000 passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
1001 evidence_summary: row.get(5)?,
1002 affected_files: row.get(6)?,
1003 v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
1004 requeue_targets: row.get(8)?,
1005 });
1006 }
1007 Ok(records)
1008 }
1009
1010 pub fn get_all_sheaf_validations(&self, session_id: &str) -> Result<Vec<SheafValidationRow>> {
1012 let conn = self.conn.lock().unwrap();
1013 let mut stmt = conn.prepare(
1014 "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
1015 FROM sheaf_validations WHERE session_id = ? ORDER BY created_at",
1016 )?;
1017 let mut rows = stmt.query([session_id])?;
1018 let mut records = Vec::new();
1019 while let Some(row) = rows.next()? {
1020 records.push(SheafValidationRow {
1021 session_id: row.get(0)?,
1022 node_id: row.get(1)?,
1023 validator_class: row.get(2)?,
1024 plugin_source: row.get::<_, Option<String>>(3)?,
1025 passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
1026 evidence_summary: row.get(5)?,
1027 affected_files: row.get(6)?,
1028 v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
1029 requeue_targets: row.get(8)?,
1030 });
1031 }
1032 Ok(records)
1033 }
1034
1035 pub fn record_provisional_branch(&self, record: &ProvisionalBranchRow) -> Result<()> {
1041 self.conn.lock().unwrap().execute(
1042 r#"
1043 INSERT INTO provisional_branches (branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir)
1044 VALUES (?, ?, ?, ?, ?, ?, ?)
1045 "#,
1046 [
1047 &record.branch_id,
1048 &record.session_id,
1049 &record.node_id,
1050 &record.parent_node_id,
1051 &record.state,
1052 &record.parent_seal_hash.as_ref().map(hex::encode).unwrap_or_default(),
1053 &record.sandbox_dir.clone().unwrap_or_default(),
1054 ],
1055 )?;
1056 Ok(())
1057 }
1058
1059 pub fn update_branch_state(&self, branch_id: &str, new_state: &str) -> Result<()> {
1061 self.conn.lock().unwrap().execute(
1062 "UPDATE provisional_branches SET state = ?, updated_at = CURRENT_TIMESTAMP WHERE branch_id = ?",
1063 [new_state, branch_id],
1064 )?;
1065 Ok(())
1066 }
1067
1068 pub fn get_provisional_branches(&self, session_id: &str) -> Result<Vec<ProvisionalBranchRow>> {
1070 let conn = self.conn.lock().unwrap();
1071 let mut stmt = conn.prepare(
1072 "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1073 FROM provisional_branches WHERE session_id = ? ORDER BY created_at",
1074 )?;
1075 let mut rows = stmt.query([session_id])?;
1076 let mut records = Vec::new();
1077 while let Some(row) = rows.next()? {
1078 let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1080 records.push(ProvisionalBranchRow {
1081 branch_id: row.get(0)?,
1082 session_id: row.get(1)?,
1083 node_id: row.get(2)?,
1084 parent_node_id: row.get(3)?,
1085 state: row.get(4)?,
1086 parent_seal_hash,
1087 sandbox_dir: row.get::<_, Option<String>>(6)?,
1088 });
1089 }
1090 Ok(records)
1091 }
1092
1093 pub fn get_live_branches_for_parent(
1095 &self,
1096 session_id: &str,
1097 parent_node_id: &str,
1098 ) -> Result<Vec<ProvisionalBranchRow>> {
1099 let conn = self.conn.lock().unwrap();
1100 let mut stmt = conn.prepare(
1101 "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1102 FROM provisional_branches
1103 WHERE session_id = ? AND parent_node_id = ? AND state IN ('active', 'sealed')
1104 ORDER BY created_at",
1105 )?;
1106 let mut rows = stmt.query([session_id, parent_node_id])?;
1107 let mut records = Vec::new();
1108 while let Some(row) = rows.next()? {
1109 let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1111 records.push(ProvisionalBranchRow {
1112 branch_id: row.get(0)?,
1113 session_id: row.get(1)?,
1114 node_id: row.get(2)?,
1115 parent_node_id: row.get(3)?,
1116 state: row.get(4)?,
1117 parent_seal_hash,
1118 sandbox_dir: row.get::<_, Option<String>>(6)?,
1119 });
1120 }
1121 Ok(records)
1122 }
1123
1124 pub fn flush_branches_for_parent(
1126 &self,
1127 session_id: &str,
1128 parent_node_id: &str,
1129 ) -> Result<Vec<String>> {
1130 let live = self.get_live_branches_for_parent(session_id, parent_node_id)?;
1131 let branch_ids: Vec<String> = live.iter().map(|b| b.branch_id.clone()).collect();
1132 for bid in &branch_ids {
1133 self.update_branch_state(bid, "flushed")?;
1134 }
1135 Ok(branch_ids)
1136 }
1137
1138 pub fn record_branch_lineage(&self, record: &BranchLineageRow) -> Result<()> {
1144 self.conn.lock().unwrap().execute(
1145 r#"
1146 INSERT INTO branch_lineage (lineage_id, parent_branch_id, child_branch_id, depends_on_seal)
1147 VALUES (?, ?, ?, ?)
1148 "#,
1149 [
1150 &record.lineage_id,
1151 &record.parent_branch_id,
1152 &record.child_branch_id,
1153 &record.depends_on_seal.to_string(),
1154 ],
1155 )?;
1156 Ok(())
1157 }
1158
1159 pub fn get_child_branches(&self, parent_branch_id: &str) -> Result<Vec<String>> {
1161 let conn = self.conn.lock().unwrap();
1162 let mut stmt =
1163 conn.prepare("SELECT child_branch_id FROM branch_lineage WHERE parent_branch_id = ?")?;
1164 let mut rows = stmt.query([parent_branch_id])?;
1165 let mut ids = Vec::new();
1166 while let Some(row) = rows.next()? {
1167 ids.push(row.get(0)?);
1168 }
1169 Ok(ids)
1170 }
1171
1172 pub fn record_interface_seal(&self, record: &InterfaceSealRow) -> Result<()> {
1178 self.conn.lock().unwrap().execute(
1179 r#"
1180 INSERT INTO interface_seals (seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version)
1181 VALUES (?, ?, ?, ?, ?, ?, ?)
1182 "#,
1183 [
1184 &record.seal_id,
1185 &record.session_id,
1186 &record.node_id,
1187 &record.sealed_path,
1188 &record.artifact_kind,
1189 &hex::encode(&record.seal_hash),
1190 &record.version.to_string(),
1191 ],
1192 )?;
1193 Ok(())
1194 }
1195
1196 pub fn get_interface_seals(
1198 &self,
1199 session_id: &str,
1200 node_id: &str,
1201 ) -> Result<Vec<InterfaceSealRow>> {
1202 let conn = self.conn.lock().unwrap();
1203 let mut stmt = conn.prepare(
1204 "SELECT seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version
1205 FROM interface_seals WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1206 )?;
1207 let mut rows = stmt.query([session_id, node_id])?;
1208 let mut records = Vec::new();
1209 while let Some(row) = rows.next()? {
1210 records.push(InterfaceSealRow {
1211 seal_id: row.get(0)?,
1212 session_id: row.get(1)?,
1213 node_id: row.get(2)?,
1214 sealed_path: row.get(3)?,
1215 artifact_kind: row.get(4)?,
1216 seal_hash: row
1217 .get::<_, String>(5)
1218 .ok()
1219 .and_then(|h| hex::decode(h).ok())
1220 .unwrap_or_default(),
1221 version: row.get::<_, i32>(6)?,
1222 });
1223 }
1224 Ok(records)
1225 }
1226
1227 pub fn has_interface_seals(&self, session_id: &str, node_id: &str) -> Result<bool> {
1229 let conn = self.conn.lock().unwrap();
1230 let count: i64 = conn.query_row(
1231 "SELECT COUNT(*) FROM interface_seals WHERE session_id = ? AND node_id = ?",
1232 [session_id, node_id],
1233 |row| row.get(0),
1234 )?;
1235 Ok(count > 0)
1236 }
1237
1238 pub fn record_branch_flush(&self, record: &BranchFlushRow) -> Result<()> {
1244 self.conn.lock().unwrap().execute(
1245 r#"
1246 INSERT INTO branch_flushes (flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason)
1247 VALUES (?, ?, ?, ?, ?, ?)
1248 "#,
1249 [
1250 &record.flush_id,
1251 &record.session_id,
1252 &record.parent_node_id,
1253 &record.flushed_branch_ids,
1254 &record.requeue_node_ids,
1255 &record.reason,
1256 ],
1257 )?;
1258 Ok(())
1259 }
1260
1261 pub fn get_branch_flushes(&self, session_id: &str) -> Result<Vec<BranchFlushRow>> {
1263 let conn = self.conn.lock().unwrap();
1264 let mut stmt = conn.prepare(
1265 "SELECT flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason
1266 FROM branch_flushes WHERE session_id = ? ORDER BY created_at",
1267 )?;
1268 let mut rows = stmt.query([session_id])?;
1269 let mut records = Vec::new();
1270 while let Some(row) = rows.next()? {
1271 records.push(BranchFlushRow {
1272 flush_id: row.get(0)?,
1273 session_id: row.get(1)?,
1274 parent_node_id: row.get(2)?,
1275 flushed_branch_ids: row.get(3)?,
1276 requeue_node_ids: row.get(4)?,
1277 reason: row.get(5)?,
1278 });
1279 }
1280 Ok(records)
1281 }
1282
1283 pub fn get_latest_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
1291 let conn = self.conn.lock().unwrap();
1292 let mut stmt = conn.prepare(
1293 "WITH ranked AS ( \
1294 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1295 FROM node_states WHERE session_id = ? \
1296 ) \
1297 SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
1298 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
1299 FROM ranked WHERE rn = 1 ORDER BY created_at",
1300 )?;
1301
1302 let mut rows = stmt.query([session_id])?;
1303 let mut records = Vec::new();
1304
1305 while let Some(row) = rows.next()? {
1306 records.push(NodeStateRecord {
1307 node_id: row.get(0)?,
1308 session_id: row.get(1)?,
1309 state: row.get(2)?,
1310 v_total: row.get::<_, f64>(3)? as f32,
1311 merkle_hash: row
1312 .get::<_, Option<String>>(4)?
1313 .and_then(|s| hex::decode(s).ok()),
1314 attempt_count: row.get(5)?,
1315 node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1316 owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
1317 goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
1318 parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
1319 children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
1320 last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1321 committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
1322 });
1323 }
1324
1325 Ok(records)
1326 }
1327
1328 pub fn record_task_graph_edge(&self, record: &TaskGraphEdgeRow) -> Result<()> {
1330 self.conn.lock().unwrap().execute(
1331 r#"
1332 INSERT INTO task_graph_edges (session_id, parent_node_id, child_node_id, edge_type)
1333 VALUES (?, ?, ?, ?)
1334 "#,
1335 [
1336 &record.session_id,
1337 &record.parent_node_id,
1338 &record.child_node_id,
1339 &record.edge_type,
1340 ],
1341 )?;
1342 Ok(())
1343 }
1344
1345 pub fn get_task_graph_edges(&self, session_id: &str) -> Result<Vec<TaskGraphEdgeRow>> {
1347 let conn = self.conn.lock().unwrap();
1348 let mut stmt = conn.prepare(
1349 "SELECT session_id, parent_node_id, child_node_id, edge_type \
1350 FROM task_graph_edges WHERE session_id = ? ORDER BY created_at",
1351 )?;
1352 let mut rows = stmt.query([session_id])?;
1353 let mut records = Vec::new();
1354 while let Some(row) = rows.next()? {
1355 records.push(TaskGraphEdgeRow {
1356 session_id: row.get(0)?,
1357 parent_node_id: row.get(1)?,
1358 child_node_id: row.get(2)?,
1359 edge_type: row.get(3)?,
1360 });
1361 }
1362 Ok(records)
1363 }
1364
1365 pub fn get_children_of_node(
1367 &self,
1368 session_id: &str,
1369 parent_node_id: &str,
1370 ) -> Result<Vec<String>> {
1371 let conn = self.conn.lock().unwrap();
1372 let mut stmt = conn.prepare(
1373 "SELECT child_node_id FROM task_graph_edges \
1374 WHERE session_id = ? AND parent_node_id = ? ORDER BY created_at",
1375 )?;
1376 let mut rows = stmt.query([session_id, parent_node_id])?;
1377 let mut ids = Vec::new();
1378 while let Some(row) = rows.next()? {
1379 ids.push(row.get(0)?);
1380 }
1381 Ok(ids)
1382 }
1383
1384 pub fn record_review_outcome(&self, record: &ReviewOutcomeRow) -> Result<()> {
1386 let reviewer_note = record.reviewer_note.clone().unwrap_or_default();
1387 let escalation_category = record.escalation_category.clone().unwrap_or_default();
1388 self.conn.lock().unwrap().execute(
1389 r#"
1390 INSERT INTO review_outcomes (session_id, node_id, outcome, reviewer_note,
1391 energy_at_review, degraded, escalation_category)
1392 VALUES (?, ?, ?, ?, ?, ?, ?)
1393 "#,
1394 duckdb::params![
1395 record.session_id,
1396 record.node_id,
1397 record.outcome,
1398 reviewer_note,
1399 record.energy_at_review.unwrap_or(0.0),
1400 record.degraded.unwrap_or(false),
1401 escalation_category,
1402 ],
1403 )?;
1404 Ok(())
1405 }
1406
1407 pub fn get_review_outcomes(
1409 &self,
1410 session_id: &str,
1411 node_id: &str,
1412 ) -> Result<Vec<ReviewOutcomeRow>> {
1413 let conn = self.conn.lock().unwrap();
1414 let mut stmt = conn.prepare(
1415 "SELECT session_id, node_id, outcome, reviewer_note, \
1416 energy_at_review, degraded, escalation_category \
1417 FROM review_outcomes WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1418 )?;
1419 let mut rows = stmt.query([session_id, node_id])?;
1420 let mut records = Vec::new();
1421 while let Some(row) = rows.next()? {
1422 records.push(ReviewOutcomeRow {
1423 session_id: row.get(0)?,
1424 node_id: row.get(1)?,
1425 outcome: row.get(2)?,
1426 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1427 energy_at_review: row.get::<_, Option<f64>>(4)?,
1428 degraded: row.get::<_, Option<bool>>(5)?,
1429 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1430 });
1431 }
1432 Ok(records)
1433 }
1434
1435 pub fn get_latest_review_outcome(
1437 &self,
1438 session_id: &str,
1439 node_id: &str,
1440 ) -> Result<Option<ReviewOutcomeRow>> {
1441 let conn = self.conn.lock().unwrap();
1442 let mut stmt = conn.prepare(
1443 "SELECT session_id, node_id, outcome, reviewer_note, \
1444 energy_at_review, degraded, escalation_category \
1445 FROM review_outcomes WHERE session_id = ? AND node_id = ? \
1446 ORDER BY created_at DESC LIMIT 1",
1447 )?;
1448 let mut rows = stmt.query([session_id, node_id])?;
1449 if let Some(row) = rows.next()? {
1450 Ok(Some(ReviewOutcomeRow {
1451 session_id: row.get(0)?,
1452 node_id: row.get(1)?,
1453 outcome: row.get(2)?,
1454 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1455 energy_at_review: row.get::<_, Option<f64>>(4)?,
1456 degraded: row.get::<_, Option<bool>>(5)?,
1457 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1458 }))
1459 } else {
1460 Ok(None)
1461 }
1462 }
1463
1464 pub fn get_all_review_outcomes(&self, session_id: &str) -> Result<Vec<ReviewOutcomeRow>> {
1466 let conn = self.conn.lock().unwrap();
1467 let mut stmt = conn.prepare(
1468 "SELECT session_id, node_id, outcome, reviewer_note, \
1469 energy_at_review, degraded, escalation_category \
1470 FROM review_outcomes WHERE session_id = ? ORDER BY created_at",
1471 )?;
1472 let mut rows = stmt.query([session_id])?;
1473 let mut records = Vec::new();
1474 while let Some(row) = rows.next()? {
1475 records.push(ReviewOutcomeRow {
1476 session_id: row.get(0)?,
1477 node_id: row.get(1)?,
1478 outcome: row.get(2)?,
1479 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1480 energy_at_review: row.get::<_, Option<f64>>(4)?,
1481 degraded: row.get::<_, Option<bool>>(5)?,
1482 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1483 });
1484 }
1485 Ok(records)
1486 }
1487
1488 pub fn record_verification_result(&self, record: &VerificationResultRow) -> Result<()> {
1494 let syntax_ok = record.syntax_ok.to_string();
1495 let build_ok = record.build_ok.to_string();
1496 let tests_ok = record.tests_ok.to_string();
1497 let lint_ok = record.lint_ok.to_string();
1498 let diagnostics_count = record.diagnostics_count.to_string();
1499 let tests_passed = record.tests_passed.to_string();
1500 let tests_failed = record.tests_failed.to_string();
1501 let degraded = record.degraded.to_string();
1502 let degraded_reason = record.degraded_reason.clone().unwrap_or_default();
1503
1504 self.conn.lock().unwrap().execute(
1505 r#"
1506 INSERT INTO verification_results (session_id, node_id, result_json,
1507 syntax_ok, build_ok, tests_ok, lint_ok,
1508 diagnostics_count, tests_passed, tests_failed, degraded, degraded_reason)
1509 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1510 "#,
1511 [
1512 &record.session_id,
1513 &record.node_id,
1514 &record.result_json,
1515 &syntax_ok,
1516 &build_ok,
1517 &tests_ok,
1518 &lint_ok,
1519 &diagnostics_count,
1520 &tests_passed,
1521 &tests_failed,
1522 °raded,
1523 °raded_reason,
1524 ],
1525 )?;
1526 Ok(())
1527 }
1528
1529 pub fn get_verification_result(
1531 &self,
1532 session_id: &str,
1533 node_id: &str,
1534 ) -> Result<Option<VerificationResultRow>> {
1535 let conn = self.conn.lock().unwrap();
1536 let mut stmt = conn.prepare(
1537 "SELECT session_id, node_id, result_json, \
1538 CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1539 diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1540 FROM verification_results \
1541 WHERE session_id = ? AND node_id = ? \
1542 ORDER BY created_at DESC LIMIT 1",
1543 )?;
1544 let mut rows = stmt.query([session_id, node_id])?;
1545 if let Some(row) = rows.next()? {
1546 Ok(Some(VerificationResultRow {
1547 session_id: row.get(0)?,
1548 node_id: row.get(1)?,
1549 result_json: row.get(2)?,
1550 syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1551 build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1552 tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1553 lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1554 diagnostics_count: row.get(7)?,
1555 tests_passed: row.get(8)?,
1556 tests_failed: row.get(9)?,
1557 degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1558 degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1559 }))
1560 } else {
1561 Ok(None)
1562 }
1563 }
1564
1565 pub fn get_all_verification_results(
1567 &self,
1568 session_id: &str,
1569 ) -> Result<Vec<VerificationResultRow>> {
1570 let conn = self.conn.lock().unwrap();
1571 let mut stmt = conn.prepare(
1572 "WITH ranked AS ( \
1573 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1574 FROM verification_results WHERE session_id = ? \
1575 ) \
1576 SELECT session_id, node_id, result_json, \
1577 CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1578 diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1579 FROM ranked WHERE rn = 1 ORDER BY created_at",
1580 )?;
1581 let mut rows = stmt.query([session_id])?;
1582 let mut records = Vec::new();
1583 while let Some(row) = rows.next()? {
1584 records.push(VerificationResultRow {
1585 session_id: row.get(0)?,
1586 node_id: row.get(1)?,
1587 result_json: row.get(2)?,
1588 syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1589 build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1590 tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1591 lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1592 diagnostics_count: row.get(7)?,
1593 tests_passed: row.get(8)?,
1594 tests_failed: row.get(9)?,
1595 degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1596 degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1597 });
1598 }
1599 Ok(records)
1600 }
1601
1602 pub fn record_artifact_bundle(&self, record: &ArtifactBundleRow) -> Result<()> {
1604 let artifact_count = record.artifact_count.to_string();
1605 let command_count = record.command_count.to_string();
1606
1607 self.conn.lock().unwrap().execute(
1608 r#"
1609 INSERT INTO artifact_bundles (session_id, node_id, bundle_json,
1610 artifact_count, command_count, touched_files)
1611 VALUES (?, ?, ?, ?, ?, ?)
1612 "#,
1613 [
1614 &record.session_id,
1615 &record.node_id,
1616 &record.bundle_json,
1617 &artifact_count,
1618 &command_count,
1619 &record.touched_files,
1620 ],
1621 )?;
1622 Ok(())
1623 }
1624
1625 pub fn get_artifact_bundle(
1627 &self,
1628 session_id: &str,
1629 node_id: &str,
1630 ) -> Result<Option<ArtifactBundleRow>> {
1631 let conn = self.conn.lock().unwrap();
1632 let mut stmt = conn.prepare(
1633 "SELECT session_id, node_id, bundle_json, artifact_count, command_count, touched_files \
1634 FROM artifact_bundles \
1635 WHERE session_id = ? AND node_id = ? \
1636 ORDER BY created_at DESC LIMIT 1",
1637 )?;
1638 let mut rows = stmt.query([session_id, node_id])?;
1639 if let Some(row) = rows.next()? {
1640 Ok(Some(ArtifactBundleRow {
1641 session_id: row.get(0)?,
1642 node_id: row.get(1)?,
1643 bundle_json: row.get(2)?,
1644 artifact_count: row.get(3)?,
1645 command_count: row.get(4)?,
1646 touched_files: row.get(5)?,
1647 }))
1648 } else {
1649 Ok(None)
1650 }
1651 }
1652}
1653
1654impl SessionStore {
1659 pub fn record_feature_charter(&self, row: &FeatureCharterRow) -> Result<()> {
1661 let conn = self.conn.lock().unwrap();
1662 conn.execute(
1663 "INSERT INTO feature_charters (charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint) \
1664 VALUES (?, ?, ?, ?, ?, ?, ?)",
1665 duckdb::params![
1666 row.charter_id,
1667 row.session_id,
1668 row.scope_description,
1669 row.max_modules,
1670 row.max_files,
1671 row.max_revisions,
1672 row.language_constraint,
1673 ],
1674 )?;
1675 Ok(())
1676 }
1677
1678 pub fn get_feature_charter(&self, session_id: &str) -> Result<Option<FeatureCharterRow>> {
1680 let conn = self.conn.lock().unwrap();
1681 let mut stmt = conn.prepare(
1682 "SELECT charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint \
1683 FROM feature_charters WHERE session_id = ? LIMIT 1",
1684 )?;
1685 let mut rows = stmt.query([session_id])?;
1686 if let Some(row) = rows.next()? {
1687 Ok(Some(FeatureCharterRow {
1688 charter_id: row.get(0)?,
1689 session_id: row.get(1)?,
1690 scope_description: row.get(2)?,
1691 max_modules: row.get(3)?,
1692 max_files: row.get(4)?,
1693 max_revisions: row.get(5)?,
1694 language_constraint: row.get(6)?,
1695 }))
1696 } else {
1697 Ok(None)
1698 }
1699 }
1700
1701 pub fn record_plan_revision(&self, row: &PlanRevisionRow) -> Result<()> {
1703 let conn = self.conn.lock().unwrap();
1704 conn.execute(
1705 "INSERT INTO plan_revisions (revision_id, session_id, sequence, plan_json, reason, supersedes, status) \
1706 VALUES (?, ?, ?, ?, ?, ?, ?)",
1707 duckdb::params![
1708 row.revision_id,
1709 row.session_id,
1710 row.sequence,
1711 row.plan_json,
1712 row.reason,
1713 row.supersedes,
1714 row.status,
1715 ],
1716 )?;
1717 Ok(())
1718 }
1719
1720 pub fn get_active_plan_revision(&self, session_id: &str) -> Result<Option<PlanRevisionRow>> {
1722 let conn = self.conn.lock().unwrap();
1723 let mut stmt = conn.prepare(
1724 "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1725 FROM plan_revisions WHERE session_id = ? AND status = 'active' \
1726 ORDER BY sequence DESC LIMIT 1",
1727 )?;
1728 let mut rows = stmt.query([session_id])?;
1729 if let Some(row) = rows.next()? {
1730 Ok(Some(PlanRevisionRow {
1731 revision_id: row.get(0)?,
1732 session_id: row.get(1)?,
1733 sequence: row.get(2)?,
1734 plan_json: row.get(3)?,
1735 reason: row.get(4)?,
1736 supersedes: row.get(5)?,
1737 status: row.get(6)?,
1738 }))
1739 } else {
1740 Ok(None)
1741 }
1742 }
1743
1744 pub fn get_plan_revisions(&self, session_id: &str) -> Result<Vec<PlanRevisionRow>> {
1746 let conn = self.conn.lock().unwrap();
1747 let mut stmt = conn.prepare(
1748 "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1749 FROM plan_revisions WHERE session_id = ? ORDER BY sequence ASC",
1750 )?;
1751 let mut rows = stmt.query([session_id])?;
1752 let mut results = Vec::new();
1753 while let Some(row) = rows.next()? {
1754 results.push(PlanRevisionRow {
1755 revision_id: row.get(0)?,
1756 session_id: row.get(1)?,
1757 sequence: row.get(2)?,
1758 plan_json: row.get(3)?,
1759 reason: row.get(4)?,
1760 supersedes: row.get(5)?,
1761 status: row.get(6)?,
1762 });
1763 }
1764 Ok(results)
1765 }
1766
1767 pub fn supersede_plan_revision(&self, revision_id: &str) -> Result<()> {
1769 let conn = self.conn.lock().unwrap();
1770 conn.execute(
1771 "UPDATE plan_revisions SET status = 'superseded' WHERE revision_id = ?",
1772 [revision_id],
1773 )?;
1774 Ok(())
1775 }
1776
1777 pub fn record_repair_footprint(&self, row: &RepairFootprintRow) -> Result<()> {
1779 let conn = self.conn.lock().unwrap();
1780 conn.execute(
1781 "INSERT INTO repair_footprints (footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved) \
1782 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
1783 duckdb::params![
1784 row.footprint_id,
1785 row.session_id,
1786 row.node_id,
1787 row.revision_id,
1788 row.attempt,
1789 row.affected_files,
1790 row.bundle_json,
1791 row.diagnosis,
1792 row.resolved,
1793 ],
1794 )?;
1795 Ok(())
1796 }
1797
1798 pub fn get_repair_footprints(
1800 &self,
1801 session_id: &str,
1802 node_id: &str,
1803 ) -> Result<Vec<RepairFootprintRow>> {
1804 let conn = self.conn.lock().unwrap();
1805 let mut stmt = conn.prepare(
1806 "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1807 FROM repair_footprints WHERE session_id = ? AND node_id = ? ORDER BY attempt ASC",
1808 )?;
1809 let mut rows = stmt.query([session_id, node_id])?;
1810 let mut results = Vec::new();
1811 while let Some(row) = rows.next()? {
1812 results.push(RepairFootprintRow {
1813 footprint_id: row.get(0)?,
1814 session_id: row.get(1)?,
1815 node_id: row.get(2)?,
1816 revision_id: row.get(3)?,
1817 attempt: row.get(4)?,
1818 affected_files: row.get(5)?,
1819 bundle_json: row.get(6)?,
1820 diagnosis: row.get(7)?,
1821 resolved: row.get(8)?,
1822 });
1823 }
1824 Ok(results)
1825 }
1826
1827 pub fn get_all_repair_footprints(&self, session_id: &str) -> Result<Vec<RepairFootprintRow>> {
1829 let conn = self.conn.lock().unwrap();
1830 let mut stmt = conn.prepare(
1831 "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1832 FROM repair_footprints WHERE session_id = ? ORDER BY attempt ASC",
1833 )?;
1834 let mut rows = stmt.query([session_id])?;
1835 let mut results = Vec::new();
1836 while let Some(row) = rows.next()? {
1837 results.push(RepairFootprintRow {
1838 footprint_id: row.get(0)?,
1839 session_id: row.get(1)?,
1840 node_id: row.get(2)?,
1841 revision_id: row.get(3)?,
1842 attempt: row.get(4)?,
1843 affected_files: row.get(5)?,
1844 bundle_json: row.get(6)?,
1845 diagnosis: row.get(7)?,
1846 resolved: row.get(8)?,
1847 });
1848 }
1849 Ok(results)
1850 }
1851
1852 pub fn resolve_repair_footprint(&self, footprint_id: &str) -> Result<()> {
1854 let conn = self.conn.lock().unwrap();
1855 conn.execute(
1856 "UPDATE repair_footprints SET resolved = true WHERE footprint_id = ?",
1857 [footprint_id],
1858 )?;
1859 Ok(())
1860 }
1861
1862 pub fn upsert_budget_envelope(&self, row: &BudgetEnvelopeRow) -> Result<()> {
1864 let conn = self.conn.lock().unwrap();
1865 conn.execute(
1867 "INSERT INTO budget_envelopes (session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd) \
1868 VALUES (?, ?, ?, ?, ?, ?, ?) \
1869 ON CONFLICT (session_id) DO UPDATE SET \
1870 max_steps = EXCLUDED.max_steps, steps_used = EXCLUDED.steps_used, \
1871 max_revisions = EXCLUDED.max_revisions, revisions_used = EXCLUDED.revisions_used, \
1872 max_cost_usd = EXCLUDED.max_cost_usd, cost_used_usd = EXCLUDED.cost_used_usd",
1873 duckdb::params![
1874 row.session_id,
1875 row.max_steps,
1876 row.steps_used,
1877 row.max_revisions,
1878 row.revisions_used,
1879 row.max_cost_usd,
1880 row.cost_used_usd,
1881 ],
1882 )?;
1883 Ok(())
1884 }
1885
1886 pub fn get_budget_envelope(&self, session_id: &str) -> Result<Option<BudgetEnvelopeRow>> {
1888 let conn = self.conn.lock().unwrap();
1889 let mut stmt = conn.prepare(
1890 "SELECT session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd \
1891 FROM budget_envelopes WHERE session_id = ?",
1892 )?;
1893 let mut rows = stmt.query([session_id])?;
1894 if let Some(row) = rows.next()? {
1895 Ok(Some(BudgetEnvelopeRow {
1896 session_id: row.get(0)?,
1897 max_steps: row.get(1)?,
1898 steps_used: row.get(2)?,
1899 max_revisions: row.get(3)?,
1900 revisions_used: row.get(4)?,
1901 max_cost_usd: row.get(5)?,
1902 cost_used_usd: row.get(6)?,
1903 }))
1904 } else {
1905 Ok(None)
1906 }
1907 }
1908}
1909
1910#[cfg(test)]
1911mod tests {
1912 use super::*;
1913
1914 fn test_store() -> SessionStore {
1916 let temp_dir = std::env::temp_dir();
1917 let db_path = temp_dir.join(format!("perspt_test_{}.db", uuid::Uuid::new_v4()));
1918 SessionStore::open(&db_path).expect("Failed to create test store")
1919 }
1920
1921 fn seed_session(store: &SessionStore, session_id: &str) {
1922 let record = SessionRecord {
1923 session_id: session_id.to_string(),
1924 task: "test task".to_string(),
1925 working_dir: "/tmp/test".to_string(),
1926 merkle_root: None,
1927 detected_toolchain: None,
1928 status: "RUNNING".to_string(),
1929 };
1930 store.create_session(&record).unwrap();
1931 }
1932
1933 #[test]
1934 fn test_node_state_phase8_roundtrip() {
1935 let store = test_store();
1936 let sid = "test-sess-1";
1937 seed_session(&store, sid);
1938
1939 let record = NodeStateRecord {
1940 node_id: "node-1".to_string(),
1941 session_id: sid.to_string(),
1942 state: "Completed".to_string(),
1943 v_total: 0.42,
1944 merkle_hash: Some(vec![0xab; 32]),
1945 attempt_count: 3,
1946 node_class: Some("Interface".to_string()),
1947 owner_plugin: Some("rust".to_string()),
1948 goal: Some("Implement API".to_string()),
1949 parent_id: Some("root".to_string()),
1950 children: Some(r#"["child-a","child-b"]"#.to_string()),
1951 last_error_type: Some("CompilationError".to_string()),
1952 committed_at: Some("2025-01-01T00:00:00Z".to_string()),
1953 };
1954
1955 store.record_node_state(&record).unwrap();
1956
1957 let states = store.get_latest_node_states(sid).unwrap();
1958 assert_eq!(states.len(), 1);
1959 let r = &states[0];
1960 assert_eq!(r.node_id, "node-1");
1961 assert_eq!(r.state, "Completed");
1962 assert_eq!(r.attempt_count, 3);
1963 assert_eq!(r.node_class.as_deref(), Some("Interface"));
1964 assert_eq!(r.owner_plugin.as_deref(), Some("rust"));
1965 assert_eq!(r.goal.as_deref(), Some("Implement API"));
1966 assert_eq!(r.parent_id.as_deref(), Some("root"));
1967 assert!(r.children.is_some());
1968 assert_eq!(r.last_error_type.as_deref(), Some("CompilationError"));
1969 assert_eq!(r.committed_at.as_deref(), Some("2025-01-01T00:00:00Z"));
1970 }
1971
1972 #[test]
1973 fn test_task_graph_edge_roundtrip() {
1974 let store = test_store();
1975 let sid = "test-graph-1";
1976 seed_session(&store, sid);
1977
1978 let edge = TaskGraphEdgeRow {
1979 session_id: sid.to_string(),
1980 parent_node_id: "parent-1".to_string(),
1981 child_node_id: "child-1".to_string(),
1982 edge_type: "depends_on".to_string(),
1983 };
1984 store.record_task_graph_edge(&edge).unwrap();
1985
1986 let edges = store.get_task_graph_edges(sid).unwrap();
1987 assert_eq!(edges.len(), 1);
1988 assert_eq!(edges[0].parent_node_id, "parent-1");
1989 assert_eq!(edges[0].child_node_id, "child-1");
1990 assert_eq!(edges[0].edge_type, "depends_on");
1991 }
1992
1993 #[test]
1994 fn test_verification_result_roundtrip() {
1995 let store = test_store();
1996 let sid = "test-vr-1";
1997 seed_session(&store, sid);
1998
1999 let row = VerificationResultRow {
2000 session_id: sid.to_string(),
2001 node_id: "node-v".to_string(),
2002 result_json: r#"{"syntax_ok":true}"#.to_string(),
2003 syntax_ok: true,
2004 build_ok: true,
2005 tests_ok: false,
2006 lint_ok: true,
2007 diagnostics_count: 2,
2008 tests_passed: 5,
2009 tests_failed: 1,
2010 degraded: false,
2011 degraded_reason: None,
2012 };
2013 store.record_verification_result(&row).unwrap();
2014
2015 let got = store.get_verification_result(sid, "node-v").unwrap();
2016 assert!(got.is_some());
2017 let got = got.unwrap();
2018 assert!(got.syntax_ok);
2019 assert!(got.build_ok);
2020 assert!(!got.tests_ok);
2021 assert_eq!(got.tests_passed, 5);
2022 assert_eq!(got.tests_failed, 1);
2023 assert!(!got.degraded);
2024 }
2025
2026 #[test]
2027 fn test_verification_result_degraded() {
2028 let store = test_store();
2029 let sid = "test-vr-deg";
2030 seed_session(&store, sid);
2031
2032 let row = VerificationResultRow {
2033 session_id: sid.to_string(),
2034 node_id: "node-d".to_string(),
2035 result_json: "{}".to_string(),
2036 syntax_ok: true,
2037 build_ok: false,
2038 tests_ok: false,
2039 lint_ok: false,
2040 diagnostics_count: 0,
2041 tests_passed: 0,
2042 tests_failed: 0,
2043 degraded: true,
2044 degraded_reason: Some("LSP unavailable".to_string()),
2045 };
2046 store.record_verification_result(&row).unwrap();
2047
2048 let got = store
2049 .get_verification_result(sid, "node-d")
2050 .unwrap()
2051 .unwrap();
2052 assert!(got.degraded);
2053 assert_eq!(got.degraded_reason.as_deref(), Some("LSP unavailable"));
2054 }
2055
2056 #[test]
2057 fn test_artifact_bundle_roundtrip() {
2058 let store = test_store();
2059 let sid = "test-ab-1";
2060 seed_session(&store, sid);
2061
2062 let row = ArtifactBundleRow {
2063 session_id: sid.to_string(),
2064 node_id: "node-a".to_string(),
2065 bundle_json: r#"{"artifacts":[],"commands":[]}"#.to_string(),
2066 artifact_count: 3,
2067 command_count: 1,
2068 touched_files: r#"["src/main.rs","src/lib.rs","tests/test.rs"]"#.to_string(),
2069 };
2070 store.record_artifact_bundle(&row).unwrap();
2071
2072 let got = store.get_artifact_bundle(sid, "node-a").unwrap();
2073 assert!(got.is_some());
2074 let got = got.unwrap();
2075 assert_eq!(got.artifact_count, 3);
2076 assert_eq!(got.command_count, 1);
2077 assert!(got.touched_files.contains("main.rs"));
2078 }
2079
2080 #[test]
2081 fn test_latest_node_states_dedup() {
2082 let store = test_store();
2083 let sid = "test-dedup";
2084 seed_session(&store, sid);
2085
2086 let r1 = NodeStateRecord {
2088 node_id: "node-x".to_string(),
2089 session_id: sid.to_string(),
2090 state: "Coding".to_string(),
2091 v_total: 0.5,
2092 merkle_hash: None,
2093 attempt_count: 1,
2094 node_class: None,
2095 owner_plugin: None,
2096 goal: None,
2097 parent_id: None,
2098 children: None,
2099 last_error_type: None,
2100 committed_at: None,
2101 };
2102 store.record_node_state(&r1).unwrap();
2103
2104 let r2 = NodeStateRecord {
2105 node_id: "node-x".to_string(),
2106 session_id: sid.to_string(),
2107 state: "Completed".to_string(),
2108 v_total: 0.3,
2109 merkle_hash: None,
2110 attempt_count: 2,
2111 node_class: Some("Implementation".to_string()),
2112 owner_plugin: None,
2113 goal: Some("Updated goal".to_string()),
2114 parent_id: None,
2115 children: None,
2116 last_error_type: None,
2117 committed_at: Some("2025-01-02T00:00:00Z".to_string()),
2118 };
2119 store.record_node_state(&r2).unwrap();
2120
2121 let latest = store.get_latest_node_states(sid).unwrap();
2123 assert_eq!(latest.len(), 1);
2124 assert_eq!(latest[0].state, "Completed");
2125 assert_eq!(latest[0].attempt_count, 2);
2126 assert_eq!(latest[0].goal.as_deref(), Some("Updated goal"));
2127 }
2128
2129 #[test]
2130 fn test_backward_compat_empty_phase8_fields() {
2131 let store = test_store();
2132 let sid = "test-compat";
2133 seed_session(&store, sid);
2134
2135 let r = NodeStateRecord {
2137 node_id: "old-node".to_string(),
2138 session_id: sid.to_string(),
2139 state: "COMPLETED".to_string(),
2140 v_total: 1.0,
2141 merkle_hash: None,
2142 attempt_count: 1,
2143 node_class: None,
2144 owner_plugin: None,
2145 goal: None,
2146 parent_id: None,
2147 children: None,
2148 last_error_type: None,
2149 committed_at: None,
2150 };
2151 store.record_node_state(&r).unwrap();
2152
2153 let latest = store.get_latest_node_states(sid).unwrap();
2154 assert_eq!(latest.len(), 1);
2155 assert!(latest[0].node_class.is_none());
2156 assert!(latest[0].goal.is_none());
2157 assert!(latest[0].committed_at.is_none());
2158
2159 let vr = store.get_verification_result(sid, "old-node").unwrap();
2161 assert!(vr.is_none());
2162 let ab = store.get_artifact_bundle(sid, "old-node").unwrap();
2163 assert!(ab.is_none());
2164 }
2165
2166 #[test]
2167 fn test_review_outcome_roundtrip() {
2168 let store = test_store();
2169 let sid = "test-review";
2170 seed_session(&store, sid);
2171
2172 let row = ReviewOutcomeRow {
2173 session_id: sid.to_string(),
2174 node_id: "node-r".to_string(),
2175 outcome: "approved".to_string(),
2176 reviewer_note: Some("LGTM".to_string()),
2177 energy_at_review: None,
2178 degraded: None,
2179 escalation_category: None,
2180 };
2181 store.record_review_outcome(&row).unwrap();
2182
2183 let outcomes = store.get_review_outcomes(sid, "node-r").unwrap();
2184 assert_eq!(outcomes.len(), 1);
2185 assert_eq!(outcomes[0].outcome, "approved");
2186 assert_eq!(outcomes[0].reviewer_note.as_deref(), Some("LGTM"));
2187 }
2188
2189 #[test]
2190 fn test_review_outcome_with_audit_fields() {
2191 let store = test_store();
2192 let sid = "test-review-audit";
2193 seed_session(&store, sid);
2194
2195 let row = ReviewOutcomeRow {
2196 session_id: sid.to_string(),
2197 node_id: "node-a".to_string(),
2198 outcome: "rejected".to_string(),
2199 reviewer_note: Some("Needs rework".to_string()),
2200 energy_at_review: Some(0.42),
2201 degraded: Some(true),
2202 escalation_category: Some("complexity".to_string()),
2203 };
2204 store.record_review_outcome(&row).unwrap();
2205
2206 let outcomes = store.get_review_outcomes(sid, "node-a").unwrap();
2207 assert_eq!(outcomes.len(), 1);
2208 assert_eq!(outcomes[0].outcome, "rejected");
2209 assert_eq!(outcomes[0].energy_at_review, Some(0.42));
2210 assert_eq!(outcomes[0].degraded, Some(true));
2211 assert_eq!(
2212 outcomes[0].escalation_category.as_deref(),
2213 Some("complexity")
2214 );
2215 }
2216
2217 #[test]
2218 fn test_get_all_review_outcomes() {
2219 let store = test_store();
2220 let sid = "test-review-all";
2221 seed_session(&store, sid);
2222
2223 for (node, outcome) in &[("n1", "approved"), ("n2", "rejected"), ("n1", "approved")] {
2224 let row = ReviewOutcomeRow {
2225 session_id: sid.to_string(),
2226 node_id: node.to_string(),
2227 outcome: outcome.to_string(),
2228 reviewer_note: None,
2229 energy_at_review: None,
2230 degraded: None,
2231 escalation_category: None,
2232 };
2233 store.record_review_outcome(&row).unwrap();
2234 }
2235
2236 let all = store.get_all_review_outcomes(sid).unwrap();
2237 assert_eq!(all.len(), 3);
2238 }
2239
2240 #[test]
2241 fn test_feature_charter_roundtrip() {
2242 let store = test_store();
2243 let sid = "test-charter";
2244 seed_session(&store, sid);
2245
2246 let row = FeatureCharterRow {
2247 charter_id: "ch-1".to_string(),
2248 session_id: sid.to_string(),
2249 scope_description: "Add authentication module".to_string(),
2250 max_modules: Some(3),
2251 max_files: Some(10),
2252 max_revisions: Some(5),
2253 language_constraint: Some("rust".to_string()),
2254 };
2255 store.record_feature_charter(&row).unwrap();
2256
2257 let got = store.get_feature_charter(sid).unwrap();
2258 assert!(got.is_some());
2259 let got = got.unwrap();
2260 assert_eq!(got.charter_id, "ch-1");
2261 assert_eq!(got.scope_description, "Add authentication module");
2262 assert_eq!(got.max_modules, Some(3));
2263 assert_eq!(got.language_constraint.as_deref(), Some("rust"));
2264 }
2265
2266 #[test]
2267 fn test_feature_charter_returns_none_for_missing() {
2268 let store = test_store();
2269 let sid = "test-charter-miss";
2270 seed_session(&store, sid);
2271
2272 let got = store.get_feature_charter(sid).unwrap();
2273 assert!(got.is_none());
2274 }
2275
2276 #[test]
2277 fn test_plan_revision_roundtrip() {
2278 let store = test_store();
2279 let sid = "test-rev";
2280 seed_session(&store, sid);
2281
2282 let row = PlanRevisionRow {
2283 revision_id: "rev-1".to_string(),
2284 session_id: sid.to_string(),
2285 sequence: 1,
2286 plan_json: r#"{"tasks":[]}"#.to_string(),
2287 reason: "initial plan".to_string(),
2288 supersedes: None,
2289 status: "active".to_string(),
2290 };
2291 store.record_plan_revision(&row).unwrap();
2292
2293 let active = store.get_active_plan_revision(sid).unwrap();
2294 assert!(active.is_some());
2295 let active = active.unwrap();
2296 assert_eq!(active.revision_id, "rev-1");
2297 assert_eq!(active.sequence, 1);
2298 assert_eq!(active.status, "active");
2299 }
2300
2301 #[test]
2302 fn test_plan_revision_supersede() {
2303 let store = test_store();
2304 let sid = "test-rev-sup";
2305 seed_session(&store, sid);
2306
2307 let r1 = PlanRevisionRow {
2308 revision_id: "rev-1".to_string(),
2309 session_id: sid.to_string(),
2310 sequence: 1,
2311 plan_json: "{}".to_string(),
2312 reason: "initial".to_string(),
2313 supersedes: None,
2314 status: "active".to_string(),
2315 };
2316 store.record_plan_revision(&r1).unwrap();
2317
2318 store.supersede_plan_revision("rev-1").unwrap();
2320
2321 let r2 = PlanRevisionRow {
2322 revision_id: "rev-2".to_string(),
2323 session_id: sid.to_string(),
2324 sequence: 2,
2325 plan_json: r#"{"tasks":["a"]}"#.to_string(),
2326 reason: "verifier feedback".to_string(),
2327 supersedes: Some("rev-1".to_string()),
2328 status: "active".to_string(),
2329 };
2330 store.record_plan_revision(&r2).unwrap();
2331
2332 let active = store.get_active_plan_revision(sid).unwrap().unwrap();
2334 assert_eq!(active.revision_id, "rev-2");
2335
2336 let all = store.get_plan_revisions(sid).unwrap();
2338 assert_eq!(all.len(), 2);
2339 assert_eq!(all[0].status, "superseded");
2340 assert_eq!(all[1].status, "active");
2341 }
2342
2343 #[test]
2344 fn test_repair_footprint_roundtrip() {
2345 let store = test_store();
2346 let sid = "test-repair";
2347 seed_session(&store, sid);
2348
2349 let row = RepairFootprintRow {
2350 footprint_id: "fp-1".to_string(),
2351 session_id: sid.to_string(),
2352 node_id: "node-a".to_string(),
2353 revision_id: "rev-1".to_string(),
2354 attempt: 1,
2355 affected_files: r#"["src/main.rs"]"#.to_string(),
2356 bundle_json: "{}".to_string(),
2357 diagnosis: "missing import".to_string(),
2358 resolved: false,
2359 };
2360 store.record_repair_footprint(&row).unwrap();
2361
2362 let footprints = store.get_repair_footprints(sid, "node-a").unwrap();
2363 assert_eq!(footprints.len(), 1);
2364 assert_eq!(footprints[0].footprint_id, "fp-1");
2365 assert_eq!(footprints[0].diagnosis, "missing import");
2366 assert!(!footprints[0].resolved);
2367 }
2368
2369 #[test]
2370 fn test_repair_footprint_resolve() {
2371 let store = test_store();
2372 let sid = "test-repair-res";
2373 seed_session(&store, sid);
2374
2375 let row = RepairFootprintRow {
2376 footprint_id: "fp-2".to_string(),
2377 session_id: sid.to_string(),
2378 node_id: "node-b".to_string(),
2379 revision_id: "rev-1".to_string(),
2380 attempt: 1,
2381 affected_files: "[]".to_string(),
2382 bundle_json: "{}".to_string(),
2383 diagnosis: "type error".to_string(),
2384 resolved: false,
2385 };
2386 store.record_repair_footprint(&row).unwrap();
2387
2388 store.resolve_repair_footprint("fp-2").unwrap();
2389
2390 let footprints = store.get_repair_footprints(sid, "node-b").unwrap();
2391 assert_eq!(footprints.len(), 1);
2392 assert!(footprints[0].resolved);
2393 }
2394
2395 #[test]
2396 fn test_budget_envelope_upsert_and_get() {
2397 let store = test_store();
2398 let sid = "test-budget";
2399 seed_session(&store, sid);
2400
2401 let row = BudgetEnvelopeRow {
2402 session_id: sid.to_string(),
2403 max_steps: Some(100),
2404 steps_used: 5,
2405 max_revisions: Some(10),
2406 revisions_used: 1,
2407 max_cost_usd: Some(5.0),
2408 cost_used_usd: 0.25,
2409 };
2410 store.upsert_budget_envelope(&row).unwrap();
2411
2412 let got = store.get_budget_envelope(sid).unwrap();
2413 assert!(got.is_some());
2414 let got = got.unwrap();
2415 assert_eq!(got.max_steps, Some(100));
2416 assert_eq!(got.steps_used, 5);
2417 assert_eq!(got.cost_used_usd, 0.25);
2418 }
2419
2420 #[test]
2421 fn test_budget_envelope_upsert_updates() {
2422 let store = test_store();
2423 let sid = "test-budget-up";
2424 seed_session(&store, sid);
2425
2426 let row1 = BudgetEnvelopeRow {
2427 session_id: sid.to_string(),
2428 max_steps: Some(100),
2429 steps_used: 0,
2430 max_revisions: None,
2431 revisions_used: 0,
2432 max_cost_usd: None,
2433 cost_used_usd: 0.0,
2434 };
2435 store.upsert_budget_envelope(&row1).unwrap();
2436
2437 let row2 = BudgetEnvelopeRow {
2439 session_id: sid.to_string(),
2440 max_steps: Some(100),
2441 steps_used: 42,
2442 max_revisions: Some(5),
2443 revisions_used: 3,
2444 max_cost_usd: Some(10.0),
2445 cost_used_usd: 4.5,
2446 };
2447 store.upsert_budget_envelope(&row2).unwrap();
2448
2449 let got = store.get_budget_envelope(sid).unwrap().unwrap();
2450 assert_eq!(got.steps_used, 42);
2451 assert_eq!(got.revisions_used, 3);
2452 assert_eq!(got.cost_used_usd, 4.5);
2453 }
2454
2455 #[test]
2456 fn test_budget_envelope_missing_returns_none() {
2457 let store = test_store();
2458 let sid = "test-budget-miss";
2459 seed_session(&store, sid);
2460
2461 let got = store.get_budget_envelope(sid).unwrap();
2462 assert!(got.is_none());
2463 }
2464
2465 #[test]
2466 fn test_read_only_store_queries_work() {
2467 let temp_dir = std::env::temp_dir();
2468 let db_path = temp_dir.join(format!("perspt_ro_test_{}.db", uuid::Uuid::new_v4()));
2469
2470 {
2472 let store = SessionStore::open(&db_path).unwrap();
2473 seed_session(&store, "ro-test");
2474 }
2475
2476 let ro = SessionStore::open_read_only(&db_path).unwrap();
2478 let sessions = ro.list_recent_sessions(10).unwrap();
2479 assert_eq!(sessions.len(), 1);
2480 assert_eq!(sessions[0].session_id, "ro-test");
2481 }
2482
2483 #[test]
2484 fn test_read_only_store_rejects_writes() {
2485 let temp_dir = std::env::temp_dir();
2486 let db_path = temp_dir.join(format!("perspt_ro_wr_{}.db", uuid::Uuid::new_v4()));
2487
2488 {
2490 let _store = SessionStore::open(&db_path).unwrap();
2491 }
2492
2493 let ro = SessionStore::open_read_only(&db_path).unwrap();
2495 let record = SessionRecord {
2496 session_id: "should-fail".to_string(),
2497 task: "test".to_string(),
2498 working_dir: "/tmp".to_string(),
2499 merkle_root: None,
2500 detected_toolchain: None,
2501 status: "RUNNING".to_string(),
2502 };
2503 assert!(ro.create_session(&record).is_err());
2504 }
2505}