Skip to main content

perspt_store/
store.rs

1//! Session Store Implementation
2//!
3//! Provides CRUD operations for SRBN sessions, node states, and energy history.
4
5use anyhow::{Context, Result};
6use duckdb::Connection;
7use serde::{Deserialize, Serialize};
8use std::path::PathBuf;
9
10use crate::schema::init_schema;
11
12/// Record for a session
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct SessionRecord {
15    pub session_id: String,
16    pub task: String,
17    pub working_dir: String,
18    pub merkle_root: Option<Vec<u8>>,
19    pub detected_toolchain: Option<String>,
20    pub status: String,
21}
22
23/// Record for node state
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct NodeStateRecord {
26    pub node_id: String,
27    pub session_id: String,
28    pub state: String,
29    pub v_total: f32,
30    pub merkle_hash: Option<Vec<u8>>,
31    pub attempt_count: i32,
32    // PSP-5 Phase 8: Richer node snapshot for resume reconstruction
33    pub node_class: Option<String>,
34    pub owner_plugin: Option<String>,
35    pub goal: Option<String>,
36    pub parent_id: Option<String>,
37    /// JSON-serialized `Vec<String>`
38    pub children: Option<String>,
39    pub last_error_type: Option<String>,
40    pub committed_at: Option<String>,
41}
42
43/// Record for energy history
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct EnergyRecord {
46    pub node_id: String,
47    pub session_id: String,
48    pub v_syn: f32,
49    pub v_str: f32,
50    pub v_log: f32,
51    pub v_boot: f32,
52    pub v_sheaf: f32,
53    pub v_total: f32,
54}
55
56/// Record for LLM request/response logging
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct LlmRequestRecord {
59    pub session_id: String,
60    pub node_id: Option<String>,
61    pub model: String,
62    pub prompt: String,
63    pub response: String,
64    pub tokens_in: i32,
65    pub tokens_out: i32,
66    pub latency_ms: i32,
67}
68
69/// PSP-5 Phase 3: Record for structural digest persistence
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct StructuralDigestRecord {
72    pub digest_id: String,
73    pub session_id: String,
74    pub node_id: String,
75    pub source_path: String,
76    pub artifact_kind: String,
77    pub hash: Vec<u8>,
78    pub version: i32,
79}
80
81/// PSP-5 Phase 3: Record for context provenance persistence
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ContextProvenanceRecord {
84    pub session_id: String,
85    pub node_id: String,
86    pub context_package_id: String,
87    /// JSON-serialized structural digest hashes
88    pub structural_hashes: String,
89    /// JSON-serialized summary hashes
90    pub summary_hashes: String,
91    /// JSON-serialized dependency commit hashes
92    pub dependency_hashes: String,
93    pub included_file_count: i32,
94    pub total_bytes: i32,
95}
96
97/// PSP-5 Phase 5: Record for escalation report persistence
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct EscalationReportRecord {
100    pub session_id: String,
101    pub node_id: String,
102    /// Serialized EscalationCategory
103    pub category: String,
104    /// JSON-serialized RewriteAction
105    pub action: String,
106    /// JSON-serialized EnergyComponents
107    pub energy_snapshot: String,
108    /// JSON-serialized `Vec<StageOutcome>`
109    pub stage_outcomes: String,
110    /// Human-readable evidence
111    pub evidence: String,
112    /// JSON-serialized `Vec<String>`
113    pub affected_node_ids: String,
114}
115
116/// PSP-5 Phase 5: Record for local graph rewrite persistence
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct RewriteRecordRow {
119    pub session_id: String,
120    pub node_id: String,
121    /// JSON-serialized RewriteAction
122    pub action: String,
123    /// Serialized EscalationCategory
124    pub category: String,
125    /// JSON-serialized `Vec<String>`
126    pub requeued_nodes: String,
127    /// JSON-serialized `Vec<String>`
128    pub inserted_nodes: String,
129}
130
131/// PSP-5 Phase 5: Record for sheaf validation result persistence
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct SheafValidationRow {
134    pub session_id: String,
135    pub node_id: String,
136    pub validator_class: String,
137    pub plugin_source: Option<String>,
138    pub passed: bool,
139    pub evidence_summary: String,
140    /// JSON-serialized `Vec<String>`
141    pub affected_files: String,
142    pub v_sheaf_contribution: f32,
143    /// JSON-serialized `Vec<String>`
144    pub requeue_targets: String,
145}
146
147// =============================================================================
148// PSP-5 Phase 6: Provisional Branch, Interface Seal, Branch Flush Records
149// =============================================================================
150
151/// PSP-5 Phase 6: Record for provisional branch persistence
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct ProvisionalBranchRow {
154    pub branch_id: String,
155    pub session_id: String,
156    pub node_id: String,
157    pub parent_node_id: String,
158    pub state: String,
159    pub parent_seal_hash: Option<Vec<u8>>,
160    pub sandbox_dir: Option<String>,
161}
162
163/// PSP-5 Phase 6: Record for branch lineage persistence
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct BranchLineageRow {
166    pub lineage_id: String,
167    pub parent_branch_id: String,
168    pub child_branch_id: String,
169    pub depends_on_seal: bool,
170}
171
172/// PSP-5 Phase 6: Record for interface seal persistence
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct InterfaceSealRow {
175    pub seal_id: String,
176    pub session_id: String,
177    pub node_id: String,
178    pub sealed_path: String,
179    pub artifact_kind: String,
180    pub seal_hash: Vec<u8>,
181    pub version: i32,
182}
183
184/// PSP-5 Phase 6: Record for branch flush decision persistence
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct BranchFlushRow {
187    pub flush_id: String,
188    pub session_id: String,
189    pub parent_node_id: String,
190    /// JSON-serialized `Vec<String>`
191    pub flushed_branch_ids: String,
192    /// JSON-serialized `Vec<String>`
193    pub requeue_node_ids: String,
194    pub reason: String,
195}
196
197// =============================================================================
198// PSP-5 Phase 8: Task Graph and Review Outcome Records
199// =============================================================================
200
201/// PSP-5 Phase 8: Record for task graph edges (DAG reconstruction on resume)
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct TaskGraphEdgeRow {
204    pub session_id: String,
205    pub parent_node_id: String,
206    pub child_node_id: String,
207    pub edge_type: String,
208}
209
210/// PSP-5 Phase 8: Record for review outcome persistence
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct ReviewOutcomeRow {
213    pub session_id: String,
214    pub node_id: String,
215    /// One of: "approved", "rejected", "edit_requested", "correction_requested", "skipped"
216    pub outcome: String,
217    pub reviewer_note: Option<String>,
218    /// Energy at time of review decision
219    pub energy_at_review: Option<f64>,
220    /// Whether verification was degraded when decision was made
221    pub degraded: Option<bool>,
222    /// Escalation category if the node had been classified
223    pub escalation_category: Option<String>,
224}
225
226/// PSP-5 Phase 8: Record for verification result snapshot persistence
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct VerificationResultRow {
229    pub session_id: String,
230    pub node_id: String,
231    /// JSON-serialized VerificationResult (full data for resume reconstruction)
232    pub result_json: String,
233    // Query-friendly summary fields
234    pub syntax_ok: bool,
235    pub build_ok: bool,
236    pub tests_ok: bool,
237    pub lint_ok: bool,
238    pub diagnostics_count: i32,
239    pub tests_passed: i32,
240    pub tests_failed: i32,
241    pub degraded: bool,
242    pub degraded_reason: Option<String>,
243}
244
245/// PSP-5 Phase 8: Record for artifact bundle snapshot persistence
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct ArtifactBundleRow {
248    pub session_id: String,
249    pub node_id: String,
250    /// JSON-serialized ArtifactBundle (full data for resume reconstruction)
251    pub bundle_json: String,
252    pub artifact_count: i32,
253    pub command_count: i32,
254    /// JSON-serialized `Vec<String>` of touched file paths
255    pub touched_files: String,
256}
257
258// =========================================================================
259// Plan Revision, Feature Charter, and Repair Footprint Row Types
260// =========================================================================
261
262/// Row type for feature_charters table
263#[derive(Debug, Clone)]
264pub struct FeatureCharterRow {
265    pub charter_id: String,
266    pub session_id: String,
267    pub scope_description: String,
268    pub max_modules: Option<i32>,
269    pub max_files: Option<i32>,
270    pub max_revisions: Option<i32>,
271    pub language_constraint: Option<String>,
272}
273
274/// Row type for plan_revisions table
275#[derive(Debug, Clone)]
276pub struct PlanRevisionRow {
277    pub revision_id: String,
278    pub session_id: String,
279    pub sequence: i32,
280    pub plan_json: String,
281    pub reason: String,
282    pub supersedes: Option<String>,
283    pub status: String,
284}
285
286/// Row type for repair_footprints table
287#[derive(Debug, Clone)]
288pub struct RepairFootprintRow {
289    pub footprint_id: String,
290    pub session_id: String,
291    pub node_id: String,
292    pub revision_id: String,
293    pub attempt: i32,
294    pub affected_files: String,
295    pub bundle_json: String,
296    pub diagnosis: String,
297    pub resolved: bool,
298}
299
300/// Row type for budget_envelopes table
301#[derive(Debug, Clone)]
302pub struct BudgetEnvelopeRow {
303    pub session_id: String,
304    pub max_steps: Option<i32>,
305    pub steps_used: i32,
306    pub max_revisions: Option<i32>,
307    pub revisions_used: i32,
308    pub max_cost_usd: Option<f64>,
309    pub cost_used_usd: f64,
310}
311
312use std::sync::Mutex;
313
314/// Session store for SRBN persistence
315pub struct SessionStore {
316    conn: Mutex<Connection>,
317}
318
319impl SessionStore {
320    /// Create a new session store with default path
321    pub fn new() -> Result<Self> {
322        let db_path = Self::default_db_path()?;
323        Self::open(&db_path)
324    }
325
326    /// Open a session store at the given path
327    pub fn open(path: &PathBuf) -> Result<Self> {
328        // Ensure parent directory exists
329        if let Some(parent) = path.parent() {
330            std::fs::create_dir_all(parent)?;
331        }
332
333        let conn = Connection::open(path).context("Failed to open DuckDB")?;
334        init_schema(&conn)?;
335
336        Ok(Self {
337            conn: Mutex::new(conn),
338        })
339    }
340
341    /// Open a session store in read-only mode for concurrent dashboard reads.
342    ///
343    /// Uses `AccessMode::ReadOnly` so the dashboard can read alongside the
344    /// agent's write lock. Does **not** call `init_schema()` (a write op).
345    /// The database file must already exist.
346    pub fn open_read_only(path: &std::path::Path) -> Result<Self> {
347        let config = duckdb::Config::default()
348            .access_mode(duckdb::AccessMode::ReadOnly)
349            .context("Failed to configure DuckDB read-only mode")?;
350        let conn = Connection::open_with_flags(path, config)
351            .context("Failed to open DuckDB in read-only mode")?;
352        Ok(Self {
353            conn: Mutex::new(conn),
354        })
355    }
356
357    /// Get the default database path (~/.local/share/perspt/perspt.db or similar)
358    pub fn default_db_path() -> Result<PathBuf> {
359        perspt_core::paths::database_path().context("Could not determine platform data directory")
360    }
361
362    /// Create a new session
363    pub fn create_session(&self, session: &SessionRecord) -> Result<()> {
364        self.conn.lock().unwrap().execute(
365            r#"
366            INSERT INTO sessions (session_id, task, working_dir, merkle_root, detected_toolchain, status)
367            VALUES (?, ?, ?, ?, ?, ?)
368            "#,
369            [
370                &session.session_id,
371                &session.task,
372                &session.working_dir,
373                &session.merkle_root.as_ref().map(hex::encode).unwrap_or_default(),
374                &session.detected_toolchain.clone().unwrap_or_default(),
375                &session.status,
376            ],
377        )?;
378        Ok(())
379    }
380
381    /// Update session merkle root
382    pub fn update_merkle_root(&self, session_id: &str, merkle_root: &[u8]) -> Result<()> {
383        self.conn.lock().unwrap().execute(
384            "UPDATE sessions SET merkle_root = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
385            [hex::encode(merkle_root), session_id.to_string()],
386        )?;
387        Ok(())
388    }
389
390    /// Record node state
391    pub fn record_node_state(&self, record: &NodeStateRecord) -> Result<()> {
392        let v_total = record.v_total.to_string();
393        let merkle_hash = record
394            .merkle_hash
395            .as_ref()
396            .map(hex::encode)
397            .unwrap_or_default();
398        let attempt_count = record.attempt_count.to_string();
399        let node_class = record.node_class.clone().unwrap_or_default();
400        let owner_plugin = record.owner_plugin.clone().unwrap_or_default();
401        let goal = record.goal.clone().unwrap_or_default();
402        let parent_id = record.parent_id.clone().unwrap_or_default();
403        let children = record.children.clone().unwrap_or_default();
404        let last_error_type = record.last_error_type.clone().unwrap_or_default();
405        let committed_at = record.committed_at.clone().unwrap_or_default();
406
407        self.conn.lock().unwrap().execute(
408            r#"
409            INSERT INTO node_states (node_id, session_id, state, v_total, merkle_hash, attempt_count,
410                                     node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at)
411            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
412            "#,
413            [
414                &record.node_id,
415                &record.session_id,
416                &record.state,
417                &v_total,
418                &merkle_hash,
419                &attempt_count,
420                &node_class,
421                &owner_plugin,
422                &goal,
423                &parent_id,
424                &children,
425                &last_error_type,
426                &committed_at,
427            ],
428        )?;
429        Ok(())
430    }
431
432    /// Record energy measurement
433    pub fn record_energy(&self, record: &EnergyRecord) -> Result<()> {
434        self.conn.lock().unwrap().execute(
435            r#"
436            INSERT INTO energy_history (node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total)
437            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
438            "#,
439            [
440                &record.node_id,
441                &record.session_id,
442                &record.v_syn.to_string(),
443                &record.v_str.to_string(),
444                &record.v_log.to_string(),
445                &record.v_boot.to_string(),
446                &record.v_sheaf.to_string(),
447                &record.v_total.to_string(),
448            ],
449        )?;
450        Ok(())
451    }
452
453    /// Get session by ID
454    pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>> {
455        let conn = self.conn.lock().unwrap();
456        let mut stmt = conn.prepare(
457            "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status FROM sessions WHERE session_id = ?"
458        )?;
459
460        let mut rows = stmt.query([session_id])?;
461        if let Some(row) = rows.next()? {
462            // merkle_root is stored as BLOB; read directly as Option<Vec<u8>>
463            // to match list_recent_sessions and avoid type mismatch on Blob columns.
464            let merkle_root: Option<Vec<u8>> = row.get(3).ok();
465
466            Ok(Some(SessionRecord {
467                session_id: row.get(0)?,
468                task: row.get(1)?,
469                working_dir: row.get(2)?,
470                merkle_root,
471                detected_toolchain: row.get(4)?,
472                status: row.get(5)?,
473            }))
474        } else {
475            Ok(None)
476        }
477    }
478
479    /// Get the directory for session artifacts (`~/.local/share/perspt/sessions/<id>`)
480    pub fn get_session_dir(&self, session_id: &str) -> Result<PathBuf> {
481        let data_dir = dirs::data_local_dir()
482            .context("Could not find local data directory")?
483            .join("perspt")
484            .join("sessions")
485            .join(session_id);
486        Ok(data_dir)
487    }
488
489    /// Ensure a session directory exists and return the path
490    pub fn create_session_dir(&self, session_id: &str) -> Result<PathBuf> {
491        let dir = self.get_session_dir(session_id)?;
492        if !dir.exists() {
493            std::fs::create_dir_all(&dir).context("Failed to create session directory")?;
494        }
495        Ok(dir)
496    }
497
498    /// Get energy history for a node (query)
499    pub fn get_energy_history(&self, session_id: &str, node_id: &str) -> Result<Vec<EnergyRecord>> {
500        let conn = self.conn.lock().unwrap();
501        let mut stmt = conn.prepare(
502            "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? AND node_id = ? ORDER BY timestamp"
503        )?;
504
505        let mut rows = stmt.query([session_id, node_id])?;
506        let mut records = Vec::new();
507
508        while let Some(row) = rows.next()? {
509            records.push(EnergyRecord {
510                node_id: row.get(0)?,
511                session_id: row.get(1)?,
512                v_syn: row.get::<_, f64>(2)? as f32,
513                v_str: row.get::<_, f64>(3)? as f32,
514                v_log: row.get::<_, f64>(4)? as f32,
515                v_boot: row.get::<_, f64>(5)? as f32,
516                v_sheaf: row.get::<_, f64>(6)? as f32,
517                v_total: row.get::<_, f64>(7)? as f32,
518            });
519        }
520
521        Ok(records)
522    }
523
524    /// Get all energy history for a session (all nodes)
525    pub fn get_session_energy_history(&self, session_id: &str) -> Result<Vec<EnergyRecord>> {
526        let conn = self.conn.lock().unwrap();
527        let mut stmt = conn.prepare(
528            "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? ORDER BY timestamp"
529        )?;
530
531        let mut rows = stmt.query([session_id])?;
532        let mut records = Vec::new();
533
534        while let Some(row) = rows.next()? {
535            records.push(EnergyRecord {
536                node_id: row.get(0)?,
537                session_id: row.get(1)?,
538                v_syn: row.get::<_, f64>(2)? as f32,
539                v_str: row.get::<_, f64>(3)? as f32,
540                v_log: row.get::<_, f64>(4)? as f32,
541                v_boot: row.get::<_, f64>(5)? as f32,
542                v_sheaf: row.get::<_, f64>(6)? as f32,
543                v_total: row.get::<_, f64>(7)? as f32,
544            });
545        }
546
547        Ok(records)
548    }
549
550    /// List recent sessions (newest first)
551    pub fn list_recent_sessions(&self, limit: usize) -> Result<Vec<SessionRecord>> {
552        self.list_sessions_paginated(limit, 0)
553    }
554
555    /// List sessions with pagination (most recent first).
556    pub fn list_sessions_paginated(
557        &self,
558        limit: usize,
559        offset: usize,
560    ) -> Result<Vec<SessionRecord>> {
561        let conn = self.conn.lock().unwrap();
562        let mut stmt = conn.prepare(
563            "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status
564             FROM sessions ORDER BY created_at DESC LIMIT ? OFFSET ?",
565        )?;
566
567        let mut rows = stmt.query([limit.to_string(), offset.to_string()])?;
568        let mut records = Vec::new();
569
570        while let Some(row) = rows.next()? {
571            let merkle_root: Option<Vec<u8>> = row.get(3).ok();
572
573            records.push(SessionRecord {
574                session_id: row.get(0)?,
575                task: row.get(1)?,
576                working_dir: row.get(2)?,
577                merkle_root,
578                detected_toolchain: row.get(4)?,
579                status: row.get(5)?,
580            });
581        }
582
583        Ok(records)
584    }
585
586    /// Count total number of sessions.
587    pub fn count_sessions(&self) -> Result<usize> {
588        let conn = self.conn.lock().unwrap();
589        let mut stmt = conn.prepare("SELECT COUNT(*) FROM sessions")?;
590        let mut rows = stmt.query([])?;
591        if let Some(row) = rows.next()? {
592            let count: i64 = row.get(0)?;
593            Ok(count as usize)
594        } else {
595            Ok(0)
596        }
597    }
598
599    /// Get all node states for a session
600    pub fn get_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
601        let conn = self.conn.lock().unwrap();
602        let mut stmt = conn.prepare(
603            "SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
604                    node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
605             FROM node_states WHERE session_id = ? ORDER BY created_at",
606        )?;
607
608        let mut rows = stmt.query([session_id])?;
609        let mut records = Vec::new();
610
611        while let Some(row) = rows.next()? {
612            records.push(NodeStateRecord {
613                node_id: row.get(0)?,
614                session_id: row.get(1)?,
615                state: row.get(2)?,
616                v_total: row.get::<_, f64>(3)? as f32,
617                merkle_hash: row
618                    .get::<_, Option<String>>(4)?
619                    .and_then(|s| hex::decode(s).ok()),
620                attempt_count: row.get(5)?,
621                node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
622                owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
623                goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
624                parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
625                children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
626                last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
627                committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
628            });
629        }
630
631        Ok(records)
632    }
633
634    /// Update session status
635    pub fn update_session_status(&self, session_id: &str, status: &str) -> Result<()> {
636        self.conn.lock().unwrap().execute(
637            "UPDATE sessions SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
638            [status, session_id],
639        )?;
640        Ok(())
641    }
642
643    /// Record an LLM request/response
644    pub fn record_llm_request(&self, record: &LlmRequestRecord) -> Result<()> {
645        let conn = self.conn.lock().unwrap();
646        conn.execute(
647            r#"
648            INSERT INTO llm_requests (session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms)
649            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
650            "#,
651            [
652                &record.session_id,
653                &record.node_id.clone().unwrap_or_default(),
654                &record.model,
655                &record.prompt,
656                &record.response,
657                &record.tokens_in.to_string(),
658                &record.tokens_out.to_string(),
659                &record.latency_ms.to_string(),
660            ],
661        )?;
662        Ok(())
663    }
664
665    /// Get LLM requests for a session
666    pub fn get_llm_requests(&self, session_id: &str) -> Result<Vec<LlmRequestRecord>> {
667        let conn = self.conn.lock().unwrap();
668        let mut stmt = conn.prepare(
669            "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
670             FROM llm_requests WHERE session_id = ? ORDER BY timestamp",
671        )?;
672
673        let mut rows = stmt.query([session_id])?;
674        let mut records = Vec::new();
675
676        while let Some(row) = rows.next()? {
677            let node_id: Option<String> = row.get(1)?;
678            records.push(LlmRequestRecord {
679                session_id: row.get(0)?,
680                node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
681                    None
682                } else {
683                    node_id
684                },
685                model: row.get(2)?,
686                prompt: row.get(3)?,
687                response: row.get(4)?,
688                tokens_in: row.get(5)?,
689                tokens_out: row.get(6)?,
690                latency_ms: row.get(7)?,
691            });
692        }
693
694        Ok(records)
695    }
696
697    /// Aggregate LLM statistics across all sessions: (count, sum_tokens_in, sum_tokens_out, sum_latency_ms)
698    pub fn get_global_llm_summary(&self) -> Result<(i64, i64, i64, i64)> {
699        let conn = self.conn.lock().unwrap();
700        let mut stmt = conn.prepare(
701            "SELECT COUNT(*), \
702             COALESCE(SUM(CASE WHEN tokens_in > 0 THEN tokens_in ELSE (LENGTH(prompt) + 3) / 4 END), 0), \
703             COALESCE(SUM(CASE WHEN tokens_out > 0 THEN tokens_out ELSE (LENGTH(response) + 3) / 4 END), 0), \
704             COALESCE(MEDIAN(latency_ms), 0) \
705             FROM llm_requests",
706        )?;
707        let result = stmt.query_row([], |row| {
708            Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
709        })?;
710        Ok(result)
711    }
712
713    // =========================================================================
714    // PSP-5 Phase 3: Structural Digest & Context Provenance Persistence
715    // =========================================================================
716
717    /// Record a structural digest
718    pub fn record_structural_digest(&self, record: &StructuralDigestRecord) -> Result<()> {
719        self.conn.lock().unwrap().execute(
720            r#"
721            INSERT INTO structural_digests (digest_id, session_id, node_id, source_path, artifact_kind, hash, version)
722            VALUES (?, ?, ?, ?, ?, ?, ?)
723            "#,
724            [
725                &record.digest_id,
726                &record.session_id,
727                &record.node_id,
728                &record.source_path,
729                &record.artifact_kind,
730                &hex::encode(&record.hash),
731                &record.version.to_string(),
732            ],
733        )?;
734        Ok(())
735    }
736
737    /// Get structural digests for a session and node
738    pub fn get_structural_digests(
739        &self,
740        session_id: &str,
741        node_id: &str,
742    ) -> Result<Vec<StructuralDigestRecord>> {
743        let conn = self.conn.lock().unwrap();
744        let mut stmt = conn.prepare(
745            "SELECT digest_id, session_id, node_id, source_path, artifact_kind, hash, version
746             FROM structural_digests WHERE session_id = ? AND node_id = ? ORDER BY created_at",
747        )?;
748
749        let mut rows = stmt.query([session_id, node_id])?;
750        let mut records = Vec::new();
751
752        while let Some(row) = rows.next()? {
753            records.push(StructuralDigestRecord {
754                digest_id: row.get(0)?,
755                session_id: row.get(1)?,
756                node_id: row.get(2)?,
757                source_path: row.get(3)?,
758                artifact_kind: row.get(4)?,
759                hash: row
760                    .get::<_, String>(5)
761                    .ok()
762                    .and_then(|s| hex::decode(s).ok())
763                    .unwrap_or_default(),
764                version: row.get(5)?,
765            });
766        }
767
768        Ok(records)
769    }
770
771    /// Record context provenance for a node
772    pub fn record_context_provenance(&self, record: &ContextProvenanceRecord) -> Result<()> {
773        self.conn.lock().unwrap().execute(
774            r#"
775            INSERT INTO context_provenance (session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes)
776            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
777            "#,
778            [
779                &record.session_id,
780                &record.node_id,
781                &record.context_package_id,
782                &record.structural_hashes,
783                &record.summary_hashes,
784                &record.dependency_hashes,
785                &record.included_file_count.to_string(),
786                &record.total_bytes.to_string(),
787            ],
788        )?;
789        Ok(())
790    }
791
792    /// Get context provenance for a session and node
793    pub fn get_context_provenance(
794        &self,
795        session_id: &str,
796        node_id: &str,
797    ) -> Result<Option<ContextProvenanceRecord>> {
798        let conn = self.conn.lock().unwrap();
799        let mut stmt = conn.prepare(
800            "SELECT session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes
801             FROM context_provenance WHERE session_id = ? AND node_id = ? ORDER BY created_at DESC LIMIT 1",
802        )?;
803
804        let mut rows = stmt.query([session_id, node_id])?;
805        if let Some(row) = rows.next()? {
806            Ok(Some(ContextProvenanceRecord {
807                session_id: row.get(0)?,
808                node_id: row.get(1)?,
809                context_package_id: row.get(2)?,
810                structural_hashes: row.get(3)?,
811                summary_hashes: row.get(4)?,
812                dependency_hashes: row.get(5)?,
813                included_file_count: row.get(6)?,
814                total_bytes: row.get(7)?,
815            }))
816        } else {
817            Ok(None)
818        }
819    }
820
821    // =========================================================================
822    // PSP-5 Phase 5: Escalation, Rewrite, and Sheaf Validation Persistence
823    // =========================================================================
824
825    /// Record an escalation report
826    pub fn record_escalation_report(&self, record: &EscalationReportRecord) -> Result<()> {
827        self.conn.lock().unwrap().execute(
828            r#"
829            INSERT INTO escalation_reports (session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids)
830            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
831            "#,
832            [
833                &record.session_id,
834                &record.node_id,
835                &record.category,
836                &record.action,
837                &record.energy_snapshot,
838                &record.stage_outcomes,
839                &record.evidence,
840                &record.affected_node_ids,
841            ],
842        )?;
843        Ok(())
844    }
845
846    /// Get escalation reports for a session
847    pub fn get_escalation_reports(&self, session_id: &str) -> Result<Vec<EscalationReportRecord>> {
848        let conn = self.conn.lock().unwrap();
849        let mut stmt = conn.prepare(
850            "SELECT session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids
851             FROM escalation_reports WHERE session_id = ? ORDER BY created_at",
852        )?;
853        let mut rows = stmt.query([session_id])?;
854        let mut records = Vec::new();
855        while let Some(row) = rows.next()? {
856            records.push(EscalationReportRecord {
857                session_id: row.get(0)?,
858                node_id: row.get(1)?,
859                category: row.get(2)?,
860                action: row.get(3)?,
861                energy_snapshot: row.get(4)?,
862                stage_outcomes: row.get(5)?,
863                evidence: row.get(6)?,
864                affected_node_ids: row.get(7)?,
865            });
866        }
867        Ok(records)
868    }
869
870    /// Record a local graph rewrite
871    pub fn record_rewrite(&self, record: &RewriteRecordRow) -> Result<()> {
872        self.conn.lock().unwrap().execute(
873            r#"
874            INSERT INTO rewrite_records (session_id, node_id, action, category, requeued_nodes, inserted_nodes)
875            VALUES (?, ?, ?, ?, ?, ?)
876            "#,
877            [
878                &record.session_id,
879                &record.node_id,
880                &record.action,
881                &record.category,
882                &record.requeued_nodes,
883                &record.inserted_nodes,
884            ],
885        )?;
886        Ok(())
887    }
888
889    /// Get rewrite records for a session
890    pub fn get_rewrite_records(&self, session_id: &str) -> Result<Vec<RewriteRecordRow>> {
891        let conn = self.conn.lock().unwrap();
892        let mut stmt = conn.prepare(
893            "SELECT session_id, node_id, action, category, requeued_nodes, inserted_nodes
894             FROM rewrite_records WHERE session_id = ? ORDER BY created_at",
895        )?;
896        let mut rows = stmt.query([session_id])?;
897        let mut records = Vec::new();
898        while let Some(row) = rows.next()? {
899            records.push(RewriteRecordRow {
900                session_id: row.get(0)?,
901                node_id: row.get(1)?,
902                action: row.get(2)?,
903                category: row.get(3)?,
904                requeued_nodes: row.get(4)?,
905                inserted_nodes: row.get(5)?,
906            });
907        }
908        Ok(records)
909    }
910
911    /// Record a sheaf validation result
912    pub fn record_sheaf_validation(&self, record: &SheafValidationRow) -> Result<()> {
913        self.conn.lock().unwrap().execute(
914            r#"
915            INSERT INTO sheaf_validations (session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets)
916            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
917            "#,
918            [
919                &record.session_id,
920                &record.node_id,
921                &record.validator_class,
922                &record.plugin_source.clone().unwrap_or_default(),
923                &record.passed.to_string(),
924                &record.evidence_summary,
925                &record.affected_files,
926                &record.v_sheaf_contribution.to_string(),
927                &record.requeue_targets,
928            ],
929        )?;
930        Ok(())
931    }
932
933    /// Get sheaf validation results for a session and node
934    pub fn get_sheaf_validations(
935        &self,
936        session_id: &str,
937        node_id: &str,
938    ) -> Result<Vec<SheafValidationRow>> {
939        let conn = self.conn.lock().unwrap();
940        let mut stmt = conn.prepare(
941            "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
942             FROM sheaf_validations WHERE session_id = ? AND node_id = ? ORDER BY created_at",
943        )?;
944        let mut rows = stmt.query([session_id, node_id])?;
945        let mut records = Vec::new();
946        while let Some(row) = rows.next()? {
947            records.push(SheafValidationRow {
948                session_id: row.get(0)?,
949                node_id: row.get(1)?,
950                validator_class: row.get(2)?,
951                plugin_source: row.get::<_, Option<String>>(3)?,
952                passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
953                evidence_summary: row.get(5)?,
954                affected_files: row.get(6)?,
955                v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
956                requeue_targets: row.get(8)?,
957            });
958        }
959        Ok(records)
960    }
961
962    /// Get all sheaf validations for a session (all nodes).
963    pub fn get_all_sheaf_validations(&self, session_id: &str) -> Result<Vec<SheafValidationRow>> {
964        let conn = self.conn.lock().unwrap();
965        let mut stmt = conn.prepare(
966            "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
967             FROM sheaf_validations WHERE session_id = ? ORDER BY created_at",
968        )?;
969        let mut rows = stmt.query([session_id])?;
970        let mut records = Vec::new();
971        while let Some(row) = rows.next()? {
972            records.push(SheafValidationRow {
973                session_id: row.get(0)?,
974                node_id: row.get(1)?,
975                validator_class: row.get(2)?,
976                plugin_source: row.get::<_, Option<String>>(3)?,
977                passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
978                evidence_summary: row.get(5)?,
979                affected_files: row.get(6)?,
980                v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
981                requeue_targets: row.get(8)?,
982            });
983        }
984        Ok(records)
985    }
986
987    // =========================================================================
988    // PSP-5 Phase 6: Provisional Branch CRUD
989    // =========================================================================
990
991    /// Record a new provisional branch
992    pub fn record_provisional_branch(&self, record: &ProvisionalBranchRow) -> Result<()> {
993        self.conn.lock().unwrap().execute(
994            r#"
995            INSERT INTO provisional_branches (branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir)
996            VALUES (?, ?, ?, ?, ?, ?, ?)
997            "#,
998            [
999                &record.branch_id,
1000                &record.session_id,
1001                &record.node_id,
1002                &record.parent_node_id,
1003                &record.state,
1004                &record.parent_seal_hash.as_ref().map(hex::encode).unwrap_or_default(),
1005                &record.sandbox_dir.clone().unwrap_or_default(),
1006            ],
1007        )?;
1008        Ok(())
1009    }
1010
1011    /// Update a provisional branch state
1012    pub fn update_branch_state(&self, branch_id: &str, new_state: &str) -> Result<()> {
1013        self.conn.lock().unwrap().execute(
1014            "UPDATE provisional_branches SET state = ?, updated_at = CURRENT_TIMESTAMP WHERE branch_id = ?",
1015            [new_state, branch_id],
1016        )?;
1017        Ok(())
1018    }
1019
1020    /// Get all provisional branches for a session
1021    pub fn get_provisional_branches(&self, session_id: &str) -> Result<Vec<ProvisionalBranchRow>> {
1022        let conn = self.conn.lock().unwrap();
1023        let mut stmt = conn.prepare(
1024            "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1025             FROM provisional_branches WHERE session_id = ? ORDER BY created_at",
1026        )?;
1027        let mut rows = stmt.query([session_id])?;
1028        let mut records = Vec::new();
1029        while let Some(row) = rows.next()? {
1030            // parent_seal_hash is BLOB; read directly as Option<Vec<u8>>
1031            let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1032            records.push(ProvisionalBranchRow {
1033                branch_id: row.get(0)?,
1034                session_id: row.get(1)?,
1035                node_id: row.get(2)?,
1036                parent_node_id: row.get(3)?,
1037                state: row.get(4)?,
1038                parent_seal_hash,
1039                sandbox_dir: row.get::<_, Option<String>>(6)?,
1040            });
1041        }
1042        Ok(records)
1043    }
1044
1045    /// Get live (active/sealed) provisional branches depending on a parent node
1046    pub fn get_live_branches_for_parent(
1047        &self,
1048        session_id: &str,
1049        parent_node_id: &str,
1050    ) -> Result<Vec<ProvisionalBranchRow>> {
1051        let conn = self.conn.lock().unwrap();
1052        let mut stmt = conn.prepare(
1053            "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1054             FROM provisional_branches
1055             WHERE session_id = ? AND parent_node_id = ? AND state IN ('active', 'sealed')
1056             ORDER BY created_at",
1057        )?;
1058        let mut rows = stmt.query([session_id, parent_node_id])?;
1059        let mut records = Vec::new();
1060        while let Some(row) = rows.next()? {
1061            // parent_seal_hash is BLOB; read directly as Option<Vec<u8>>
1062            let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1063            records.push(ProvisionalBranchRow {
1064                branch_id: row.get(0)?,
1065                session_id: row.get(1)?,
1066                node_id: row.get(2)?,
1067                parent_node_id: row.get(3)?,
1068                state: row.get(4)?,
1069                parent_seal_hash,
1070                sandbox_dir: row.get::<_, Option<String>>(6)?,
1071            });
1072        }
1073        Ok(records)
1074    }
1075
1076    /// Mark all live branches for a parent as flushed
1077    pub fn flush_branches_for_parent(
1078        &self,
1079        session_id: &str,
1080        parent_node_id: &str,
1081    ) -> Result<Vec<String>> {
1082        let live = self.get_live_branches_for_parent(session_id, parent_node_id)?;
1083        let branch_ids: Vec<String> = live.iter().map(|b| b.branch_id.clone()).collect();
1084        for bid in &branch_ids {
1085            self.update_branch_state(bid, "flushed")?;
1086        }
1087        Ok(branch_ids)
1088    }
1089
1090    // =========================================================================
1091    // PSP-5 Phase 6: Branch Lineage CRUD
1092    // =========================================================================
1093
1094    /// Record a branch lineage edge
1095    pub fn record_branch_lineage(&self, record: &BranchLineageRow) -> Result<()> {
1096        self.conn.lock().unwrap().execute(
1097            r#"
1098            INSERT INTO branch_lineage (lineage_id, parent_branch_id, child_branch_id, depends_on_seal)
1099            VALUES (?, ?, ?, ?)
1100            "#,
1101            [
1102                &record.lineage_id,
1103                &record.parent_branch_id,
1104                &record.child_branch_id,
1105                &record.depends_on_seal.to_string(),
1106            ],
1107        )?;
1108        Ok(())
1109    }
1110
1111    /// Get child branch IDs for a parent branch
1112    pub fn get_child_branches(&self, parent_branch_id: &str) -> Result<Vec<String>> {
1113        let conn = self.conn.lock().unwrap();
1114        let mut stmt =
1115            conn.prepare("SELECT child_branch_id FROM branch_lineage WHERE parent_branch_id = ?")?;
1116        let mut rows = stmt.query([parent_branch_id])?;
1117        let mut ids = Vec::new();
1118        while let Some(row) = rows.next()? {
1119            ids.push(row.get(0)?);
1120        }
1121        Ok(ids)
1122    }
1123
1124    // =========================================================================
1125    // PSP-5 Phase 6: Interface Seal CRUD
1126    // =========================================================================
1127
1128    /// Record an interface seal
1129    pub fn record_interface_seal(&self, record: &InterfaceSealRow) -> Result<()> {
1130        self.conn.lock().unwrap().execute(
1131            r#"
1132            INSERT INTO interface_seals (seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version)
1133            VALUES (?, ?, ?, ?, ?, ?, ?)
1134            "#,
1135            [
1136                &record.seal_id,
1137                &record.session_id,
1138                &record.node_id,
1139                &record.sealed_path,
1140                &record.artifact_kind,
1141                &hex::encode(&record.seal_hash),
1142                &record.version.to_string(),
1143            ],
1144        )?;
1145        Ok(())
1146    }
1147
1148    /// Get all interface seals for a node
1149    pub fn get_interface_seals(
1150        &self,
1151        session_id: &str,
1152        node_id: &str,
1153    ) -> Result<Vec<InterfaceSealRow>> {
1154        let conn = self.conn.lock().unwrap();
1155        let mut stmt = conn.prepare(
1156            "SELECT seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version
1157             FROM interface_seals WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1158        )?;
1159        let mut rows = stmt.query([session_id, node_id])?;
1160        let mut records = Vec::new();
1161        while let Some(row) = rows.next()? {
1162            records.push(InterfaceSealRow {
1163                seal_id: row.get(0)?,
1164                session_id: row.get(1)?,
1165                node_id: row.get(2)?,
1166                sealed_path: row.get(3)?,
1167                artifact_kind: row.get(4)?,
1168                seal_hash: row
1169                    .get::<_, String>(5)
1170                    .ok()
1171                    .and_then(|h| hex::decode(h).ok())
1172                    .unwrap_or_default(),
1173                version: row.get::<_, i32>(6)?,
1174            });
1175        }
1176        Ok(records)
1177    }
1178
1179    /// Check whether a node has any interface seals
1180    pub fn has_interface_seals(&self, session_id: &str, node_id: &str) -> Result<bool> {
1181        let conn = self.conn.lock().unwrap();
1182        let count: i64 = conn.query_row(
1183            "SELECT COUNT(*) FROM interface_seals WHERE session_id = ? AND node_id = ?",
1184            [session_id, node_id],
1185            |row| row.get(0),
1186        )?;
1187        Ok(count > 0)
1188    }
1189
1190    // =========================================================================
1191    // PSP-5 Phase 6: Branch Flush CRUD
1192    // =========================================================================
1193
1194    /// Record a branch flush decision
1195    pub fn record_branch_flush(&self, record: &BranchFlushRow) -> Result<()> {
1196        self.conn.lock().unwrap().execute(
1197            r#"
1198            INSERT INTO branch_flushes (flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason)
1199            VALUES (?, ?, ?, ?, ?, ?)
1200            "#,
1201            [
1202                &record.flush_id,
1203                &record.session_id,
1204                &record.parent_node_id,
1205                &record.flushed_branch_ids,
1206                &record.requeue_node_ids,
1207                &record.reason,
1208            ],
1209        )?;
1210        Ok(())
1211    }
1212
1213    /// Get all branch flush records for a session
1214    pub fn get_branch_flushes(&self, session_id: &str) -> Result<Vec<BranchFlushRow>> {
1215        let conn = self.conn.lock().unwrap();
1216        let mut stmt = conn.prepare(
1217            "SELECT flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason
1218             FROM branch_flushes WHERE session_id = ? ORDER BY created_at",
1219        )?;
1220        let mut rows = stmt.query([session_id])?;
1221        let mut records = Vec::new();
1222        while let Some(row) = rows.next()? {
1223            records.push(BranchFlushRow {
1224                flush_id: row.get(0)?,
1225                session_id: row.get(1)?,
1226                parent_node_id: row.get(2)?,
1227                flushed_branch_ids: row.get(3)?,
1228                requeue_node_ids: row.get(4)?,
1229                reason: row.get(5)?,
1230            });
1231        }
1232        Ok(records)
1233    }
1234
1235    // =========================================================================
1236    // PSP-5 Phase 8: Node Snapshot, Task Graph, and Review Outcome Persistence
1237    // =========================================================================
1238
1239    /// Get the latest node state snapshot per node for a session (for resume reconstruction).
1240    ///
1241    /// Returns at most one record per node_id, picking the most recently created row.
1242    pub fn get_latest_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
1243        let conn = self.conn.lock().unwrap();
1244        let mut stmt = conn.prepare(
1245            "WITH ranked AS ( \
1246                 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1247                 FROM node_states WHERE session_id = ? \
1248             ) \
1249             SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
1250                    node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
1251             FROM ranked WHERE rn = 1 ORDER BY created_at",
1252        )?;
1253
1254        let mut rows = stmt.query([session_id])?;
1255        let mut records = Vec::new();
1256
1257        while let Some(row) = rows.next()? {
1258            records.push(NodeStateRecord {
1259                node_id: row.get(0)?,
1260                session_id: row.get(1)?,
1261                state: row.get(2)?,
1262                v_total: row.get::<_, f64>(3)? as f32,
1263                merkle_hash: row
1264                    .get::<_, Option<String>>(4)?
1265                    .and_then(|s| hex::decode(s).ok()),
1266                attempt_count: row.get(5)?,
1267                node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1268                owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
1269                goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
1270                parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
1271                children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
1272                last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1273                committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
1274            });
1275        }
1276
1277        Ok(records)
1278    }
1279
1280    /// Record a task graph edge (parent→child dependency)
1281    pub fn record_task_graph_edge(&self, record: &TaskGraphEdgeRow) -> Result<()> {
1282        self.conn.lock().unwrap().execute(
1283            r#"
1284            INSERT INTO task_graph_edges (session_id, parent_node_id, child_node_id, edge_type)
1285            VALUES (?, ?, ?, ?)
1286            "#,
1287            [
1288                &record.session_id,
1289                &record.parent_node_id,
1290                &record.child_node_id,
1291                &record.edge_type,
1292            ],
1293        )?;
1294        Ok(())
1295    }
1296
1297    /// Get all task graph edges for a session
1298    pub fn get_task_graph_edges(&self, session_id: &str) -> Result<Vec<TaskGraphEdgeRow>> {
1299        let conn = self.conn.lock().unwrap();
1300        let mut stmt = conn.prepare(
1301            "SELECT session_id, parent_node_id, child_node_id, edge_type \
1302             FROM task_graph_edges WHERE session_id = ? ORDER BY created_at",
1303        )?;
1304        let mut rows = stmt.query([session_id])?;
1305        let mut records = Vec::new();
1306        while let Some(row) = rows.next()? {
1307            records.push(TaskGraphEdgeRow {
1308                session_id: row.get(0)?,
1309                parent_node_id: row.get(1)?,
1310                child_node_id: row.get(2)?,
1311                edge_type: row.get(3)?,
1312            });
1313        }
1314        Ok(records)
1315    }
1316
1317    /// Record a review outcome (approval, rejection, edit request)
1318    pub fn record_review_outcome(&self, record: &ReviewOutcomeRow) -> Result<()> {
1319        let reviewer_note = record.reviewer_note.clone().unwrap_or_default();
1320        let escalation_category = record.escalation_category.clone().unwrap_or_default();
1321        self.conn.lock().unwrap().execute(
1322            r#"
1323            INSERT INTO review_outcomes (session_id, node_id, outcome, reviewer_note,
1324                                         energy_at_review, degraded, escalation_category)
1325            VALUES (?, ?, ?, ?, ?, ?, ?)
1326            "#,
1327            duckdb::params![
1328                record.session_id,
1329                record.node_id,
1330                record.outcome,
1331                reviewer_note,
1332                record.energy_at_review.unwrap_or(0.0),
1333                record.degraded.unwrap_or(false),
1334                escalation_category,
1335            ],
1336        )?;
1337        Ok(())
1338    }
1339
1340    /// Get all review outcomes for a node
1341    pub fn get_review_outcomes(
1342        &self,
1343        session_id: &str,
1344        node_id: &str,
1345    ) -> Result<Vec<ReviewOutcomeRow>> {
1346        let conn = self.conn.lock().unwrap();
1347        let mut stmt = conn.prepare(
1348            "SELECT session_id, node_id, outcome, reviewer_note, \
1349             energy_at_review, degraded, escalation_category \
1350             FROM review_outcomes WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1351        )?;
1352        let mut rows = stmt.query([session_id, node_id])?;
1353        let mut records = Vec::new();
1354        while let Some(row) = rows.next()? {
1355            records.push(ReviewOutcomeRow {
1356                session_id: row.get(0)?,
1357                node_id: row.get(1)?,
1358                outcome: row.get(2)?,
1359                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1360                energy_at_review: row.get::<_, Option<f64>>(4)?,
1361                degraded: row.get::<_, Option<bool>>(5)?,
1362                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1363            });
1364        }
1365        Ok(records)
1366    }
1367
1368    /// Get the most recent review outcome for a node
1369    pub fn get_latest_review_outcome(
1370        &self,
1371        session_id: &str,
1372        node_id: &str,
1373    ) -> Result<Option<ReviewOutcomeRow>> {
1374        let conn = self.conn.lock().unwrap();
1375        let mut stmt = conn.prepare(
1376            "SELECT session_id, node_id, outcome, reviewer_note, \
1377             energy_at_review, degraded, escalation_category \
1378             FROM review_outcomes WHERE session_id = ? AND node_id = ? \
1379             ORDER BY created_at DESC LIMIT 1",
1380        )?;
1381        let mut rows = stmt.query([session_id, node_id])?;
1382        if let Some(row) = rows.next()? {
1383            Ok(Some(ReviewOutcomeRow {
1384                session_id: row.get(0)?,
1385                node_id: row.get(1)?,
1386                outcome: row.get(2)?,
1387                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1388                energy_at_review: row.get::<_, Option<f64>>(4)?,
1389                degraded: row.get::<_, Option<bool>>(5)?,
1390                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1391            }))
1392        } else {
1393            Ok(None)
1394        }
1395    }
1396
1397    /// Get all review outcomes for a session (across all nodes).
1398    pub fn get_all_review_outcomes(&self, session_id: &str) -> Result<Vec<ReviewOutcomeRow>> {
1399        let conn = self.conn.lock().unwrap();
1400        let mut stmt = conn.prepare(
1401            "SELECT session_id, node_id, outcome, reviewer_note, \
1402             energy_at_review, degraded, escalation_category \
1403             FROM review_outcomes WHERE session_id = ? ORDER BY created_at",
1404        )?;
1405        let mut rows = stmt.query([session_id])?;
1406        let mut records = Vec::new();
1407        while let Some(row) = rows.next()? {
1408            records.push(ReviewOutcomeRow {
1409                session_id: row.get(0)?,
1410                node_id: row.get(1)?,
1411                outcome: row.get(2)?,
1412                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1413                energy_at_review: row.get::<_, Option<f64>>(4)?,
1414                degraded: row.get::<_, Option<bool>>(5)?,
1415                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1416            });
1417        }
1418        Ok(records)
1419    }
1420
1421    // =========================================================================
1422    // PSP-5 Phase 8: Verification Result and Artifact Bundle Persistence
1423    // =========================================================================
1424
1425    /// Record a verification result snapshot for a node
1426    pub fn record_verification_result(&self, record: &VerificationResultRow) -> Result<()> {
1427        let syntax_ok = record.syntax_ok.to_string();
1428        let build_ok = record.build_ok.to_string();
1429        let tests_ok = record.tests_ok.to_string();
1430        let lint_ok = record.lint_ok.to_string();
1431        let diagnostics_count = record.diagnostics_count.to_string();
1432        let tests_passed = record.tests_passed.to_string();
1433        let tests_failed = record.tests_failed.to_string();
1434        let degraded = record.degraded.to_string();
1435        let degraded_reason = record.degraded_reason.clone().unwrap_or_default();
1436
1437        self.conn.lock().unwrap().execute(
1438            r#"
1439            INSERT INTO verification_results (session_id, node_id, result_json,
1440                syntax_ok, build_ok, tests_ok, lint_ok,
1441                diagnostics_count, tests_passed, tests_failed, degraded, degraded_reason)
1442            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1443            "#,
1444            [
1445                &record.session_id,
1446                &record.node_id,
1447                &record.result_json,
1448                &syntax_ok,
1449                &build_ok,
1450                &tests_ok,
1451                &lint_ok,
1452                &diagnostics_count,
1453                &tests_passed,
1454                &tests_failed,
1455                &degraded,
1456                &degraded_reason,
1457            ],
1458        )?;
1459        Ok(())
1460    }
1461
1462    /// Get the latest verification result for a node
1463    pub fn get_verification_result(
1464        &self,
1465        session_id: &str,
1466        node_id: &str,
1467    ) -> Result<Option<VerificationResultRow>> {
1468        let conn = self.conn.lock().unwrap();
1469        let mut stmt = conn.prepare(
1470            "SELECT session_id, node_id, result_json, \
1471                    CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1472                    diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1473             FROM verification_results \
1474             WHERE session_id = ? AND node_id = ? \
1475             ORDER BY created_at DESC LIMIT 1",
1476        )?;
1477        let mut rows = stmt.query([session_id, node_id])?;
1478        if let Some(row) = rows.next()? {
1479            Ok(Some(VerificationResultRow {
1480                session_id: row.get(0)?,
1481                node_id: row.get(1)?,
1482                result_json: row.get(2)?,
1483                syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1484                build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1485                tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1486                lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1487                diagnostics_count: row.get(7)?,
1488                tests_passed: row.get(8)?,
1489                tests_failed: row.get(9)?,
1490                degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1491                degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1492            }))
1493        } else {
1494            Ok(None)
1495        }
1496    }
1497
1498    /// Get all verification results for a session (for status display)
1499    pub fn get_all_verification_results(
1500        &self,
1501        session_id: &str,
1502    ) -> Result<Vec<VerificationResultRow>> {
1503        let conn = self.conn.lock().unwrap();
1504        let mut stmt = conn.prepare(
1505            "WITH ranked AS ( \
1506                 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1507                 FROM verification_results WHERE session_id = ? \
1508             ) \
1509             SELECT session_id, node_id, result_json, \
1510                    CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1511                    diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1512             FROM ranked WHERE rn = 1 ORDER BY created_at",
1513        )?;
1514        let mut rows = stmt.query([session_id])?;
1515        let mut records = Vec::new();
1516        while let Some(row) = rows.next()? {
1517            records.push(VerificationResultRow {
1518                session_id: row.get(0)?,
1519                node_id: row.get(1)?,
1520                result_json: row.get(2)?,
1521                syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1522                build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1523                tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1524                lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1525                diagnostics_count: row.get(7)?,
1526                tests_passed: row.get(8)?,
1527                tests_failed: row.get(9)?,
1528                degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1529                degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1530            });
1531        }
1532        Ok(records)
1533    }
1534
1535    /// Record an artifact bundle snapshot for a node
1536    pub fn record_artifact_bundle(&self, record: &ArtifactBundleRow) -> Result<()> {
1537        let artifact_count = record.artifact_count.to_string();
1538        let command_count = record.command_count.to_string();
1539
1540        self.conn.lock().unwrap().execute(
1541            r#"
1542            INSERT INTO artifact_bundles (session_id, node_id, bundle_json,
1543                artifact_count, command_count, touched_files)
1544            VALUES (?, ?, ?, ?, ?, ?)
1545            "#,
1546            [
1547                &record.session_id,
1548                &record.node_id,
1549                &record.bundle_json,
1550                &artifact_count,
1551                &command_count,
1552                &record.touched_files,
1553            ],
1554        )?;
1555        Ok(())
1556    }
1557
1558    /// Get the latest artifact bundle for a node
1559    pub fn get_artifact_bundle(
1560        &self,
1561        session_id: &str,
1562        node_id: &str,
1563    ) -> Result<Option<ArtifactBundleRow>> {
1564        let conn = self.conn.lock().unwrap();
1565        let mut stmt = conn.prepare(
1566            "SELECT session_id, node_id, bundle_json, artifact_count, command_count, touched_files \
1567             FROM artifact_bundles \
1568             WHERE session_id = ? AND node_id = ? \
1569             ORDER BY created_at DESC LIMIT 1",
1570        )?;
1571        let mut rows = stmt.query([session_id, node_id])?;
1572        if let Some(row) = rows.next()? {
1573            Ok(Some(ArtifactBundleRow {
1574                session_id: row.get(0)?,
1575                node_id: row.get(1)?,
1576                bundle_json: row.get(2)?,
1577                artifact_count: row.get(3)?,
1578                command_count: row.get(4)?,
1579                touched_files: row.get(5)?,
1580            }))
1581        } else {
1582            Ok(None)
1583        }
1584    }
1585}
1586
1587// =========================================================================
1588// Plan Revision, Feature Charter, and Repair Footprint Methods
1589// =========================================================================
1590
1591impl SessionStore {
1592    /// Record a feature charter for a session.
1593    pub fn record_feature_charter(&self, row: &FeatureCharterRow) -> Result<()> {
1594        let conn = self.conn.lock().unwrap();
1595        conn.execute(
1596            "INSERT INTO feature_charters (charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint) \
1597             VALUES (?, ?, ?, ?, ?, ?, ?)",
1598            duckdb::params![
1599                row.charter_id,
1600                row.session_id,
1601                row.scope_description,
1602                row.max_modules,
1603                row.max_files,
1604                row.max_revisions,
1605                row.language_constraint,
1606            ],
1607        )?;
1608        Ok(())
1609    }
1610
1611    /// Get the feature charter for a session.
1612    pub fn get_feature_charter(&self, session_id: &str) -> Result<Option<FeatureCharterRow>> {
1613        let conn = self.conn.lock().unwrap();
1614        let mut stmt = conn.prepare(
1615            "SELECT charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint \
1616             FROM feature_charters WHERE session_id = ? LIMIT 1",
1617        )?;
1618        let mut rows = stmt.query([session_id])?;
1619        if let Some(row) = rows.next()? {
1620            Ok(Some(FeatureCharterRow {
1621                charter_id: row.get(0)?,
1622                session_id: row.get(1)?,
1623                scope_description: row.get(2)?,
1624                max_modules: row.get(3)?,
1625                max_files: row.get(4)?,
1626                max_revisions: row.get(5)?,
1627                language_constraint: row.get(6)?,
1628            }))
1629        } else {
1630            Ok(None)
1631        }
1632    }
1633
1634    /// Record a plan revision.
1635    pub fn record_plan_revision(&self, row: &PlanRevisionRow) -> Result<()> {
1636        let conn = self.conn.lock().unwrap();
1637        conn.execute(
1638            "INSERT INTO plan_revisions (revision_id, session_id, sequence, plan_json, reason, supersedes, status) \
1639             VALUES (?, ?, ?, ?, ?, ?, ?)",
1640            duckdb::params![
1641                row.revision_id,
1642                row.session_id,
1643                row.sequence,
1644                row.plan_json,
1645                row.reason,
1646                row.supersedes,
1647                row.status,
1648            ],
1649        )?;
1650        Ok(())
1651    }
1652
1653    /// Get the active plan revision for a session.
1654    pub fn get_active_plan_revision(&self, session_id: &str) -> Result<Option<PlanRevisionRow>> {
1655        let conn = self.conn.lock().unwrap();
1656        let mut stmt = conn.prepare(
1657            "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1658             FROM plan_revisions WHERE session_id = ? AND status = 'active' \
1659             ORDER BY sequence DESC LIMIT 1",
1660        )?;
1661        let mut rows = stmt.query([session_id])?;
1662        if let Some(row) = rows.next()? {
1663            Ok(Some(PlanRevisionRow {
1664                revision_id: row.get(0)?,
1665                session_id: row.get(1)?,
1666                sequence: row.get(2)?,
1667                plan_json: row.get(3)?,
1668                reason: row.get(4)?,
1669                supersedes: row.get(5)?,
1670                status: row.get(6)?,
1671            }))
1672        } else {
1673            Ok(None)
1674        }
1675    }
1676
1677    /// Get all plan revisions for a session, ordered by sequence.
1678    pub fn get_plan_revisions(&self, session_id: &str) -> Result<Vec<PlanRevisionRow>> {
1679        let conn = self.conn.lock().unwrap();
1680        let mut stmt = conn.prepare(
1681            "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1682             FROM plan_revisions WHERE session_id = ? ORDER BY sequence ASC",
1683        )?;
1684        let mut rows = stmt.query([session_id])?;
1685        let mut results = Vec::new();
1686        while let Some(row) = rows.next()? {
1687            results.push(PlanRevisionRow {
1688                revision_id: row.get(0)?,
1689                session_id: row.get(1)?,
1690                sequence: row.get(2)?,
1691                plan_json: row.get(3)?,
1692                reason: row.get(4)?,
1693                supersedes: row.get(5)?,
1694                status: row.get(6)?,
1695            });
1696        }
1697        Ok(results)
1698    }
1699
1700    /// Supersede a plan revision (set status to 'superseded').
1701    pub fn supersede_plan_revision(&self, revision_id: &str) -> Result<()> {
1702        let conn = self.conn.lock().unwrap();
1703        conn.execute(
1704            "UPDATE plan_revisions SET status = 'superseded' WHERE revision_id = ?",
1705            [revision_id],
1706        )?;
1707        Ok(())
1708    }
1709
1710    /// Record a repair footprint.
1711    pub fn record_repair_footprint(&self, row: &RepairFootprintRow) -> Result<()> {
1712        let conn = self.conn.lock().unwrap();
1713        conn.execute(
1714            "INSERT INTO repair_footprints (footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved) \
1715             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
1716            duckdb::params![
1717                row.footprint_id,
1718                row.session_id,
1719                row.node_id,
1720                row.revision_id,
1721                row.attempt,
1722                row.affected_files,
1723                row.bundle_json,
1724                row.diagnosis,
1725                row.resolved,
1726            ],
1727        )?;
1728        Ok(())
1729    }
1730
1731    /// Get repair footprints for a node, ordered by attempt.
1732    pub fn get_repair_footprints(
1733        &self,
1734        session_id: &str,
1735        node_id: &str,
1736    ) -> Result<Vec<RepairFootprintRow>> {
1737        let conn = self.conn.lock().unwrap();
1738        let mut stmt = conn.prepare(
1739            "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1740             FROM repair_footprints WHERE session_id = ? AND node_id = ? ORDER BY attempt ASC",
1741        )?;
1742        let mut rows = stmt.query([session_id, node_id])?;
1743        let mut results = Vec::new();
1744        while let Some(row) = rows.next()? {
1745            results.push(RepairFootprintRow {
1746                footprint_id: row.get(0)?,
1747                session_id: row.get(1)?,
1748                node_id: row.get(2)?,
1749                revision_id: row.get(3)?,
1750                attempt: row.get(4)?,
1751                affected_files: row.get(5)?,
1752                bundle_json: row.get(6)?,
1753                diagnosis: row.get(7)?,
1754                resolved: row.get(8)?,
1755            });
1756        }
1757        Ok(results)
1758    }
1759
1760    /// Get all repair footprints for a session (all nodes).
1761    pub fn get_all_repair_footprints(&self, session_id: &str) -> Result<Vec<RepairFootprintRow>> {
1762        let conn = self.conn.lock().unwrap();
1763        let mut stmt = conn.prepare(
1764            "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1765             FROM repair_footprints WHERE session_id = ? ORDER BY attempt ASC",
1766        )?;
1767        let mut rows = stmt.query([session_id])?;
1768        let mut results = Vec::new();
1769        while let Some(row) = rows.next()? {
1770            results.push(RepairFootprintRow {
1771                footprint_id: row.get(0)?,
1772                session_id: row.get(1)?,
1773                node_id: row.get(2)?,
1774                revision_id: row.get(3)?,
1775                attempt: row.get(4)?,
1776                affected_files: row.get(5)?,
1777                bundle_json: row.get(6)?,
1778                diagnosis: row.get(7)?,
1779                resolved: row.get(8)?,
1780            });
1781        }
1782        Ok(results)
1783    }
1784
1785    /// Mark a repair footprint as resolved.
1786    pub fn resolve_repair_footprint(&self, footprint_id: &str) -> Result<()> {
1787        let conn = self.conn.lock().unwrap();
1788        conn.execute(
1789            "UPDATE repair_footprints SET resolved = true WHERE footprint_id = ?",
1790            [footprint_id],
1791        )?;
1792        Ok(())
1793    }
1794
1795    /// Record or update a budget envelope for a session.
1796    pub fn upsert_budget_envelope(&self, row: &BudgetEnvelopeRow) -> Result<()> {
1797        let conn = self.conn.lock().unwrap();
1798        // Try insert first, update on conflict
1799        conn.execute(
1800            "INSERT INTO budget_envelopes (session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd) \
1801             VALUES (?, ?, ?, ?, ?, ?, ?) \
1802             ON CONFLICT (session_id) DO UPDATE SET \
1803             max_steps = EXCLUDED.max_steps, steps_used = EXCLUDED.steps_used, \
1804             max_revisions = EXCLUDED.max_revisions, revisions_used = EXCLUDED.revisions_used, \
1805             max_cost_usd = EXCLUDED.max_cost_usd, cost_used_usd = EXCLUDED.cost_used_usd",
1806            duckdb::params![
1807                row.session_id,
1808                row.max_steps,
1809                row.steps_used,
1810                row.max_revisions,
1811                row.revisions_used,
1812                row.max_cost_usd,
1813                row.cost_used_usd,
1814            ],
1815        )?;
1816        Ok(())
1817    }
1818
1819    /// Get the budget envelope for a session.
1820    pub fn get_budget_envelope(&self, session_id: &str) -> Result<Option<BudgetEnvelopeRow>> {
1821        let conn = self.conn.lock().unwrap();
1822        let mut stmt = conn.prepare(
1823            "SELECT session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd \
1824             FROM budget_envelopes WHERE session_id = ?",
1825        )?;
1826        let mut rows = stmt.query([session_id])?;
1827        if let Some(row) = rows.next()? {
1828            Ok(Some(BudgetEnvelopeRow {
1829                session_id: row.get(0)?,
1830                max_steps: row.get(1)?,
1831                steps_used: row.get(2)?,
1832                max_revisions: row.get(3)?,
1833                revisions_used: row.get(4)?,
1834                max_cost_usd: row.get(5)?,
1835                cost_used_usd: row.get(6)?,
1836            }))
1837        } else {
1838            Ok(None)
1839        }
1840    }
1841}
1842
1843#[cfg(test)]
1844mod tests {
1845    use super::*;
1846
1847    /// Create an in-memory store for testing
1848    fn test_store() -> SessionStore {
1849        let temp_dir = std::env::temp_dir();
1850        let db_path = temp_dir.join(format!("perspt_test_{}.db", uuid::Uuid::new_v4()));
1851        SessionStore::open(&db_path).expect("Failed to create test store")
1852    }
1853
1854    fn seed_session(store: &SessionStore, session_id: &str) {
1855        let record = SessionRecord {
1856            session_id: session_id.to_string(),
1857            task: "test task".to_string(),
1858            working_dir: "/tmp/test".to_string(),
1859            merkle_root: None,
1860            detected_toolchain: None,
1861            status: "RUNNING".to_string(),
1862        };
1863        store.create_session(&record).unwrap();
1864    }
1865
1866    #[test]
1867    fn test_node_state_phase8_roundtrip() {
1868        let store = test_store();
1869        let sid = "test-sess-1";
1870        seed_session(&store, sid);
1871
1872        let record = NodeStateRecord {
1873            node_id: "node-1".to_string(),
1874            session_id: sid.to_string(),
1875            state: "Completed".to_string(),
1876            v_total: 0.42,
1877            merkle_hash: Some(vec![0xab; 32]),
1878            attempt_count: 3,
1879            node_class: Some("Interface".to_string()),
1880            owner_plugin: Some("rust".to_string()),
1881            goal: Some("Implement API".to_string()),
1882            parent_id: Some("root".to_string()),
1883            children: Some(r#"["child-a","child-b"]"#.to_string()),
1884            last_error_type: Some("CompilationError".to_string()),
1885            committed_at: Some("2025-01-01T00:00:00Z".to_string()),
1886        };
1887
1888        store.record_node_state(&record).unwrap();
1889
1890        let states = store.get_latest_node_states(sid).unwrap();
1891        assert_eq!(states.len(), 1);
1892        let r = &states[0];
1893        assert_eq!(r.node_id, "node-1");
1894        assert_eq!(r.state, "Completed");
1895        assert_eq!(r.attempt_count, 3);
1896        assert_eq!(r.node_class.as_deref(), Some("Interface"));
1897        assert_eq!(r.owner_plugin.as_deref(), Some("rust"));
1898        assert_eq!(r.goal.as_deref(), Some("Implement API"));
1899        assert_eq!(r.parent_id.as_deref(), Some("root"));
1900        assert!(r.children.is_some());
1901        assert_eq!(r.last_error_type.as_deref(), Some("CompilationError"));
1902        assert_eq!(r.committed_at.as_deref(), Some("2025-01-01T00:00:00Z"));
1903    }
1904
1905    #[test]
1906    fn test_task_graph_edge_roundtrip() {
1907        let store = test_store();
1908        let sid = "test-graph-1";
1909        seed_session(&store, sid);
1910
1911        let edge = TaskGraphEdgeRow {
1912            session_id: sid.to_string(),
1913            parent_node_id: "parent-1".to_string(),
1914            child_node_id: "child-1".to_string(),
1915            edge_type: "depends_on".to_string(),
1916        };
1917        store.record_task_graph_edge(&edge).unwrap();
1918
1919        let edges = store.get_task_graph_edges(sid).unwrap();
1920        assert_eq!(edges.len(), 1);
1921        assert_eq!(edges[0].parent_node_id, "parent-1");
1922        assert_eq!(edges[0].child_node_id, "child-1");
1923        assert_eq!(edges[0].edge_type, "depends_on");
1924    }
1925
1926    #[test]
1927    fn test_verification_result_roundtrip() {
1928        let store = test_store();
1929        let sid = "test-vr-1";
1930        seed_session(&store, sid);
1931
1932        let row = VerificationResultRow {
1933            session_id: sid.to_string(),
1934            node_id: "node-v".to_string(),
1935            result_json: r#"{"syntax_ok":true}"#.to_string(),
1936            syntax_ok: true,
1937            build_ok: true,
1938            tests_ok: false,
1939            lint_ok: true,
1940            diagnostics_count: 2,
1941            tests_passed: 5,
1942            tests_failed: 1,
1943            degraded: false,
1944            degraded_reason: None,
1945        };
1946        store.record_verification_result(&row).unwrap();
1947
1948        let got = store.get_verification_result(sid, "node-v").unwrap();
1949        assert!(got.is_some());
1950        let got = got.unwrap();
1951        assert!(got.syntax_ok);
1952        assert!(got.build_ok);
1953        assert!(!got.tests_ok);
1954        assert_eq!(got.tests_passed, 5);
1955        assert_eq!(got.tests_failed, 1);
1956        assert!(!got.degraded);
1957    }
1958
1959    #[test]
1960    fn test_verification_result_degraded() {
1961        let store = test_store();
1962        let sid = "test-vr-deg";
1963        seed_session(&store, sid);
1964
1965        let row = VerificationResultRow {
1966            session_id: sid.to_string(),
1967            node_id: "node-d".to_string(),
1968            result_json: "{}".to_string(),
1969            syntax_ok: true,
1970            build_ok: false,
1971            tests_ok: false,
1972            lint_ok: false,
1973            diagnostics_count: 0,
1974            tests_passed: 0,
1975            tests_failed: 0,
1976            degraded: true,
1977            degraded_reason: Some("LSP unavailable".to_string()),
1978        };
1979        store.record_verification_result(&row).unwrap();
1980
1981        let got = store
1982            .get_verification_result(sid, "node-d")
1983            .unwrap()
1984            .unwrap();
1985        assert!(got.degraded);
1986        assert_eq!(got.degraded_reason.as_deref(), Some("LSP unavailable"));
1987    }
1988
1989    #[test]
1990    fn test_artifact_bundle_roundtrip() {
1991        let store = test_store();
1992        let sid = "test-ab-1";
1993        seed_session(&store, sid);
1994
1995        let row = ArtifactBundleRow {
1996            session_id: sid.to_string(),
1997            node_id: "node-a".to_string(),
1998            bundle_json: r#"{"artifacts":[],"commands":[]}"#.to_string(),
1999            artifact_count: 3,
2000            command_count: 1,
2001            touched_files: r#"["src/main.rs","src/lib.rs","tests/test.rs"]"#.to_string(),
2002        };
2003        store.record_artifact_bundle(&row).unwrap();
2004
2005        let got = store.get_artifact_bundle(sid, "node-a").unwrap();
2006        assert!(got.is_some());
2007        let got = got.unwrap();
2008        assert_eq!(got.artifact_count, 3);
2009        assert_eq!(got.command_count, 1);
2010        assert!(got.touched_files.contains("main.rs"));
2011    }
2012
2013    #[test]
2014    fn test_latest_node_states_dedup() {
2015        let store = test_store();
2016        let sid = "test-dedup";
2017        seed_session(&store, sid);
2018
2019        // Insert two states for the same node
2020        let r1 = NodeStateRecord {
2021            node_id: "node-x".to_string(),
2022            session_id: sid.to_string(),
2023            state: "Coding".to_string(),
2024            v_total: 0.5,
2025            merkle_hash: None,
2026            attempt_count: 1,
2027            node_class: None,
2028            owner_plugin: None,
2029            goal: None,
2030            parent_id: None,
2031            children: None,
2032            last_error_type: None,
2033            committed_at: None,
2034        };
2035        store.record_node_state(&r1).unwrap();
2036
2037        let r2 = NodeStateRecord {
2038            node_id: "node-x".to_string(),
2039            session_id: sid.to_string(),
2040            state: "Completed".to_string(),
2041            v_total: 0.3,
2042            merkle_hash: None,
2043            attempt_count: 2,
2044            node_class: Some("Implementation".to_string()),
2045            owner_plugin: None,
2046            goal: Some("Updated goal".to_string()),
2047            parent_id: None,
2048            children: None,
2049            last_error_type: None,
2050            committed_at: Some("2025-01-02T00:00:00Z".to_string()),
2051        };
2052        store.record_node_state(&r2).unwrap();
2053
2054        // get_latest should return only the last entry
2055        let latest = store.get_latest_node_states(sid).unwrap();
2056        assert_eq!(latest.len(), 1);
2057        assert_eq!(latest[0].state, "Completed");
2058        assert_eq!(latest[0].attempt_count, 2);
2059        assert_eq!(latest[0].goal.as_deref(), Some("Updated goal"));
2060    }
2061
2062    #[test]
2063    fn test_backward_compat_empty_phase8_fields() {
2064        let store = test_store();
2065        let sid = "test-compat";
2066        seed_session(&store, sid);
2067
2068        // Insert a node with all Phase 8 fields as None (pre-Phase-8 session)
2069        let r = NodeStateRecord {
2070            node_id: "old-node".to_string(),
2071            session_id: sid.to_string(),
2072            state: "COMPLETED".to_string(),
2073            v_total: 1.0,
2074            merkle_hash: None,
2075            attempt_count: 1,
2076            node_class: None,
2077            owner_plugin: None,
2078            goal: None,
2079            parent_id: None,
2080            children: None,
2081            last_error_type: None,
2082            committed_at: None,
2083        };
2084        store.record_node_state(&r).unwrap();
2085
2086        let latest = store.get_latest_node_states(sid).unwrap();
2087        assert_eq!(latest.len(), 1);
2088        assert!(latest[0].node_class.is_none());
2089        assert!(latest[0].goal.is_none());
2090        assert!(latest[0].committed_at.is_none());
2091
2092        // Verification and artifact lookups should return None
2093        let vr = store.get_verification_result(sid, "old-node").unwrap();
2094        assert!(vr.is_none());
2095        let ab = store.get_artifact_bundle(sid, "old-node").unwrap();
2096        assert!(ab.is_none());
2097    }
2098
2099    #[test]
2100    fn test_review_outcome_roundtrip() {
2101        let store = test_store();
2102        let sid = "test-review";
2103        seed_session(&store, sid);
2104
2105        let row = ReviewOutcomeRow {
2106            session_id: sid.to_string(),
2107            node_id: "node-r".to_string(),
2108            outcome: "approved".to_string(),
2109            reviewer_note: Some("LGTM".to_string()),
2110            energy_at_review: None,
2111            degraded: None,
2112            escalation_category: None,
2113        };
2114        store.record_review_outcome(&row).unwrap();
2115
2116        let outcomes = store.get_review_outcomes(sid, "node-r").unwrap();
2117        assert_eq!(outcomes.len(), 1);
2118        assert_eq!(outcomes[0].outcome, "approved");
2119        assert_eq!(outcomes[0].reviewer_note.as_deref(), Some("LGTM"));
2120    }
2121
2122    #[test]
2123    fn test_review_outcome_with_audit_fields() {
2124        let store = test_store();
2125        let sid = "test-review-audit";
2126        seed_session(&store, sid);
2127
2128        let row = ReviewOutcomeRow {
2129            session_id: sid.to_string(),
2130            node_id: "node-a".to_string(),
2131            outcome: "rejected".to_string(),
2132            reviewer_note: Some("Needs rework".to_string()),
2133            energy_at_review: Some(0.42),
2134            degraded: Some(true),
2135            escalation_category: Some("complexity".to_string()),
2136        };
2137        store.record_review_outcome(&row).unwrap();
2138
2139        let outcomes = store.get_review_outcomes(sid, "node-a").unwrap();
2140        assert_eq!(outcomes.len(), 1);
2141        assert_eq!(outcomes[0].outcome, "rejected");
2142        assert_eq!(outcomes[0].energy_at_review, Some(0.42));
2143        assert_eq!(outcomes[0].degraded, Some(true));
2144        assert_eq!(
2145            outcomes[0].escalation_category.as_deref(),
2146            Some("complexity")
2147        );
2148    }
2149
2150    #[test]
2151    fn test_get_all_review_outcomes() {
2152        let store = test_store();
2153        let sid = "test-review-all";
2154        seed_session(&store, sid);
2155
2156        for (node, outcome) in &[("n1", "approved"), ("n2", "rejected"), ("n1", "approved")] {
2157            let row = ReviewOutcomeRow {
2158                session_id: sid.to_string(),
2159                node_id: node.to_string(),
2160                outcome: outcome.to_string(),
2161                reviewer_note: None,
2162                energy_at_review: None,
2163                degraded: None,
2164                escalation_category: None,
2165            };
2166            store.record_review_outcome(&row).unwrap();
2167        }
2168
2169        let all = store.get_all_review_outcomes(sid).unwrap();
2170        assert_eq!(all.len(), 3);
2171    }
2172
2173    #[test]
2174    fn test_feature_charter_roundtrip() {
2175        let store = test_store();
2176        let sid = "test-charter";
2177        seed_session(&store, sid);
2178
2179        let row = FeatureCharterRow {
2180            charter_id: "ch-1".to_string(),
2181            session_id: sid.to_string(),
2182            scope_description: "Add authentication module".to_string(),
2183            max_modules: Some(3),
2184            max_files: Some(10),
2185            max_revisions: Some(5),
2186            language_constraint: Some("rust".to_string()),
2187        };
2188        store.record_feature_charter(&row).unwrap();
2189
2190        let got = store.get_feature_charter(sid).unwrap();
2191        assert!(got.is_some());
2192        let got = got.unwrap();
2193        assert_eq!(got.charter_id, "ch-1");
2194        assert_eq!(got.scope_description, "Add authentication module");
2195        assert_eq!(got.max_modules, Some(3));
2196        assert_eq!(got.language_constraint.as_deref(), Some("rust"));
2197    }
2198
2199    #[test]
2200    fn test_feature_charter_returns_none_for_missing() {
2201        let store = test_store();
2202        let sid = "test-charter-miss";
2203        seed_session(&store, sid);
2204
2205        let got = store.get_feature_charter(sid).unwrap();
2206        assert!(got.is_none());
2207    }
2208
2209    #[test]
2210    fn test_plan_revision_roundtrip() {
2211        let store = test_store();
2212        let sid = "test-rev";
2213        seed_session(&store, sid);
2214
2215        let row = PlanRevisionRow {
2216            revision_id: "rev-1".to_string(),
2217            session_id: sid.to_string(),
2218            sequence: 1,
2219            plan_json: r#"{"tasks":[]}"#.to_string(),
2220            reason: "initial plan".to_string(),
2221            supersedes: None,
2222            status: "active".to_string(),
2223        };
2224        store.record_plan_revision(&row).unwrap();
2225
2226        let active = store.get_active_plan_revision(sid).unwrap();
2227        assert!(active.is_some());
2228        let active = active.unwrap();
2229        assert_eq!(active.revision_id, "rev-1");
2230        assert_eq!(active.sequence, 1);
2231        assert_eq!(active.status, "active");
2232    }
2233
2234    #[test]
2235    fn test_plan_revision_supersede() {
2236        let store = test_store();
2237        let sid = "test-rev-sup";
2238        seed_session(&store, sid);
2239
2240        let r1 = PlanRevisionRow {
2241            revision_id: "rev-1".to_string(),
2242            session_id: sid.to_string(),
2243            sequence: 1,
2244            plan_json: "{}".to_string(),
2245            reason: "initial".to_string(),
2246            supersedes: None,
2247            status: "active".to_string(),
2248        };
2249        store.record_plan_revision(&r1).unwrap();
2250
2251        // Supersede rev-1
2252        store.supersede_plan_revision("rev-1").unwrap();
2253
2254        let r2 = PlanRevisionRow {
2255            revision_id: "rev-2".to_string(),
2256            session_id: sid.to_string(),
2257            sequence: 2,
2258            plan_json: r#"{"tasks":["a"]}"#.to_string(),
2259            reason: "verifier feedback".to_string(),
2260            supersedes: Some("rev-1".to_string()),
2261            status: "active".to_string(),
2262        };
2263        store.record_plan_revision(&r2).unwrap();
2264
2265        // Only rev-2 should be active
2266        let active = store.get_active_plan_revision(sid).unwrap().unwrap();
2267        assert_eq!(active.revision_id, "rev-2");
2268
2269        // All revisions returned in order
2270        let all = store.get_plan_revisions(sid).unwrap();
2271        assert_eq!(all.len(), 2);
2272        assert_eq!(all[0].status, "superseded");
2273        assert_eq!(all[1].status, "active");
2274    }
2275
2276    #[test]
2277    fn test_repair_footprint_roundtrip() {
2278        let store = test_store();
2279        let sid = "test-repair";
2280        seed_session(&store, sid);
2281
2282        let row = RepairFootprintRow {
2283            footprint_id: "fp-1".to_string(),
2284            session_id: sid.to_string(),
2285            node_id: "node-a".to_string(),
2286            revision_id: "rev-1".to_string(),
2287            attempt: 1,
2288            affected_files: r#"["src/main.rs"]"#.to_string(),
2289            bundle_json: "{}".to_string(),
2290            diagnosis: "missing import".to_string(),
2291            resolved: false,
2292        };
2293        store.record_repair_footprint(&row).unwrap();
2294
2295        let footprints = store.get_repair_footprints(sid, "node-a").unwrap();
2296        assert_eq!(footprints.len(), 1);
2297        assert_eq!(footprints[0].footprint_id, "fp-1");
2298        assert_eq!(footprints[0].diagnosis, "missing import");
2299        assert!(!footprints[0].resolved);
2300    }
2301
2302    #[test]
2303    fn test_repair_footprint_resolve() {
2304        let store = test_store();
2305        let sid = "test-repair-res";
2306        seed_session(&store, sid);
2307
2308        let row = RepairFootprintRow {
2309            footprint_id: "fp-2".to_string(),
2310            session_id: sid.to_string(),
2311            node_id: "node-b".to_string(),
2312            revision_id: "rev-1".to_string(),
2313            attempt: 1,
2314            affected_files: "[]".to_string(),
2315            bundle_json: "{}".to_string(),
2316            diagnosis: "type error".to_string(),
2317            resolved: false,
2318        };
2319        store.record_repair_footprint(&row).unwrap();
2320
2321        store.resolve_repair_footprint("fp-2").unwrap();
2322
2323        let footprints = store.get_repair_footprints(sid, "node-b").unwrap();
2324        assert_eq!(footprints.len(), 1);
2325        assert!(footprints[0].resolved);
2326    }
2327
2328    #[test]
2329    fn test_budget_envelope_upsert_and_get() {
2330        let store = test_store();
2331        let sid = "test-budget";
2332        seed_session(&store, sid);
2333
2334        let row = BudgetEnvelopeRow {
2335            session_id: sid.to_string(),
2336            max_steps: Some(100),
2337            steps_used: 5,
2338            max_revisions: Some(10),
2339            revisions_used: 1,
2340            max_cost_usd: Some(5.0),
2341            cost_used_usd: 0.25,
2342        };
2343        store.upsert_budget_envelope(&row).unwrap();
2344
2345        let got = store.get_budget_envelope(sid).unwrap();
2346        assert!(got.is_some());
2347        let got = got.unwrap();
2348        assert_eq!(got.max_steps, Some(100));
2349        assert_eq!(got.steps_used, 5);
2350        assert_eq!(got.cost_used_usd, 0.25);
2351    }
2352
2353    #[test]
2354    fn test_budget_envelope_upsert_updates() {
2355        let store = test_store();
2356        let sid = "test-budget-up";
2357        seed_session(&store, sid);
2358
2359        let row1 = BudgetEnvelopeRow {
2360            session_id: sid.to_string(),
2361            max_steps: Some(100),
2362            steps_used: 0,
2363            max_revisions: None,
2364            revisions_used: 0,
2365            max_cost_usd: None,
2366            cost_used_usd: 0.0,
2367        };
2368        store.upsert_budget_envelope(&row1).unwrap();
2369
2370        // Update with new values
2371        let row2 = BudgetEnvelopeRow {
2372            session_id: sid.to_string(),
2373            max_steps: Some(100),
2374            steps_used: 42,
2375            max_revisions: Some(5),
2376            revisions_used: 3,
2377            max_cost_usd: Some(10.0),
2378            cost_used_usd: 4.5,
2379        };
2380        store.upsert_budget_envelope(&row2).unwrap();
2381
2382        let got = store.get_budget_envelope(sid).unwrap().unwrap();
2383        assert_eq!(got.steps_used, 42);
2384        assert_eq!(got.revisions_used, 3);
2385        assert_eq!(got.cost_used_usd, 4.5);
2386    }
2387
2388    #[test]
2389    fn test_budget_envelope_missing_returns_none() {
2390        let store = test_store();
2391        let sid = "test-budget-miss";
2392        seed_session(&store, sid);
2393
2394        let got = store.get_budget_envelope(sid).unwrap();
2395        assert!(got.is_none());
2396    }
2397
2398    #[test]
2399    fn test_read_only_store_queries_work() {
2400        let temp_dir = std::env::temp_dir();
2401        let db_path = temp_dir.join(format!("perspt_ro_test_{}.db", uuid::Uuid::new_v4()));
2402
2403        // Create and seed a normal store
2404        {
2405            let store = SessionStore::open(&db_path).unwrap();
2406            seed_session(&store, "ro-test");
2407        }
2408
2409        // Open read-only and verify queries work
2410        let ro = SessionStore::open_read_only(&db_path).unwrap();
2411        let sessions = ro.list_recent_sessions(10).unwrap();
2412        assert_eq!(sessions.len(), 1);
2413        assert_eq!(sessions[0].session_id, "ro-test");
2414    }
2415
2416    #[test]
2417    fn test_read_only_store_rejects_writes() {
2418        let temp_dir = std::env::temp_dir();
2419        let db_path = temp_dir.join(format!("perspt_ro_wr_{}.db", uuid::Uuid::new_v4()));
2420
2421        // Create the DB first
2422        {
2423            let _store = SessionStore::open(&db_path).unwrap();
2424        }
2425
2426        // Open read-only and verify writes fail
2427        let ro = SessionStore::open_read_only(&db_path).unwrap();
2428        let record = SessionRecord {
2429            session_id: "should-fail".to_string(),
2430            task: "test".to_string(),
2431            working_dir: "/tmp".to_string(),
2432            merkle_root: None,
2433            detected_toolchain: None,
2434            status: "RUNNING".to_string(),
2435        };
2436        assert!(ro.create_session(&record).is_err());
2437    }
2438}