1use anyhow::{Context, Result};
6use duckdb::Connection;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use std::path::PathBuf;
10
11use crate::schema::init_schema;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct SessionRecord {
16 pub session_id: String,
17 pub task: String,
18 pub working_dir: String,
19 pub merkle_root: Option<Vec<u8>>,
20 pub detected_toolchain: Option<String>,
21 pub status: String,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct NodeStateRecord {
27 pub node_id: String,
28 pub session_id: String,
29 pub state: String,
30 pub v_total: f32,
31 pub merkle_hash: Option<Vec<u8>>,
32 pub attempt_count: i32,
33 pub node_class: Option<String>,
35 pub owner_plugin: Option<String>,
36 pub goal: Option<String>,
37 pub parent_id: Option<String>,
38 pub children: Option<String>,
40 pub last_error_type: Option<String>,
41 pub committed_at: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct EnergyRecord {
47 pub node_id: String,
48 pub session_id: String,
49 pub v_syn: f32,
50 pub v_str: f32,
51 pub v_log: f32,
52 pub v_boot: f32,
53 pub v_sheaf: f32,
54 pub v_total: f32,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct LlmRequestRecord {
60 pub session_id: String,
61 pub node_id: Option<String>,
62 pub model: String,
63 pub prompt: String,
64 pub response: String,
65 pub tokens_in: i32,
66 pub tokens_out: i32,
67 pub latency_ms: i32,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct StructuralDigestRecord {
73 pub digest_id: String,
74 pub session_id: String,
75 pub node_id: String,
76 pub source_path: String,
77 pub artifact_kind: String,
78 pub hash: Vec<u8>,
79 pub version: i32,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ContextProvenanceRecord {
85 pub session_id: String,
86 pub node_id: String,
87 pub context_package_id: String,
88 pub structural_hashes: String,
90 pub summary_hashes: String,
92 pub dependency_hashes: String,
94 pub included_file_count: i32,
95 pub total_bytes: i32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct EscalationReportRecord {
101 pub session_id: String,
102 pub node_id: String,
103 pub category: String,
105 pub action: String,
107 pub energy_snapshot: String,
109 pub stage_outcomes: String,
111 pub evidence: String,
113 pub affected_node_ids: String,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct RewriteRecordRow {
120 pub session_id: String,
121 pub node_id: String,
122 pub action: String,
124 pub category: String,
126 pub requeued_nodes: String,
128 pub inserted_nodes: String,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct SheafValidationRow {
135 pub session_id: String,
136 pub node_id: String,
137 pub validator_class: String,
138 pub plugin_source: Option<String>,
139 pub passed: bool,
140 pub evidence_summary: String,
141 pub affected_files: String,
143 pub v_sheaf_contribution: f32,
144 pub requeue_targets: String,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ProvisionalBranchRow {
155 pub branch_id: String,
156 pub session_id: String,
157 pub node_id: String,
158 pub parent_node_id: String,
159 pub state: String,
160 pub parent_seal_hash: Option<Vec<u8>>,
161 pub sandbox_dir: Option<String>,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct BranchLineageRow {
167 pub lineage_id: String,
168 pub parent_branch_id: String,
169 pub child_branch_id: String,
170 pub depends_on_seal: bool,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct InterfaceSealRow {
176 pub seal_id: String,
177 pub session_id: String,
178 pub node_id: String,
179 pub sealed_path: String,
180 pub artifact_kind: String,
181 pub seal_hash: Vec<u8>,
182 pub version: i32,
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct BranchFlushRow {
188 pub flush_id: String,
189 pub session_id: String,
190 pub parent_node_id: String,
191 pub flushed_branch_ids: String,
193 pub requeue_node_ids: String,
195 pub reason: String,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct TaskGraphEdgeRow {
205 pub session_id: String,
206 pub parent_node_id: String,
207 pub child_node_id: String,
208 pub edge_type: String,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ReviewOutcomeRow {
214 pub session_id: String,
215 pub node_id: String,
216 pub outcome: String,
218 pub reviewer_note: Option<String>,
219 pub energy_at_review: Option<f64>,
221 pub degraded: Option<bool>,
223 pub escalation_category: Option<String>,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct VerificationResultRow {
230 pub session_id: String,
231 pub node_id: String,
232 pub result_json: String,
234 pub syntax_ok: bool,
236 pub build_ok: bool,
237 pub tests_ok: bool,
238 pub lint_ok: bool,
239 pub diagnostics_count: i32,
240 pub tests_passed: i32,
241 pub tests_failed: i32,
242 pub degraded: bool,
243 pub degraded_reason: Option<String>,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ArtifactBundleRow {
249 pub session_id: String,
250 pub node_id: String,
251 pub bundle_json: String,
253 pub artifact_count: i32,
254 pub command_count: i32,
255 pub touched_files: String,
257}
258
259use std::sync::Mutex;
260
261pub struct SessionStore {
263 conn: Mutex<Connection>,
264}
265
266impl SessionStore {
267 pub fn new() -> Result<Self> {
269 let db_path = Self::default_db_path()?;
270 Self::open(&db_path)
271 }
272
273 pub fn open(path: &PathBuf) -> Result<Self> {
275 if let Some(parent) = path.parent() {
277 std::fs::create_dir_all(parent)?;
278 }
279
280 let conn = Connection::open(path).context("Failed to open DuckDB")?;
281 init_schema(&conn)?;
282
283 Ok(Self {
284 conn: Mutex::new(conn),
285 })
286 }
287
288 pub fn default_db_path() -> Result<PathBuf> {
290 let data_dir = dirs::data_local_dir()
291 .context("Could not find local data directory")?
292 .join("perspt");
293 Ok(data_dir.join("perspt.db"))
294 }
295
296 pub fn create_session(&self, session: &SessionRecord) -> Result<()> {
298 self.conn.lock().unwrap().execute(
299 r#"
300 INSERT INTO sessions (session_id, task, working_dir, merkle_root, detected_toolchain, status)
301 VALUES (?, ?, ?, ?, ?, ?)
302 "#,
303 [
304 &session.session_id,
305 &session.task,
306 &session.working_dir,
307 &session.merkle_root.as_ref().map(hex::encode).unwrap_or_default(),
308 &session.detected_toolchain.clone().unwrap_or_default(),
309 &session.status,
310 ],
311 )?;
312 Ok(())
313 }
314
315 pub fn update_merkle_root(&self, session_id: &str, merkle_root: &[u8]) -> Result<()> {
317 self.conn.lock().unwrap().execute(
318 "UPDATE sessions SET merkle_root = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
319 [hex::encode(merkle_root), session_id.to_string()],
320 )?;
321 Ok(())
322 }
323
324 pub fn record_node_state(&self, record: &NodeStateRecord) -> Result<()> {
326 let v_total = record.v_total.to_string();
327 let merkle_hash = record
328 .merkle_hash
329 .as_ref()
330 .map(hex::encode)
331 .unwrap_or_default();
332 let attempt_count = record.attempt_count.to_string();
333 let node_class = record.node_class.clone().unwrap_or_default();
334 let owner_plugin = record.owner_plugin.clone().unwrap_or_default();
335 let goal = record.goal.clone().unwrap_or_default();
336 let parent_id = record.parent_id.clone().unwrap_or_default();
337 let children = record.children.clone().unwrap_or_default();
338 let last_error_type = record.last_error_type.clone().unwrap_or_default();
339 let committed_at = record.committed_at.clone().unwrap_or_default();
340
341 self.conn.lock().unwrap().execute(
342 r#"
343 INSERT INTO node_states (node_id, session_id, state, v_total, merkle_hash, attempt_count,
344 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at)
345 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
346 "#,
347 [
348 &record.node_id,
349 &record.session_id,
350 &record.state,
351 &v_total,
352 &merkle_hash,
353 &attempt_count,
354 &node_class,
355 &owner_plugin,
356 &goal,
357 &parent_id,
358 &children,
359 &last_error_type,
360 &committed_at,
361 ],
362 )?;
363 Ok(())
364 }
365
366 pub fn record_energy(&self, record: &EnergyRecord) -> Result<()> {
368 self.conn.lock().unwrap().execute(
369 r#"
370 INSERT INTO energy_history (node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total)
371 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
372 "#,
373 [
374 &record.node_id,
375 &record.session_id,
376 &record.v_syn.to_string(),
377 &record.v_str.to_string(),
378 &record.v_log.to_string(),
379 &record.v_boot.to_string(),
380 &record.v_sheaf.to_string(),
381 &record.v_total.to_string(),
382 ],
383 )?;
384 Ok(())
385 }
386
387 pub fn calculate_hash(content: &[u8]) -> Vec<u8> {
389 let mut hasher = Sha256::new();
390 hasher.update(content);
391 hasher.finalize().to_vec()
392 }
393
394 pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>> {
396 let conn = self.conn.lock().unwrap();
397 let mut stmt = conn.prepare(
398 "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status FROM sessions WHERE session_id = ?"
399 )?;
400
401 let mut rows = stmt.query([session_id])?;
402 if let Some(row) = rows.next()? {
403 let merkle_root: Option<Vec<u8>> = row.get(3).ok();
406
407 Ok(Some(SessionRecord {
408 session_id: row.get(0)?,
409 task: row.get(1)?,
410 working_dir: row.get(2)?,
411 merkle_root,
412 detected_toolchain: row.get(4)?,
413 status: row.get(5)?,
414 }))
415 } else {
416 Ok(None)
417 }
418 }
419
420 pub fn get_session_dir(&self, session_id: &str) -> Result<PathBuf> {
422 let data_dir = dirs::data_local_dir()
423 .context("Could not find local data directory")?
424 .join("perspt")
425 .join("sessions")
426 .join(session_id);
427 Ok(data_dir)
428 }
429
430 pub fn create_session_dir(&self, session_id: &str) -> Result<PathBuf> {
432 let dir = self.get_session_dir(session_id)?;
433 if !dir.exists() {
434 std::fs::create_dir_all(&dir).context("Failed to create session directory")?;
435 }
436 Ok(dir)
437 }
438
439 pub fn get_energy_history(&self, session_id: &str, node_id: &str) -> Result<Vec<EnergyRecord>> {
441 let conn = self.conn.lock().unwrap();
442 let mut stmt = conn.prepare(
443 "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? AND node_id = ? ORDER BY timestamp"
444 )?;
445
446 let mut rows = stmt.query([session_id, node_id])?;
447 let mut records = Vec::new();
448
449 while let Some(row) = rows.next()? {
450 records.push(EnergyRecord {
451 node_id: row.get(0)?,
452 session_id: row.get(1)?,
453 v_syn: row.get::<_, f64>(2)? as f32,
454 v_str: row.get::<_, f64>(3)? as f32,
455 v_log: row.get::<_, f64>(4)? as f32,
456 v_boot: row.get::<_, f64>(5)? as f32,
457 v_sheaf: row.get::<_, f64>(6)? as f32,
458 v_total: row.get::<_, f64>(7)? as f32,
459 });
460 }
461
462 Ok(records)
463 }
464
465 pub fn list_recent_sessions(&self, limit: usize) -> Result<Vec<SessionRecord>> {
467 let conn = self.conn.lock().unwrap();
468 let mut stmt = conn.prepare(
469 "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status
470 FROM sessions ORDER BY created_at DESC LIMIT ?",
471 )?;
472
473 let mut rows = stmt.query([limit.to_string()])?;
474 let mut records = Vec::new();
475
476 while let Some(row) = rows.next()? {
477 let merkle_root: Option<Vec<u8>> = row.get(3).ok();
479
480 records.push(SessionRecord {
481 session_id: row.get(0)?,
482 task: row.get(1)?,
483 working_dir: row.get(2)?,
484 merkle_root,
485 detected_toolchain: row.get(4)?,
486 status: row.get(5)?,
487 });
488 }
489
490 Ok(records)
491 }
492
493 pub fn get_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
495 let conn = self.conn.lock().unwrap();
496 let mut stmt = conn.prepare(
497 "SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
498 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
499 FROM node_states WHERE session_id = ? ORDER BY created_at",
500 )?;
501
502 let mut rows = stmt.query([session_id])?;
503 let mut records = Vec::new();
504
505 while let Some(row) = rows.next()? {
506 records.push(NodeStateRecord {
507 node_id: row.get(0)?,
508 session_id: row.get(1)?,
509 state: row.get(2)?,
510 v_total: row.get::<_, f64>(3)? as f32,
511 merkle_hash: row
512 .get::<_, Option<String>>(4)?
513 .and_then(|s| hex::decode(s).ok()),
514 attempt_count: row.get(5)?,
515 node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
516 owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
517 goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
518 parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
519 children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
520 last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
521 committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
522 });
523 }
524
525 Ok(records)
526 }
527
528 pub fn update_session_status(&self, session_id: &str, status: &str) -> Result<()> {
530 self.conn.lock().unwrap().execute(
531 "UPDATE sessions SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
532 [status, session_id],
533 )?;
534 Ok(())
535 }
536
537 pub fn record_llm_request(&self, record: &LlmRequestRecord) -> Result<()> {
539 let conn = self.conn.lock().unwrap();
540 conn.execute(
541 r#"
542 INSERT INTO llm_requests (session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms)
543 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
544 "#,
545 [
546 &record.session_id,
547 &record.node_id.clone().unwrap_or_default(),
548 &record.model,
549 &record.prompt,
550 &record.response,
551 &record.tokens_in.to_string(),
552 &record.tokens_out.to_string(),
553 &record.latency_ms.to_string(),
554 ],
555 )?;
556 Ok(())
557 }
558
559 pub fn get_llm_requests(&self, session_id: &str) -> Result<Vec<LlmRequestRecord>> {
561 let conn = self.conn.lock().unwrap();
562 let mut stmt = conn.prepare(
563 "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
564 FROM llm_requests WHERE session_id = ? ORDER BY timestamp",
565 )?;
566
567 let mut rows = stmt.query([session_id])?;
568 let mut records = Vec::new();
569
570 while let Some(row) = rows.next()? {
571 let node_id: Option<String> = row.get(1)?;
572 records.push(LlmRequestRecord {
573 session_id: row.get(0)?,
574 node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
575 None
576 } else {
577 node_id
578 },
579 model: row.get(2)?,
580 prompt: row.get(3)?,
581 response: row.get(4)?,
582 tokens_in: row.get(5)?,
583 tokens_out: row.get(6)?,
584 latency_ms: row.get(7)?,
585 });
586 }
587
588 Ok(records)
589 }
590
591 pub fn count_all_llm_requests(&self) -> Result<i64> {
593 let conn = self.conn.lock().unwrap();
594 let mut stmt = conn.prepare("SELECT COUNT(*) FROM llm_requests")?;
595 let count: i64 = stmt.query_row([], |row| row.get(0))?;
596 Ok(count)
597 }
598
599 pub fn get_all_llm_requests(&self, limit: usize) -> Result<Vec<LlmRequestRecord>> {
601 let conn = self.conn.lock().unwrap();
602 let mut stmt = conn.prepare(
603 "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
604 FROM llm_requests ORDER BY timestamp DESC LIMIT ?",
605 )?;
606
607 let mut rows = stmt.query([limit as i64])?;
608 let mut records = Vec::new();
609
610 while let Some(row) = rows.next()? {
611 let node_id: Option<String> = row.get(1)?;
612 records.push(LlmRequestRecord {
613 session_id: row.get(0)?,
614 node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
615 None
616 } else {
617 node_id
618 },
619 model: row.get(2)?,
620 prompt: row.get(3)?,
621 response: row.get(4)?,
622 tokens_in: row.get(5)?,
623 tokens_out: row.get(6)?,
624 latency_ms: row.get(7)?,
625 });
626 }
627
628 Ok(records)
629 }
630
631 pub fn record_structural_digest(&self, record: &StructuralDigestRecord) -> Result<()> {
637 self.conn.lock().unwrap().execute(
638 r#"
639 INSERT INTO structural_digests (digest_id, session_id, node_id, source_path, artifact_kind, hash, version)
640 VALUES (?, ?, ?, ?, ?, ?, ?)
641 "#,
642 [
643 &record.digest_id,
644 &record.session_id,
645 &record.node_id,
646 &record.source_path,
647 &record.artifact_kind,
648 &hex::encode(&record.hash),
649 &record.version.to_string(),
650 ],
651 )?;
652 Ok(())
653 }
654
655 pub fn get_structural_digests(
657 &self,
658 session_id: &str,
659 node_id: &str,
660 ) -> Result<Vec<StructuralDigestRecord>> {
661 let conn = self.conn.lock().unwrap();
662 let mut stmt = conn.prepare(
663 "SELECT digest_id, session_id, node_id, source_path, artifact_kind, hash, version
664 FROM structural_digests WHERE session_id = ? AND node_id = ? ORDER BY created_at",
665 )?;
666
667 let mut rows = stmt.query([session_id, node_id])?;
668 let mut records = Vec::new();
669
670 while let Some(row) = rows.next()? {
671 records.push(StructuralDigestRecord {
672 digest_id: row.get(0)?,
673 session_id: row.get(1)?,
674 node_id: row.get(2)?,
675 source_path: row.get(3)?,
676 artifact_kind: row.get(4)?,
677 hash: row
678 .get::<_, String>(5)
679 .ok()
680 .and_then(|s| hex::decode(s).ok())
681 .unwrap_or_default(),
682 version: row.get(5)?,
683 });
684 }
685
686 Ok(records)
687 }
688
689 pub fn record_context_provenance(&self, record: &ContextProvenanceRecord) -> Result<()> {
691 self.conn.lock().unwrap().execute(
692 r#"
693 INSERT INTO context_provenance (session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes)
694 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
695 "#,
696 [
697 &record.session_id,
698 &record.node_id,
699 &record.context_package_id,
700 &record.structural_hashes,
701 &record.summary_hashes,
702 &record.dependency_hashes,
703 &record.included_file_count.to_string(),
704 &record.total_bytes.to_string(),
705 ],
706 )?;
707 Ok(())
708 }
709
710 pub fn get_context_provenance(
712 &self,
713 session_id: &str,
714 node_id: &str,
715 ) -> Result<Option<ContextProvenanceRecord>> {
716 let conn = self.conn.lock().unwrap();
717 let mut stmt = conn.prepare(
718 "SELECT session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes
719 FROM context_provenance WHERE session_id = ? AND node_id = ? ORDER BY created_at DESC LIMIT 1",
720 )?;
721
722 let mut rows = stmt.query([session_id, node_id])?;
723 if let Some(row) = rows.next()? {
724 Ok(Some(ContextProvenanceRecord {
725 session_id: row.get(0)?,
726 node_id: row.get(1)?,
727 context_package_id: row.get(2)?,
728 structural_hashes: row.get(3)?,
729 summary_hashes: row.get(4)?,
730 dependency_hashes: row.get(5)?,
731 included_file_count: row.get(6)?,
732 total_bytes: row.get(7)?,
733 }))
734 } else {
735 Ok(None)
736 }
737 }
738
739 pub fn record_escalation_report(&self, record: &EscalationReportRecord) -> Result<()> {
745 self.conn.lock().unwrap().execute(
746 r#"
747 INSERT INTO escalation_reports (session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids)
748 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
749 "#,
750 [
751 &record.session_id,
752 &record.node_id,
753 &record.category,
754 &record.action,
755 &record.energy_snapshot,
756 &record.stage_outcomes,
757 &record.evidence,
758 &record.affected_node_ids,
759 ],
760 )?;
761 Ok(())
762 }
763
764 pub fn get_escalation_reports(&self, session_id: &str) -> Result<Vec<EscalationReportRecord>> {
766 let conn = self.conn.lock().unwrap();
767 let mut stmt = conn.prepare(
768 "SELECT session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids
769 FROM escalation_reports WHERE session_id = ? ORDER BY created_at",
770 )?;
771 let mut rows = stmt.query([session_id])?;
772 let mut records = Vec::new();
773 while let Some(row) = rows.next()? {
774 records.push(EscalationReportRecord {
775 session_id: row.get(0)?,
776 node_id: row.get(1)?,
777 category: row.get(2)?,
778 action: row.get(3)?,
779 energy_snapshot: row.get(4)?,
780 stage_outcomes: row.get(5)?,
781 evidence: row.get(6)?,
782 affected_node_ids: row.get(7)?,
783 });
784 }
785 Ok(records)
786 }
787
788 pub fn record_rewrite(&self, record: &RewriteRecordRow) -> Result<()> {
790 self.conn.lock().unwrap().execute(
791 r#"
792 INSERT INTO rewrite_records (session_id, node_id, action, category, requeued_nodes, inserted_nodes)
793 VALUES (?, ?, ?, ?, ?, ?)
794 "#,
795 [
796 &record.session_id,
797 &record.node_id,
798 &record.action,
799 &record.category,
800 &record.requeued_nodes,
801 &record.inserted_nodes,
802 ],
803 )?;
804 Ok(())
805 }
806
807 pub fn get_rewrite_records(&self, session_id: &str) -> Result<Vec<RewriteRecordRow>> {
809 let conn = self.conn.lock().unwrap();
810 let mut stmt = conn.prepare(
811 "SELECT session_id, node_id, action, category, requeued_nodes, inserted_nodes
812 FROM rewrite_records WHERE session_id = ? ORDER BY created_at",
813 )?;
814 let mut rows = stmt.query([session_id])?;
815 let mut records = Vec::new();
816 while let Some(row) = rows.next()? {
817 records.push(RewriteRecordRow {
818 session_id: row.get(0)?,
819 node_id: row.get(1)?,
820 action: row.get(2)?,
821 category: row.get(3)?,
822 requeued_nodes: row.get(4)?,
823 inserted_nodes: row.get(5)?,
824 });
825 }
826 Ok(records)
827 }
828
829 pub fn record_sheaf_validation(&self, record: &SheafValidationRow) -> Result<()> {
831 self.conn.lock().unwrap().execute(
832 r#"
833 INSERT INTO sheaf_validations (session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets)
834 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
835 "#,
836 [
837 &record.session_id,
838 &record.node_id,
839 &record.validator_class,
840 &record.plugin_source.clone().unwrap_or_default(),
841 &record.passed.to_string(),
842 &record.evidence_summary,
843 &record.affected_files,
844 &record.v_sheaf_contribution.to_string(),
845 &record.requeue_targets,
846 ],
847 )?;
848 Ok(())
849 }
850
851 pub fn get_sheaf_validations(
853 &self,
854 session_id: &str,
855 node_id: &str,
856 ) -> Result<Vec<SheafValidationRow>> {
857 let conn = self.conn.lock().unwrap();
858 let mut stmt = conn.prepare(
859 "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
860 FROM sheaf_validations WHERE session_id = ? AND node_id = ? ORDER BY created_at",
861 )?;
862 let mut rows = stmt.query([session_id, node_id])?;
863 let mut records = Vec::new();
864 while let Some(row) = rows.next()? {
865 records.push(SheafValidationRow {
866 session_id: row.get(0)?,
867 node_id: row.get(1)?,
868 validator_class: row.get(2)?,
869 plugin_source: row.get::<_, Option<String>>(3)?,
870 passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
871 evidence_summary: row.get(5)?,
872 affected_files: row.get(6)?,
873 v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
874 requeue_targets: row.get(8)?,
875 });
876 }
877 Ok(records)
878 }
879
880 pub fn record_provisional_branch(&self, record: &ProvisionalBranchRow) -> Result<()> {
886 self.conn.lock().unwrap().execute(
887 r#"
888 INSERT INTO provisional_branches (branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir)
889 VALUES (?, ?, ?, ?, ?, ?, ?)
890 "#,
891 [
892 &record.branch_id,
893 &record.session_id,
894 &record.node_id,
895 &record.parent_node_id,
896 &record.state,
897 &record.parent_seal_hash.as_ref().map(hex::encode).unwrap_or_default(),
898 &record.sandbox_dir.clone().unwrap_or_default(),
899 ],
900 )?;
901 Ok(())
902 }
903
904 pub fn update_branch_state(&self, branch_id: &str, new_state: &str) -> Result<()> {
906 self.conn.lock().unwrap().execute(
907 "UPDATE provisional_branches SET state = ?, updated_at = CURRENT_TIMESTAMP WHERE branch_id = ?",
908 [new_state, branch_id],
909 )?;
910 Ok(())
911 }
912
913 pub fn get_provisional_branches(&self, session_id: &str) -> Result<Vec<ProvisionalBranchRow>> {
915 let conn = self.conn.lock().unwrap();
916 let mut stmt = conn.prepare(
917 "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
918 FROM provisional_branches WHERE session_id = ? ORDER BY created_at",
919 )?;
920 let mut rows = stmt.query([session_id])?;
921 let mut records = Vec::new();
922 while let Some(row) = rows.next()? {
923 let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
925 records.push(ProvisionalBranchRow {
926 branch_id: row.get(0)?,
927 session_id: row.get(1)?,
928 node_id: row.get(2)?,
929 parent_node_id: row.get(3)?,
930 state: row.get(4)?,
931 parent_seal_hash,
932 sandbox_dir: row.get::<_, Option<String>>(6)?,
933 });
934 }
935 Ok(records)
936 }
937
938 pub fn get_live_branches_for_parent(
940 &self,
941 session_id: &str,
942 parent_node_id: &str,
943 ) -> Result<Vec<ProvisionalBranchRow>> {
944 let conn = self.conn.lock().unwrap();
945 let mut stmt = conn.prepare(
946 "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
947 FROM provisional_branches
948 WHERE session_id = ? AND parent_node_id = ? AND state IN ('active', 'sealed')
949 ORDER BY created_at",
950 )?;
951 let mut rows = stmt.query([session_id, parent_node_id])?;
952 let mut records = Vec::new();
953 while let Some(row) = rows.next()? {
954 let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
956 records.push(ProvisionalBranchRow {
957 branch_id: row.get(0)?,
958 session_id: row.get(1)?,
959 node_id: row.get(2)?,
960 parent_node_id: row.get(3)?,
961 state: row.get(4)?,
962 parent_seal_hash,
963 sandbox_dir: row.get::<_, Option<String>>(6)?,
964 });
965 }
966 Ok(records)
967 }
968
969 pub fn flush_branches_for_parent(
971 &self,
972 session_id: &str,
973 parent_node_id: &str,
974 ) -> Result<Vec<String>> {
975 let live = self.get_live_branches_for_parent(session_id, parent_node_id)?;
976 let branch_ids: Vec<String> = live.iter().map(|b| b.branch_id.clone()).collect();
977 for bid in &branch_ids {
978 self.update_branch_state(bid, "flushed")?;
979 }
980 Ok(branch_ids)
981 }
982
983 pub fn record_branch_lineage(&self, record: &BranchLineageRow) -> Result<()> {
989 self.conn.lock().unwrap().execute(
990 r#"
991 INSERT INTO branch_lineage (lineage_id, parent_branch_id, child_branch_id, depends_on_seal)
992 VALUES (?, ?, ?, ?)
993 "#,
994 [
995 &record.lineage_id,
996 &record.parent_branch_id,
997 &record.child_branch_id,
998 &record.depends_on_seal.to_string(),
999 ],
1000 )?;
1001 Ok(())
1002 }
1003
1004 pub fn get_child_branches(&self, parent_branch_id: &str) -> Result<Vec<String>> {
1006 let conn = self.conn.lock().unwrap();
1007 let mut stmt =
1008 conn.prepare("SELECT child_branch_id FROM branch_lineage WHERE parent_branch_id = ?")?;
1009 let mut rows = stmt.query([parent_branch_id])?;
1010 let mut ids = Vec::new();
1011 while let Some(row) = rows.next()? {
1012 ids.push(row.get(0)?);
1013 }
1014 Ok(ids)
1015 }
1016
1017 pub fn record_interface_seal(&self, record: &InterfaceSealRow) -> Result<()> {
1023 self.conn.lock().unwrap().execute(
1024 r#"
1025 INSERT INTO interface_seals (seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version)
1026 VALUES (?, ?, ?, ?, ?, ?, ?)
1027 "#,
1028 [
1029 &record.seal_id,
1030 &record.session_id,
1031 &record.node_id,
1032 &record.sealed_path,
1033 &record.artifact_kind,
1034 &hex::encode(&record.seal_hash),
1035 &record.version.to_string(),
1036 ],
1037 )?;
1038 Ok(())
1039 }
1040
1041 pub fn get_interface_seals(
1043 &self,
1044 session_id: &str,
1045 node_id: &str,
1046 ) -> Result<Vec<InterfaceSealRow>> {
1047 let conn = self.conn.lock().unwrap();
1048 let mut stmt = conn.prepare(
1049 "SELECT seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version
1050 FROM interface_seals WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1051 )?;
1052 let mut rows = stmt.query([session_id, node_id])?;
1053 let mut records = Vec::new();
1054 while let Some(row) = rows.next()? {
1055 records.push(InterfaceSealRow {
1056 seal_id: row.get(0)?,
1057 session_id: row.get(1)?,
1058 node_id: row.get(2)?,
1059 sealed_path: row.get(3)?,
1060 artifact_kind: row.get(4)?,
1061 seal_hash: row
1062 .get::<_, String>(5)
1063 .ok()
1064 .and_then(|h| hex::decode(h).ok())
1065 .unwrap_or_default(),
1066 version: row.get::<_, i32>(6)?,
1067 });
1068 }
1069 Ok(records)
1070 }
1071
1072 pub fn has_interface_seals(&self, session_id: &str, node_id: &str) -> Result<bool> {
1074 let conn = self.conn.lock().unwrap();
1075 let count: i64 = conn.query_row(
1076 "SELECT COUNT(*) FROM interface_seals WHERE session_id = ? AND node_id = ?",
1077 [session_id, node_id],
1078 |row| row.get(0),
1079 )?;
1080 Ok(count > 0)
1081 }
1082
1083 pub fn record_branch_flush(&self, record: &BranchFlushRow) -> Result<()> {
1089 self.conn.lock().unwrap().execute(
1090 r#"
1091 INSERT INTO branch_flushes (flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason)
1092 VALUES (?, ?, ?, ?, ?, ?)
1093 "#,
1094 [
1095 &record.flush_id,
1096 &record.session_id,
1097 &record.parent_node_id,
1098 &record.flushed_branch_ids,
1099 &record.requeue_node_ids,
1100 &record.reason,
1101 ],
1102 )?;
1103 Ok(())
1104 }
1105
1106 pub fn get_branch_flushes(&self, session_id: &str) -> Result<Vec<BranchFlushRow>> {
1108 let conn = self.conn.lock().unwrap();
1109 let mut stmt = conn.prepare(
1110 "SELECT flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason
1111 FROM branch_flushes WHERE session_id = ? ORDER BY created_at",
1112 )?;
1113 let mut rows = stmt.query([session_id])?;
1114 let mut records = Vec::new();
1115 while let Some(row) = rows.next()? {
1116 records.push(BranchFlushRow {
1117 flush_id: row.get(0)?,
1118 session_id: row.get(1)?,
1119 parent_node_id: row.get(2)?,
1120 flushed_branch_ids: row.get(3)?,
1121 requeue_node_ids: row.get(4)?,
1122 reason: row.get(5)?,
1123 });
1124 }
1125 Ok(records)
1126 }
1127
1128 pub fn get_latest_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
1136 let conn = self.conn.lock().unwrap();
1137 let mut stmt = conn.prepare(
1138 "WITH ranked AS ( \
1139 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1140 FROM node_states WHERE session_id = ? \
1141 ) \
1142 SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
1143 node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
1144 FROM ranked WHERE rn = 1 ORDER BY created_at",
1145 )?;
1146
1147 let mut rows = stmt.query([session_id])?;
1148 let mut records = Vec::new();
1149
1150 while let Some(row) = rows.next()? {
1151 records.push(NodeStateRecord {
1152 node_id: row.get(0)?,
1153 session_id: row.get(1)?,
1154 state: row.get(2)?,
1155 v_total: row.get::<_, f64>(3)? as f32,
1156 merkle_hash: row
1157 .get::<_, Option<String>>(4)?
1158 .and_then(|s| hex::decode(s).ok()),
1159 attempt_count: row.get(5)?,
1160 node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1161 owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
1162 goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
1163 parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
1164 children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
1165 last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1166 committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
1167 });
1168 }
1169
1170 Ok(records)
1171 }
1172
1173 pub fn record_task_graph_edge(&self, record: &TaskGraphEdgeRow) -> Result<()> {
1175 self.conn.lock().unwrap().execute(
1176 r#"
1177 INSERT INTO task_graph_edges (session_id, parent_node_id, child_node_id, edge_type)
1178 VALUES (?, ?, ?, ?)
1179 "#,
1180 [
1181 &record.session_id,
1182 &record.parent_node_id,
1183 &record.child_node_id,
1184 &record.edge_type,
1185 ],
1186 )?;
1187 Ok(())
1188 }
1189
1190 pub fn get_task_graph_edges(&self, session_id: &str) -> Result<Vec<TaskGraphEdgeRow>> {
1192 let conn = self.conn.lock().unwrap();
1193 let mut stmt = conn.prepare(
1194 "SELECT session_id, parent_node_id, child_node_id, edge_type \
1195 FROM task_graph_edges WHERE session_id = ? ORDER BY created_at",
1196 )?;
1197 let mut rows = stmt.query([session_id])?;
1198 let mut records = Vec::new();
1199 while let Some(row) = rows.next()? {
1200 records.push(TaskGraphEdgeRow {
1201 session_id: row.get(0)?,
1202 parent_node_id: row.get(1)?,
1203 child_node_id: row.get(2)?,
1204 edge_type: row.get(3)?,
1205 });
1206 }
1207 Ok(records)
1208 }
1209
1210 pub fn get_children_of_node(
1212 &self,
1213 session_id: &str,
1214 parent_node_id: &str,
1215 ) -> Result<Vec<String>> {
1216 let conn = self.conn.lock().unwrap();
1217 let mut stmt = conn.prepare(
1218 "SELECT child_node_id FROM task_graph_edges \
1219 WHERE session_id = ? AND parent_node_id = ? ORDER BY created_at",
1220 )?;
1221 let mut rows = stmt.query([session_id, parent_node_id])?;
1222 let mut ids = Vec::new();
1223 while let Some(row) = rows.next()? {
1224 ids.push(row.get(0)?);
1225 }
1226 Ok(ids)
1227 }
1228
1229 pub fn record_review_outcome(&self, record: &ReviewOutcomeRow) -> Result<()> {
1231 let reviewer_note = record.reviewer_note.clone().unwrap_or_default();
1232 let escalation_category = record.escalation_category.clone().unwrap_or_default();
1233 self.conn.lock().unwrap().execute(
1234 r#"
1235 INSERT INTO review_outcomes (session_id, node_id, outcome, reviewer_note,
1236 energy_at_review, degraded, escalation_category)
1237 VALUES (?, ?, ?, ?, ?, ?, ?)
1238 "#,
1239 duckdb::params![
1240 record.session_id,
1241 record.node_id,
1242 record.outcome,
1243 reviewer_note,
1244 record.energy_at_review.unwrap_or(0.0),
1245 record.degraded.unwrap_or(false),
1246 escalation_category,
1247 ],
1248 )?;
1249 Ok(())
1250 }
1251
1252 pub fn get_review_outcomes(
1254 &self,
1255 session_id: &str,
1256 node_id: &str,
1257 ) -> Result<Vec<ReviewOutcomeRow>> {
1258 let conn = self.conn.lock().unwrap();
1259 let mut stmt = conn.prepare(
1260 "SELECT session_id, node_id, outcome, reviewer_note, \
1261 energy_at_review, degraded, escalation_category \
1262 FROM review_outcomes WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1263 )?;
1264 let mut rows = stmt.query([session_id, node_id])?;
1265 let mut records = Vec::new();
1266 while let Some(row) = rows.next()? {
1267 records.push(ReviewOutcomeRow {
1268 session_id: row.get(0)?,
1269 node_id: row.get(1)?,
1270 outcome: row.get(2)?,
1271 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1272 energy_at_review: row.get::<_, Option<f64>>(4)?,
1273 degraded: row.get::<_, Option<bool>>(5)?,
1274 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1275 });
1276 }
1277 Ok(records)
1278 }
1279
1280 pub fn get_latest_review_outcome(
1282 &self,
1283 session_id: &str,
1284 node_id: &str,
1285 ) -> Result<Option<ReviewOutcomeRow>> {
1286 let conn = self.conn.lock().unwrap();
1287 let mut stmt = conn.prepare(
1288 "SELECT session_id, node_id, outcome, reviewer_note, \
1289 energy_at_review, degraded, escalation_category \
1290 FROM review_outcomes WHERE session_id = ? AND node_id = ? \
1291 ORDER BY created_at DESC LIMIT 1",
1292 )?;
1293 let mut rows = stmt.query([session_id, node_id])?;
1294 if let Some(row) = rows.next()? {
1295 Ok(Some(ReviewOutcomeRow {
1296 session_id: row.get(0)?,
1297 node_id: row.get(1)?,
1298 outcome: row.get(2)?,
1299 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1300 energy_at_review: row.get::<_, Option<f64>>(4)?,
1301 degraded: row.get::<_, Option<bool>>(5)?,
1302 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1303 }))
1304 } else {
1305 Ok(None)
1306 }
1307 }
1308
1309 pub fn get_all_review_outcomes(&self, session_id: &str) -> Result<Vec<ReviewOutcomeRow>> {
1311 let conn = self.conn.lock().unwrap();
1312 let mut stmt = conn.prepare(
1313 "SELECT session_id, node_id, outcome, reviewer_note, \
1314 energy_at_review, degraded, escalation_category \
1315 FROM review_outcomes WHERE session_id = ? ORDER BY created_at",
1316 )?;
1317 let mut rows = stmt.query([session_id])?;
1318 let mut records = Vec::new();
1319 while let Some(row) = rows.next()? {
1320 records.push(ReviewOutcomeRow {
1321 session_id: row.get(0)?,
1322 node_id: row.get(1)?,
1323 outcome: row.get(2)?,
1324 reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1325 energy_at_review: row.get::<_, Option<f64>>(4)?,
1326 degraded: row.get::<_, Option<bool>>(5)?,
1327 escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1328 });
1329 }
1330 Ok(records)
1331 }
1332
1333 pub fn record_verification_result(&self, record: &VerificationResultRow) -> Result<()> {
1339 let syntax_ok = record.syntax_ok.to_string();
1340 let build_ok = record.build_ok.to_string();
1341 let tests_ok = record.tests_ok.to_string();
1342 let lint_ok = record.lint_ok.to_string();
1343 let diagnostics_count = record.diagnostics_count.to_string();
1344 let tests_passed = record.tests_passed.to_string();
1345 let tests_failed = record.tests_failed.to_string();
1346 let degraded = record.degraded.to_string();
1347 let degraded_reason = record.degraded_reason.clone().unwrap_or_default();
1348
1349 self.conn.lock().unwrap().execute(
1350 r#"
1351 INSERT INTO verification_results (session_id, node_id, result_json,
1352 syntax_ok, build_ok, tests_ok, lint_ok,
1353 diagnostics_count, tests_passed, tests_failed, degraded, degraded_reason)
1354 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1355 "#,
1356 [
1357 &record.session_id,
1358 &record.node_id,
1359 &record.result_json,
1360 &syntax_ok,
1361 &build_ok,
1362 &tests_ok,
1363 &lint_ok,
1364 &diagnostics_count,
1365 &tests_passed,
1366 &tests_failed,
1367 °raded,
1368 °raded_reason,
1369 ],
1370 )?;
1371 Ok(())
1372 }
1373
1374 pub fn get_verification_result(
1376 &self,
1377 session_id: &str,
1378 node_id: &str,
1379 ) -> Result<Option<VerificationResultRow>> {
1380 let conn = self.conn.lock().unwrap();
1381 let mut stmt = conn.prepare(
1382 "SELECT session_id, node_id, result_json, \
1383 CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1384 diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1385 FROM verification_results \
1386 WHERE session_id = ? AND node_id = ? \
1387 ORDER BY created_at DESC LIMIT 1",
1388 )?;
1389 let mut rows = stmt.query([session_id, node_id])?;
1390 if let Some(row) = rows.next()? {
1391 Ok(Some(VerificationResultRow {
1392 session_id: row.get(0)?,
1393 node_id: row.get(1)?,
1394 result_json: row.get(2)?,
1395 syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1396 build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1397 tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1398 lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1399 diagnostics_count: row.get(7)?,
1400 tests_passed: row.get(8)?,
1401 tests_failed: row.get(9)?,
1402 degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1403 degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1404 }))
1405 } else {
1406 Ok(None)
1407 }
1408 }
1409
1410 pub fn get_all_verification_results(
1412 &self,
1413 session_id: &str,
1414 ) -> Result<Vec<VerificationResultRow>> {
1415 let conn = self.conn.lock().unwrap();
1416 let mut stmt = conn.prepare(
1417 "WITH ranked AS ( \
1418 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1419 FROM verification_results WHERE session_id = ? \
1420 ) \
1421 SELECT session_id, node_id, result_json, \
1422 CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1423 diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1424 FROM ranked WHERE rn = 1 ORDER BY created_at",
1425 )?;
1426 let mut rows = stmt.query([session_id])?;
1427 let mut records = Vec::new();
1428 while let Some(row) = rows.next()? {
1429 records.push(VerificationResultRow {
1430 session_id: row.get(0)?,
1431 node_id: row.get(1)?,
1432 result_json: row.get(2)?,
1433 syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1434 build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1435 tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1436 lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1437 diagnostics_count: row.get(7)?,
1438 tests_passed: row.get(8)?,
1439 tests_failed: row.get(9)?,
1440 degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1441 degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1442 });
1443 }
1444 Ok(records)
1445 }
1446
1447 pub fn record_artifact_bundle(&self, record: &ArtifactBundleRow) -> Result<()> {
1449 let artifact_count = record.artifact_count.to_string();
1450 let command_count = record.command_count.to_string();
1451
1452 self.conn.lock().unwrap().execute(
1453 r#"
1454 INSERT INTO artifact_bundles (session_id, node_id, bundle_json,
1455 artifact_count, command_count, touched_files)
1456 VALUES (?, ?, ?, ?, ?, ?)
1457 "#,
1458 [
1459 &record.session_id,
1460 &record.node_id,
1461 &record.bundle_json,
1462 &artifact_count,
1463 &command_count,
1464 &record.touched_files,
1465 ],
1466 )?;
1467 Ok(())
1468 }
1469
1470 pub fn get_artifact_bundle(
1472 &self,
1473 session_id: &str,
1474 node_id: &str,
1475 ) -> Result<Option<ArtifactBundleRow>> {
1476 let conn = self.conn.lock().unwrap();
1477 let mut stmt = conn.prepare(
1478 "SELECT session_id, node_id, bundle_json, artifact_count, command_count, touched_files \
1479 FROM artifact_bundles \
1480 WHERE session_id = ? AND node_id = ? \
1481 ORDER BY created_at DESC LIMIT 1",
1482 )?;
1483 let mut rows = stmt.query([session_id, node_id])?;
1484 if let Some(row) = rows.next()? {
1485 Ok(Some(ArtifactBundleRow {
1486 session_id: row.get(0)?,
1487 node_id: row.get(1)?,
1488 bundle_json: row.get(2)?,
1489 artifact_count: row.get(3)?,
1490 command_count: row.get(4)?,
1491 touched_files: row.get(5)?,
1492 }))
1493 } else {
1494 Ok(None)
1495 }
1496 }
1497}
1498
1499#[cfg(test)]
1500mod tests {
1501 use super::*;
1502
1503 fn test_store() -> SessionStore {
1505 let temp_dir = std::env::temp_dir();
1506 let db_path = temp_dir.join(format!("perspt_test_{}.db", uuid::Uuid::new_v4()));
1507 SessionStore::open(&db_path).expect("Failed to create test store")
1508 }
1509
1510 fn seed_session(store: &SessionStore, session_id: &str) {
1511 let record = SessionRecord {
1512 session_id: session_id.to_string(),
1513 task: "test task".to_string(),
1514 working_dir: "/tmp/test".to_string(),
1515 merkle_root: None,
1516 detected_toolchain: None,
1517 status: "RUNNING".to_string(),
1518 };
1519 store.create_session(&record).unwrap();
1520 }
1521
1522 #[test]
1523 fn test_node_state_phase8_roundtrip() {
1524 let store = test_store();
1525 let sid = "test-sess-1";
1526 seed_session(&store, sid);
1527
1528 let record = NodeStateRecord {
1529 node_id: "node-1".to_string(),
1530 session_id: sid.to_string(),
1531 state: "Completed".to_string(),
1532 v_total: 0.42,
1533 merkle_hash: Some(vec![0xab; 32]),
1534 attempt_count: 3,
1535 node_class: Some("Interface".to_string()),
1536 owner_plugin: Some("rust".to_string()),
1537 goal: Some("Implement API".to_string()),
1538 parent_id: Some("root".to_string()),
1539 children: Some(r#"["child-a","child-b"]"#.to_string()),
1540 last_error_type: Some("CompilationError".to_string()),
1541 committed_at: Some("2025-01-01T00:00:00Z".to_string()),
1542 };
1543
1544 store.record_node_state(&record).unwrap();
1545
1546 let states = store.get_latest_node_states(sid).unwrap();
1547 assert_eq!(states.len(), 1);
1548 let r = &states[0];
1549 assert_eq!(r.node_id, "node-1");
1550 assert_eq!(r.state, "Completed");
1551 assert_eq!(r.attempt_count, 3);
1552 assert_eq!(r.node_class.as_deref(), Some("Interface"));
1553 assert_eq!(r.owner_plugin.as_deref(), Some("rust"));
1554 assert_eq!(r.goal.as_deref(), Some("Implement API"));
1555 assert_eq!(r.parent_id.as_deref(), Some("root"));
1556 assert!(r.children.is_some());
1557 assert_eq!(r.last_error_type.as_deref(), Some("CompilationError"));
1558 assert_eq!(r.committed_at.as_deref(), Some("2025-01-01T00:00:00Z"));
1559 }
1560
1561 #[test]
1562 fn test_task_graph_edge_roundtrip() {
1563 let store = test_store();
1564 let sid = "test-graph-1";
1565 seed_session(&store, sid);
1566
1567 let edge = TaskGraphEdgeRow {
1568 session_id: sid.to_string(),
1569 parent_node_id: "parent-1".to_string(),
1570 child_node_id: "child-1".to_string(),
1571 edge_type: "depends_on".to_string(),
1572 };
1573 store.record_task_graph_edge(&edge).unwrap();
1574
1575 let edges = store.get_task_graph_edges(sid).unwrap();
1576 assert_eq!(edges.len(), 1);
1577 assert_eq!(edges[0].parent_node_id, "parent-1");
1578 assert_eq!(edges[0].child_node_id, "child-1");
1579 assert_eq!(edges[0].edge_type, "depends_on");
1580 }
1581
1582 #[test]
1583 fn test_verification_result_roundtrip() {
1584 let store = test_store();
1585 let sid = "test-vr-1";
1586 seed_session(&store, sid);
1587
1588 let row = VerificationResultRow {
1589 session_id: sid.to_string(),
1590 node_id: "node-v".to_string(),
1591 result_json: r#"{"syntax_ok":true}"#.to_string(),
1592 syntax_ok: true,
1593 build_ok: true,
1594 tests_ok: false,
1595 lint_ok: true,
1596 diagnostics_count: 2,
1597 tests_passed: 5,
1598 tests_failed: 1,
1599 degraded: false,
1600 degraded_reason: None,
1601 };
1602 store.record_verification_result(&row).unwrap();
1603
1604 let got = store.get_verification_result(sid, "node-v").unwrap();
1605 assert!(got.is_some());
1606 let got = got.unwrap();
1607 assert!(got.syntax_ok);
1608 assert!(got.build_ok);
1609 assert!(!got.tests_ok);
1610 assert_eq!(got.tests_passed, 5);
1611 assert_eq!(got.tests_failed, 1);
1612 assert!(!got.degraded);
1613 }
1614
1615 #[test]
1616 fn test_verification_result_degraded() {
1617 let store = test_store();
1618 let sid = "test-vr-deg";
1619 seed_session(&store, sid);
1620
1621 let row = VerificationResultRow {
1622 session_id: sid.to_string(),
1623 node_id: "node-d".to_string(),
1624 result_json: "{}".to_string(),
1625 syntax_ok: true,
1626 build_ok: false,
1627 tests_ok: false,
1628 lint_ok: false,
1629 diagnostics_count: 0,
1630 tests_passed: 0,
1631 tests_failed: 0,
1632 degraded: true,
1633 degraded_reason: Some("LSP unavailable".to_string()),
1634 };
1635 store.record_verification_result(&row).unwrap();
1636
1637 let got = store
1638 .get_verification_result(sid, "node-d")
1639 .unwrap()
1640 .unwrap();
1641 assert!(got.degraded);
1642 assert_eq!(got.degraded_reason.as_deref(), Some("LSP unavailable"));
1643 }
1644
1645 #[test]
1646 fn test_artifact_bundle_roundtrip() {
1647 let store = test_store();
1648 let sid = "test-ab-1";
1649 seed_session(&store, sid);
1650
1651 let row = ArtifactBundleRow {
1652 session_id: sid.to_string(),
1653 node_id: "node-a".to_string(),
1654 bundle_json: r#"{"artifacts":[],"commands":[]}"#.to_string(),
1655 artifact_count: 3,
1656 command_count: 1,
1657 touched_files: r#"["src/main.rs","src/lib.rs","tests/test.rs"]"#.to_string(),
1658 };
1659 store.record_artifact_bundle(&row).unwrap();
1660
1661 let got = store.get_artifact_bundle(sid, "node-a").unwrap();
1662 assert!(got.is_some());
1663 let got = got.unwrap();
1664 assert_eq!(got.artifact_count, 3);
1665 assert_eq!(got.command_count, 1);
1666 assert!(got.touched_files.contains("main.rs"));
1667 }
1668
1669 #[test]
1670 fn test_latest_node_states_dedup() {
1671 let store = test_store();
1672 let sid = "test-dedup";
1673 seed_session(&store, sid);
1674
1675 let r1 = NodeStateRecord {
1677 node_id: "node-x".to_string(),
1678 session_id: sid.to_string(),
1679 state: "Coding".to_string(),
1680 v_total: 0.5,
1681 merkle_hash: None,
1682 attempt_count: 1,
1683 node_class: None,
1684 owner_plugin: None,
1685 goal: None,
1686 parent_id: None,
1687 children: None,
1688 last_error_type: None,
1689 committed_at: None,
1690 };
1691 store.record_node_state(&r1).unwrap();
1692
1693 let r2 = NodeStateRecord {
1694 node_id: "node-x".to_string(),
1695 session_id: sid.to_string(),
1696 state: "Completed".to_string(),
1697 v_total: 0.3,
1698 merkle_hash: None,
1699 attempt_count: 2,
1700 node_class: Some("Implementation".to_string()),
1701 owner_plugin: None,
1702 goal: Some("Updated goal".to_string()),
1703 parent_id: None,
1704 children: None,
1705 last_error_type: None,
1706 committed_at: Some("2025-01-02T00:00:00Z".to_string()),
1707 };
1708 store.record_node_state(&r2).unwrap();
1709
1710 let latest = store.get_latest_node_states(sid).unwrap();
1712 assert_eq!(latest.len(), 1);
1713 assert_eq!(latest[0].state, "Completed");
1714 assert_eq!(latest[0].attempt_count, 2);
1715 assert_eq!(latest[0].goal.as_deref(), Some("Updated goal"));
1716 }
1717
1718 #[test]
1719 fn test_backward_compat_empty_phase8_fields() {
1720 let store = test_store();
1721 let sid = "test-compat";
1722 seed_session(&store, sid);
1723
1724 let r = NodeStateRecord {
1726 node_id: "old-node".to_string(),
1727 session_id: sid.to_string(),
1728 state: "COMPLETED".to_string(),
1729 v_total: 1.0,
1730 merkle_hash: None,
1731 attempt_count: 1,
1732 node_class: None,
1733 owner_plugin: None,
1734 goal: None,
1735 parent_id: None,
1736 children: None,
1737 last_error_type: None,
1738 committed_at: None,
1739 };
1740 store.record_node_state(&r).unwrap();
1741
1742 let latest = store.get_latest_node_states(sid).unwrap();
1743 assert_eq!(latest.len(), 1);
1744 assert!(latest[0].node_class.is_none());
1745 assert!(latest[0].goal.is_none());
1746 assert!(latest[0].committed_at.is_none());
1747
1748 let vr = store.get_verification_result(sid, "old-node").unwrap();
1750 assert!(vr.is_none());
1751 let ab = store.get_artifact_bundle(sid, "old-node").unwrap();
1752 assert!(ab.is_none());
1753 }
1754
1755 #[test]
1756 fn test_review_outcome_roundtrip() {
1757 let store = test_store();
1758 let sid = "test-review";
1759 seed_session(&store, sid);
1760
1761 let row = ReviewOutcomeRow {
1762 session_id: sid.to_string(),
1763 node_id: "node-r".to_string(),
1764 outcome: "approved".to_string(),
1765 reviewer_note: Some("LGTM".to_string()),
1766 energy_at_review: None,
1767 degraded: None,
1768 escalation_category: None,
1769 };
1770 store.record_review_outcome(&row).unwrap();
1771
1772 let outcomes = store.get_review_outcomes(sid, "node-r").unwrap();
1773 assert_eq!(outcomes.len(), 1);
1774 assert_eq!(outcomes[0].outcome, "approved");
1775 assert_eq!(outcomes[0].reviewer_note.as_deref(), Some("LGTM"));
1776 }
1777
1778 #[test]
1779 fn test_review_outcome_with_audit_fields() {
1780 let store = test_store();
1781 let sid = "test-review-audit";
1782 seed_session(&store, sid);
1783
1784 let row = ReviewOutcomeRow {
1785 session_id: sid.to_string(),
1786 node_id: "node-a".to_string(),
1787 outcome: "rejected".to_string(),
1788 reviewer_note: Some("Needs rework".to_string()),
1789 energy_at_review: Some(0.42),
1790 degraded: Some(true),
1791 escalation_category: Some("complexity".to_string()),
1792 };
1793 store.record_review_outcome(&row).unwrap();
1794
1795 let outcomes = store.get_review_outcomes(sid, "node-a").unwrap();
1796 assert_eq!(outcomes.len(), 1);
1797 assert_eq!(outcomes[0].outcome, "rejected");
1798 assert_eq!(outcomes[0].energy_at_review, Some(0.42));
1799 assert_eq!(outcomes[0].degraded, Some(true));
1800 assert_eq!(
1801 outcomes[0].escalation_category.as_deref(),
1802 Some("complexity")
1803 );
1804 }
1805
1806 #[test]
1807 fn test_get_all_review_outcomes() {
1808 let store = test_store();
1809 let sid = "test-review-all";
1810 seed_session(&store, sid);
1811
1812 for (node, outcome) in &[("n1", "approved"), ("n2", "rejected"), ("n1", "approved")] {
1813 let row = ReviewOutcomeRow {
1814 session_id: sid.to_string(),
1815 node_id: node.to_string(),
1816 outcome: outcome.to_string(),
1817 reviewer_note: None,
1818 energy_at_review: None,
1819 degraded: None,
1820 escalation_category: None,
1821 };
1822 store.record_review_outcome(&row).unwrap();
1823 }
1824
1825 let all = store.get_all_review_outcomes(sid).unwrap();
1826 assert_eq!(all.len(), 3);
1827 }
1828}