Skip to main content

perspt_store/
store.rs

1//! Session Store Implementation
2//!
3//! Provides CRUD operations for SRBN sessions, node states, and energy history.
4
5use anyhow::{Context, Result};
6use duckdb::Connection;
7use serde::{Deserialize, Serialize};
8use sha2::{Digest, Sha256};
9use std::path::PathBuf;
10
11use crate::schema::init_schema;
12
13/// Record for a session
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct SessionRecord {
16    pub session_id: String,
17    pub task: String,
18    pub working_dir: String,
19    pub merkle_root: Option<Vec<u8>>,
20    pub detected_toolchain: Option<String>,
21    pub status: String,
22}
23
24/// Record for node state
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct NodeStateRecord {
27    pub node_id: String,
28    pub session_id: String,
29    pub state: String,
30    pub v_total: f32,
31    pub merkle_hash: Option<Vec<u8>>,
32    pub attempt_count: i32,
33    // PSP-5 Phase 8: Richer node snapshot for resume reconstruction
34    pub node_class: Option<String>,
35    pub owner_plugin: Option<String>,
36    pub goal: Option<String>,
37    pub parent_id: Option<String>,
38    /// JSON-serialized `Vec<String>`
39    pub children: Option<String>,
40    pub last_error_type: Option<String>,
41    pub committed_at: Option<String>,
42}
43
44/// Record for energy history
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct EnergyRecord {
47    pub node_id: String,
48    pub session_id: String,
49    pub v_syn: f32,
50    pub v_str: f32,
51    pub v_log: f32,
52    pub v_boot: f32,
53    pub v_sheaf: f32,
54    pub v_total: f32,
55}
56
57/// Record for LLM request/response logging
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct LlmRequestRecord {
60    pub session_id: String,
61    pub node_id: Option<String>,
62    pub model: String,
63    pub prompt: String,
64    pub response: String,
65    pub tokens_in: i32,
66    pub tokens_out: i32,
67    pub latency_ms: i32,
68}
69
70/// PSP-5 Phase 3: Record for structural digest persistence
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct StructuralDigestRecord {
73    pub digest_id: String,
74    pub session_id: String,
75    pub node_id: String,
76    pub source_path: String,
77    pub artifact_kind: String,
78    pub hash: Vec<u8>,
79    pub version: i32,
80}
81
82/// PSP-5 Phase 3: Record for context provenance persistence
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct ContextProvenanceRecord {
85    pub session_id: String,
86    pub node_id: String,
87    pub context_package_id: String,
88    /// JSON-serialized structural digest hashes
89    pub structural_hashes: String,
90    /// JSON-serialized summary hashes
91    pub summary_hashes: String,
92    /// JSON-serialized dependency commit hashes
93    pub dependency_hashes: String,
94    pub included_file_count: i32,
95    pub total_bytes: i32,
96}
97
98/// PSP-5 Phase 5: Record for escalation report persistence
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct EscalationReportRecord {
101    pub session_id: String,
102    pub node_id: String,
103    /// Serialized EscalationCategory
104    pub category: String,
105    /// JSON-serialized RewriteAction
106    pub action: String,
107    /// JSON-serialized EnergyComponents
108    pub energy_snapshot: String,
109    /// JSON-serialized `Vec<StageOutcome>`
110    pub stage_outcomes: String,
111    /// Human-readable evidence
112    pub evidence: String,
113    /// JSON-serialized `Vec<String>`
114    pub affected_node_ids: String,
115}
116
117/// PSP-5 Phase 5: Record for local graph rewrite persistence
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct RewriteRecordRow {
120    pub session_id: String,
121    pub node_id: String,
122    /// JSON-serialized RewriteAction
123    pub action: String,
124    /// Serialized EscalationCategory
125    pub category: String,
126    /// JSON-serialized `Vec<String>`
127    pub requeued_nodes: String,
128    /// JSON-serialized `Vec<String>`
129    pub inserted_nodes: String,
130}
131
132/// PSP-5 Phase 5: Record for sheaf validation result persistence
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct SheafValidationRow {
135    pub session_id: String,
136    pub node_id: String,
137    pub validator_class: String,
138    pub plugin_source: Option<String>,
139    pub passed: bool,
140    pub evidence_summary: String,
141    /// JSON-serialized `Vec<String>`
142    pub affected_files: String,
143    pub v_sheaf_contribution: f32,
144    /// JSON-serialized `Vec<String>`
145    pub requeue_targets: String,
146}
147
148// =============================================================================
149// PSP-5 Phase 6: Provisional Branch, Interface Seal, Branch Flush Records
150// =============================================================================
151
152/// PSP-5 Phase 6: Record for provisional branch persistence
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ProvisionalBranchRow {
155    pub branch_id: String,
156    pub session_id: String,
157    pub node_id: String,
158    pub parent_node_id: String,
159    pub state: String,
160    pub parent_seal_hash: Option<Vec<u8>>,
161    pub sandbox_dir: Option<String>,
162}
163
164/// PSP-5 Phase 6: Record for branch lineage persistence
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct BranchLineageRow {
167    pub lineage_id: String,
168    pub parent_branch_id: String,
169    pub child_branch_id: String,
170    pub depends_on_seal: bool,
171}
172
173/// PSP-5 Phase 6: Record for interface seal persistence
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct InterfaceSealRow {
176    pub seal_id: String,
177    pub session_id: String,
178    pub node_id: String,
179    pub sealed_path: String,
180    pub artifact_kind: String,
181    pub seal_hash: Vec<u8>,
182    pub version: i32,
183}
184
185/// PSP-5 Phase 6: Record for branch flush decision persistence
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct BranchFlushRow {
188    pub flush_id: String,
189    pub session_id: String,
190    pub parent_node_id: String,
191    /// JSON-serialized `Vec<String>`
192    pub flushed_branch_ids: String,
193    /// JSON-serialized `Vec<String>`
194    pub requeue_node_ids: String,
195    pub reason: String,
196}
197
198// =============================================================================
199// PSP-5 Phase 8: Task Graph and Review Outcome Records
200// =============================================================================
201
202/// PSP-5 Phase 8: Record for task graph edges (DAG reconstruction on resume)
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct TaskGraphEdgeRow {
205    pub session_id: String,
206    pub parent_node_id: String,
207    pub child_node_id: String,
208    pub edge_type: String,
209}
210
211/// PSP-5 Phase 8: Record for review outcome persistence
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct ReviewOutcomeRow {
214    pub session_id: String,
215    pub node_id: String,
216    /// One of: "approved", "rejected", "edit_requested", "correction_requested", "skipped"
217    pub outcome: String,
218    pub reviewer_note: Option<String>,
219    /// Energy at time of review decision
220    pub energy_at_review: Option<f64>,
221    /// Whether verification was degraded when decision was made
222    pub degraded: Option<bool>,
223    /// Escalation category if the node had been classified
224    pub escalation_category: Option<String>,
225}
226
227/// PSP-5 Phase 8: Record for verification result snapshot persistence
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct VerificationResultRow {
230    pub session_id: String,
231    pub node_id: String,
232    /// JSON-serialized VerificationResult (full data for resume reconstruction)
233    pub result_json: String,
234    // Query-friendly summary fields
235    pub syntax_ok: bool,
236    pub build_ok: bool,
237    pub tests_ok: bool,
238    pub lint_ok: bool,
239    pub diagnostics_count: i32,
240    pub tests_passed: i32,
241    pub tests_failed: i32,
242    pub degraded: bool,
243    pub degraded_reason: Option<String>,
244}
245
246/// PSP-5 Phase 8: Record for artifact bundle snapshot persistence
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ArtifactBundleRow {
249    pub session_id: String,
250    pub node_id: String,
251    /// JSON-serialized ArtifactBundle (full data for resume reconstruction)
252    pub bundle_json: String,
253    pub artifact_count: i32,
254    pub command_count: i32,
255    /// JSON-serialized `Vec<String>` of touched file paths
256    pub touched_files: String,
257}
258
259// =========================================================================
260// Plan Revision, Feature Charter, and Repair Footprint Row Types
261// =========================================================================
262
263/// Row type for feature_charters table
264#[derive(Debug, Clone)]
265pub struct FeatureCharterRow {
266    pub charter_id: String,
267    pub session_id: String,
268    pub scope_description: String,
269    pub max_modules: Option<i32>,
270    pub max_files: Option<i32>,
271    pub max_revisions: Option<i32>,
272    pub language_constraint: Option<String>,
273}
274
275/// Row type for plan_revisions table
276#[derive(Debug, Clone)]
277pub struct PlanRevisionRow {
278    pub revision_id: String,
279    pub session_id: String,
280    pub sequence: i32,
281    pub plan_json: String,
282    pub reason: String,
283    pub supersedes: Option<String>,
284    pub status: String,
285}
286
287/// Row type for repair_footprints table
288#[derive(Debug, Clone)]
289pub struct RepairFootprintRow {
290    pub footprint_id: String,
291    pub session_id: String,
292    pub node_id: String,
293    pub revision_id: String,
294    pub attempt: i32,
295    pub affected_files: String,
296    pub bundle_json: String,
297    pub diagnosis: String,
298    pub resolved: bool,
299}
300
301/// Row type for budget_envelopes table
302#[derive(Debug, Clone)]
303pub struct BudgetEnvelopeRow {
304    pub session_id: String,
305    pub max_steps: Option<i32>,
306    pub steps_used: i32,
307    pub max_revisions: Option<i32>,
308    pub revisions_used: i32,
309    pub max_cost_usd: Option<f64>,
310    pub cost_used_usd: f64,
311}
312
313use std::sync::Mutex;
314
315/// Session store for SRBN persistence
316pub struct SessionStore {
317    conn: Mutex<Connection>,
318}
319
320impl SessionStore {
321    /// Create a new session store with default path
322    pub fn new() -> Result<Self> {
323        let db_path = Self::default_db_path()?;
324        Self::open(&db_path)
325    }
326
327    /// Open a session store at the given path
328    pub fn open(path: &PathBuf) -> Result<Self> {
329        // Ensure parent directory exists
330        if let Some(parent) = path.parent() {
331            std::fs::create_dir_all(parent)?;
332        }
333
334        let conn = Connection::open(path).context("Failed to open DuckDB")?;
335        init_schema(&conn)?;
336
337        Ok(Self {
338            conn: Mutex::new(conn),
339        })
340    }
341
342    /// Open a session store in read-only mode for concurrent dashboard reads.
343    ///
344    /// Uses `AccessMode::ReadOnly` so the dashboard can read alongside the
345    /// agent's write lock. Does **not** call `init_schema()` (a write op).
346    /// The database file must already exist.
347    pub fn open_read_only(path: &std::path::Path) -> Result<Self> {
348        let config = duckdb::Config::default()
349            .access_mode(duckdb::AccessMode::ReadOnly)
350            .context("Failed to configure DuckDB read-only mode")?;
351        let conn = Connection::open_with_flags(path, config)
352            .context("Failed to open DuckDB in read-only mode")?;
353        Ok(Self {
354            conn: Mutex::new(conn),
355        })
356    }
357
358    /// Get the default database path (~/.local/share/perspt/perspt.db or similar)
359    pub fn default_db_path() -> Result<PathBuf> {
360        perspt_core::paths::database_path().context("Could not determine platform data directory")
361    }
362
363    /// Create a new session
364    pub fn create_session(&self, session: &SessionRecord) -> Result<()> {
365        self.conn.lock().unwrap().execute(
366            r#"
367            INSERT INTO sessions (session_id, task, working_dir, merkle_root, detected_toolchain, status)
368            VALUES (?, ?, ?, ?, ?, ?)
369            "#,
370            [
371                &session.session_id,
372                &session.task,
373                &session.working_dir,
374                &session.merkle_root.as_ref().map(hex::encode).unwrap_or_default(),
375                &session.detected_toolchain.clone().unwrap_or_default(),
376                &session.status,
377            ],
378        )?;
379        Ok(())
380    }
381
382    /// Update session merkle root
383    pub fn update_merkle_root(&self, session_id: &str, merkle_root: &[u8]) -> Result<()> {
384        self.conn.lock().unwrap().execute(
385            "UPDATE sessions SET merkle_root = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
386            [hex::encode(merkle_root), session_id.to_string()],
387        )?;
388        Ok(())
389    }
390
391    /// Record node state
392    pub fn record_node_state(&self, record: &NodeStateRecord) -> Result<()> {
393        let v_total = record.v_total.to_string();
394        let merkle_hash = record
395            .merkle_hash
396            .as_ref()
397            .map(hex::encode)
398            .unwrap_or_default();
399        let attempt_count = record.attempt_count.to_string();
400        let node_class = record.node_class.clone().unwrap_or_default();
401        let owner_plugin = record.owner_plugin.clone().unwrap_or_default();
402        let goal = record.goal.clone().unwrap_or_default();
403        let parent_id = record.parent_id.clone().unwrap_or_default();
404        let children = record.children.clone().unwrap_or_default();
405        let last_error_type = record.last_error_type.clone().unwrap_or_default();
406        let committed_at = record.committed_at.clone().unwrap_or_default();
407
408        self.conn.lock().unwrap().execute(
409            r#"
410            INSERT INTO node_states (node_id, session_id, state, v_total, merkle_hash, attempt_count,
411                                     node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at)
412            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
413            "#,
414            [
415                &record.node_id,
416                &record.session_id,
417                &record.state,
418                &v_total,
419                &merkle_hash,
420                &attempt_count,
421                &node_class,
422                &owner_plugin,
423                &goal,
424                &parent_id,
425                &children,
426                &last_error_type,
427                &committed_at,
428            ],
429        )?;
430        Ok(())
431    }
432
433    /// Record energy measurement
434    pub fn record_energy(&self, record: &EnergyRecord) -> Result<()> {
435        self.conn.lock().unwrap().execute(
436            r#"
437            INSERT INTO energy_history (node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total)
438            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
439            "#,
440            [
441                &record.node_id,
442                &record.session_id,
443                &record.v_syn.to_string(),
444                &record.v_str.to_string(),
445                &record.v_log.to_string(),
446                &record.v_boot.to_string(),
447                &record.v_sheaf.to_string(),
448                &record.v_total.to_string(),
449            ],
450        )?;
451        Ok(())
452    }
453
454    /// Calculate Merkle hash for content
455    pub fn calculate_hash(content: &[u8]) -> Vec<u8> {
456        let mut hasher = Sha256::new();
457        hasher.update(content);
458        hasher.finalize().to_vec()
459    }
460
461    /// Get session by ID
462    pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>> {
463        let conn = self.conn.lock().unwrap();
464        let mut stmt = conn.prepare(
465            "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status FROM sessions WHERE session_id = ?"
466        )?;
467
468        let mut rows = stmt.query([session_id])?;
469        if let Some(row) = rows.next()? {
470            // merkle_root is stored as BLOB; read directly as Option<Vec<u8>>
471            // to match list_recent_sessions and avoid type mismatch on Blob columns.
472            let merkle_root: Option<Vec<u8>> = row.get(3).ok();
473
474            Ok(Some(SessionRecord {
475                session_id: row.get(0)?,
476                task: row.get(1)?,
477                working_dir: row.get(2)?,
478                merkle_root,
479                detected_toolchain: row.get(4)?,
480                status: row.get(5)?,
481            }))
482        } else {
483            Ok(None)
484        }
485    }
486
487    /// Get the directory for session artifacts (`~/.local/share/perspt/sessions/<id>`)
488    pub fn get_session_dir(&self, session_id: &str) -> Result<PathBuf> {
489        let data_dir = dirs::data_local_dir()
490            .context("Could not find local data directory")?
491            .join("perspt")
492            .join("sessions")
493            .join(session_id);
494        Ok(data_dir)
495    }
496
497    /// Ensure a session directory exists and return the path
498    pub fn create_session_dir(&self, session_id: &str) -> Result<PathBuf> {
499        let dir = self.get_session_dir(session_id)?;
500        if !dir.exists() {
501            std::fs::create_dir_all(&dir).context("Failed to create session directory")?;
502        }
503        Ok(dir)
504    }
505
506    /// Get energy history for a node (query)
507    pub fn get_energy_history(&self, session_id: &str, node_id: &str) -> Result<Vec<EnergyRecord>> {
508        let conn = self.conn.lock().unwrap();
509        let mut stmt = conn.prepare(
510            "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? AND node_id = ? ORDER BY timestamp"
511        )?;
512
513        let mut rows = stmt.query([session_id, node_id])?;
514        let mut records = Vec::new();
515
516        while let Some(row) = rows.next()? {
517            records.push(EnergyRecord {
518                node_id: row.get(0)?,
519                session_id: row.get(1)?,
520                v_syn: row.get::<_, f64>(2)? as f32,
521                v_str: row.get::<_, f64>(3)? as f32,
522                v_log: row.get::<_, f64>(4)? as f32,
523                v_boot: row.get::<_, f64>(5)? as f32,
524                v_sheaf: row.get::<_, f64>(6)? as f32,
525                v_total: row.get::<_, f64>(7)? as f32,
526            });
527        }
528
529        Ok(records)
530    }
531
532    /// Get all energy history for a session (all nodes)
533    pub fn get_session_energy_history(&self, session_id: &str) -> Result<Vec<EnergyRecord>> {
534        let conn = self.conn.lock().unwrap();
535        let mut stmt = conn.prepare(
536            "SELECT node_id, session_id, v_syn, v_str, v_log, v_boot, v_sheaf, v_total FROM energy_history WHERE session_id = ? ORDER BY timestamp"
537        )?;
538
539        let mut rows = stmt.query([session_id])?;
540        let mut records = Vec::new();
541
542        while let Some(row) = rows.next()? {
543            records.push(EnergyRecord {
544                node_id: row.get(0)?,
545                session_id: row.get(1)?,
546                v_syn: row.get::<_, f64>(2)? as f32,
547                v_str: row.get::<_, f64>(3)? as f32,
548                v_log: row.get::<_, f64>(4)? as f32,
549                v_boot: row.get::<_, f64>(5)? as f32,
550                v_sheaf: row.get::<_, f64>(6)? as f32,
551                v_total: row.get::<_, f64>(7)? as f32,
552            });
553        }
554
555        Ok(records)
556    }
557
558    /// List recent sessions (newest first)
559    pub fn list_recent_sessions(&self, limit: usize) -> Result<Vec<SessionRecord>> {
560        self.list_sessions_paginated(limit, 0)
561    }
562
563    /// List sessions with pagination (most recent first).
564    pub fn list_sessions_paginated(
565        &self,
566        limit: usize,
567        offset: usize,
568    ) -> Result<Vec<SessionRecord>> {
569        let conn = self.conn.lock().unwrap();
570        let mut stmt = conn.prepare(
571            "SELECT session_id, task, working_dir, merkle_root, detected_toolchain, status
572             FROM sessions ORDER BY created_at DESC LIMIT ? OFFSET ?",
573        )?;
574
575        let mut rows = stmt.query([limit.to_string(), offset.to_string()])?;
576        let mut records = Vec::new();
577
578        while let Some(row) = rows.next()? {
579            let merkle_root: Option<Vec<u8>> = row.get(3).ok();
580
581            records.push(SessionRecord {
582                session_id: row.get(0)?,
583                task: row.get(1)?,
584                working_dir: row.get(2)?,
585                merkle_root,
586                detected_toolchain: row.get(4)?,
587                status: row.get(5)?,
588            });
589        }
590
591        Ok(records)
592    }
593
594    /// Count total number of sessions.
595    pub fn count_sessions(&self) -> Result<usize> {
596        let conn = self.conn.lock().unwrap();
597        let mut stmt = conn.prepare("SELECT COUNT(*) FROM sessions")?;
598        let mut rows = stmt.query([])?;
599        if let Some(row) = rows.next()? {
600            let count: i64 = row.get(0)?;
601            Ok(count as usize)
602        } else {
603            Ok(0)
604        }
605    }
606
607    /// Get all node states for a session
608    pub fn get_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
609        let conn = self.conn.lock().unwrap();
610        let mut stmt = conn.prepare(
611            "SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
612                    node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
613             FROM node_states WHERE session_id = ? ORDER BY created_at",
614        )?;
615
616        let mut rows = stmt.query([session_id])?;
617        let mut records = Vec::new();
618
619        while let Some(row) = rows.next()? {
620            records.push(NodeStateRecord {
621                node_id: row.get(0)?,
622                session_id: row.get(1)?,
623                state: row.get(2)?,
624                v_total: row.get::<_, f64>(3)? as f32,
625                merkle_hash: row
626                    .get::<_, Option<String>>(4)?
627                    .and_then(|s| hex::decode(s).ok()),
628                attempt_count: row.get(5)?,
629                node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
630                owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
631                goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
632                parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
633                children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
634                last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
635                committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
636            });
637        }
638
639        Ok(records)
640    }
641
642    /// Update session status
643    pub fn update_session_status(&self, session_id: &str, status: &str) -> Result<()> {
644        self.conn.lock().unwrap().execute(
645            "UPDATE sessions SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
646            [status, session_id],
647        )?;
648        Ok(())
649    }
650
651    /// Record an LLM request/response
652    pub fn record_llm_request(&self, record: &LlmRequestRecord) -> Result<()> {
653        let conn = self.conn.lock().unwrap();
654        conn.execute(
655            r#"
656            INSERT INTO llm_requests (session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms)
657            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
658            "#,
659            [
660                &record.session_id,
661                &record.node_id.clone().unwrap_or_default(),
662                &record.model,
663                &record.prompt,
664                &record.response,
665                &record.tokens_in.to_string(),
666                &record.tokens_out.to_string(),
667                &record.latency_ms.to_string(),
668            ],
669        )?;
670        Ok(())
671    }
672
673    /// Get LLM requests for a session
674    pub fn get_llm_requests(&self, session_id: &str) -> Result<Vec<LlmRequestRecord>> {
675        let conn = self.conn.lock().unwrap();
676        let mut stmt = conn.prepare(
677            "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
678             FROM llm_requests WHERE session_id = ? ORDER BY timestamp",
679        )?;
680
681        let mut rows = stmt.query([session_id])?;
682        let mut records = Vec::new();
683
684        while let Some(row) = rows.next()? {
685            let node_id: Option<String> = row.get(1)?;
686            records.push(LlmRequestRecord {
687                session_id: row.get(0)?,
688                node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
689                    None
690                } else {
691                    node_id
692                },
693                model: row.get(2)?,
694                prompt: row.get(3)?,
695                response: row.get(4)?,
696                tokens_in: row.get(5)?,
697                tokens_out: row.get(6)?,
698                latency_ms: row.get(7)?,
699            });
700        }
701
702        Ok(records)
703    }
704
705    /// Count all LLM requests in the database (for debugging)
706    pub fn count_all_llm_requests(&self) -> Result<i64> {
707        let conn = self.conn.lock().unwrap();
708        let mut stmt = conn.prepare("SELECT COUNT(*) FROM llm_requests")?;
709        let count: i64 = stmt.query_row([], |row| row.get(0))?;
710        Ok(count)
711    }
712
713    /// Aggregate LLM statistics across all sessions: (count, sum_tokens_in, sum_tokens_out, sum_latency_ms)
714    pub fn get_global_llm_summary(&self) -> Result<(i64, i64, i64, i64)> {
715        let conn = self.conn.lock().unwrap();
716        let mut stmt = conn.prepare(
717            "SELECT COUNT(*), \
718             COALESCE(SUM(CASE WHEN tokens_in > 0 THEN tokens_in ELSE (LENGTH(prompt) + 3) / 4 END), 0), \
719             COALESCE(SUM(CASE WHEN tokens_out > 0 THEN tokens_out ELSE (LENGTH(response) + 3) / 4 END), 0), \
720             COALESCE(MEDIAN(latency_ms), 0) \
721             FROM llm_requests",
722        )?;
723        let result = stmt.query_row([], |row| {
724            Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
725        })?;
726        Ok(result)
727    }
728
729    /// Get all LLM requests (for debugging)
730    pub fn get_all_llm_requests(&self, limit: usize) -> Result<Vec<LlmRequestRecord>> {
731        let conn = self.conn.lock().unwrap();
732        let mut stmt = conn.prepare(
733            "SELECT session_id, node_id, model, prompt, response, tokens_in, tokens_out, latency_ms
734             FROM llm_requests ORDER BY timestamp DESC LIMIT ?",
735        )?;
736
737        let mut rows = stmt.query([limit as i64])?;
738        let mut records = Vec::new();
739
740        while let Some(row) = rows.next()? {
741            let node_id: Option<String> = row.get(1)?;
742            records.push(LlmRequestRecord {
743                session_id: row.get(0)?,
744                node_id: if node_id.as_ref().map(|s| s.is_empty()).unwrap_or(true) {
745                    None
746                } else {
747                    node_id
748                },
749                model: row.get(2)?,
750                prompt: row.get(3)?,
751                response: row.get(4)?,
752                tokens_in: row.get(5)?,
753                tokens_out: row.get(6)?,
754                latency_ms: row.get(7)?,
755            });
756        }
757
758        Ok(records)
759    }
760
761    // =========================================================================
762    // PSP-5 Phase 3: Structural Digest & Context Provenance Persistence
763    // =========================================================================
764
765    /// Record a structural digest
766    pub fn record_structural_digest(&self, record: &StructuralDigestRecord) -> Result<()> {
767        self.conn.lock().unwrap().execute(
768            r#"
769            INSERT INTO structural_digests (digest_id, session_id, node_id, source_path, artifact_kind, hash, version)
770            VALUES (?, ?, ?, ?, ?, ?, ?)
771            "#,
772            [
773                &record.digest_id,
774                &record.session_id,
775                &record.node_id,
776                &record.source_path,
777                &record.artifact_kind,
778                &hex::encode(&record.hash),
779                &record.version.to_string(),
780            ],
781        )?;
782        Ok(())
783    }
784
785    /// Get structural digests for a session and node
786    pub fn get_structural_digests(
787        &self,
788        session_id: &str,
789        node_id: &str,
790    ) -> Result<Vec<StructuralDigestRecord>> {
791        let conn = self.conn.lock().unwrap();
792        let mut stmt = conn.prepare(
793            "SELECT digest_id, session_id, node_id, source_path, artifact_kind, hash, version
794             FROM structural_digests WHERE session_id = ? AND node_id = ? ORDER BY created_at",
795        )?;
796
797        let mut rows = stmt.query([session_id, node_id])?;
798        let mut records = Vec::new();
799
800        while let Some(row) = rows.next()? {
801            records.push(StructuralDigestRecord {
802                digest_id: row.get(0)?,
803                session_id: row.get(1)?,
804                node_id: row.get(2)?,
805                source_path: row.get(3)?,
806                artifact_kind: row.get(4)?,
807                hash: row
808                    .get::<_, String>(5)
809                    .ok()
810                    .and_then(|s| hex::decode(s).ok())
811                    .unwrap_or_default(),
812                version: row.get(5)?,
813            });
814        }
815
816        Ok(records)
817    }
818
819    /// Record context provenance for a node
820    pub fn record_context_provenance(&self, record: &ContextProvenanceRecord) -> Result<()> {
821        self.conn.lock().unwrap().execute(
822            r#"
823            INSERT INTO context_provenance (session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes)
824            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
825            "#,
826            [
827                &record.session_id,
828                &record.node_id,
829                &record.context_package_id,
830                &record.structural_hashes,
831                &record.summary_hashes,
832                &record.dependency_hashes,
833                &record.included_file_count.to_string(),
834                &record.total_bytes.to_string(),
835            ],
836        )?;
837        Ok(())
838    }
839
840    /// Get context provenance for a session and node
841    pub fn get_context_provenance(
842        &self,
843        session_id: &str,
844        node_id: &str,
845    ) -> Result<Option<ContextProvenanceRecord>> {
846        let conn = self.conn.lock().unwrap();
847        let mut stmt = conn.prepare(
848            "SELECT session_id, node_id, context_package_id, structural_hashes, summary_hashes, dependency_hashes, included_file_count, total_bytes
849             FROM context_provenance WHERE session_id = ? AND node_id = ? ORDER BY created_at DESC LIMIT 1",
850        )?;
851
852        let mut rows = stmt.query([session_id, node_id])?;
853        if let Some(row) = rows.next()? {
854            Ok(Some(ContextProvenanceRecord {
855                session_id: row.get(0)?,
856                node_id: row.get(1)?,
857                context_package_id: row.get(2)?,
858                structural_hashes: row.get(3)?,
859                summary_hashes: row.get(4)?,
860                dependency_hashes: row.get(5)?,
861                included_file_count: row.get(6)?,
862                total_bytes: row.get(7)?,
863            }))
864        } else {
865            Ok(None)
866        }
867    }
868
869    // =========================================================================
870    // PSP-5 Phase 5: Escalation, Rewrite, and Sheaf Validation Persistence
871    // =========================================================================
872
873    /// Record an escalation report
874    pub fn record_escalation_report(&self, record: &EscalationReportRecord) -> Result<()> {
875        self.conn.lock().unwrap().execute(
876            r#"
877            INSERT INTO escalation_reports (session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids)
878            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
879            "#,
880            [
881                &record.session_id,
882                &record.node_id,
883                &record.category,
884                &record.action,
885                &record.energy_snapshot,
886                &record.stage_outcomes,
887                &record.evidence,
888                &record.affected_node_ids,
889            ],
890        )?;
891        Ok(())
892    }
893
894    /// Get escalation reports for a session
895    pub fn get_escalation_reports(&self, session_id: &str) -> Result<Vec<EscalationReportRecord>> {
896        let conn = self.conn.lock().unwrap();
897        let mut stmt = conn.prepare(
898            "SELECT session_id, node_id, category, action, energy_snapshot, stage_outcomes, evidence, affected_node_ids
899             FROM escalation_reports WHERE session_id = ? ORDER BY created_at",
900        )?;
901        let mut rows = stmt.query([session_id])?;
902        let mut records = Vec::new();
903        while let Some(row) = rows.next()? {
904            records.push(EscalationReportRecord {
905                session_id: row.get(0)?,
906                node_id: row.get(1)?,
907                category: row.get(2)?,
908                action: row.get(3)?,
909                energy_snapshot: row.get(4)?,
910                stage_outcomes: row.get(5)?,
911                evidence: row.get(6)?,
912                affected_node_ids: row.get(7)?,
913            });
914        }
915        Ok(records)
916    }
917
918    /// Record a local graph rewrite
919    pub fn record_rewrite(&self, record: &RewriteRecordRow) -> Result<()> {
920        self.conn.lock().unwrap().execute(
921            r#"
922            INSERT INTO rewrite_records (session_id, node_id, action, category, requeued_nodes, inserted_nodes)
923            VALUES (?, ?, ?, ?, ?, ?)
924            "#,
925            [
926                &record.session_id,
927                &record.node_id,
928                &record.action,
929                &record.category,
930                &record.requeued_nodes,
931                &record.inserted_nodes,
932            ],
933        )?;
934        Ok(())
935    }
936
937    /// Get rewrite records for a session
938    pub fn get_rewrite_records(&self, session_id: &str) -> Result<Vec<RewriteRecordRow>> {
939        let conn = self.conn.lock().unwrap();
940        let mut stmt = conn.prepare(
941            "SELECT session_id, node_id, action, category, requeued_nodes, inserted_nodes
942             FROM rewrite_records WHERE session_id = ? ORDER BY created_at",
943        )?;
944        let mut rows = stmt.query([session_id])?;
945        let mut records = Vec::new();
946        while let Some(row) = rows.next()? {
947            records.push(RewriteRecordRow {
948                session_id: row.get(0)?,
949                node_id: row.get(1)?,
950                action: row.get(2)?,
951                category: row.get(3)?,
952                requeued_nodes: row.get(4)?,
953                inserted_nodes: row.get(5)?,
954            });
955        }
956        Ok(records)
957    }
958
959    /// Record a sheaf validation result
960    pub fn record_sheaf_validation(&self, record: &SheafValidationRow) -> Result<()> {
961        self.conn.lock().unwrap().execute(
962            r#"
963            INSERT INTO sheaf_validations (session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets)
964            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
965            "#,
966            [
967                &record.session_id,
968                &record.node_id,
969                &record.validator_class,
970                &record.plugin_source.clone().unwrap_or_default(),
971                &record.passed.to_string(),
972                &record.evidence_summary,
973                &record.affected_files,
974                &record.v_sheaf_contribution.to_string(),
975                &record.requeue_targets,
976            ],
977        )?;
978        Ok(())
979    }
980
981    /// Get sheaf validation results for a session and node
982    pub fn get_sheaf_validations(
983        &self,
984        session_id: &str,
985        node_id: &str,
986    ) -> Result<Vec<SheafValidationRow>> {
987        let conn = self.conn.lock().unwrap();
988        let mut stmt = conn.prepare(
989            "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
990             FROM sheaf_validations WHERE session_id = ? AND node_id = ? ORDER BY created_at",
991        )?;
992        let mut rows = stmt.query([session_id, node_id])?;
993        let mut records = Vec::new();
994        while let Some(row) = rows.next()? {
995            records.push(SheafValidationRow {
996                session_id: row.get(0)?,
997                node_id: row.get(1)?,
998                validator_class: row.get(2)?,
999                plugin_source: row.get::<_, Option<String>>(3)?,
1000                passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
1001                evidence_summary: row.get(5)?,
1002                affected_files: row.get(6)?,
1003                v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
1004                requeue_targets: row.get(8)?,
1005            });
1006        }
1007        Ok(records)
1008    }
1009
1010    /// Get all sheaf validations for a session (all nodes).
1011    pub fn get_all_sheaf_validations(&self, session_id: &str) -> Result<Vec<SheafValidationRow>> {
1012        let conn = self.conn.lock().unwrap();
1013        let mut stmt = conn.prepare(
1014            "SELECT session_id, node_id, validator_class, plugin_source, passed, evidence_summary, affected_files, v_sheaf_contribution, requeue_targets
1015             FROM sheaf_validations WHERE session_id = ? ORDER BY created_at",
1016        )?;
1017        let mut rows = stmt.query([session_id])?;
1018        let mut records = Vec::new();
1019        while let Some(row) = rows.next()? {
1020            records.push(SheafValidationRow {
1021                session_id: row.get(0)?,
1022                node_id: row.get(1)?,
1023                validator_class: row.get(2)?,
1024                plugin_source: row.get::<_, Option<String>>(3)?,
1025                passed: row.get::<_, String>(4)?.parse().unwrap_or(false),
1026                evidence_summary: row.get(5)?,
1027                affected_files: row.get(6)?,
1028                v_sheaf_contribution: row.get::<_, f64>(7)? as f32,
1029                requeue_targets: row.get(8)?,
1030            });
1031        }
1032        Ok(records)
1033    }
1034
1035    // =========================================================================
1036    // PSP-5 Phase 6: Provisional Branch CRUD
1037    // =========================================================================
1038
1039    /// Record a new provisional branch
1040    pub fn record_provisional_branch(&self, record: &ProvisionalBranchRow) -> Result<()> {
1041        self.conn.lock().unwrap().execute(
1042            r#"
1043            INSERT INTO provisional_branches (branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir)
1044            VALUES (?, ?, ?, ?, ?, ?, ?)
1045            "#,
1046            [
1047                &record.branch_id,
1048                &record.session_id,
1049                &record.node_id,
1050                &record.parent_node_id,
1051                &record.state,
1052                &record.parent_seal_hash.as_ref().map(hex::encode).unwrap_or_default(),
1053                &record.sandbox_dir.clone().unwrap_or_default(),
1054            ],
1055        )?;
1056        Ok(())
1057    }
1058
1059    /// Update a provisional branch state
1060    pub fn update_branch_state(&self, branch_id: &str, new_state: &str) -> Result<()> {
1061        self.conn.lock().unwrap().execute(
1062            "UPDATE provisional_branches SET state = ?, updated_at = CURRENT_TIMESTAMP WHERE branch_id = ?",
1063            [new_state, branch_id],
1064        )?;
1065        Ok(())
1066    }
1067
1068    /// Get all provisional branches for a session
1069    pub fn get_provisional_branches(&self, session_id: &str) -> Result<Vec<ProvisionalBranchRow>> {
1070        let conn = self.conn.lock().unwrap();
1071        let mut stmt = conn.prepare(
1072            "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1073             FROM provisional_branches WHERE session_id = ? ORDER BY created_at",
1074        )?;
1075        let mut rows = stmt.query([session_id])?;
1076        let mut records = Vec::new();
1077        while let Some(row) = rows.next()? {
1078            // parent_seal_hash is BLOB; read directly as Option<Vec<u8>>
1079            let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1080            records.push(ProvisionalBranchRow {
1081                branch_id: row.get(0)?,
1082                session_id: row.get(1)?,
1083                node_id: row.get(2)?,
1084                parent_node_id: row.get(3)?,
1085                state: row.get(4)?,
1086                parent_seal_hash,
1087                sandbox_dir: row.get::<_, Option<String>>(6)?,
1088            });
1089        }
1090        Ok(records)
1091    }
1092
1093    /// Get live (active/sealed) provisional branches depending on a parent node
1094    pub fn get_live_branches_for_parent(
1095        &self,
1096        session_id: &str,
1097        parent_node_id: &str,
1098    ) -> Result<Vec<ProvisionalBranchRow>> {
1099        let conn = self.conn.lock().unwrap();
1100        let mut stmt = conn.prepare(
1101            "SELECT branch_id, session_id, node_id, parent_node_id, state, parent_seal_hash, sandbox_dir
1102             FROM provisional_branches
1103             WHERE session_id = ? AND parent_node_id = ? AND state IN ('active', 'sealed')
1104             ORDER BY created_at",
1105        )?;
1106        let mut rows = stmt.query([session_id, parent_node_id])?;
1107        let mut records = Vec::new();
1108        while let Some(row) = rows.next()? {
1109            // parent_seal_hash is BLOB; read directly as Option<Vec<u8>>
1110            let parent_seal_hash: Option<Vec<u8>> = row.get(5).ok();
1111            records.push(ProvisionalBranchRow {
1112                branch_id: row.get(0)?,
1113                session_id: row.get(1)?,
1114                node_id: row.get(2)?,
1115                parent_node_id: row.get(3)?,
1116                state: row.get(4)?,
1117                parent_seal_hash,
1118                sandbox_dir: row.get::<_, Option<String>>(6)?,
1119            });
1120        }
1121        Ok(records)
1122    }
1123
1124    /// Mark all live branches for a parent as flushed
1125    pub fn flush_branches_for_parent(
1126        &self,
1127        session_id: &str,
1128        parent_node_id: &str,
1129    ) -> Result<Vec<String>> {
1130        let live = self.get_live_branches_for_parent(session_id, parent_node_id)?;
1131        let branch_ids: Vec<String> = live.iter().map(|b| b.branch_id.clone()).collect();
1132        for bid in &branch_ids {
1133            self.update_branch_state(bid, "flushed")?;
1134        }
1135        Ok(branch_ids)
1136    }
1137
1138    // =========================================================================
1139    // PSP-5 Phase 6: Branch Lineage CRUD
1140    // =========================================================================
1141
1142    /// Record a branch lineage edge
1143    pub fn record_branch_lineage(&self, record: &BranchLineageRow) -> Result<()> {
1144        self.conn.lock().unwrap().execute(
1145            r#"
1146            INSERT INTO branch_lineage (lineage_id, parent_branch_id, child_branch_id, depends_on_seal)
1147            VALUES (?, ?, ?, ?)
1148            "#,
1149            [
1150                &record.lineage_id,
1151                &record.parent_branch_id,
1152                &record.child_branch_id,
1153                &record.depends_on_seal.to_string(),
1154            ],
1155        )?;
1156        Ok(())
1157    }
1158
1159    /// Get child branch IDs for a parent branch
1160    pub fn get_child_branches(&self, parent_branch_id: &str) -> Result<Vec<String>> {
1161        let conn = self.conn.lock().unwrap();
1162        let mut stmt =
1163            conn.prepare("SELECT child_branch_id FROM branch_lineage WHERE parent_branch_id = ?")?;
1164        let mut rows = stmt.query([parent_branch_id])?;
1165        let mut ids = Vec::new();
1166        while let Some(row) = rows.next()? {
1167            ids.push(row.get(0)?);
1168        }
1169        Ok(ids)
1170    }
1171
1172    // =========================================================================
1173    // PSP-5 Phase 6: Interface Seal CRUD
1174    // =========================================================================
1175
1176    /// Record an interface seal
1177    pub fn record_interface_seal(&self, record: &InterfaceSealRow) -> Result<()> {
1178        self.conn.lock().unwrap().execute(
1179            r#"
1180            INSERT INTO interface_seals (seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version)
1181            VALUES (?, ?, ?, ?, ?, ?, ?)
1182            "#,
1183            [
1184                &record.seal_id,
1185                &record.session_id,
1186                &record.node_id,
1187                &record.sealed_path,
1188                &record.artifact_kind,
1189                &hex::encode(&record.seal_hash),
1190                &record.version.to_string(),
1191            ],
1192        )?;
1193        Ok(())
1194    }
1195
1196    /// Get all interface seals for a node
1197    pub fn get_interface_seals(
1198        &self,
1199        session_id: &str,
1200        node_id: &str,
1201    ) -> Result<Vec<InterfaceSealRow>> {
1202        let conn = self.conn.lock().unwrap();
1203        let mut stmt = conn.prepare(
1204            "SELECT seal_id, session_id, node_id, sealed_path, artifact_kind, seal_hash, version
1205             FROM interface_seals WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1206        )?;
1207        let mut rows = stmt.query([session_id, node_id])?;
1208        let mut records = Vec::new();
1209        while let Some(row) = rows.next()? {
1210            records.push(InterfaceSealRow {
1211                seal_id: row.get(0)?,
1212                session_id: row.get(1)?,
1213                node_id: row.get(2)?,
1214                sealed_path: row.get(3)?,
1215                artifact_kind: row.get(4)?,
1216                seal_hash: row
1217                    .get::<_, String>(5)
1218                    .ok()
1219                    .and_then(|h| hex::decode(h).ok())
1220                    .unwrap_or_default(),
1221                version: row.get::<_, i32>(6)?,
1222            });
1223        }
1224        Ok(records)
1225    }
1226
1227    /// Check whether a node has any interface seals
1228    pub fn has_interface_seals(&self, session_id: &str, node_id: &str) -> Result<bool> {
1229        let conn = self.conn.lock().unwrap();
1230        let count: i64 = conn.query_row(
1231            "SELECT COUNT(*) FROM interface_seals WHERE session_id = ? AND node_id = ?",
1232            [session_id, node_id],
1233            |row| row.get(0),
1234        )?;
1235        Ok(count > 0)
1236    }
1237
1238    // =========================================================================
1239    // PSP-5 Phase 6: Branch Flush CRUD
1240    // =========================================================================
1241
1242    /// Record a branch flush decision
1243    pub fn record_branch_flush(&self, record: &BranchFlushRow) -> Result<()> {
1244        self.conn.lock().unwrap().execute(
1245            r#"
1246            INSERT INTO branch_flushes (flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason)
1247            VALUES (?, ?, ?, ?, ?, ?)
1248            "#,
1249            [
1250                &record.flush_id,
1251                &record.session_id,
1252                &record.parent_node_id,
1253                &record.flushed_branch_ids,
1254                &record.requeue_node_ids,
1255                &record.reason,
1256            ],
1257        )?;
1258        Ok(())
1259    }
1260
1261    /// Get all branch flush records for a session
1262    pub fn get_branch_flushes(&self, session_id: &str) -> Result<Vec<BranchFlushRow>> {
1263        let conn = self.conn.lock().unwrap();
1264        let mut stmt = conn.prepare(
1265            "SELECT flush_id, session_id, parent_node_id, flushed_branch_ids, requeue_node_ids, reason
1266             FROM branch_flushes WHERE session_id = ? ORDER BY created_at",
1267        )?;
1268        let mut rows = stmt.query([session_id])?;
1269        let mut records = Vec::new();
1270        while let Some(row) = rows.next()? {
1271            records.push(BranchFlushRow {
1272                flush_id: row.get(0)?,
1273                session_id: row.get(1)?,
1274                parent_node_id: row.get(2)?,
1275                flushed_branch_ids: row.get(3)?,
1276                requeue_node_ids: row.get(4)?,
1277                reason: row.get(5)?,
1278            });
1279        }
1280        Ok(records)
1281    }
1282
1283    // =========================================================================
1284    // PSP-5 Phase 8: Node Snapshot, Task Graph, and Review Outcome Persistence
1285    // =========================================================================
1286
1287    /// Get the latest node state snapshot per node for a session (for resume reconstruction).
1288    ///
1289    /// Returns at most one record per node_id, picking the most recently created row.
1290    pub fn get_latest_node_states(&self, session_id: &str) -> Result<Vec<NodeStateRecord>> {
1291        let conn = self.conn.lock().unwrap();
1292        let mut stmt = conn.prepare(
1293            "WITH ranked AS ( \
1294                 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1295                 FROM node_states WHERE session_id = ? \
1296             ) \
1297             SELECT node_id, session_id, state, v_total, CAST(merkle_hash AS VARCHAR), attempt_count, \
1298                    node_class, owner_plugin, goal, parent_id, children, last_error_type, committed_at \
1299             FROM ranked WHERE rn = 1 ORDER BY created_at",
1300        )?;
1301
1302        let mut rows = stmt.query([session_id])?;
1303        let mut records = Vec::new();
1304
1305        while let Some(row) = rows.next()? {
1306            records.push(NodeStateRecord {
1307                node_id: row.get(0)?,
1308                session_id: row.get(1)?,
1309                state: row.get(2)?,
1310                v_total: row.get::<_, f64>(3)? as f32,
1311                merkle_hash: row
1312                    .get::<_, Option<String>>(4)?
1313                    .and_then(|s| hex::decode(s).ok()),
1314                attempt_count: row.get(5)?,
1315                node_class: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1316                owner_plugin: row.get::<_, Option<String>>(7)?.filter(|s| !s.is_empty()),
1317                goal: row.get::<_, Option<String>>(8)?.filter(|s| !s.is_empty()),
1318                parent_id: row.get::<_, Option<String>>(9)?.filter(|s| !s.is_empty()),
1319                children: row.get::<_, Option<String>>(10)?.filter(|s| !s.is_empty()),
1320                last_error_type: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1321                committed_at: row.get::<_, Option<String>>(12)?.filter(|s| !s.is_empty()),
1322            });
1323        }
1324
1325        Ok(records)
1326    }
1327
1328    /// Record a task graph edge (parent→child dependency)
1329    pub fn record_task_graph_edge(&self, record: &TaskGraphEdgeRow) -> Result<()> {
1330        self.conn.lock().unwrap().execute(
1331            r#"
1332            INSERT INTO task_graph_edges (session_id, parent_node_id, child_node_id, edge_type)
1333            VALUES (?, ?, ?, ?)
1334            "#,
1335            [
1336                &record.session_id,
1337                &record.parent_node_id,
1338                &record.child_node_id,
1339                &record.edge_type,
1340            ],
1341        )?;
1342        Ok(())
1343    }
1344
1345    /// Get all task graph edges for a session
1346    pub fn get_task_graph_edges(&self, session_id: &str) -> Result<Vec<TaskGraphEdgeRow>> {
1347        let conn = self.conn.lock().unwrap();
1348        let mut stmt = conn.prepare(
1349            "SELECT session_id, parent_node_id, child_node_id, edge_type \
1350             FROM task_graph_edges WHERE session_id = ? ORDER BY created_at",
1351        )?;
1352        let mut rows = stmt.query([session_id])?;
1353        let mut records = Vec::new();
1354        while let Some(row) = rows.next()? {
1355            records.push(TaskGraphEdgeRow {
1356                session_id: row.get(0)?,
1357                parent_node_id: row.get(1)?,
1358                child_node_id: row.get(2)?,
1359                edge_type: row.get(3)?,
1360            });
1361        }
1362        Ok(records)
1363    }
1364
1365    /// Get child node IDs for a parent in a session's task graph
1366    pub fn get_children_of_node(
1367        &self,
1368        session_id: &str,
1369        parent_node_id: &str,
1370    ) -> Result<Vec<String>> {
1371        let conn = self.conn.lock().unwrap();
1372        let mut stmt = conn.prepare(
1373            "SELECT child_node_id FROM task_graph_edges \
1374             WHERE session_id = ? AND parent_node_id = ? ORDER BY created_at",
1375        )?;
1376        let mut rows = stmt.query([session_id, parent_node_id])?;
1377        let mut ids = Vec::new();
1378        while let Some(row) = rows.next()? {
1379            ids.push(row.get(0)?);
1380        }
1381        Ok(ids)
1382    }
1383
1384    /// Record a review outcome (approval, rejection, edit request)
1385    pub fn record_review_outcome(&self, record: &ReviewOutcomeRow) -> Result<()> {
1386        let reviewer_note = record.reviewer_note.clone().unwrap_or_default();
1387        let escalation_category = record.escalation_category.clone().unwrap_or_default();
1388        self.conn.lock().unwrap().execute(
1389            r#"
1390            INSERT INTO review_outcomes (session_id, node_id, outcome, reviewer_note,
1391                                         energy_at_review, degraded, escalation_category)
1392            VALUES (?, ?, ?, ?, ?, ?, ?)
1393            "#,
1394            duckdb::params![
1395                record.session_id,
1396                record.node_id,
1397                record.outcome,
1398                reviewer_note,
1399                record.energy_at_review.unwrap_or(0.0),
1400                record.degraded.unwrap_or(false),
1401                escalation_category,
1402            ],
1403        )?;
1404        Ok(())
1405    }
1406
1407    /// Get all review outcomes for a node
1408    pub fn get_review_outcomes(
1409        &self,
1410        session_id: &str,
1411        node_id: &str,
1412    ) -> Result<Vec<ReviewOutcomeRow>> {
1413        let conn = self.conn.lock().unwrap();
1414        let mut stmt = conn.prepare(
1415            "SELECT session_id, node_id, outcome, reviewer_note, \
1416             energy_at_review, degraded, escalation_category \
1417             FROM review_outcomes WHERE session_id = ? AND node_id = ? ORDER BY created_at",
1418        )?;
1419        let mut rows = stmt.query([session_id, node_id])?;
1420        let mut records = Vec::new();
1421        while let Some(row) = rows.next()? {
1422            records.push(ReviewOutcomeRow {
1423                session_id: row.get(0)?,
1424                node_id: row.get(1)?,
1425                outcome: row.get(2)?,
1426                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1427                energy_at_review: row.get::<_, Option<f64>>(4)?,
1428                degraded: row.get::<_, Option<bool>>(5)?,
1429                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1430            });
1431        }
1432        Ok(records)
1433    }
1434
1435    /// Get the most recent review outcome for a node
1436    pub fn get_latest_review_outcome(
1437        &self,
1438        session_id: &str,
1439        node_id: &str,
1440    ) -> Result<Option<ReviewOutcomeRow>> {
1441        let conn = self.conn.lock().unwrap();
1442        let mut stmt = conn.prepare(
1443            "SELECT session_id, node_id, outcome, reviewer_note, \
1444             energy_at_review, degraded, escalation_category \
1445             FROM review_outcomes WHERE session_id = ? AND node_id = ? \
1446             ORDER BY created_at DESC LIMIT 1",
1447        )?;
1448        let mut rows = stmt.query([session_id, node_id])?;
1449        if let Some(row) = rows.next()? {
1450            Ok(Some(ReviewOutcomeRow {
1451                session_id: row.get(0)?,
1452                node_id: row.get(1)?,
1453                outcome: row.get(2)?,
1454                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1455                energy_at_review: row.get::<_, Option<f64>>(4)?,
1456                degraded: row.get::<_, Option<bool>>(5)?,
1457                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1458            }))
1459        } else {
1460            Ok(None)
1461        }
1462    }
1463
1464    /// Get all review outcomes for a session (across all nodes).
1465    pub fn get_all_review_outcomes(&self, session_id: &str) -> Result<Vec<ReviewOutcomeRow>> {
1466        let conn = self.conn.lock().unwrap();
1467        let mut stmt = conn.prepare(
1468            "SELECT session_id, node_id, outcome, reviewer_note, \
1469             energy_at_review, degraded, escalation_category \
1470             FROM review_outcomes WHERE session_id = ? ORDER BY created_at",
1471        )?;
1472        let mut rows = stmt.query([session_id])?;
1473        let mut records = Vec::new();
1474        while let Some(row) = rows.next()? {
1475            records.push(ReviewOutcomeRow {
1476                session_id: row.get(0)?,
1477                node_id: row.get(1)?,
1478                outcome: row.get(2)?,
1479                reviewer_note: row.get::<_, Option<String>>(3)?.filter(|s| !s.is_empty()),
1480                energy_at_review: row.get::<_, Option<f64>>(4)?,
1481                degraded: row.get::<_, Option<bool>>(5)?,
1482                escalation_category: row.get::<_, Option<String>>(6)?.filter(|s| !s.is_empty()),
1483            });
1484        }
1485        Ok(records)
1486    }
1487
1488    // =========================================================================
1489    // PSP-5 Phase 8: Verification Result and Artifact Bundle Persistence
1490    // =========================================================================
1491
1492    /// Record a verification result snapshot for a node
1493    pub fn record_verification_result(&self, record: &VerificationResultRow) -> Result<()> {
1494        let syntax_ok = record.syntax_ok.to_string();
1495        let build_ok = record.build_ok.to_string();
1496        let tests_ok = record.tests_ok.to_string();
1497        let lint_ok = record.lint_ok.to_string();
1498        let diagnostics_count = record.diagnostics_count.to_string();
1499        let tests_passed = record.tests_passed.to_string();
1500        let tests_failed = record.tests_failed.to_string();
1501        let degraded = record.degraded.to_string();
1502        let degraded_reason = record.degraded_reason.clone().unwrap_or_default();
1503
1504        self.conn.lock().unwrap().execute(
1505            r#"
1506            INSERT INTO verification_results (session_id, node_id, result_json,
1507                syntax_ok, build_ok, tests_ok, lint_ok,
1508                diagnostics_count, tests_passed, tests_failed, degraded, degraded_reason)
1509            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1510            "#,
1511            [
1512                &record.session_id,
1513                &record.node_id,
1514                &record.result_json,
1515                &syntax_ok,
1516                &build_ok,
1517                &tests_ok,
1518                &lint_ok,
1519                &diagnostics_count,
1520                &tests_passed,
1521                &tests_failed,
1522                &degraded,
1523                &degraded_reason,
1524            ],
1525        )?;
1526        Ok(())
1527    }
1528
1529    /// Get the latest verification result for a node
1530    pub fn get_verification_result(
1531        &self,
1532        session_id: &str,
1533        node_id: &str,
1534    ) -> Result<Option<VerificationResultRow>> {
1535        let conn = self.conn.lock().unwrap();
1536        let mut stmt = conn.prepare(
1537            "SELECT session_id, node_id, result_json, \
1538                    CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1539                    diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1540             FROM verification_results \
1541             WHERE session_id = ? AND node_id = ? \
1542             ORDER BY created_at DESC LIMIT 1",
1543        )?;
1544        let mut rows = stmt.query([session_id, node_id])?;
1545        if let Some(row) = rows.next()? {
1546            Ok(Some(VerificationResultRow {
1547                session_id: row.get(0)?,
1548                node_id: row.get(1)?,
1549                result_json: row.get(2)?,
1550                syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1551                build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1552                tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1553                lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1554                diagnostics_count: row.get(7)?,
1555                tests_passed: row.get(8)?,
1556                tests_failed: row.get(9)?,
1557                degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1558                degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1559            }))
1560        } else {
1561            Ok(None)
1562        }
1563    }
1564
1565    /// Get all verification results for a session (for status display)
1566    pub fn get_all_verification_results(
1567        &self,
1568        session_id: &str,
1569    ) -> Result<Vec<VerificationResultRow>> {
1570        let conn = self.conn.lock().unwrap();
1571        let mut stmt = conn.prepare(
1572            "WITH ranked AS ( \
1573                 SELECT *, ROW_NUMBER() OVER (PARTITION BY node_id ORDER BY created_at DESC) AS rn \
1574                 FROM verification_results WHERE session_id = ? \
1575             ) \
1576             SELECT session_id, node_id, result_json, \
1577                    CAST(syntax_ok AS VARCHAR), CAST(build_ok AS VARCHAR), CAST(tests_ok AS VARCHAR), CAST(lint_ok AS VARCHAR), \
1578                    diagnostics_count, tests_passed, tests_failed, CAST(degraded AS VARCHAR), degraded_reason \
1579             FROM ranked WHERE rn = 1 ORDER BY created_at",
1580        )?;
1581        let mut rows = stmt.query([session_id])?;
1582        let mut records = Vec::new();
1583        while let Some(row) = rows.next()? {
1584            records.push(VerificationResultRow {
1585                session_id: row.get(0)?,
1586                node_id: row.get(1)?,
1587                result_json: row.get(2)?,
1588                syntax_ok: row.get::<_, String>(3)?.parse().unwrap_or(false),
1589                build_ok: row.get::<_, String>(4)?.parse().unwrap_or(false),
1590                tests_ok: row.get::<_, String>(5)?.parse().unwrap_or(false),
1591                lint_ok: row.get::<_, String>(6)?.parse().unwrap_or(false),
1592                diagnostics_count: row.get(7)?,
1593                tests_passed: row.get(8)?,
1594                tests_failed: row.get(9)?,
1595                degraded: row.get::<_, String>(10)?.parse().unwrap_or(false),
1596                degraded_reason: row.get::<_, Option<String>>(11)?.filter(|s| !s.is_empty()),
1597            });
1598        }
1599        Ok(records)
1600    }
1601
1602    /// Record an artifact bundle snapshot for a node
1603    pub fn record_artifact_bundle(&self, record: &ArtifactBundleRow) -> Result<()> {
1604        let artifact_count = record.artifact_count.to_string();
1605        let command_count = record.command_count.to_string();
1606
1607        self.conn.lock().unwrap().execute(
1608            r#"
1609            INSERT INTO artifact_bundles (session_id, node_id, bundle_json,
1610                artifact_count, command_count, touched_files)
1611            VALUES (?, ?, ?, ?, ?, ?)
1612            "#,
1613            [
1614                &record.session_id,
1615                &record.node_id,
1616                &record.bundle_json,
1617                &artifact_count,
1618                &command_count,
1619                &record.touched_files,
1620            ],
1621        )?;
1622        Ok(())
1623    }
1624
1625    /// Get the latest artifact bundle for a node
1626    pub fn get_artifact_bundle(
1627        &self,
1628        session_id: &str,
1629        node_id: &str,
1630    ) -> Result<Option<ArtifactBundleRow>> {
1631        let conn = self.conn.lock().unwrap();
1632        let mut stmt = conn.prepare(
1633            "SELECT session_id, node_id, bundle_json, artifact_count, command_count, touched_files \
1634             FROM artifact_bundles \
1635             WHERE session_id = ? AND node_id = ? \
1636             ORDER BY created_at DESC LIMIT 1",
1637        )?;
1638        let mut rows = stmt.query([session_id, node_id])?;
1639        if let Some(row) = rows.next()? {
1640            Ok(Some(ArtifactBundleRow {
1641                session_id: row.get(0)?,
1642                node_id: row.get(1)?,
1643                bundle_json: row.get(2)?,
1644                artifact_count: row.get(3)?,
1645                command_count: row.get(4)?,
1646                touched_files: row.get(5)?,
1647            }))
1648        } else {
1649            Ok(None)
1650        }
1651    }
1652}
1653
1654// =========================================================================
1655// Plan Revision, Feature Charter, and Repair Footprint Methods
1656// =========================================================================
1657
1658impl SessionStore {
1659    /// Record a feature charter for a session.
1660    pub fn record_feature_charter(&self, row: &FeatureCharterRow) -> Result<()> {
1661        let conn = self.conn.lock().unwrap();
1662        conn.execute(
1663            "INSERT INTO feature_charters (charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint) \
1664             VALUES (?, ?, ?, ?, ?, ?, ?)",
1665            duckdb::params![
1666                row.charter_id,
1667                row.session_id,
1668                row.scope_description,
1669                row.max_modules,
1670                row.max_files,
1671                row.max_revisions,
1672                row.language_constraint,
1673            ],
1674        )?;
1675        Ok(())
1676    }
1677
1678    /// Get the feature charter for a session.
1679    pub fn get_feature_charter(&self, session_id: &str) -> Result<Option<FeatureCharterRow>> {
1680        let conn = self.conn.lock().unwrap();
1681        let mut stmt = conn.prepare(
1682            "SELECT charter_id, session_id, scope_description, max_modules, max_files, max_revisions, language_constraint \
1683             FROM feature_charters WHERE session_id = ? LIMIT 1",
1684        )?;
1685        let mut rows = stmt.query([session_id])?;
1686        if let Some(row) = rows.next()? {
1687            Ok(Some(FeatureCharterRow {
1688                charter_id: row.get(0)?,
1689                session_id: row.get(1)?,
1690                scope_description: row.get(2)?,
1691                max_modules: row.get(3)?,
1692                max_files: row.get(4)?,
1693                max_revisions: row.get(5)?,
1694                language_constraint: row.get(6)?,
1695            }))
1696        } else {
1697            Ok(None)
1698        }
1699    }
1700
1701    /// Record a plan revision.
1702    pub fn record_plan_revision(&self, row: &PlanRevisionRow) -> Result<()> {
1703        let conn = self.conn.lock().unwrap();
1704        conn.execute(
1705            "INSERT INTO plan_revisions (revision_id, session_id, sequence, plan_json, reason, supersedes, status) \
1706             VALUES (?, ?, ?, ?, ?, ?, ?)",
1707            duckdb::params![
1708                row.revision_id,
1709                row.session_id,
1710                row.sequence,
1711                row.plan_json,
1712                row.reason,
1713                row.supersedes,
1714                row.status,
1715            ],
1716        )?;
1717        Ok(())
1718    }
1719
1720    /// Get the active plan revision for a session.
1721    pub fn get_active_plan_revision(&self, session_id: &str) -> Result<Option<PlanRevisionRow>> {
1722        let conn = self.conn.lock().unwrap();
1723        let mut stmt = conn.prepare(
1724            "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1725             FROM plan_revisions WHERE session_id = ? AND status = 'active' \
1726             ORDER BY sequence DESC LIMIT 1",
1727        )?;
1728        let mut rows = stmt.query([session_id])?;
1729        if let Some(row) = rows.next()? {
1730            Ok(Some(PlanRevisionRow {
1731                revision_id: row.get(0)?,
1732                session_id: row.get(1)?,
1733                sequence: row.get(2)?,
1734                plan_json: row.get(3)?,
1735                reason: row.get(4)?,
1736                supersedes: row.get(5)?,
1737                status: row.get(6)?,
1738            }))
1739        } else {
1740            Ok(None)
1741        }
1742    }
1743
1744    /// Get all plan revisions for a session, ordered by sequence.
1745    pub fn get_plan_revisions(&self, session_id: &str) -> Result<Vec<PlanRevisionRow>> {
1746        let conn = self.conn.lock().unwrap();
1747        let mut stmt = conn.prepare(
1748            "SELECT revision_id, session_id, sequence, plan_json, reason, supersedes, status \
1749             FROM plan_revisions WHERE session_id = ? ORDER BY sequence ASC",
1750        )?;
1751        let mut rows = stmt.query([session_id])?;
1752        let mut results = Vec::new();
1753        while let Some(row) = rows.next()? {
1754            results.push(PlanRevisionRow {
1755                revision_id: row.get(0)?,
1756                session_id: row.get(1)?,
1757                sequence: row.get(2)?,
1758                plan_json: row.get(3)?,
1759                reason: row.get(4)?,
1760                supersedes: row.get(5)?,
1761                status: row.get(6)?,
1762            });
1763        }
1764        Ok(results)
1765    }
1766
1767    /// Supersede a plan revision (set status to 'superseded').
1768    pub fn supersede_plan_revision(&self, revision_id: &str) -> Result<()> {
1769        let conn = self.conn.lock().unwrap();
1770        conn.execute(
1771            "UPDATE plan_revisions SET status = 'superseded' WHERE revision_id = ?",
1772            [revision_id],
1773        )?;
1774        Ok(())
1775    }
1776
1777    /// Record a repair footprint.
1778    pub fn record_repair_footprint(&self, row: &RepairFootprintRow) -> Result<()> {
1779        let conn = self.conn.lock().unwrap();
1780        conn.execute(
1781            "INSERT INTO repair_footprints (footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved) \
1782             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
1783            duckdb::params![
1784                row.footprint_id,
1785                row.session_id,
1786                row.node_id,
1787                row.revision_id,
1788                row.attempt,
1789                row.affected_files,
1790                row.bundle_json,
1791                row.diagnosis,
1792                row.resolved,
1793            ],
1794        )?;
1795        Ok(())
1796    }
1797
1798    /// Get repair footprints for a node, ordered by attempt.
1799    pub fn get_repair_footprints(
1800        &self,
1801        session_id: &str,
1802        node_id: &str,
1803    ) -> Result<Vec<RepairFootprintRow>> {
1804        let conn = self.conn.lock().unwrap();
1805        let mut stmt = conn.prepare(
1806            "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1807             FROM repair_footprints WHERE session_id = ? AND node_id = ? ORDER BY attempt ASC",
1808        )?;
1809        let mut rows = stmt.query([session_id, node_id])?;
1810        let mut results = Vec::new();
1811        while let Some(row) = rows.next()? {
1812            results.push(RepairFootprintRow {
1813                footprint_id: row.get(0)?,
1814                session_id: row.get(1)?,
1815                node_id: row.get(2)?,
1816                revision_id: row.get(3)?,
1817                attempt: row.get(4)?,
1818                affected_files: row.get(5)?,
1819                bundle_json: row.get(6)?,
1820                diagnosis: row.get(7)?,
1821                resolved: row.get(8)?,
1822            });
1823        }
1824        Ok(results)
1825    }
1826
1827    /// Get all repair footprints for a session (all nodes).
1828    pub fn get_all_repair_footprints(&self, session_id: &str) -> Result<Vec<RepairFootprintRow>> {
1829        let conn = self.conn.lock().unwrap();
1830        let mut stmt = conn.prepare(
1831            "SELECT footprint_id, session_id, node_id, revision_id, attempt, affected_files, bundle_json, diagnosis, resolved \
1832             FROM repair_footprints WHERE session_id = ? ORDER BY attempt ASC",
1833        )?;
1834        let mut rows = stmt.query([session_id])?;
1835        let mut results = Vec::new();
1836        while let Some(row) = rows.next()? {
1837            results.push(RepairFootprintRow {
1838                footprint_id: row.get(0)?,
1839                session_id: row.get(1)?,
1840                node_id: row.get(2)?,
1841                revision_id: row.get(3)?,
1842                attempt: row.get(4)?,
1843                affected_files: row.get(5)?,
1844                bundle_json: row.get(6)?,
1845                diagnosis: row.get(7)?,
1846                resolved: row.get(8)?,
1847            });
1848        }
1849        Ok(results)
1850    }
1851
1852    /// Mark a repair footprint as resolved.
1853    pub fn resolve_repair_footprint(&self, footprint_id: &str) -> Result<()> {
1854        let conn = self.conn.lock().unwrap();
1855        conn.execute(
1856            "UPDATE repair_footprints SET resolved = true WHERE footprint_id = ?",
1857            [footprint_id],
1858        )?;
1859        Ok(())
1860    }
1861
1862    /// Record or update a budget envelope for a session.
1863    pub fn upsert_budget_envelope(&self, row: &BudgetEnvelopeRow) -> Result<()> {
1864        let conn = self.conn.lock().unwrap();
1865        // Try insert first, update on conflict
1866        conn.execute(
1867            "INSERT INTO budget_envelopes (session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd) \
1868             VALUES (?, ?, ?, ?, ?, ?, ?) \
1869             ON CONFLICT (session_id) DO UPDATE SET \
1870             max_steps = EXCLUDED.max_steps, steps_used = EXCLUDED.steps_used, \
1871             max_revisions = EXCLUDED.max_revisions, revisions_used = EXCLUDED.revisions_used, \
1872             max_cost_usd = EXCLUDED.max_cost_usd, cost_used_usd = EXCLUDED.cost_used_usd",
1873            duckdb::params![
1874                row.session_id,
1875                row.max_steps,
1876                row.steps_used,
1877                row.max_revisions,
1878                row.revisions_used,
1879                row.max_cost_usd,
1880                row.cost_used_usd,
1881            ],
1882        )?;
1883        Ok(())
1884    }
1885
1886    /// Get the budget envelope for a session.
1887    pub fn get_budget_envelope(&self, session_id: &str) -> Result<Option<BudgetEnvelopeRow>> {
1888        let conn = self.conn.lock().unwrap();
1889        let mut stmt = conn.prepare(
1890            "SELECT session_id, max_steps, steps_used, max_revisions, revisions_used, max_cost_usd, cost_used_usd \
1891             FROM budget_envelopes WHERE session_id = ?",
1892        )?;
1893        let mut rows = stmt.query([session_id])?;
1894        if let Some(row) = rows.next()? {
1895            Ok(Some(BudgetEnvelopeRow {
1896                session_id: row.get(0)?,
1897                max_steps: row.get(1)?,
1898                steps_used: row.get(2)?,
1899                max_revisions: row.get(3)?,
1900                revisions_used: row.get(4)?,
1901                max_cost_usd: row.get(5)?,
1902                cost_used_usd: row.get(6)?,
1903            }))
1904        } else {
1905            Ok(None)
1906        }
1907    }
1908}
1909
1910#[cfg(test)]
1911mod tests {
1912    use super::*;
1913
1914    /// Create an in-memory store for testing
1915    fn test_store() -> SessionStore {
1916        let temp_dir = std::env::temp_dir();
1917        let db_path = temp_dir.join(format!("perspt_test_{}.db", uuid::Uuid::new_v4()));
1918        SessionStore::open(&db_path).expect("Failed to create test store")
1919    }
1920
1921    fn seed_session(store: &SessionStore, session_id: &str) {
1922        let record = SessionRecord {
1923            session_id: session_id.to_string(),
1924            task: "test task".to_string(),
1925            working_dir: "/tmp/test".to_string(),
1926            merkle_root: None,
1927            detected_toolchain: None,
1928            status: "RUNNING".to_string(),
1929        };
1930        store.create_session(&record).unwrap();
1931    }
1932
1933    #[test]
1934    fn test_node_state_phase8_roundtrip() {
1935        let store = test_store();
1936        let sid = "test-sess-1";
1937        seed_session(&store, sid);
1938
1939        let record = NodeStateRecord {
1940            node_id: "node-1".to_string(),
1941            session_id: sid.to_string(),
1942            state: "Completed".to_string(),
1943            v_total: 0.42,
1944            merkle_hash: Some(vec![0xab; 32]),
1945            attempt_count: 3,
1946            node_class: Some("Interface".to_string()),
1947            owner_plugin: Some("rust".to_string()),
1948            goal: Some("Implement API".to_string()),
1949            parent_id: Some("root".to_string()),
1950            children: Some(r#"["child-a","child-b"]"#.to_string()),
1951            last_error_type: Some("CompilationError".to_string()),
1952            committed_at: Some("2025-01-01T00:00:00Z".to_string()),
1953        };
1954
1955        store.record_node_state(&record).unwrap();
1956
1957        let states = store.get_latest_node_states(sid).unwrap();
1958        assert_eq!(states.len(), 1);
1959        let r = &states[0];
1960        assert_eq!(r.node_id, "node-1");
1961        assert_eq!(r.state, "Completed");
1962        assert_eq!(r.attempt_count, 3);
1963        assert_eq!(r.node_class.as_deref(), Some("Interface"));
1964        assert_eq!(r.owner_plugin.as_deref(), Some("rust"));
1965        assert_eq!(r.goal.as_deref(), Some("Implement API"));
1966        assert_eq!(r.parent_id.as_deref(), Some("root"));
1967        assert!(r.children.is_some());
1968        assert_eq!(r.last_error_type.as_deref(), Some("CompilationError"));
1969        assert_eq!(r.committed_at.as_deref(), Some("2025-01-01T00:00:00Z"));
1970    }
1971
1972    #[test]
1973    fn test_task_graph_edge_roundtrip() {
1974        let store = test_store();
1975        let sid = "test-graph-1";
1976        seed_session(&store, sid);
1977
1978        let edge = TaskGraphEdgeRow {
1979            session_id: sid.to_string(),
1980            parent_node_id: "parent-1".to_string(),
1981            child_node_id: "child-1".to_string(),
1982            edge_type: "depends_on".to_string(),
1983        };
1984        store.record_task_graph_edge(&edge).unwrap();
1985
1986        let edges = store.get_task_graph_edges(sid).unwrap();
1987        assert_eq!(edges.len(), 1);
1988        assert_eq!(edges[0].parent_node_id, "parent-1");
1989        assert_eq!(edges[0].child_node_id, "child-1");
1990        assert_eq!(edges[0].edge_type, "depends_on");
1991    }
1992
1993    #[test]
1994    fn test_verification_result_roundtrip() {
1995        let store = test_store();
1996        let sid = "test-vr-1";
1997        seed_session(&store, sid);
1998
1999        let row = VerificationResultRow {
2000            session_id: sid.to_string(),
2001            node_id: "node-v".to_string(),
2002            result_json: r#"{"syntax_ok":true}"#.to_string(),
2003            syntax_ok: true,
2004            build_ok: true,
2005            tests_ok: false,
2006            lint_ok: true,
2007            diagnostics_count: 2,
2008            tests_passed: 5,
2009            tests_failed: 1,
2010            degraded: false,
2011            degraded_reason: None,
2012        };
2013        store.record_verification_result(&row).unwrap();
2014
2015        let got = store.get_verification_result(sid, "node-v").unwrap();
2016        assert!(got.is_some());
2017        let got = got.unwrap();
2018        assert!(got.syntax_ok);
2019        assert!(got.build_ok);
2020        assert!(!got.tests_ok);
2021        assert_eq!(got.tests_passed, 5);
2022        assert_eq!(got.tests_failed, 1);
2023        assert!(!got.degraded);
2024    }
2025
2026    #[test]
2027    fn test_verification_result_degraded() {
2028        let store = test_store();
2029        let sid = "test-vr-deg";
2030        seed_session(&store, sid);
2031
2032        let row = VerificationResultRow {
2033            session_id: sid.to_string(),
2034            node_id: "node-d".to_string(),
2035            result_json: "{}".to_string(),
2036            syntax_ok: true,
2037            build_ok: false,
2038            tests_ok: false,
2039            lint_ok: false,
2040            diagnostics_count: 0,
2041            tests_passed: 0,
2042            tests_failed: 0,
2043            degraded: true,
2044            degraded_reason: Some("LSP unavailable".to_string()),
2045        };
2046        store.record_verification_result(&row).unwrap();
2047
2048        let got = store
2049            .get_verification_result(sid, "node-d")
2050            .unwrap()
2051            .unwrap();
2052        assert!(got.degraded);
2053        assert_eq!(got.degraded_reason.as_deref(), Some("LSP unavailable"));
2054    }
2055
2056    #[test]
2057    fn test_artifact_bundle_roundtrip() {
2058        let store = test_store();
2059        let sid = "test-ab-1";
2060        seed_session(&store, sid);
2061
2062        let row = ArtifactBundleRow {
2063            session_id: sid.to_string(),
2064            node_id: "node-a".to_string(),
2065            bundle_json: r#"{"artifacts":[],"commands":[]}"#.to_string(),
2066            artifact_count: 3,
2067            command_count: 1,
2068            touched_files: r#"["src/main.rs","src/lib.rs","tests/test.rs"]"#.to_string(),
2069        };
2070        store.record_artifact_bundle(&row).unwrap();
2071
2072        let got = store.get_artifact_bundle(sid, "node-a").unwrap();
2073        assert!(got.is_some());
2074        let got = got.unwrap();
2075        assert_eq!(got.artifact_count, 3);
2076        assert_eq!(got.command_count, 1);
2077        assert!(got.touched_files.contains("main.rs"));
2078    }
2079
2080    #[test]
2081    fn test_latest_node_states_dedup() {
2082        let store = test_store();
2083        let sid = "test-dedup";
2084        seed_session(&store, sid);
2085
2086        // Insert two states for the same node
2087        let r1 = NodeStateRecord {
2088            node_id: "node-x".to_string(),
2089            session_id: sid.to_string(),
2090            state: "Coding".to_string(),
2091            v_total: 0.5,
2092            merkle_hash: None,
2093            attempt_count: 1,
2094            node_class: None,
2095            owner_plugin: None,
2096            goal: None,
2097            parent_id: None,
2098            children: None,
2099            last_error_type: None,
2100            committed_at: None,
2101        };
2102        store.record_node_state(&r1).unwrap();
2103
2104        let r2 = NodeStateRecord {
2105            node_id: "node-x".to_string(),
2106            session_id: sid.to_string(),
2107            state: "Completed".to_string(),
2108            v_total: 0.3,
2109            merkle_hash: None,
2110            attempt_count: 2,
2111            node_class: Some("Implementation".to_string()),
2112            owner_plugin: None,
2113            goal: Some("Updated goal".to_string()),
2114            parent_id: None,
2115            children: None,
2116            last_error_type: None,
2117            committed_at: Some("2025-01-02T00:00:00Z".to_string()),
2118        };
2119        store.record_node_state(&r2).unwrap();
2120
2121        // get_latest should return only the last entry
2122        let latest = store.get_latest_node_states(sid).unwrap();
2123        assert_eq!(latest.len(), 1);
2124        assert_eq!(latest[0].state, "Completed");
2125        assert_eq!(latest[0].attempt_count, 2);
2126        assert_eq!(latest[0].goal.as_deref(), Some("Updated goal"));
2127    }
2128
2129    #[test]
2130    fn test_backward_compat_empty_phase8_fields() {
2131        let store = test_store();
2132        let sid = "test-compat";
2133        seed_session(&store, sid);
2134
2135        // Insert a node with all Phase 8 fields as None (pre-Phase-8 session)
2136        let r = NodeStateRecord {
2137            node_id: "old-node".to_string(),
2138            session_id: sid.to_string(),
2139            state: "COMPLETED".to_string(),
2140            v_total: 1.0,
2141            merkle_hash: None,
2142            attempt_count: 1,
2143            node_class: None,
2144            owner_plugin: None,
2145            goal: None,
2146            parent_id: None,
2147            children: None,
2148            last_error_type: None,
2149            committed_at: None,
2150        };
2151        store.record_node_state(&r).unwrap();
2152
2153        let latest = store.get_latest_node_states(sid).unwrap();
2154        assert_eq!(latest.len(), 1);
2155        assert!(latest[0].node_class.is_none());
2156        assert!(latest[0].goal.is_none());
2157        assert!(latest[0].committed_at.is_none());
2158
2159        // Verification and artifact lookups should return None
2160        let vr = store.get_verification_result(sid, "old-node").unwrap();
2161        assert!(vr.is_none());
2162        let ab = store.get_artifact_bundle(sid, "old-node").unwrap();
2163        assert!(ab.is_none());
2164    }
2165
2166    #[test]
2167    fn test_review_outcome_roundtrip() {
2168        let store = test_store();
2169        let sid = "test-review";
2170        seed_session(&store, sid);
2171
2172        let row = ReviewOutcomeRow {
2173            session_id: sid.to_string(),
2174            node_id: "node-r".to_string(),
2175            outcome: "approved".to_string(),
2176            reviewer_note: Some("LGTM".to_string()),
2177            energy_at_review: None,
2178            degraded: None,
2179            escalation_category: None,
2180        };
2181        store.record_review_outcome(&row).unwrap();
2182
2183        let outcomes = store.get_review_outcomes(sid, "node-r").unwrap();
2184        assert_eq!(outcomes.len(), 1);
2185        assert_eq!(outcomes[0].outcome, "approved");
2186        assert_eq!(outcomes[0].reviewer_note.as_deref(), Some("LGTM"));
2187    }
2188
2189    #[test]
2190    fn test_review_outcome_with_audit_fields() {
2191        let store = test_store();
2192        let sid = "test-review-audit";
2193        seed_session(&store, sid);
2194
2195        let row = ReviewOutcomeRow {
2196            session_id: sid.to_string(),
2197            node_id: "node-a".to_string(),
2198            outcome: "rejected".to_string(),
2199            reviewer_note: Some("Needs rework".to_string()),
2200            energy_at_review: Some(0.42),
2201            degraded: Some(true),
2202            escalation_category: Some("complexity".to_string()),
2203        };
2204        store.record_review_outcome(&row).unwrap();
2205
2206        let outcomes = store.get_review_outcomes(sid, "node-a").unwrap();
2207        assert_eq!(outcomes.len(), 1);
2208        assert_eq!(outcomes[0].outcome, "rejected");
2209        assert_eq!(outcomes[0].energy_at_review, Some(0.42));
2210        assert_eq!(outcomes[0].degraded, Some(true));
2211        assert_eq!(
2212            outcomes[0].escalation_category.as_deref(),
2213            Some("complexity")
2214        );
2215    }
2216
2217    #[test]
2218    fn test_get_all_review_outcomes() {
2219        let store = test_store();
2220        let sid = "test-review-all";
2221        seed_session(&store, sid);
2222
2223        for (node, outcome) in &[("n1", "approved"), ("n2", "rejected"), ("n1", "approved")] {
2224            let row = ReviewOutcomeRow {
2225                session_id: sid.to_string(),
2226                node_id: node.to_string(),
2227                outcome: outcome.to_string(),
2228                reviewer_note: None,
2229                energy_at_review: None,
2230                degraded: None,
2231                escalation_category: None,
2232            };
2233            store.record_review_outcome(&row).unwrap();
2234        }
2235
2236        let all = store.get_all_review_outcomes(sid).unwrap();
2237        assert_eq!(all.len(), 3);
2238    }
2239
2240    #[test]
2241    fn test_feature_charter_roundtrip() {
2242        let store = test_store();
2243        let sid = "test-charter";
2244        seed_session(&store, sid);
2245
2246        let row = FeatureCharterRow {
2247            charter_id: "ch-1".to_string(),
2248            session_id: sid.to_string(),
2249            scope_description: "Add authentication module".to_string(),
2250            max_modules: Some(3),
2251            max_files: Some(10),
2252            max_revisions: Some(5),
2253            language_constraint: Some("rust".to_string()),
2254        };
2255        store.record_feature_charter(&row).unwrap();
2256
2257        let got = store.get_feature_charter(sid).unwrap();
2258        assert!(got.is_some());
2259        let got = got.unwrap();
2260        assert_eq!(got.charter_id, "ch-1");
2261        assert_eq!(got.scope_description, "Add authentication module");
2262        assert_eq!(got.max_modules, Some(3));
2263        assert_eq!(got.language_constraint.as_deref(), Some("rust"));
2264    }
2265
2266    #[test]
2267    fn test_feature_charter_returns_none_for_missing() {
2268        let store = test_store();
2269        let sid = "test-charter-miss";
2270        seed_session(&store, sid);
2271
2272        let got = store.get_feature_charter(sid).unwrap();
2273        assert!(got.is_none());
2274    }
2275
2276    #[test]
2277    fn test_plan_revision_roundtrip() {
2278        let store = test_store();
2279        let sid = "test-rev";
2280        seed_session(&store, sid);
2281
2282        let row = PlanRevisionRow {
2283            revision_id: "rev-1".to_string(),
2284            session_id: sid.to_string(),
2285            sequence: 1,
2286            plan_json: r#"{"tasks":[]}"#.to_string(),
2287            reason: "initial plan".to_string(),
2288            supersedes: None,
2289            status: "active".to_string(),
2290        };
2291        store.record_plan_revision(&row).unwrap();
2292
2293        let active = store.get_active_plan_revision(sid).unwrap();
2294        assert!(active.is_some());
2295        let active = active.unwrap();
2296        assert_eq!(active.revision_id, "rev-1");
2297        assert_eq!(active.sequence, 1);
2298        assert_eq!(active.status, "active");
2299    }
2300
2301    #[test]
2302    fn test_plan_revision_supersede() {
2303        let store = test_store();
2304        let sid = "test-rev-sup";
2305        seed_session(&store, sid);
2306
2307        let r1 = PlanRevisionRow {
2308            revision_id: "rev-1".to_string(),
2309            session_id: sid.to_string(),
2310            sequence: 1,
2311            plan_json: "{}".to_string(),
2312            reason: "initial".to_string(),
2313            supersedes: None,
2314            status: "active".to_string(),
2315        };
2316        store.record_plan_revision(&r1).unwrap();
2317
2318        // Supersede rev-1
2319        store.supersede_plan_revision("rev-1").unwrap();
2320
2321        let r2 = PlanRevisionRow {
2322            revision_id: "rev-2".to_string(),
2323            session_id: sid.to_string(),
2324            sequence: 2,
2325            plan_json: r#"{"tasks":["a"]}"#.to_string(),
2326            reason: "verifier feedback".to_string(),
2327            supersedes: Some("rev-1".to_string()),
2328            status: "active".to_string(),
2329        };
2330        store.record_plan_revision(&r2).unwrap();
2331
2332        // Only rev-2 should be active
2333        let active = store.get_active_plan_revision(sid).unwrap().unwrap();
2334        assert_eq!(active.revision_id, "rev-2");
2335
2336        // All revisions returned in order
2337        let all = store.get_plan_revisions(sid).unwrap();
2338        assert_eq!(all.len(), 2);
2339        assert_eq!(all[0].status, "superseded");
2340        assert_eq!(all[1].status, "active");
2341    }
2342
2343    #[test]
2344    fn test_repair_footprint_roundtrip() {
2345        let store = test_store();
2346        let sid = "test-repair";
2347        seed_session(&store, sid);
2348
2349        let row = RepairFootprintRow {
2350            footprint_id: "fp-1".to_string(),
2351            session_id: sid.to_string(),
2352            node_id: "node-a".to_string(),
2353            revision_id: "rev-1".to_string(),
2354            attempt: 1,
2355            affected_files: r#"["src/main.rs"]"#.to_string(),
2356            bundle_json: "{}".to_string(),
2357            diagnosis: "missing import".to_string(),
2358            resolved: false,
2359        };
2360        store.record_repair_footprint(&row).unwrap();
2361
2362        let footprints = store.get_repair_footprints(sid, "node-a").unwrap();
2363        assert_eq!(footprints.len(), 1);
2364        assert_eq!(footprints[0].footprint_id, "fp-1");
2365        assert_eq!(footprints[0].diagnosis, "missing import");
2366        assert!(!footprints[0].resolved);
2367    }
2368
2369    #[test]
2370    fn test_repair_footprint_resolve() {
2371        let store = test_store();
2372        let sid = "test-repair-res";
2373        seed_session(&store, sid);
2374
2375        let row = RepairFootprintRow {
2376            footprint_id: "fp-2".to_string(),
2377            session_id: sid.to_string(),
2378            node_id: "node-b".to_string(),
2379            revision_id: "rev-1".to_string(),
2380            attempt: 1,
2381            affected_files: "[]".to_string(),
2382            bundle_json: "{}".to_string(),
2383            diagnosis: "type error".to_string(),
2384            resolved: false,
2385        };
2386        store.record_repair_footprint(&row).unwrap();
2387
2388        store.resolve_repair_footprint("fp-2").unwrap();
2389
2390        let footprints = store.get_repair_footprints(sid, "node-b").unwrap();
2391        assert_eq!(footprints.len(), 1);
2392        assert!(footprints[0].resolved);
2393    }
2394
2395    #[test]
2396    fn test_budget_envelope_upsert_and_get() {
2397        let store = test_store();
2398        let sid = "test-budget";
2399        seed_session(&store, sid);
2400
2401        let row = BudgetEnvelopeRow {
2402            session_id: sid.to_string(),
2403            max_steps: Some(100),
2404            steps_used: 5,
2405            max_revisions: Some(10),
2406            revisions_used: 1,
2407            max_cost_usd: Some(5.0),
2408            cost_used_usd: 0.25,
2409        };
2410        store.upsert_budget_envelope(&row).unwrap();
2411
2412        let got = store.get_budget_envelope(sid).unwrap();
2413        assert!(got.is_some());
2414        let got = got.unwrap();
2415        assert_eq!(got.max_steps, Some(100));
2416        assert_eq!(got.steps_used, 5);
2417        assert_eq!(got.cost_used_usd, 0.25);
2418    }
2419
2420    #[test]
2421    fn test_budget_envelope_upsert_updates() {
2422        let store = test_store();
2423        let sid = "test-budget-up";
2424        seed_session(&store, sid);
2425
2426        let row1 = BudgetEnvelopeRow {
2427            session_id: sid.to_string(),
2428            max_steps: Some(100),
2429            steps_used: 0,
2430            max_revisions: None,
2431            revisions_used: 0,
2432            max_cost_usd: None,
2433            cost_used_usd: 0.0,
2434        };
2435        store.upsert_budget_envelope(&row1).unwrap();
2436
2437        // Update with new values
2438        let row2 = BudgetEnvelopeRow {
2439            session_id: sid.to_string(),
2440            max_steps: Some(100),
2441            steps_used: 42,
2442            max_revisions: Some(5),
2443            revisions_used: 3,
2444            max_cost_usd: Some(10.0),
2445            cost_used_usd: 4.5,
2446        };
2447        store.upsert_budget_envelope(&row2).unwrap();
2448
2449        let got = store.get_budget_envelope(sid).unwrap().unwrap();
2450        assert_eq!(got.steps_used, 42);
2451        assert_eq!(got.revisions_used, 3);
2452        assert_eq!(got.cost_used_usd, 4.5);
2453    }
2454
2455    #[test]
2456    fn test_budget_envelope_missing_returns_none() {
2457        let store = test_store();
2458        let sid = "test-budget-miss";
2459        seed_session(&store, sid);
2460
2461        let got = store.get_budget_envelope(sid).unwrap();
2462        assert!(got.is_none());
2463    }
2464
2465    #[test]
2466    fn test_read_only_store_queries_work() {
2467        let temp_dir = std::env::temp_dir();
2468        let db_path = temp_dir.join(format!("perspt_ro_test_{}.db", uuid::Uuid::new_v4()));
2469
2470        // Create and seed a normal store
2471        {
2472            let store = SessionStore::open(&db_path).unwrap();
2473            seed_session(&store, "ro-test");
2474        }
2475
2476        // Open read-only and verify queries work
2477        let ro = SessionStore::open_read_only(&db_path).unwrap();
2478        let sessions = ro.list_recent_sessions(10).unwrap();
2479        assert_eq!(sessions.len(), 1);
2480        assert_eq!(sessions[0].session_id, "ro-test");
2481    }
2482
2483    #[test]
2484    fn test_read_only_store_rejects_writes() {
2485        let temp_dir = std::env::temp_dir();
2486        let db_path = temp_dir.join(format!("perspt_ro_wr_{}.db", uuid::Uuid::new_v4()));
2487
2488        // Create the DB first
2489        {
2490            let _store = SessionStore::open(&db_path).unwrap();
2491        }
2492
2493        // Open read-only and verify writes fail
2494        let ro = SessionStore::open_read_only(&db_path).unwrap();
2495        let record = SessionRecord {
2496            session_id: "should-fail".to_string(),
2497            task: "test".to_string(),
2498            working_dir: "/tmp".to_string(),
2499            merkle_root: None,
2500            detected_toolchain: None,
2501            status: "RUNNING".to_string(),
2502        };
2503        assert!(ro.create_session(&record).is_err());
2504    }
2505}