Skip to main content

perspt_store/
store.rs

1//! Session Store Implementation
2//!
3//! Provides CRUD operations for SRBN sessions, node states, and energy history.
4
5use anyhow::{Context, Result};
6use duckdb::Connection;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use std::path::PathBuf;
10
11use crate::schema::init_schema;
12
13/// Record for a session
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct SessionRecord {
16    pub session_id: String,
17    pub task: String,
18    pub working_dir: String,
19    pub merkle_root: Option<Vec<u8>>,
20    pub detected_toolchain: Option<String>,
21    pub status: String,
22}
23
24/// Record for node state
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct NodeStateRecord {
27    pub node_id: String,
28    pub session_id: String,
29    pub state: String,
30    pub v_total: f32,
31    pub merkle_hash: Option<Vec<u8>>,
32    pub attempt_count: i32,
33    // PSP-5 Phase 8: Richer node snapshot for resume reconstruction
34    pub node_class: Option<String>,
35    pub owner_plugin: Option<String>,
36    pub goal: Option<String>,
37    pub parent_id: Option<String>,
38    /// JSON-serialized `Vec<String>`
39    pub children: Option<String>,
40    pub last_error_type: Option<String>,
41    pub committed_at: Option<String>,
42}
43
44/// Record for energy history
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct EnergyRecord {
47    pub node_id: String,
48    pub session_id: String,
49    pub v_syn: f32,
50    pub v_str: f32,
51    pub v_log: f32,
52    pub v_boot: f32,
53    pub v_sheaf: f32,
54    pub v_total: f32,
55}
56
57/// Record for LLM request/response logging
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct LlmRequestRecord {
60    pub session_id: String,
61    pub node_id: Option<String>,
62    pub model: String,
63    pub prompt: String,
64    pub response: String,
65    pub tokens_in: i32,
66    pub tokens_out: i32,
67    pub latency_ms: i32,
68}
69
70/// PSP-5 Phase 3: Record for structural digest persistence
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct StructuralDigestRecord {
73    pub digest_id: String,
74    pub session_id: String,
75    pub node_id: String,
76    pub source_path: String,
77    pub artifact_kind: String,
78    pub hash: Vec<u8>,
79    pub version: i32,
80}
81
82/// PSP-5 Phase 3: Record for context provenance persistence
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ContextProvenanceRecord {
85    pub session_id: String,
86    pub node_id: String,
87    pub context_package_id: String,
88    /// JSON-serialized structural digest hashes
89    pub structural_hashes: String,
90    /// JSON-serialized summary hashes
91    pub summary_hashes: String,
92    /// JSON-serialized dependency commit hashes
93    pub dependency_hashes: String,
94    pub included_file_count: i32,
95    pub total_bytes: i32,
96}
97
98/// PSP-5 Phase 5: Record for escalation report persistence
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct EscalationReportRecord {
101    pub session_id: String,
102    pub node_id: String,
103    /// Serialized EscalationCategory
104    pub category: String,
105    /// JSON-serialized RewriteAction
106    pub action: String,
107    /// JSON-serialized EnergyComponents
108    pub energy_snapshot: String,
109    /// JSON-serialized `Vec<StageOutcome>`
110    pub stage_outcomes: String,
111    /// Human-readable evidence
112    pub evidence: String,
113    /// JSON-serialized `Vec<String>`
114    pub affected_node_ids: String,
115}
116
117/// PSP-5 Phase 5: Record for local graph rewrite persistence
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct RewriteRecordRow {
120    pub session_id: String,
121    pub node_id: String,
122    /// JSON-serialized RewriteAction
123    pub action: String,
124    /// Serialized EscalationCategory
125    pub category: String,
126    /// JSON-serialized `Vec<String>`
127    pub requeued_nodes: String,
128    /// JSON-serialized `Vec<String>`
129    pub inserted_nodes: String,
130}
131
132/// PSP-5 Phase 5: Record for sheaf validation result persistence
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct SheafValidationRow {
135    pub session_id: String,
136    pub node_id: String,
137    pub validator_class: String,
138    pub plugin_source: Option<String>,
139    pub passed: bool,
140    pub evidence_summary: String,
141    /// JSON-serialized `Vec<String>`
142    pub affected_files: String,
143    pub v_sheaf_contribution: f32,
144    /// JSON-serialized `Vec<String>`
145    pub requeue_targets: String,
146}
147
148// =============================================================================
149// PSP-5 Phase 6: Provisional Branch, Interface Seal, Branch Flush Records
150// =============================================================================
151
152/// PSP-5 Phase 6: Record for provisional branch persistence
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ProvisionalBranchRow {
155    pub branch_id: String,
156    pub session_id: String,
157    pub node_id: String,
158    pub parent_node_id: String,
159    pub state: String,
160    pub parent_seal_hash: Option<Vec<u8>>,
161    pub sandbox_dir: Option<String>,
162}
163
164/// PSP-5 Phase 6: Record for branch lineage persistence
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct BranchLineageRow {
167    pub lineage_id: String,
168    pub parent_branch_id: String,
169    pub child_branch_id: String,
170    pub depends_on_seal: bool,
171}
172
173/// PSP-5 Phase 6: Record for interface seal persistence
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct InterfaceSealRow {
176    pub seal_id: String,
177    pub session_id: String,
178    pub node_id: String,
179    pub sealed_path: String,
180    pub artifact_kind: String,
181    pub seal_hash: Vec<u8>,
182    pub version: i32,
183}
184
185/// PSP-5 Phase 6: Record for branch flush decision persistence
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct BranchFlushRow {
188    pub flush_id: String,
189    pub session_id: String,
190    pub parent_node_id: String,
191    /// JSON-serialized `Vec<String>`
192    pub flushed_branch_ids: String,
193    /// JSON-serialized `Vec<String>`
194    pub requeue_node_ids: String,
195    pub reason: String,
196}
197
198// =============================================================================
199// PSP-5 Phase 8: Task Graph and Review Outcome Records
200// =============================================================================
201
202/// PSP-5 Phase 8: Record for task graph edges (DAG reconstruction on resume)
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct TaskGraphEdgeRow {
205    pub session_id: String,
206    pub parent_node_id: String,
207    pub child_node_id: String,
208    pub edge_type: String,
209}
210
211/// PSP-5 Phase 8: Record for review outcome persistence
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ReviewOutcomeRow {
214    pub session_id: String,
215    pub node_id: String,
216    /// One of: "approved", "rejected", "edit_requested", "correction_requested", "skipped"
217    pub outcome: String,
218    pub reviewer_note: Option<String>,
219    /// Energy at time of review decision
220    pub energy_at_review: Option<f64>,
221    /// Whether verification was degraded when decision was made
222    pub degraded: Option<bool>,
223    /// Escalation category if the node had been classified
224    pub escalation_category: Option<String>,
225}
226
227/// PSP-5 Phase 8: Record for verification result snapshot persistence
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct VerificationResultRow {
230    pub session_id: String,
231    pub node_id: String,
232    /// JSON-serialized VerificationResult (full data for resume reconstruction)
233    pub result_json: String,
234    // Query-friendly summary fields
235    pub syntax_ok: bool,
236    pub build_ok: bool,
237    pub tests_ok: bool,
238    pub lint_ok: bool,
239    pub diagnostics_count: i32,
240    pub tests_passed: i32,
241    pub tests_failed: i32,
242    pub degraded: bool,
243    pub degraded_reason: Option<String>,
244}
245
246/// PSP-5 Phase 8: Record for artifact bundle snapshot persistence
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ArtifactBundleRow {
249    pub session_id: String,
250    pub node_id: String,
251    /// JSON-serialized ArtifactBundle (full data for resume reconstruction)
252    pub bundle_json: String,
253    pub artifact_count: i32,
254    pub command_count: i32,
255    /// JSON-serialized `Vec<String>` of touched file paths
256    pub touched_files: String,
257}
258
259use std::sync::Mutex;
260
261/// Session store for SRBN persistence
262pub struct SessionStore {
263    conn: Mutex<Connection>,
264}
265
266impl SessionStore {
267    /// Create a new session store with default path
268    pub fn new() -> Result<Self> {
269        let db_path = Self::default_db_path()?;
270        Self::open(&db_path)
271    }
272
273    /// Open a session store at the given path
274    pub fn open(path: &PathBuf) -> Result<Self> {
275        // Ensure parent directory exists
276        if let Some(parent) = path.parent() {
277            std::fs::create_dir_all(parent)?;
278        }
279
280        let conn = Connection::open(path).context("Failed to open DuckDB")?;
281        init_schema(&conn)?;
282
283        Ok(Self {
284            conn: Mutex::new(conn),
285        })
286    }
287
288    /// Get the default database path (~/.local/share/perspt/perspt.db or similar)
289    pub fn default_db_path() -> Result<PathBuf> {
290        let data_dir = dirs::data_local_dir()
291            .context("Could not find local data directory")?
292            .join("perspt");
293        Ok(data_dir.join("perspt.db"))
294    }
295
296    /// Create a new session
297    pub fn create_session(&self, session: &SessionRecord) -> Result<()> {
298        self.conn.lock().unwrap().execute(
299            r#"
300            INSERT INTO sessions (session_id, task, working_dir, merkle_root, detected_toolchain, status)
301            VALUES (?, ?, ?, ?, ?, ?)
302            "#,
303            [
304                &session.session_id,
305                &session.task,
306                &session.working_dir,
307                &session.merkle_root.as_ref().map(hex::encode).unwrap_or_default(),
308                &session.detected_toolchain.clone().unwrap_or_default(),
309                &session.status,
310            ],
311        )?;
312        Ok(())
313    }
314
315    /// Update session merkle root
316    pub fn update_merkle_root(&self, session_id: &str, merkle_root: &[u8]) -> Result<()> {
317        self.conn.lock().unwrap().execute(
318            "UPDATE sessions SET merkle_root = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
319            [hex::encode(merkle_root), session_id.to_string()],
320        )?;
321        Ok(())
322    }
323
324    /// Record node state
325    pub fn record_node_state(&self, record: &NodeStateRecord) -> Result<()> {
326        let v_total = record.v_total.to_string();
327        let merkle_hash = record
328            .merkle_hash
329            .as_ref()
330            .map(hex::encode)
331            .unwrap_or_default();
332        let attempt_count = record.attempt_count.to_string();
333        let node_class = record.node_class.clone().unwrap_or_default();
334        let owner_plugin = record.owner_plugin.clone().unwrap_or_default();
335        let goal = record.goal.clone().unwrap_or_default();
336        let parent_id = record.parent_id.clone().unwrap_or_default();
337        let children = record.children.clone().unwrap_or_default();
338        let last_error_type = record.last_error_type.clone().unwrap_or_default();
339        let committed_at = record.committed_at.clone().unwrap_or_default();
340
341        self.conn.lock().unwrap().execute(
342            r#"
343            INSERT INTO node_states (node_id, session_id, state, v_total, merkle_hash, attempt_count,
344                                     node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at)
345            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
346            "#,
347            [
348                &record.node_id,
349                &record.session_id,
350                &record.state,
351                &v_total,
352                &merkle_hash,
353                &attempt_count,
354                &node_class,
355                &owner_plugin,
356                &goal,
357                &parent_id,
358                &children,
359                &last_error_type,
360                &committed_at,
361            ],
362        )?;
363        Ok(())
364    }
365
366    /// Record energy measurement
367    pub fn record_energy(&self, record: &EnergyRecord) -> Result<()> {
368        self.conn.lock().unwrap().execute(
369            r#"
370            INSERT INTO energy_history (node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total)
371            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
372            "#,
373            [
374                &record.node_id,
375                &record.session_id,
376                &record.v_syn.to_string(),
377                &record.v_str.to_string(),
378                &record.v_log.to_string(),
379                &record.v_boot.to_string(),
380                &record.v_sheaf.to_string(),
381                &record.v_total.to_string(),
382            ],
383        )?;
384        Ok(())
385    }
386
387    /// Calculate Merkle hash for content
388    pub fn calculate_hash(content: &[u8]) -> Vec<u8> {
389        let mut hasher = Sha256::new();
390        hasher.update(content);
391        hasher.finalize().to_vec()
392    }
393
394    /// Get session by ID
395    pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>> {
396        let conn = self.conn.lock().unwrap();
397        let mut stmt = conn.prepare(
398            "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status FROM sessions WHERE session_id = ?"
399        )?;
400
401        let mut rows = stmt.query([session_id])?;
402        if let Some(row) = rows.next()? {
403            // merkle_root is stored as BLOB; read directly as Option<Vec<u8>>
404            // to match list_recent_sessions and avoid type mismatch on Blob columns.
405            let merkle_root: Option<Vec<u8>> = row.get(3).ok();
406
407            Ok(Some(SessionRecord {
408                session_id: row.get(0)?,
409                task: row.get(1)?,
410                working_dir: row.get(2)?,
411                merkle_root,
412                detected_toolchain: row.get(4)?,
413                status: row.get(5)?,
414            }))
415        } else {
416            Ok(None)
417        }
418    }
419
420    /// Get the directory for session artifacts (`~/.local/share/perspt/sessions/<id>`)
421    pub fn get_session_dir(&self, session_id: &str) -> Result<PathBuf> {
422        let data_dir = dirs::data_local_dir()
423            .context("Could not find local data directory")?
424            .join("perspt")
425            .join("sessions")
426            .join(session_id);
427        Ok(data_dir)
428    }
429
430    /// Ensure a session directory exists and return the path
431    pub fn create_session_dir(&self, session_id: &str) -> Result<PathBuf> {
432        let dir = self.get_session_dir(session_id)?;
433        if !dir.exists() {
434            std::fs::create_dir_all(&dir).context("Failed to create session directory")?;
435        }
436        Ok(dir)
437    }
438
439    /// Get energy history for a node (query)
440    pub fn get_energy_history(&self, session_id: &str, node_id: &str) -> Result<Vec<EnergyRecord>> {
441        let conn = self.conn.lock().unwrap();
442        let mut stmt = conn.prepare(
443            "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? AND node_id = ? ORDER BY timestamp"
444        )?;
445
446        let mut rows = stmt.query([session_id, node_id])?;
447        let mut records = Vec::new();
448
449        while let Some(row) = rows.next()? {
450            records.push(EnergyRecord {
451                node_id: row.get(0)?,
452                session_id: row.get(1)?,
453                v_syn: row.get::<_, f64>(2)? as f32,
454                v_str: row.get::<_, f64>(3)? as f32,
455                v_log: row.get::<_, f64>(4)? as f32,
456                v_boot: row.get::<_, f64>(5)? as f32,
457                v_sheaf: row.get::<_, f64>(6)? as f32,
458                v_total: row.get::<_, f64>(7)? as f32,
459            });
460        }
461
462        Ok(records)
463    }
464
465    /// List recent sessions (newest first)
466    pub fn list_recent_sessions(&self, limit: usize) -> Result<Vec<SessionRecord>> {
467        let conn = self.conn.lock().unwrap();
468        let mut stmt = conn.prepare(
469            "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status
470             FROM sessions ORDER BY created_at DESC LIMIT ?",
471        )?;
472
473        let mut rows = stmt.query([limit.to_string()])?;
474        let mut records = Vec::new();
475
476        while let Some(row) = rows.next()? {
477            // merkle_root is stored as BLOB, read it directly as Option<Vec<u8>>
478            let merkle_root: Option<Vec<u8>> = row.get(3).ok();
479
480            records.push(SessionRecord {
481                session_id: row.get(0)?,
482                task: row.get(1)?,
483                working_dir: row.get(2)?,
484                merkle_root,
485                detected_toolchain: row.get(4)?,
486                status: row.get(5)?,
487            });
488        }
489
490        Ok(records)
491    }
492
493    /// Get all node states for a session
494    pub fn get_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
495        let conn = self.conn.lock().unwrap();
496        let mut stmt = conn.prepare(
497            "SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
498                    node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
499             FROM node_states WHERE session_id = ? ORDER BY created_at",
500        )?;
501
502        let mut rows = stmt.query([session_id])?;
503        let mut records = Vec::new();
504
505        while let Some(row) = rows.next()? {
506            records.push(NodeStateRecord {
507                node_id: row.get(0)?,
508                session_id: row.get(1)?,
509                state: row.get(2)?,
510                v_total: row.get::<_, f64>(3)? as f32,
511                merkle_hash: row
512                    .get::<_, Option<String>>(4)?
513                    .and_then(|s| hex::decode(s).ok()),
514                attempt_count: row.get(5)?,
515                node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
516                owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
517                goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
518                parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
519                children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
520                last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
521                committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
522            });
523        }
524
525        Ok(records)
526    }
527
528    /// Update session status
529    pub fn update_session_status(&self, session_id: &str, status: &str) -> Result<()> {
530        self.conn.lock().unwrap().execute(
531            "UPDATE sessions SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
532            [status, session_id],
533        )?;
534        Ok(())
535    }
536
537    /// Record an LLM request/response
538    pub fn record_llm_request(&self, record: &LlmRequestRecord) -> Result<()> {
539        let conn = self.conn.lock().unwrap();
540        conn.execute(
541            r#"
542            INSERT INTO llm_requests (session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms)
543            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
544            "#,
545            [
546                &record.session_id,
547                &record.node_id.clone().unwrap_or_default(),
548                &record.model,
549                &record.prompt,
550                &record.response,
551                &record.tokens_in.to_string(),
552                &record.tokens_out.to_string(),
553                &record.latency_ms.to_string(),
554            ],
555        )?;
556        Ok(())
557    }
558
559    /// Get LLM requests for a session
560    pub fn get_llm_requests(&self, session_id: &str) -> Result<Vec<LlmRequestRecord>> {
561        let conn = self.conn.lock().unwrap();
562        let mut stmt = conn.prepare(
563            "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
564             FROM llm_requests WHERE session_id = ? ORDER BY timestamp",
565        )?;
566
567        let mut rows = stmt.query([session_id])?;
568        let mut records = Vec::new();
569
570        while let Some(row) = rows.next()? {
571            let node_id: Option<String> = row.get(1)?;
572            records.push(LlmRequestRecord {
573                session_id: row.get(0)?,
574                node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
575                    None
576                } else {
577                    node_id
578                },
579                model: row.get(2)?,
580                prompt: row.get(3)?,
581                response: row.get(4)?,
582                tokens_in: row.get(5)?,
583                tokens_out: row.get(6)?,
584                latency_ms: row.get(7)?,
585            });
586        }
587
588        Ok(records)
589    }
590
591    /// Count all LLM requests in the database (for debugging)
592    pub fn count_all_llm_requests(&self) -> Result<i64> {
593        let conn = self.conn.lock().unwrap();
594        let mut stmt = conn.prepare("SELECT COUNT(*) FROM llm_requests")?;
595        let count: i64 = stmt.query_row([], |row| row.get(0))?;
596        Ok(count)
597    }
598
599    /// Get all LLM requests (for debugging)
600    pub fn get_all_llm_requests(&self, limit: usize) -> Result<Vec<LlmRequestRecord>> {
601        let conn = self.conn.lock().unwrap();
602        let mut stmt = conn.prepare(
603            "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
604             FROM llm_requests ORDER BY timestamp DESC LIMIT ?",
605        )?;
606
607        let mut rows = stmt.query([limit as i64])?;
608        let mut records = Vec::new();
609
610        while let Some(row) = rows.next()? {
611            let node_id: Option<String> = row.get(1)?;
612            records.push(LlmRequestRecord {
613                session_id: row.get(0)?,
614                node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
615                    None
616                } else {
617                    node_id
618                },
619                model: row.get(2)?,
620                prompt: row.get(3)?,
621                response: row.get(4)?,
622                tokens_in: row.get(5)?,
623                tokens_out: row.get(6)?,
624                latency_ms: row.get(7)?,
625            });
626        }
627
628        Ok(records)
629    }
630
631    // =========================================================================
632    // PSP-5 Phase 3: Structural Digest & Context Provenance Persistence
633    // =========================================================================
634
635    /// Record a structural digest
636    pub fn record_structural_digest(&self, record: &StructuralDigestRecord) -> Result<()> {
637        self.conn.lock().unwrap().execute(
638            r#"
639            INSERT INTO structural_digests (digest_id, session_id, node_id, source_path, artifact_kind, hash, version)
640            VALUES (?, ?, ?, ?, ?, ?, ?)
641            "#,
642            [
643                &record.digest_id,
644                &record.session_id,
645                &record.node_id,
646                &record.source_path,
647                &record.artifact_kind,
648                &hex::encode(&record.hash),
649                &record.version.to_string(),
650            ],
651        )?;
652        Ok(())
653    }
654
655    /// Get structural digests for a session and node
656    pub fn get_structural_digests(
657        &self,
658        session_id: &str,
659        node_id: &str,
660    ) -> Result<Vec<StructuralDigestRecord>> {
661        let conn = self.conn.lock().unwrap();
662        let mut stmt = conn.prepare(
663            "SELECT digest_id, session_id, node_id, source_path, artifact_kind, hash, version
664             FROM structural_digests WHERE session_id = ? AND node_id = ? ORDER BY created_at",
665        )?;
666
667        let mut rows = stmt.query([session_id, node_id])?;
668        let mut records = Vec::new();
669
670        while let Some(row) = rows.next()? {
671            records.push(StructuralDigestRecord {
672                digest_id: row.get(0)?,
673                session_id: row.get(1)?,
674                node_id: row.get(2)?,
675                source_path: row.get(3)?,
676                artifact_kind: row.get(4)?,
677                hash: row
678                    .get::<_, String>(5)
679                    .ok()
680                    .and_then(|s| hex::decode(s).ok())
681                    .unwrap_or_default(),
682                version: row.get(5)?,
683            });
684        }
685
686        Ok(records)
687    }
688
689    /// Record context provenance for a node
690    pub fn record_context_provenance(&self, record: &ContextProvenanceRecord) -> Result<()> {
691        self.conn.lock().unwrap().execute(
692            r#"
693            INSERT INTO context_provenance (session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes)
694            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
695            "#,
696            [
697                &record.session_id,
698                &record.node_id,
699                &record.context_package_id,
700                &record.structural_hashes,
701                &record.summary_hashes,
702                &record.dependency_hashes,
703                &record.included_file_count.to_string(),
704                &record.total_bytes.to_string(),
705            ],
706        )?;
707        Ok(())
708    }
709
710    /// Get context provenance for a session and node
711    pub fn get_context_provenance(
712        &self,
713        session_id: &str,
714        node_id: &str,
715    ) -> Result<Option<ContextProvenanceRecord>> {
716        let conn = self.conn.lock().unwrap();
717        let mut stmt = conn.prepare(
718            "SELECT session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes
719             FROM context_provenance WHERE session_id = ? AND node_id = ? ORDER BY created_at DESC LIMIT 1",
720        )?;
721
722        let mut rows = stmt.query([session_id, node_id])?;
723        if let Some(row) = rows.next()? {
724            Ok(Some(ContextProvenanceRecord {
725                session_id: row.get(0)?,
726                node_id: row.get(1)?,
727                context_package_id: row.get(2)?,
728                structural_hashes: row.get(3)?,
729                summary_hashes: row.get(4)?,
730                dependency_hashes: row.get(5)?,
731                included_file_count: row.get(6)?,
732                total_bytes: row.get(7)?,
733            }))
734        } else {
735            Ok(None)
736        }
737    }
738
739    // =========================================================================
740    // PSP-5 Phase 5: Escalation, Rewrite, and Sheaf Validation Persistence
741    // =========================================================================
742
743    /// Record an escalation report
744    pub fn record_escalation_report(&self, record: &EscalationReportRecord) -> Result<()> {
745        self.conn.lock().unwrap().execute(
746            r#"
747            INSERT INTO escalation_reports (session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids)
748            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
749            "#,
750            [
751                &record.session_id,
752                &record.node_id,
753                &record.category,
754                &record.action,
755                &record.energy_snapshot,
756                &record.stage_outcomes,
757                &record.evidence,
758                &record.affected_node_ids,
759            ],
760        )?;
761        Ok(())
762    }
763
764    /// Get escalation reports for a session
765    pub fn get_escalation_reports(&self, session_id: &str) -> Result<Vec<EscalationReportRecord>> {
766        let conn = self.conn.lock().unwrap();
767        let mut stmt = conn.prepare(
768            "SELECT session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids
769             FROM escalation_reports WHERE session_id = ? ORDER BY created_at",
770        )?;
771        let mut rows = stmt.query([session_id])?;
772        let mut records = Vec::new();
773        while let Some(row) = rows.next()? {
774            records.push(EscalationReportRecord {
775                session_id: row.get(0)?,
776                node_id: row.get(1)?,
777                category: row.get(2)?,
778                action: row.get(3)?,
779                energy_snapshot: row.get(4)?,
780                stage_outcomes: row.get(5)?,
781                evidence: row.get(6)?,
782                affected_node_ids: row.get(7)?,
783            });
784        }
785        Ok(records)
786    }
787
788    /// Record a local graph rewrite
789    pub fn record_rewrite(&self, record: &RewriteRecordRow) -> Result<()> {
790        self.conn.lock().unwrap().execute(
791            r#"
792            INSERT INTO rewrite_records (session_id, node_id, action, category, requeued_nodes, inserted_nodes)
793            VALUES (?, ?, ?, ?, ?, ?)
794            "#,
795            [
796                &record.session_id,
797                &record.node_id,
798                &record.action,
799                &record.category,
800                &record.requeued_nodes,
801                &record.inserted_nodes,
802            ],
803        )?;
804        Ok(())
805    }
806
807    /// Get rewrite records for a session
808    pub fn get_rewrite_records(&self, session_id: &str) -> Result<Vec<RewriteRecordRow>> {
809        let conn = self.conn.lock().unwrap();
810        let mut stmt = conn.prepare(
811            "SELECT session_id, node_id, action, category, requeued_nodes, inserted_nodes
812             FROM rewrite_records WHERE session_id = ? ORDER BY created_at",
813        )?;
814        let mut rows = stmt.query([session_id])?;
815        let mut records = Vec::new();
816        while let Some(row) = rows.next()? {
817            records.push(RewriteRecordRow {
818                session_id: row.get(0)?,
819                node_id: row.get(1)?,
820                action: row.get(2)?,
821                category: row.get(3)?,
822                requeued_nodes: row.get(4)?,
823                inserted_nodes: row.get(5)?,
824            });
825        }
826        Ok(records)
827    }
828
829    /// Record a sheaf validation result
830    pub fn record_sheaf_validation(&self, record: &SheafValidationRow) -> Result<()> {
831        self.conn.lock().unwrap().execute(
832            r#"
833            INSERT INTO sheaf_validations (session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets)
834            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
835            "#,
836            [
837                &record.session_id,
838                &record.node_id,
839                &record.validator_class,
840                &record.plugin_source.clone().unwrap_or_default(),
841                &record.passed.to_string(),
842                &record.evidence_summary,
843                &record.affected_files,
844                &record.v_sheaf_contribution.to_string(),
845                &record.requeue_targets,
846            ],
847        )?;
848        Ok(())
849    }
850
851    /// Get sheaf validation results for a session and node
852    pub fn get_sheaf_validations(
853        &self,
854        session_id: &str,
855        node_id: &str,
856    ) -> Result<Vec<SheafValidationRow>> {
857        let conn = self.conn.lock().unwrap();
858        let mut stmt = conn.prepare(
859            "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
860             FROM sheaf_validations WHERE session_id = ? AND node_id = ? ORDER BY created_at",
861        )?;
862        let mut rows = stmt.query([session_id, node_id])?;
863        let mut records = Vec::new();
864        while let Some(row) = rows.next()? {
865            records.push(SheafValidationRow {
866                session_id: row.get(0)?,
867                node_id: row.get(1)?,
868                validator_class: row.get(2)?,
869                plugin_source: row.get::<_, Option<String>>(3)?,
870                passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
871                evidence_summary: row.get(5)?,
872                affected_files: row.get(6)?,
873                v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
874                requeue_targets: row.get(8)?,
875            });
876        }
877        Ok(records)
878    }
879
880    // =========================================================================
881    // PSP-5 Phase 6: Provisional Branch CRUD
882    // =========================================================================
883
884    /// Record a new provisional branch
885    pub fn record_provisional_branch(&self, record: &ProvisionalBranchRow) -> Result<()> {
886        self.conn.lock().unwrap().execute(
887            r#"
888            INSERT INTO provisional_branches (branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir)
889            VALUES (?, ?, ?, ?, ?, ?, ?)
890            "#,
891            [
892                &record.branch_id,
893                &record.session_id,
894                &record.node_id,
895                &record.parent_node_id,
896                &record.state,
897                &record.parent_seal_hash.as_ref().map(hex::encode).unwrap_or_default(),
898                &record.sandbox_dir.clone().unwrap_or_default(),
899            ],
900        )?;
901        Ok(())
902    }
903
904    /// Update a provisional branch state
905    pub fn update_branch_state(&self, branch_id: &str, new_state: &str) -> Result<()> {
906        self.conn.lock().unwrap().execute(
907            "UPDATE provisional_branches SET state = ?, updated_at = CURRENT_TIMESTAMP WHERE branch_id = ?",
908            [new_state, branch_id],
909        )?;
910        Ok(())
911    }
912
913    /// Get all provisional branches for a session
914    pub fn get_provisional_branches(&self, session_id: &str) -> Result<Vec<ProvisionalBranchRow>> {
915        let conn = self.conn.lock().unwrap();
916        let mut stmt = conn.prepare(
917            "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
918             FROM provisional_branches WHERE session_id = ? ORDER BY created_at",
919        )?;
920        let mut rows = stmt.query([session_id])?;
921        let mut records = Vec::new();
922        while let Some(row) = rows.next()? {
923            // parent_seal_hash is BLOB; read directly as Option<Vec<u8>>
924            let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
925            records.push(ProvisionalBranchRow {
926                branch_id: row.get(0)?,
927                session_id: row.get(1)?,
928                node_id: row.get(2)?,
929                parent_node_id: row.get(3)?,
930                state: row.get(4)?,
931                parent_seal_hash,
932                sandbox_dir: row.get::<_, Option<String>>(6)?,
933            });
934        }
935        Ok(records)
936    }
937
938    /// Get live (active/sealed) provisional branches depending on a parent node
939    pub fn get_live_branches_for_parent(
940        &self,
941        session_id: &str,
942        parent_node_id: &str,
943    ) -> Result<Vec<ProvisionalBranchRow>> {
944        let conn = self.conn.lock().unwrap();
945        let mut stmt = conn.prepare(
946            "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
947             FROM provisional_branches
948             WHERE session_id = ? AND parent_node_id = ? AND state IN ('active', 'sealed')
949             ORDER BY created_at",
950        )?;
951        let mut rows = stmt.query([session_id, parent_node_id])?;
952        let mut records = Vec::new();
953        while let Some(row) = rows.next()? {
954            // parent_seal_hash is BLOB; read directly as Option<Vec<u8>>
955            let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
956            records.push(ProvisionalBranchRow {
957                branch_id: row.get(0)?,
958                session_id: row.get(1)?,
959                node_id: row.get(2)?,
960                parent_node_id: row.get(3)?,
961                state: row.get(4)?,
962                parent_seal_hash,
963                sandbox_dir: row.get::<_, Option<String>>(6)?,
964            });
965        }
966        Ok(records)
967    }
968
969    /// Mark all live branches for a parent as flushed
970    pub fn flush_branches_for_parent(
971        &self,
972        session_id: &str,
973        parent_node_id: &str,
974    ) -> Result<Vec<String>> {
975        let live = self.get_live_branches_for_parent(session_id, parent_node_id)?;
976        let branch_ids: Vec<String> = live.iter().map(|b| b.branch_id.clone()).collect();
977        for bid in &branch_ids {
978            self.update_branch_state(bid, "flushed")?;
979        }
980        Ok(branch_ids)
981    }
982
983    // =========================================================================
984    // PSP-5 Phase 6: Branch Lineage CRUD
985    // =========================================================================
986
987    /// Record a branch lineage edge
988    pub fn record_branch_lineage(&self, record: &BranchLineageRow) -> Result<()> {
989        self.conn.lock().unwrap().execute(
990            r#"
991            INSERT INTO branch_lineage (lineage_id, parent_branch_id, child_branch_id, depends_on_seal)
992            VALUES (?, ?, ?, ?)
993            "#,
994            [
995                &record.lineage_id,
996                &record.parent_branch_id,
997                &record.child_branch_id,
998                &record.depends_on_seal.to_string(),
999            ],
1000        )?;
1001        Ok(())
1002    }
1003
1004    /// Get child branch IDs for a parent branch
1005    pub fn get_child_branches(&self, parent_branch_id: &str) -> Result<Vec<String>> {
1006        let conn = self.conn.lock().unwrap();
1007        let mut stmt =
1008            conn.prepare("SELECT child_branch_id FROM branch_lineage WHERE parent_branch_id = ?")?;
1009        let mut rows = stmt.query([parent_branch_id])?;
1010        let mut ids = Vec::new();
1011        while let Some(row) = rows.next()? {
1012            ids.push(row.get(0)?);
1013        }
1014        Ok(ids)
1015    }
1016
1017    // =========================================================================
1018    // PSP-5 Phase 6: Interface Seal CRUD
1019    // =========================================================================
1020
1021    /// Record an interface seal
1022    pub fn record_interface_seal(&self, record: &InterfaceSealRow) -> Result<()> {
1023        self.conn.lock().unwrap().execute(
1024            r#"
1025            INSERT INTO interface_seals (seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version)
1026            VALUES (?, ?, ?, ?, ?, ?, ?)
1027            "#,
1028            [
1029                &record.seal_id,
1030                &record.session_id,
1031                &record.node_id,
1032                &record.sealed_path,
1033                &record.artifact_kind,
1034                &hex::encode(&record.seal_hash),
1035                &record.version.to_string(),
1036            ],
1037        )?;
1038        Ok(())
1039    }
1040
1041    /// Get all interface seals for a node
1042    pub fn get_interface_seals(
1043        &self,
1044        session_id: &str,
1045        node_id: &str,
1046    ) -> Result<Vec<InterfaceSealRow>> {
1047        let conn = self.conn.lock().unwrap();
1048        let mut stmt = conn.prepare(
1049            "SELECT seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version
1050             FROM interface_seals WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1051        )?;
1052        let mut rows = stmt.query([session_id, node_id])?;
1053        let mut records = Vec::new();
1054        while let Some(row) = rows.next()? {
1055            records.push(InterfaceSealRow {
1056                seal_id: row.get(0)?,
1057                session_id: row.get(1)?,
1058                node_id: row.get(2)?,
1059                sealed_path: row.get(3)?,
1060                artifact_kind: row.get(4)?,
1061                seal_hash: row
1062                    .get::<_, String>(5)
1063                    .ok()
1064                    .and_then(|h| hex::decode(h).ok())
1065                    .unwrap_or_default(),
1066                version: row.get::<_, i32>(6)?,
1067            });
1068        }
1069        Ok(records)
1070    }
1071
1072    /// Check whether a node has any interface seals
1073    pub fn has_interface_seals(&self, session_id: &str, node_id: &str) -> Result<bool> {
1074        let conn = self.conn.lock().unwrap();
1075        let count: i64 = conn.query_row(
1076            "SELECT COUNT(*) FROM interface_seals WHERE session_id = ? AND node_id = ?",
1077            [session_id, node_id],
1078            |row| row.get(0),
1079        )?;
1080        Ok(count > 0)
1081    }
1082
1083    // =========================================================================
1084    // PSP-5 Phase 6: Branch Flush CRUD
1085    // =========================================================================
1086
1087    /// Record a branch flush decision
1088    pub fn record_branch_flush(&self, record: &BranchFlushRow) -> Result<()> {
1089        self.conn.lock().unwrap().execute(
1090            r#"
1091            INSERT INTO branch_flushes (flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason)
1092            VALUES (?, ?, ?, ?, ?, ?)
1093            "#,
1094            [
1095                &record.flush_id,
1096                &record.session_id,
1097                &record.parent_node_id,
1098                &record.flushed_branch_ids,
1099                &record.requeue_node_ids,
1100                &record.reason,
1101            ],
1102        )?;
1103        Ok(())
1104    }
1105
1106    /// Get all branch flush records for a session
1107    pub fn get_branch_flushes(&self, session_id: &str) -> Result<Vec<BranchFlushRow>> {
1108        let conn = self.conn.lock().unwrap();
1109        let mut stmt = conn.prepare(
1110            "SELECT flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason
1111             FROM branch_flushes WHERE session_id = ? ORDER BY created_at",
1112        )?;
1113        let mut rows = stmt.query([session_id])?;
1114        let mut records = Vec::new();
1115        while let Some(row) = rows.next()? {
1116            records.push(BranchFlushRow {
1117                flush_id: row.get(0)?,
1118                session_id: row.get(1)?,
1119                parent_node_id: row.get(2)?,
1120                flushed_branch_ids: row.get(3)?,
1121                requeue_node_ids: row.get(4)?,
1122                reason: row.get(5)?,
1123            });
1124        }
1125        Ok(records)
1126    }
1127
1128    // =========================================================================
1129    // PSP-5 Phase 8: Node Snapshot, Task Graph, and Review Outcome Persistence
1130    // =========================================================================
1131
1132    /// Get the latest node state snapshot per node for a session (for resume reconstruction).
1133    ///
1134    /// Returns at most one record per node_id, picking the most recently created row.
1135    pub fn get_latest_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
1136        let conn = self.conn.lock().unwrap();
1137        let mut stmt = conn.prepare(
1138            "WITH ranked AS ( \
1139                 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1140                 FROM node_states WHERE session_id = ? \
1141             ) \
1142             SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
1143                    node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
1144             FROM ranked WHERE rn = 1 ORDER BY created_at",
1145        )?;
1146
1147        let mut rows = stmt.query([session_id])?;
1148        let mut records = Vec::new();
1149
1150        while let Some(row) = rows.next()? {
1151            records.push(NodeStateRecord {
1152                node_id: row.get(0)?,
1153                session_id: row.get(1)?,
1154                state: row.get(2)?,
1155                v_total: row.get::<_, f64>(3)? as f32,
1156                merkle_hash: row
1157                    .get::<_, Option<String>>(4)?
1158                    .and_then(|s| hex::decode(s).ok()),
1159                attempt_count: row.get(5)?,
1160                node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1161                owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
1162                goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
1163                parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
1164                children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
1165                last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1166                committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
1167            });
1168        }
1169
1170        Ok(records)
1171    }
1172
1173    /// Record a task graph edge (parent→child dependency)
1174    pub fn record_task_graph_edge(&self, record: &TaskGraphEdgeRow) -> Result<()> {
1175        self.conn.lock().unwrap().execute(
1176            r#"
1177            INSERT INTO task_graph_edges (session_id, parent_node_id, child_node_id, edge_type)
1178            VALUES (?, ?, ?, ?)
1179            "#,
1180            [
1181                &record.session_id,
1182                &record.parent_node_id,
1183                &record.child_node_id,
1184                &record.edge_type,
1185            ],
1186        )?;
1187        Ok(())
1188    }
1189
1190    /// Get all task graph edges for a session
1191    pub fn get_task_graph_edges(&self, session_id: &str) -> Result<Vec<TaskGraphEdgeRow>> {
1192        let conn = self.conn.lock().unwrap();
1193        let mut stmt = conn.prepare(
1194            "SELECT session_id, parent_node_id, child_node_id, edge_type \
1195             FROM task_graph_edges WHERE session_id = ? ORDER BY created_at",
1196        )?;
1197        let mut rows = stmt.query([session_id])?;
1198        let mut records = Vec::new();
1199        while let Some(row) = rows.next()? {
1200            records.push(TaskGraphEdgeRow {
1201                session_id: row.get(0)?,
1202                parent_node_id: row.get(1)?,
1203                child_node_id: row.get(2)?,
1204                edge_type: row.get(3)?,
1205            });
1206        }
1207        Ok(records)
1208    }
1209
1210    /// Get child node IDs for a parent in a session's task graph
1211    pub fn get_children_of_node(
1212        &self,
1213        session_id: &str,
1214        parent_node_id: &str,
1215    ) -> Result<Vec<String>> {
1216        let conn = self.conn.lock().unwrap();
1217        let mut stmt = conn.prepare(
1218            "SELECT child_node_id FROM task_graph_edges \
1219             WHERE session_id = ? AND parent_node_id = ? ORDER BY created_at",
1220        )?;
1221        let mut rows = stmt.query([session_id, parent_node_id])?;
1222        let mut ids = Vec::new();
1223        while let Some(row) = rows.next()? {
1224            ids.push(row.get(0)?);
1225        }
1226        Ok(ids)
1227    }
1228
1229    /// Record a review outcome (approval, rejection, edit request)
1230    pub fn record_review_outcome(&self, record: &ReviewOutcomeRow) -> Result<()> {
1231        let reviewer_note = record.reviewer_note.clone().unwrap_or_default();
1232        let escalation_category = record.escalation_category.clone().unwrap_or_default();
1233        self.conn.lock().unwrap().execute(
1234            r#"
1235            INSERT INTO review_outcomes (session_id, node_id, outcome, reviewer_note,
1236                                         energy_at_review, degraded, escalation_category)
1237            VALUES (?, ?, ?, ?, ?, ?, ?)
1238            "#,
1239            duckdb::params![
1240                record.session_id,
1241                record.node_id,
1242                record.outcome,
1243                reviewer_note,
1244                record.energy_at_review.unwrap_or(0.0),
1245                record.degraded.unwrap_or(false),
1246                escalation_category,
1247            ],
1248        )?;
1249        Ok(())
1250    }
1251
1252    /// Get all review outcomes for a node
1253    pub fn get_review_outcomes(
1254        &self,
1255        session_id: &str,
1256        node_id: &str,
1257    ) -> Result<Vec<ReviewOutcomeRow>> {
1258        let conn = self.conn.lock().unwrap();
1259        let mut stmt = conn.prepare(
1260            "SELECT session_id, node_id, outcome, reviewer_note, \
1261             energy_at_review, degraded, escalation_category \
1262             FROM review_outcomes WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1263        )?;
1264        let mut rows = stmt.query([session_id, node_id])?;
1265        let mut records = Vec::new();
1266        while let Some(row) = rows.next()? {
1267            records.push(ReviewOutcomeRow {
1268                session_id: row.get(0)?,
1269                node_id: row.get(1)?,
1270                outcome: row.get(2)?,
1271                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1272                energy_at_review: row.get::<_, Option<f64>>(4)?,
1273                degraded: row.get::<_, Option<bool>>(5)?,
1274                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1275            });
1276        }
1277        Ok(records)
1278    }
1279
1280    /// Get the most recent review outcome for a node
1281    pub fn get_latest_review_outcome(
1282        &self,
1283        session_id: &str,
1284        node_id: &str,
1285    ) -> Result<Option<ReviewOutcomeRow>> {
1286        let conn = self.conn.lock().unwrap();
1287        let mut stmt = conn.prepare(
1288            "SELECT session_id, node_id, outcome, reviewer_note, \
1289             energy_at_review, degraded, escalation_category \
1290             FROM review_outcomes WHERE session_id = ? AND node_id = ? \
1291             ORDER BY created_at DESC LIMIT 1",
1292        )?;
1293        let mut rows = stmt.query([session_id, node_id])?;
1294        if let Some(row) = rows.next()? {
1295            Ok(Some(ReviewOutcomeRow {
1296                session_id: row.get(0)?,
1297                node_id: row.get(1)?,
1298                outcome: row.get(2)?,
1299                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1300                energy_at_review: row.get::<_, Option<f64>>(4)?,
1301                degraded: row.get::<_, Option<bool>>(5)?,
1302                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1303            }))
1304        } else {
1305            Ok(None)
1306        }
1307    }
1308
1309    /// Get all review outcomes for a session (across all nodes).
1310    pub fn get_all_review_outcomes(&self, session_id: &str) -> Result<Vec<ReviewOutcomeRow>> {
1311        let conn = self.conn.lock().unwrap();
1312        let mut stmt = conn.prepare(
1313            "SELECT session_id, node_id, outcome, reviewer_note, \
1314             energy_at_review, degraded, escalation_category \
1315             FROM review_outcomes WHERE session_id = ? ORDER BY created_at",
1316        )?;
1317        let mut rows = stmt.query([session_id])?;
1318        let mut records = Vec::new();
1319        while let Some(row) = rows.next()? {
1320            records.push(ReviewOutcomeRow {
1321                session_id: row.get(0)?,
1322                node_id: row.get(1)?,
1323                outcome: row.get(2)?,
1324                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1325                energy_at_review: row.get::<_, Option<f64>>(4)?,
1326                degraded: row.get::<_, Option<bool>>(5)?,
1327                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1328            });
1329        }
1330        Ok(records)
1331    }
1332
1333    // =========================================================================
1334    // PSP-5 Phase 8: Verification Result and Artifact Bundle Persistence
1335    // =========================================================================
1336
1337    /// Record a verification result snapshot for a node
1338    pub fn record_verification_result(&self, record: &VerificationResultRow) -> Result<()> {
1339        let syntax_ok = record.syntax_ok.to_string();
1340        let build_ok = record.build_ok.to_string();
1341        let tests_ok = record.tests_ok.to_string();
1342        let lint_ok = record.lint_ok.to_string();
1343        let diagnostics_count = record.diagnostics_count.to_string();
1344        let tests_passed = record.tests_passed.to_string();
1345        let tests_failed = record.tests_failed.to_string();
1346        let degraded = record.degraded.to_string();
1347        let degraded_reason = record.degraded_reason.clone().unwrap_or_default();
1348
1349        self.conn.lock().unwrap().execute(
1350            r#"
1351            INSERT INTO verification_results (session_id, node_id, result_json,
1352                syntax_ok, build_ok, tests_ok, lint_ok,
1353                diagnostics_count, tests_passed, tests_failed, degraded, degraded_reason)
1354            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1355            "#,
1356            [
1357                &record.session_id,
1358                &record.node_id,
1359                &record.result_json,
1360                &syntax_ok,
1361                &build_ok,
1362                &tests_ok,
1363                &lint_ok,
1364                &diagnostics_count,
1365                &tests_passed,
1366                &tests_failed,
1367                &degraded,
1368                &degraded_reason,
1369            ],
1370        )?;
1371        Ok(())
1372    }
1373
1374    /// Get the latest verification result for a node
1375    pub fn get_verification_result(
1376        &self,
1377        session_id: &str,
1378        node_id: &str,
1379    ) -> Result<Option<VerificationResultRow>> {
1380        let conn = self.conn.lock().unwrap();
1381        let mut stmt = conn.prepare(
1382            "SELECT session_id, node_id, result_json, \
1383                    CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1384                    diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1385             FROM verification_results \
1386             WHERE session_id = ? AND node_id = ? \
1387             ORDER BY created_at DESC LIMIT 1",
1388        )?;
1389        let mut rows = stmt.query([session_id, node_id])?;
1390        if let Some(row) = rows.next()? {
1391            Ok(Some(VerificationResultRow {
1392                session_id: row.get(0)?,
1393                node_id: row.get(1)?,
1394                result_json: row.get(2)?,
1395                syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1396                build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1397                tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1398                lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1399                diagnostics_count: row.get(7)?,
1400                tests_passed: row.get(8)?,
1401                tests_failed: row.get(9)?,
1402                degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1403                degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1404            }))
1405        } else {
1406            Ok(None)
1407        }
1408    }
1409
1410    /// Get all verification results for a session (for status display)
1411    pub fn get_all_verification_results(
1412        &self,
1413        session_id: &str,
1414    ) -> Result<Vec<VerificationResultRow>> {
1415        let conn = self.conn.lock().unwrap();
1416        let mut stmt = conn.prepare(
1417            "WITH ranked AS ( \
1418                 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1419                 FROM verification_results WHERE session_id = ? \
1420             ) \
1421             SELECT session_id, node_id, result_json, \
1422                    CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1423                    diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1424             FROM ranked WHERE rn = 1 ORDER BY created_at",
1425        )?;
1426        let mut rows = stmt.query([session_id])?;
1427        let mut records = Vec::new();
1428        while let Some(row) = rows.next()? {
1429            records.push(VerificationResultRow {
1430                session_id: row.get(0)?,
1431                node_id: row.get(1)?,
1432                result_json: row.get(2)?,
1433                syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1434                build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1435                tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1436                lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1437                diagnostics_count: row.get(7)?,
1438                tests_passed: row.get(8)?,
1439                tests_failed: row.get(9)?,
1440                degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1441                degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1442            });
1443        }
1444        Ok(records)
1445    }
1446
1447    /// Record an artifact bundle snapshot for a node
1448    pub fn record_artifact_bundle(&self, record: &ArtifactBundleRow) -> Result<()> {
1449        let artifact_count = record.artifact_count.to_string();
1450        let command_count = record.command_count.to_string();
1451
1452        self.conn.lock().unwrap().execute(
1453            r#"
1454            INSERT INTO artifact_bundles (session_id, node_id, bundle_json,
1455                artifact_count, command_count, touched_files)
1456            VALUES (?, ?, ?, ?, ?, ?)
1457            "#,
1458            [
1459                &record.session_id,
1460                &record.node_id,
1461                &record.bundle_json,
1462                &artifact_count,
1463                &command_count,
1464                &record.touched_files,
1465            ],
1466        )?;
1467        Ok(())
1468    }
1469
1470    /// Get the latest artifact bundle for a node
1471    pub fn get_artifact_bundle(
1472        &self,
1473        session_id: &str,
1474        node_id: &str,
1475    ) -> Result<Option<ArtifactBundleRow>> {
1476        let conn = self.conn.lock().unwrap();
1477        let mut stmt = conn.prepare(
1478            "SELECT session_id, node_id, bundle_json, artifact_count, command_count, touched_files \
1479             FROM artifact_bundles \
1480             WHERE session_id = ? AND node_id = ? \
1481             ORDER BY created_at DESC LIMIT 1",
1482        )?;
1483        let mut rows = stmt.query([session_id, node_id])?;
1484        if let Some(row) = rows.next()? {
1485            Ok(Some(ArtifactBundleRow {
1486                session_id: row.get(0)?,
1487                node_id: row.get(1)?,
1488                bundle_json: row.get(2)?,
1489                artifact_count: row.get(3)?,
1490                command_count: row.get(4)?,
1491                touched_files: row.get(5)?,
1492            }))
1493        } else {
1494            Ok(None)
1495        }
1496    }
1497}
1498
1499#[cfg(test)]
1500mod tests {
1501    use super::*;
1502
1503    /// Create an in-memory store for testing
1504    fn test_store() -> SessionStore {
1505        let temp_dir = std::env::temp_dir();
1506        let db_path = temp_dir.join(format!("perspt_test_{}.db", uuid::Uuid::new_v4()));
1507        SessionStore::open(&db_path).expect("Failed to create test store")
1508    }
1509
1510    fn seed_session(store: &SessionStore, session_id: &str) {
1511        let record = SessionRecord {
1512            session_id: session_id.to_string(),
1513            task: "test task".to_string(),
1514            working_dir: "/tmp/test".to_string(),
1515            merkle_root: None,
1516            detected_toolchain: None,
1517            status: "RUNNING".to_string(),
1518        };
1519        store.create_session(&record).unwrap();
1520    }
1521
1522    #[test]
1523    fn test_node_state_phase8_roundtrip() {
1524        let store = test_store();
1525        let sid = "test-sess-1";
1526        seed_session(&store, sid);
1527
1528        let record = NodeStateRecord {
1529            node_id: "node-1".to_string(),
1530            session_id: sid.to_string(),
1531            state: "Completed".to_string(),
1532            v_total: 0.42,
1533            merkle_hash: Some(vec![0xab; 32]),
1534            attempt_count: 3,
1535            node_class: Some("Interface".to_string()),
1536            owner_plugin: Some("rust".to_string()),
1537            goal: Some("Implement API".to_string()),
1538            parent_id: Some("root".to_string()),
1539            children: Some(r#"["child-a","child-b"]"#.to_string()),
1540            last_error_type: Some("CompilationError".to_string()),
1541            committed_at: Some("2025-01-01T00:00:00Z".to_string()),
1542        };
1543
1544        store.record_node_state(&record).unwrap();
1545
1546        let states = store.get_latest_node_states(sid).unwrap();
1547        assert_eq!(states.len(), 1);
1548        let r = &states[0];
1549        assert_eq!(r.node_id, "node-1");
1550        assert_eq!(r.state, "Completed");
1551        assert_eq!(r.attempt_count, 3);
1552        assert_eq!(r.node_class.as_deref(), Some("Interface"));
1553        assert_eq!(r.owner_plugin.as_deref(), Some("rust"));
1554        assert_eq!(r.goal.as_deref(), Some("Implement API"));
1555        assert_eq!(r.parent_id.as_deref(), Some("root"));
1556        assert!(r.children.is_some());
1557        assert_eq!(r.last_error_type.as_deref(), Some("CompilationError"));
1558        assert_eq!(r.committed_at.as_deref(), Some("2025-01-01T00:00:00Z"));
1559    }
1560
1561    #[test]
1562    fn test_task_graph_edge_roundtrip() {
1563        let store = test_store();
1564        let sid = "test-graph-1";
1565        seed_session(&store, sid);
1566
1567        let edge = TaskGraphEdgeRow {
1568            session_id: sid.to_string(),
1569            parent_node_id: "parent-1".to_string(),
1570            child_node_id: "child-1".to_string(),
1571            edge_type: "depends_on".to_string(),
1572        };
1573        store.record_task_graph_edge(&edge).unwrap();
1574
1575        let edges = store.get_task_graph_edges(sid).unwrap();
1576        assert_eq!(edges.len(), 1);
1577        assert_eq!(edges[0].parent_node_id, "parent-1");
1578        assert_eq!(edges[0].child_node_id, "child-1");
1579        assert_eq!(edges[0].edge_type, "depends_on");
1580    }
1581
1582    #[test]
1583    fn test_verification_result_roundtrip() {
1584        let store = test_store();
1585        let sid = "test-vr-1";
1586        seed_session(&store, sid);
1587
1588        let row = VerificationResultRow {
1589            session_id: sid.to_string(),
1590            node_id: "node-v".to_string(),
1591            result_json: r#"{"syntax_ok":true}"#.to_string(),
1592            syntax_ok: true,
1593            build_ok: true,
1594            tests_ok: false,
1595            lint_ok: true,
1596            diagnostics_count: 2,
1597            tests_passed: 5,
1598            tests_failed: 1,
1599            degraded: false,
1600            degraded_reason: None,
1601        };
1602        store.record_verification_result(&row).unwrap();
1603
1604        let got = store.get_verification_result(sid, "node-v").unwrap();
1605        assert!(got.is_some());
1606        let got = got.unwrap();
1607        assert!(got.syntax_ok);
1608        assert!(got.build_ok);
1609        assert!(!got.tests_ok);
1610        assert_eq!(got.tests_passed, 5);
1611        assert_eq!(got.tests_failed, 1);
1612        assert!(!got.degraded);
1613    }
1614
1615    #[test]
1616    fn test_verification_result_degraded() {
1617        let store = test_store();
1618        let sid = "test-vr-deg";
1619        seed_session(&store, sid);
1620
1621        let row = VerificationResultRow {
1622            session_id: sid.to_string(),
1623            node_id: "node-d".to_string(),
1624            result_json: "{}".to_string(),
1625            syntax_ok: true,
1626            build_ok: false,
1627            tests_ok: false,
1628            lint_ok: false,
1629            diagnostics_count: 0,
1630            tests_passed: 0,
1631            tests_failed: 0,
1632            degraded: true,
1633            degraded_reason: Some("LSP unavailable".to_string()),
1634        };
1635        store.record_verification_result(&row).unwrap();
1636
1637        let got = store
1638            .get_verification_result(sid, "node-d")
1639            .unwrap()
1640            .unwrap();
1641        assert!(got.degraded);
1642        assert_eq!(got.degraded_reason.as_deref(), Some("LSP unavailable"));
1643    }
1644
1645    #[test]
1646    fn test_artifact_bundle_roundtrip() {
1647        let store = test_store();
1648        let sid = "test-ab-1";
1649        seed_session(&store, sid);
1650
1651        let row = ArtifactBundleRow {
1652            session_id: sid.to_string(),
1653            node_id: "node-a".to_string(),
1654            bundle_json: r#"{"artifacts":[],"commands":[]}"#.to_string(),
1655            artifact_count: 3,
1656            command_count: 1,
1657            touched_files: r#"["src/main.rs","src/lib.rs","tests/test.rs"]"#.to_string(),
1658        };
1659        store.record_artifact_bundle(&row).unwrap();
1660
1661        let got = store.get_artifact_bundle(sid, "node-a").unwrap();
1662        assert!(got.is_some());
1663        let got = got.unwrap();
1664        assert_eq!(got.artifact_count, 3);
1665        assert_eq!(got.command_count, 1);
1666        assert!(got.touched_files.contains("main.rs"));
1667    }
1668
1669    #[test]
1670    fn test_latest_node_states_dedup() {
1671        let store = test_store();
1672        let sid = "test-dedup";
1673        seed_session(&store, sid);
1674
1675        // Insert two states for the same node
1676        let r1 = NodeStateRecord {
1677            node_id: "node-x".to_string(),
1678            session_id: sid.to_string(),
1679            state: "Coding".to_string(),
1680            v_total: 0.5,
1681            merkle_hash: None,
1682            attempt_count: 1,
1683            node_class: None,
1684            owner_plugin: None,
1685            goal: None,
1686            parent_id: None,
1687            children: None,
1688            last_error_type: None,
1689            committed_at: None,
1690        };
1691        store.record_node_state(&r1).unwrap();
1692
1693        let r2 = NodeStateRecord {
1694            node_id: "node-x".to_string(),
1695            session_id: sid.to_string(),
1696            state: "Completed".to_string(),
1697            v_total: 0.3,
1698            merkle_hash: None,
1699            attempt_count: 2,
1700            node_class: Some("Implementation".to_string()),
1701            owner_plugin: None,
1702            goal: Some("Updated goal".to_string()),
1703            parent_id: None,
1704            children: None,
1705            last_error_type: None,
1706            committed_at: Some("2025-01-02T00:00:00Z".to_string()),
1707        };
1708        store.record_node_state(&r2).unwrap();
1709
1710        // get_latest should return only the last entry
1711        let latest = store.get_latest_node_states(sid).unwrap();
1712        assert_eq!(latest.len(), 1);
1713        assert_eq!(latest[0].state, "Completed");
1714        assert_eq!(latest[0].attempt_count, 2);
1715        assert_eq!(latest[0].goal.as_deref(), Some("Updated goal"));
1716    }
1717
1718    #[test]
1719    fn test_backward_compat_empty_phase8_fields() {
1720        let store = test_store();
1721        let sid = "test-compat";
1722        seed_session(&store, sid);
1723
1724        // Insert a node with all Phase 8 fields as None (pre-Phase-8 session)
1725        let r = NodeStateRecord {
1726            node_id: "old-node".to_string(),
1727            session_id: sid.to_string(),
1728            state: "COMPLETED".to_string(),
1729            v_total: 1.0,
1730            merkle_hash: None,
1731            attempt_count: 1,
1732            node_class: None,
1733            owner_plugin: None,
1734            goal: None,
1735            parent_id: None,
1736            children: None,
1737            last_error_type: None,
1738            committed_at: None,
1739        };
1740        store.record_node_state(&r).unwrap();
1741
1742        let latest = store.get_latest_node_states(sid).unwrap();
1743        assert_eq!(latest.len(), 1);
1744        assert!(latest[0].node_class.is_none());
1745        assert!(latest[0].goal.is_none());
1746        assert!(latest[0].committed_at.is_none());
1747
1748        // Verification and artifact lookups should return None
1749        let vr = store.get_verification_result(sid, "old-node").unwrap();
1750        assert!(vr.is_none());
1751        let ab = store.get_artifact_bundle(sid, "old-node").unwrap();
1752        assert!(ab.is_none());
1753    }
1754
1755    #[test]
1756    fn test_review_outcome_roundtrip() {
1757        let store = test_store();
1758        let sid = "test-review";
1759        seed_session(&store, sid);
1760
1761        let row = ReviewOutcomeRow {
1762            session_id: sid.to_string(),
1763            node_id: "node-r".to_string(),
1764            outcome: "approved".to_string(),
1765            reviewer_note: Some("LGTM".to_string()),
1766            energy_at_review: None,
1767            degraded: None,
1768            escalation_category: None,
1769        };
1770        store.record_review_outcome(&row).unwrap();
1771
1772        let outcomes = store.get_review_outcomes(sid, "node-r").unwrap();
1773        assert_eq!(outcomes.len(), 1);
1774        assert_eq!(outcomes[0].outcome, "approved");
1775        assert_eq!(outcomes[0].reviewer_note.as_deref(), Some("LGTM"));
1776    }
1777
1778    #[test]
1779    fn test_review_outcome_with_audit_fields() {
1780        let store = test_store();
1781        let sid = "test-review-audit";
1782        seed_session(&store, sid);
1783
1784        let row = ReviewOutcomeRow {
1785            session_id: sid.to_string(),
1786            node_id: "node-a".to_string(),
1787            outcome: "rejected".to_string(),
1788            reviewer_note: Some("Needs rework".to_string()),
1789            energy_at_review: Some(0.42),
1790            degraded: Some(true),
1791            escalation_category: Some("complexity".to_string()),
1792        };
1793        store.record_review_outcome(&row).unwrap();
1794
1795        let outcomes = store.get_review_outcomes(sid, "node-a").unwrap();
1796        assert_eq!(outcomes.len(), 1);
1797        assert_eq!(outcomes[0].outcome, "rejected");
1798        assert_eq!(outcomes[0].energy_at_review, Some(0.42));
1799        assert_eq!(outcomes[0].degraded, Some(true));
1800        assert_eq!(
1801            outcomes[0].escalation_category.as_deref(),
1802            Some("complexity")
1803        );
1804    }
1805
1806    #[test]
1807    fn test_get_all_review_outcomes() {
1808        let store = test_store();
1809        let sid = "test-review-all";
1810        seed_session(&store, sid);
1811
1812        for (node, outcome) in &[("n1", "approved"), ("n2", "rejected"), ("n1", "approved")] {
1813            let row = ReviewOutcomeRow {
1814                session_id: sid.to_string(),
1815                node_id: node.to_string(),
1816                outcome: outcome.to_string(),
1817                reviewer_note: None,
1818                energy_at_review: None,
1819                degraded: None,
1820                escalation_category: None,
1821            };
1822            store.record_review_outcome(&row).unwrap();
1823        }
1824
1825        let all = store.get_all_review_outcomes(sid).unwrap();
1826        assert_eq!(all.len(), 3);
1827    }
1828}