Skip to main content

perspt_agent/
ledger.rs

1//! DuckDB Merkle Ledger
2//!
3//! Persistent storage for session history, commits, and Merkle proofs.
4
5use anyhow::{Context, Result};
6pub use perspt_store::{LlmRequestRecord, NodeStateRecord, SessionRecord, SessionStore};
7use std::path::{Path, PathBuf};
8
9/// Full commit payload collected by the orchestrator at commit time.
10///
11/// Bundles graph-structural fields, retry/error metadata, and merkle
12/// material so that `commit_node_snapshot()` can persist a complete
13/// node record in a single call.
14#[derive(Debug, Clone)]
15pub struct NodeCommitPayload {
16    pub node_id: String,
17    pub state: String,
18    pub v_total: f32,
19    pub merkle_hash: Option<Vec<u8>>,
20    pub attempt_count: i32,
21    pub node_class: Option<String>,
22    pub owner_plugin: Option<String>,
23    pub goal: Option<String>,
24    pub parent_id: Option<String>,
25    /// JSON-serialized `Vec<String>` of child node IDs
26    pub children: Option<String>,
27    pub last_error_type: Option<String>,
28}
29
30/// Merkle commit record (Legacy wrapper for compatibility)
31#[derive(Debug, Clone)]
32pub struct MerkleCommit {
33    pub commit_id: String,
34    pub session_id: String,
35    pub node_id: String,
36    pub merkle_root: [u8; 32],
37    pub parent_hash: Option<[u8; 32]>,
38    pub timestamp: i64,
39    pub energy: f32,
40    pub stable: bool,
41}
42
43/// Session record (Legacy wrapper for compatibility)
44#[derive(Debug, Clone)]
45pub struct SessionRecordLegacy {
46    pub session_id: String,
47    pub task: String,
48    pub started_at: i64,
49    pub ended_at: Option<i64>,
50    pub status: String,
51    pub total_nodes: usize,
52    pub completed_nodes: usize,
53}
54
55/// Merkle Ledger using DuckDB for persistence
56pub struct MerkleLedger {
57    /// Session store from perspt-store
58    store: SessionStore,
59    /// Current session metadata (legacy cache)
60    pub(crate) current_session: Option<SessionRecordLegacy>,
61    /// Session artifact directory
62    session_dir: Option<PathBuf>,
63}
64
65impl MerkleLedger {
66    /// Create a new ledger (opens or creates database)
67    pub fn new() -> Result<Self> {
68        let store = SessionStore::new().context("Failed to initialize session store")?;
69        Ok(Self {
70            store,
71            current_session: None,
72            session_dir: None,
73        })
74    }
75
76    /// Create an in-memory ledger (for testing)
77    pub fn in_memory() -> Result<Self> {
78        // Use a unique temp db for testing to avoid collisions
79        let temp_dir = std::env::temp_dir();
80        let db_path = temp_dir.join(format!("perspt_test_{}.db", uuid::Uuid::new_v4()));
81        let store = SessionStore::open(&db_path)?;
82        Ok(Self {
83            store,
84            current_session: None,
85            session_dir: None,
86        })
87    }
88
89    /// Start a new session
90    pub fn start_session(&mut self, session_id: &str, task: &str, working_dir: &str) -> Result<()> {
91        let record = SessionRecord {
92            session_id: session_id.to_string(),
93            task: task.to_string(),
94            working_dir: working_dir.to_string(),
95            merkle_root: None,
96            detected_toolchain: None,
97            status: "RUNNING".to_string(),
98        };
99
100        self.store.create_session(&record)?;
101
102        // Create physical artifact directory
103        let dir = self.store.create_session_dir(session_id)?;
104        self.session_dir = Some(dir);
105
106        let legacy_record = SessionRecordLegacy {
107            session_id: session_id.to_string(),
108            task: task.to_string(),
109            started_at: chrono_timestamp(),
110            ended_at: None,
111            status: "RUNNING".to_string(),
112            total_nodes: 0,
113            completed_nodes: 0,
114        };
115        self.current_session = Some(legacy_record);
116
117        log::info!("Started persistent session: {}", session_id);
118        Ok(())
119    }
120
121    /// Record energy measurement
122    pub fn record_energy(
123        &self,
124        node_id: &str,
125        energy: &crate::types::EnergyComponents,
126        total_energy: f32,
127    ) -> Result<()> {
128        let session_id = self
129            .current_session
130            .as_ref()
131            .map(|s| s.session_id.clone())
132            .context("No active session to record energy")?;
133
134        let record = perspt_store::EnergyRecord {
135            node_id: node_id.to_string(),
136            session_id,
137            v_syn: energy.v_syn,
138            v_str: energy.v_str,
139            v_log: energy.v_log,
140            v_boot: energy.v_boot,
141            v_sheaf: energy.v_sheaf,
142            v_total: total_energy,
143        };
144
145        self.store.record_energy(&record)?;
146        Ok(())
147    }
148
149    /// Commit a stable node state
150    pub fn commit_node(
151        &mut self,
152        node_id: &str,
153        merkle_root: [u8; 32],
154        _parent_hash: Option<[u8; 32]>,
155        energy: f32,
156        state_json: String,
157    ) -> Result<String> {
158        let session_id = self
159            .current_session
160            .as_ref()
161            .map(|s| s.session_id.clone())
162            .context("No active session to commit")?;
163
164        let commit_id = generate_commit_id();
165
166        let record = NodeStateRecord {
167            node_id: node_id.to_string(),
168            session_id: session_id.clone(),
169            state: state_json,
170            v_total: energy,
171            merkle_hash: Some(merkle_root.to_vec()),
172            attempt_count: 1, // Placeholder
173            // Phase 8 fields — populated properly via commit_node_snapshot
174            node_class: None,
175            owner_plugin: None,
176            goal: None,
177            parent_id: None,
178            children: None,
179            last_error_type: None,
180            committed_at: None,
181        };
182
183        self.store.record_node_state(&record)?;
184        self.store.update_merkle_root(&session_id, &merkle_root)?;
185
186        log::info!("Committed node {} to store", node_id);
187
188        // Update session progress
189        if let Some(ref mut session) = self.current_session {
190            session.completed_nodes += 1;
191        }
192
193        Ok(commit_id)
194    }
195
196    /// Commit a full node snapshot with all Phase 8 metadata.
197    ///
198    /// This is the preferred commit API for the orchestrator. It records the
199    /// complete node state, graph-structural fields, retry/error metadata,
200    /// and merkle material in a single durable write. Returns the commit ID.
201    pub fn commit_node_snapshot(&mut self, payload: &NodeCommitPayload) -> Result<String> {
202        let session_id = self
203            .current_session
204            .as_ref()
205            .map(|s| s.session_id.clone())
206            .context("No active session to commit")?;
207
208        let commit_id = generate_commit_id();
209
210        let record = NodeStateRecord {
211            node_id: payload.node_id.clone(),
212            session_id: session_id.clone(),
213            state: payload.state.clone(),
214            v_total: payload.v_total,
215            merkle_hash: payload.merkle_hash.clone(),
216            attempt_count: payload.attempt_count,
217            node_class: payload.node_class.clone(),
218            owner_plugin: payload.owner_plugin.clone(),
219            goal: payload.goal.clone(),
220            parent_id: payload.parent_id.clone(),
221            children: payload.children.clone(),
222            last_error_type: payload.last_error_type.clone(),
223            committed_at: Some(chrono_iso_now()),
224        };
225
226        self.store.record_node_state(&record)?;
227
228        // Update merkle root if hash is present
229        if let Some(ref hash) = payload.merkle_hash {
230            if hash.len() == 32 {
231                let mut root = [0u8; 32];
232                root.copy_from_slice(hash);
233                self.store.update_merkle_root(&session_id, &root)?;
234            }
235        }
236
237        log::info!(
238            "Committed node snapshot '{}' (state={}, attempts={})",
239            payload.node_id,
240            payload.state,
241            payload.attempt_count
242        );
243
244        if let Some(ref mut session) = self.current_session {
245            session.completed_nodes += 1;
246        }
247
248        Ok(commit_id)
249    }
250
251    /// End the current session
252    pub fn end_session(&mut self, status: &str) -> Result<()> {
253        if let Some(ref mut session) = self.current_session {
254            session.ended_at = Some(chrono_timestamp());
255            session.status = status.to_string();
256            log::info!(
257                "Ended session {} with status: {}",
258                session.session_id,
259                status
260            );
261        }
262        Ok(())
263    }
264
265    /// Get artifacts directory
266    pub fn artifacts_dir(&self) -> Option<&Path> {
267        self.session_dir.as_deref()
268    }
269
270    /// Get session statistics (legacy facade)
271    pub fn get_stats(&self) -> LedgerStats {
272        LedgerStats {
273            total_sessions: 0, // Would query store.count_sessions()
274            total_commits: 0,
275            db_size_bytes: 0,
276        }
277    }
278
279    /// Get the current merkle root (legacy facade)
280    pub fn current_merkle_root(&self) -> [u8; 32] {
281        [0u8; 32] // Placeholder
282    }
283
284    /// Record an LLM request/response for debugging and cost tracking
285    pub fn record_llm_request(
286        &self,
287        model: &str,
288        prompt: &str,
289        response: &str,
290        node_id: Option<&str>,
291        latency_ms: i32,
292    ) -> Result<()> {
293        let session_id = self
294            .current_session
295            .as_ref()
296            .map(|s| s.session_id.clone())
297            .context("No active session to record LLM request")?;
298
299        let record = LlmRequestRecord {
300            session_id,
301            node_id: node_id.map(|s| s.to_string()),
302            model: model.to_string(),
303            prompt: prompt.to_string(),
304            response: response.to_string(),
305            tokens_in: 0, // TODO: Extract from provider response if available
306            tokens_out: 0,
307            latency_ms,
308        };
309
310        self.store.record_llm_request(&record)?;
311        log::debug!(
312            "Recorded LLM request: model={}, prompt_len={}, response_len={}",
313            model,
314            prompt.len(),
315            response.len()
316        );
317        Ok(())
318    }
319
320    /// Get access to the underlying store (for direct queries)
321    pub fn store(&self) -> &SessionStore {
322        &self.store
323    }
324
325    // =========================================================================
326    // PSP-5 Phase 3: Structural Digests & Context Provenance
327    // =========================================================================
328
329    /// Record a structural digest for a node
330    pub fn record_structural_digest(
331        &self,
332        node_id: &str,
333        source_path: &str,
334        artifact_kind: &str,
335        hash: &[u8],
336        version: i32,
337    ) -> Result<()> {
338        let session_id = self
339            .current_session
340            .as_ref()
341            .map(|s| s.session_id.clone())
342            .context("No active session to record structural digest")?;
343
344        let record = perspt_store::StructuralDigestRecord {
345            digest_id: format!("sd-{}-{}", node_id, uuid::Uuid::new_v4()),
346            session_id,
347            node_id: node_id.to_string(),
348            source_path: source_path.to_string(),
349            artifact_kind: artifact_kind.to_string(),
350            hash: hash.to_vec(),
351            version,
352        };
353
354        self.store.record_structural_digest(&record)?;
355        log::debug!(
356            "Recorded structural digest for {} at {}",
357            node_id,
358            source_path
359        );
360        Ok(())
361    }
362
363    /// Get structural digests for a specific node in the current session
364    pub fn get_structural_digests(
365        &self,
366        node_id: &str,
367    ) -> Result<Vec<perspt_store::StructuralDigestRecord>> {
368        let session_id = self
369            .current_session
370            .as_ref()
371            .map(|s| s.session_id.clone())
372            .context("No active session to query structural digests")?;
373
374        self.store.get_structural_digests(&session_id, node_id)
375    }
376
377    /// Record context provenance for a node
378    pub fn record_context_provenance(
379        &self,
380        provenance: &perspt_core::types::ContextProvenance,
381    ) -> Result<()> {
382        let session_id = self
383            .current_session
384            .as_ref()
385            .map(|s| s.session_id.clone())
386            .context("No active session to record context provenance")?;
387
388        let to_hex_32 =
389            |bytes: &[u8; 32]| -> String { bytes.iter().map(|b| format!("{:02x}", b)).collect() };
390        let to_hex_vec =
391            |bytes: &[u8]| -> String { bytes.iter().map(|b| format!("{:02x}", b)).collect() };
392        let structural_hashes: Vec<String> = provenance
393            .structural_digest_hashes
394            .iter()
395            .map(|(id, hash)| format!("{}:{}", id, to_hex_32(hash)))
396            .collect();
397        let summary_hashes: Vec<String> = provenance
398            .summary_digest_hashes
399            .iter()
400            .map(|(id, hash)| format!("{}:{}", id, to_hex_32(hash)))
401            .collect();
402        let dep_hashes: Vec<String> = provenance
403            .dependency_commit_hashes
404            .iter()
405            .map(|(id, hash)| format!("{}:{}", id, to_hex_vec(hash)))
406            .collect();
407
408        let record = perspt_store::ContextProvenanceRecord {
409            session_id,
410            node_id: provenance.node_id.clone(),
411            context_package_id: provenance.context_package_id.clone(),
412            structural_hashes: serde_json::to_string(&structural_hashes).unwrap_or_default(),
413            summary_hashes: serde_json::to_string(&summary_hashes).unwrap_or_default(),
414            dependency_hashes: serde_json::to_string(&dep_hashes).unwrap_or_default(),
415            included_file_count: provenance.included_file_count as i32,
416            total_bytes: provenance.total_bytes as i32,
417        };
418
419        self.store.record_context_provenance(&record)?;
420        log::debug!(
421            "Recorded context provenance for node '{}' (package '{}')",
422            provenance.node_id,
423            provenance.context_package_id
424        );
425        Ok(())
426    }
427
428    /// Get context provenance for a specific node in the current session
429    pub fn get_context_provenance(
430        &self,
431        node_id: &str,
432    ) -> Result<Option<perspt_store::ContextProvenanceRecord>> {
433        let session_id = self
434            .current_session
435            .as_ref()
436            .map(|s| s.session_id.clone())
437            .context("No active session to query context provenance")?;
438
439        self.store.get_context_provenance(&session_id, node_id)
440    }
441
442    // =========================================================================
443    // PSP-5 Phase 5: Escalation and Rewrite Persistence
444    // =========================================================================
445
446    /// Record an escalation report for a non-convergent node
447    pub fn record_escalation_report(
448        &self,
449        report: &perspt_core::types::EscalationReport,
450    ) -> Result<()> {
451        let session_id = self
452            .current_session
453            .as_ref()
454            .map(|s| s.session_id.clone())
455            .context("No active session to record escalation report")?;
456
457        let record = perspt_store::EscalationReportRecord {
458            session_id,
459            node_id: report.node_id.clone(),
460            category: report.category.to_string(),
461            action: serde_json::to_string(&report.action).unwrap_or_default(),
462            energy_snapshot: serde_json::to_string(&report.energy_snapshot).unwrap_or_default(),
463            stage_outcomes: serde_json::to_string(&report.stage_outcomes).unwrap_or_default(),
464            evidence: report.evidence.clone(),
465            affected_node_ids: serde_json::to_string(&report.affected_node_ids).unwrap_or_default(),
466        };
467
468        self.store.record_escalation_report(&record)?;
469        log::debug!(
470            "Recorded escalation report for node '{}': {} → {}",
471            report.node_id,
472            report.category,
473            report.action
474        );
475        Ok(())
476    }
477
478    /// Record a local graph rewrite
479    pub fn record_rewrite(&self, record: &perspt_core::types::RewriteRecord) -> Result<()> {
480        let session_id = self
481            .current_session
482            .as_ref()
483            .map(|s| s.session_id.clone())
484            .context("No active session to record rewrite")?;
485
486        let row = perspt_store::RewriteRecordRow {
487            session_id,
488            node_id: record.node_id.clone(),
489            action: serde_json::to_string(&record.action).unwrap_or_default(),
490            category: record.category.to_string(),
491            requeued_nodes: serde_json::to_string(&record.requeued_nodes).unwrap_or_default(),
492            inserted_nodes: serde_json::to_string(&record.inserted_nodes).unwrap_or_default(),
493        };
494
495        self.store.record_rewrite(&row)?;
496        log::debug!(
497            "Recorded rewrite for node '{}': {} ({} requeued, {} inserted)",
498            record.node_id,
499            record.action,
500            record.requeued_nodes.len(),
501            record.inserted_nodes.len()
502        );
503        Ok(())
504    }
505
506    /// PSP-5 Phase 5: Count rewrite records matching a lineage prefix.
507    ///
508    /// A lineage is identified by the base node ID (before any `__split_` or
509    /// `__iface` suffixes). This count is used as a churn guardrail to prevent
510    /// infinite rewrite loops.
511    pub fn get_rewrite_count_for_lineage(&self, lineage_base: &str) -> Result<usize> {
512        let session_id = self
513            .current_session
514            .as_ref()
515            .map(|s| s.session_id.clone())
516            .context("No active session to query rewrite count")?;
517
518        let records = self.store.get_rewrite_records(&session_id)?;
519        let count = records
520            .iter()
521            .filter(|r| r.node_id.starts_with(lineage_base))
522            .count();
523        Ok(count)
524    }
525
526    /// Record a sheaf validation result
527    pub fn record_sheaf_validation(
528        &self,
529        node_id: &str,
530        result: &perspt_core::types::SheafValidationResult,
531    ) -> Result<()> {
532        let session_id = self
533            .current_session
534            .as_ref()
535            .map(|s| s.session_id.clone())
536            .context("No active session to record sheaf validation")?;
537
538        let row = perspt_store::SheafValidationRow {
539            session_id,
540            node_id: node_id.to_string(),
541            validator_class: result.validator_class.to_string(),
542            plugin_source: result.plugin_source.clone(),
543            passed: result.passed,
544            evidence_summary: result.evidence_summary.clone(),
545            affected_files: serde_json::to_string(&result.affected_files).unwrap_or_default(),
546            v_sheaf_contribution: result.v_sheaf_contribution,
547            requeue_targets: serde_json::to_string(&result.requeue_targets).unwrap_or_default(),
548        };
549
550        self.store.record_sheaf_validation(&row)?;
551        log::debug!(
552            "Recorded sheaf validation for node '{}': {} → {}",
553            node_id,
554            result.validator_class,
555            if result.passed { "pass" } else { "fail" }
556        );
557        Ok(())
558    }
559
560    /// Get escalation reports for the current session
561    pub fn get_escalation_reports(&self) -> Result<Vec<perspt_store::EscalationReportRecord>> {
562        let session_id = self
563            .current_session
564            .as_ref()
565            .map(|s| s.session_id.clone())
566            .context("No active session to query escalation reports")?;
567
568        self.store.get_escalation_reports(&session_id)
569    }
570
571    // =========================================================================
572    // PSP-5 Phase 8: Verification Result and Artifact Bundle Facades
573    // =========================================================================
574
575    /// Record a verification result snapshot for a node
576    pub fn record_verification_result(
577        &self,
578        node_id: &str,
579        result: &perspt_core::types::VerificationResult,
580    ) -> Result<()> {
581        let session_id = self.session_id()?;
582
583        let result_json = serde_json::to_string(result).unwrap_or_default();
584        let row = perspt_store::VerificationResultRow {
585            session_id,
586            node_id: node_id.to_string(),
587            result_json,
588            syntax_ok: result.syntax_ok,
589            build_ok: result.build_ok,
590            tests_ok: result.tests_ok,
591            lint_ok: result.lint_ok,
592            diagnostics_count: result.diagnostics_count as i32,
593            tests_passed: result.tests_passed as i32,
594            tests_failed: result.tests_failed as i32,
595            degraded: result.degraded,
596            degraded_reason: result.degraded_reason.clone(),
597        };
598
599        self.store.record_verification_result(&row)?;
600        log::debug!(
601            "Recorded verification result for node '{}': syn={} build={} test={} degraded={}",
602            node_id,
603            result.syntax_ok,
604            result.build_ok,
605            result.tests_ok,
606            result.degraded
607        );
608        Ok(())
609    }
610
611    /// Get the latest verification result for a node
612    pub fn get_verification_result(
613        &self,
614        node_id: &str,
615    ) -> Result<Option<perspt_store::VerificationResultRow>> {
616        let session_id = self.session_id()?;
617        self.store.get_verification_result(&session_id, node_id)
618    }
619
620    /// Record an artifact bundle snapshot for a node
621    pub fn record_artifact_bundle(
622        &self,
623        node_id: &str,
624        bundle: &perspt_core::types::ArtifactBundle,
625    ) -> Result<()> {
626        let session_id = self.session_id()?;
627
628        let bundle_json = serde_json::to_string(bundle).unwrap_or_default();
629        let touched_files: Vec<String> = bundle
630            .artifacts
631            .iter()
632            .map(|a| a.path().to_string())
633            .collect();
634
635        let row = perspt_store::ArtifactBundleRow {
636            session_id,
637            node_id: node_id.to_string(),
638            bundle_json,
639            artifact_count: bundle.artifacts.len() as i32,
640            command_count: bundle.commands.len() as i32,
641            touched_files: serde_json::to_string(&touched_files).unwrap_or_default(),
642        };
643
644        self.store.record_artifact_bundle(&row)?;
645        log::debug!(
646            "Recorded artifact bundle for node '{}': {} artifacts, {} commands",
647            node_id,
648            bundle.artifacts.len(),
649            bundle.commands.len()
650        );
651        Ok(())
652    }
653
654    /// Get the latest artifact bundle for a node
655    pub fn get_artifact_bundle(
656        &self,
657        node_id: &str,
658    ) -> Result<Option<perspt_store::ArtifactBundleRow>> {
659        let session_id = self.session_id()?;
660        self.store.get_artifact_bundle(&session_id, node_id)
661    }
662
663    // =========================================================================
664    // PSP-5 Phase 8: Task Graph & Session Rehydration
665    // =========================================================================
666
667    /// Record a task-graph edge (parent→child dependency)
668    pub fn record_task_graph_edge(
669        &self,
670        parent_node_id: &str,
671        child_node_id: &str,
672        edge_type: &str,
673    ) -> Result<()> {
674        let session_id = self.session_id()?;
675        let row = perspt_store::TaskGraphEdgeRow {
676            session_id,
677            parent_node_id: parent_node_id.to_string(),
678            child_node_id: child_node_id.to_string(),
679            edge_type: edge_type.to_string(),
680        };
681        self.store.record_task_graph_edge(&row)?;
682        log::debug!(
683            "Recorded task graph edge: {} → {} ({})",
684            parent_node_id,
685            child_node_id,
686            edge_type
687        );
688        Ok(())
689    }
690
691    /// Get all task graph edges for the current session
692    pub fn get_task_graph_edges(&self) -> Result<Vec<perspt_store::TaskGraphEdgeRow>> {
693        let session_id = self.session_id()?;
694        self.store.get_task_graph_edges(&session_id)
695    }
696
697    /// Get sheaf validations for a specific node
698    pub fn get_sheaf_validations(
699        &self,
700        node_id: &str,
701    ) -> Result<Vec<perspt_store::SheafValidationRow>> {
702        let session_id = self.session_id()?;
703        self.store.get_sheaf_validations(&session_id, node_id)
704    }
705
706    /// Load a complete session snapshot for rehydration/resume.
707    ///
708    /// Aggregates the latest node states, graph topology, energy history,
709    /// verification results, artifact bundles, sheaf validations,
710    /// provisional branches, interface seals, context provenance, and
711    /// escalation reports into a single `SessionSnapshot`.
712    pub fn load_session_snapshot(&self) -> Result<SessionSnapshot> {
713        let session_id = self.session_id()?;
714
715        let node_states = self
716            .store
717            .get_latest_node_states(&session_id)
718            .unwrap_or_default();
719
720        let graph_edges = self
721            .store
722            .get_task_graph_edges(&session_id)
723            .unwrap_or_default();
724
725        let branches = self
726            .store
727            .get_provisional_branches(&session_id)
728            .unwrap_or_default();
729
730        let escalation_reports = self
731            .store
732            .get_escalation_reports(&session_id)
733            .unwrap_or_default();
734
735        let flushes = self
736            .store
737            .get_branch_flushes(&session_id)
738            .unwrap_or_default();
739
740        // Collect per-node evidence
741        let mut node_details: Vec<NodeSnapshotDetail> = Vec::with_capacity(node_states.len());
742        for ns in &node_states {
743            let nid = &ns.node_id;
744
745            let energy_history = self
746                .store
747                .get_energy_history(&session_id, nid)
748                .unwrap_or_default();
749
750            let verification = self
751                .store
752                .get_verification_result(&session_id, nid)
753                .ok()
754                .flatten();
755
756            let artifact_bundle = self
757                .store
758                .get_artifact_bundle(&session_id, nid)
759                .ok()
760                .flatten();
761
762            let sheaf_validations = self
763                .store
764                .get_sheaf_validations(&session_id, nid)
765                .unwrap_or_default();
766
767            let interface_seals = self
768                .store
769                .get_interface_seals(&session_id, nid)
770                .unwrap_or_default();
771
772            let context_provenance = self
773                .store
774                .get_context_provenance(&session_id, nid)
775                .ok()
776                .flatten();
777
778            node_details.push(NodeSnapshotDetail {
779                record: ns.clone(),
780                energy_history,
781                verification,
782                artifact_bundle,
783                sheaf_validations,
784                interface_seals,
785                context_provenance,
786            });
787        }
788
789        log::info!(
790            "Loaded session snapshot: {} nodes, {} edges, {} branches",
791            node_details.len(),
792            graph_edges.len(),
793            branches.len()
794        );
795
796        Ok(SessionSnapshot {
797            session_id,
798            node_details,
799            graph_edges,
800            branches,
801            escalation_reports,
802            flushes,
803        })
804    }
805
806    // =========================================================================
807    // PSP-5 Phase 6: Provisional Branch, Interface Seal, Branch Flush Facades
808    // =========================================================================
809
810    /// Get the current session ID (helper for Phase 6 methods)
811    fn session_id(&self) -> Result<String> {
812        self.current_session
813            .as_ref()
814            .map(|s| s.session_id.clone())
815            .context("No active session")
816    }
817
818    /// Record a new provisional branch for speculative child work
819    pub fn record_provisional_branch(
820        &self,
821        branch: &perspt_core::types::ProvisionalBranch,
822    ) -> Result<()> {
823        let row = perspt_store::ProvisionalBranchRow {
824            branch_id: branch.branch_id.clone(),
825            session_id: branch.session_id.clone(),
826            node_id: branch.node_id.clone(),
827            parent_node_id: branch.parent_node_id.clone(),
828            state: branch.state.to_string(),
829            parent_seal_hash: branch.parent_seal_hash.map(|h| h.to_vec()),
830            sandbox_dir: branch.sandbox_dir.clone(),
831        };
832
833        self.store.record_provisional_branch(&row)?;
834        log::debug!(
835            "Recorded provisional branch '{}' for node '{}' (parent: '{}')",
836            branch.branch_id,
837            branch.node_id,
838            branch.parent_node_id
839        );
840        Ok(())
841    }
842
843    /// Update a provisional branch state
844    pub fn update_branch_state(&self, branch_id: &str, new_state: &str) -> Result<()> {
845        self.store.update_branch_state(branch_id, new_state)?;
846        log::debug!("Updated branch '{}' state to '{}'", branch_id, new_state);
847        Ok(())
848    }
849
850    /// Get all provisional branches for the current session
851    pub fn get_provisional_branches(&self) -> Result<Vec<perspt_store::ProvisionalBranchRow>> {
852        let session_id = self.session_id()?;
853        self.store.get_provisional_branches(&session_id)
854    }
855
856    /// Get live (active/sealed) branches depending on a parent node
857    pub fn get_live_branches_for_parent(
858        &self,
859        parent_node_id: &str,
860    ) -> Result<Vec<perspt_store::ProvisionalBranchRow>> {
861        let session_id = self.session_id()?;
862        self.store
863            .get_live_branches_for_parent(&session_id, parent_node_id)
864    }
865
866    /// Flush all live branches for a parent node and return flushed branch IDs
867    pub fn flush_branches_for_parent(&self, parent_node_id: &str) -> Result<Vec<String>> {
868        let session_id = self.session_id()?;
869        self.store
870            .flush_branches_for_parent(&session_id, parent_node_id)
871    }
872
873    /// Record a branch lineage edge (parent branch → child branch)
874    pub fn record_branch_lineage(&self, lineage: &perspt_core::types::BranchLineage) -> Result<()> {
875        let row = perspt_store::BranchLineageRow {
876            lineage_id: lineage.lineage_id.clone(),
877            parent_branch_id: lineage.parent_branch_id.clone(),
878            child_branch_id: lineage.child_branch_id.clone(),
879            depends_on_seal: lineage.depends_on_seal,
880        };
881
882        self.store.record_branch_lineage(&row)?;
883        log::debug!(
884            "Recorded branch lineage: {} → {}",
885            lineage.parent_branch_id,
886            lineage.child_branch_id
887        );
888        Ok(())
889    }
890
891    /// Get child branch IDs for a parent branch
892    pub fn get_child_branches(&self, parent_branch_id: &str) -> Result<Vec<String>> {
893        self.store.get_child_branches(parent_branch_id)
894    }
895
896    /// Record an interface seal for a node
897    pub fn record_interface_seal(
898        &self,
899        seal: &perspt_core::types::InterfaceSealRecord,
900    ) -> Result<()> {
901        let row = perspt_store::InterfaceSealRow {
902            seal_id: seal.seal_id.clone(),
903            session_id: seal.session_id.clone(),
904            node_id: seal.node_id.clone(),
905            sealed_path: seal.sealed_path.clone(),
906            artifact_kind: seal.artifact_kind.to_string(),
907            seal_hash: seal.seal_hash.to_vec(),
908            version: seal.version as i32,
909        };
910
911        self.store.record_interface_seal(&row)?;
912        log::debug!(
913            "Recorded interface seal '{}' for node '{}' at '{}'",
914            seal.seal_id,
915            seal.node_id,
916            seal.sealed_path
917        );
918        Ok(())
919    }
920
921    /// Get all interface seals for a node in the current session
922    pub fn get_interface_seals(
923        &self,
924        node_id: &str,
925    ) -> Result<Vec<perspt_store::InterfaceSealRow>> {
926        let session_id = self.session_id()?;
927        self.store.get_interface_seals(&session_id, node_id)
928    }
929
930    /// Check whether a node has any interface seals
931    pub fn has_interface_seals(&self, node_id: &str) -> Result<bool> {
932        let session_id = self.session_id()?;
933        self.store.has_interface_seals(&session_id, node_id)
934    }
935
936    /// Record a branch flush decision
937    pub fn record_branch_flush(&self, flush: &perspt_core::types::BranchFlushRecord) -> Result<()> {
938        let row = perspt_store::BranchFlushRow {
939            flush_id: flush.flush_id.clone(),
940            session_id: flush.session_id.clone(),
941            parent_node_id: flush.parent_node_id.clone(),
942            flushed_branch_ids: serde_json::to_string(&flush.flushed_branch_ids)
943                .unwrap_or_default(),
944            requeue_node_ids: serde_json::to_string(&flush.requeue_node_ids).unwrap_or_default(),
945            reason: flush.reason.clone(),
946        };
947
948        self.store.record_branch_flush(&row)?;
949        log::debug!(
950            "Recorded branch flush for parent '{}': {} branches flushed",
951            flush.parent_node_id,
952            flush.flushed_branch_ids.len()
953        );
954        Ok(())
955    }
956
957    /// Get all branch flush records for the current session
958    pub fn get_branch_flushes(&self) -> Result<Vec<perspt_store::BranchFlushRow>> {
959        let session_id = self.session_id()?;
960        self.store.get_branch_flushes(&session_id)
961    }
962
963    // =========================================================================
964    // PSP-5 Phase 7: Review Outcome Persistence
965    // =========================================================================
966
967    /// Persist a review decision as an audit record.
968    pub fn record_review_outcome(
969        &self,
970        node_id: &str,
971        outcome: &str,
972        reviewer_note: Option<&str>,
973        energy_at_review: Option<f64>,
974        degraded: Option<bool>,
975        escalation_category: Option<&str>,
976    ) -> Result<()> {
977        let session_id = self.session_id()?;
978        let row = perspt_store::ReviewOutcomeRow {
979            session_id,
980            node_id: node_id.to_string(),
981            outcome: outcome.to_string(),
982            reviewer_note: reviewer_note.map(|s| s.to_string()),
983            energy_at_review,
984            degraded,
985            escalation_category: escalation_category.map(|s| s.to_string()),
986        };
987        self.store.record_review_outcome(&row)
988    }
989
990    /// Get all review outcomes for a node.
991    pub fn get_review_outcomes(
992        &self,
993        node_id: &str,
994    ) -> Result<Vec<perspt_store::ReviewOutcomeRow>> {
995        let session_id = self.session_id()?;
996        self.store.get_review_outcomes(&session_id, node_id)
997    }
998
999    /// Get all review outcomes across the session.
1000    pub fn get_all_review_outcomes(&self) -> Result<Vec<perspt_store::ReviewOutcomeRow>> {
1001        let session_id = self.session_id()?;
1002        self.store.get_all_review_outcomes(&session_id)
1003    }
1004
1005    // =========================================================================
1006    // PSP-5 Phase 7: Shared Review & Provenance Aggregation Helpers
1007    // =========================================================================
1008
1009    /// Build a review-ready summary for a single node.
1010    ///
1011    /// Aggregates energy history, escalation reports, sheaf validations,
1012    /// context provenance, interface seals, and branch state from the store
1013    /// into a single struct consumable by both TUI and CLI surfaces.
1014    pub fn node_review_summary(&self, node_id: &str) -> Result<NodeReviewSummary> {
1015        let session_id = self.session_id()?;
1016
1017        let energy_history = self
1018            .store
1019            .get_energy_history(&session_id, node_id)
1020            .unwrap_or_default();
1021
1022        let latest_energy = energy_history.last().cloned();
1023
1024        let escalation_reports = self
1025            .store
1026            .get_escalation_reports(&session_id)
1027            .unwrap_or_default()
1028            .into_iter()
1029            .filter(|r| r.node_id == node_id)
1030            .collect::<Vec<_>>();
1031
1032        let sheaf_validations = self
1033            .store
1034            .get_sheaf_validations(&session_id, node_id)
1035            .unwrap_or_default();
1036
1037        let interface_seals = self
1038            .store
1039            .get_interface_seals(&session_id, node_id)
1040            .unwrap_or_default();
1041
1042        let context_provenance = self
1043            .store
1044            .get_context_provenance(&session_id, node_id)
1045            .ok()
1046            .flatten()
1047            .into_iter()
1048            .collect::<Vec<_>>();
1049
1050        let branches: Vec<_> = self
1051            .store
1052            .get_provisional_branches(&session_id)
1053            .unwrap_or_default()
1054            .into_iter()
1055            .filter(|b| b.node_id == node_id)
1056            .collect();
1057
1058        let attempt_count = energy_history.len().max(1) as u32;
1059
1060        Ok(NodeReviewSummary {
1061            node_id: node_id.to_string(),
1062            latest_energy,
1063            energy_history,
1064            attempt_count,
1065            escalation_reports,
1066            sheaf_validations,
1067            interface_seals,
1068            context_provenance,
1069            branches,
1070        })
1071    }
1072
1073    /// Build a session-level summary aggregating lifecycle counts, energy
1074    /// stats, escalation activity, and branch provenance.
1075    pub fn session_summary(&self) -> Result<SessionReviewSummary> {
1076        let session_id = self.session_id()?;
1077
1078        let node_states = self.store.get_node_states(&session_id).unwrap_or_default();
1079        let total_nodes = node_states.len();
1080        let completed = node_states
1081            .iter()
1082            .filter(|n| n.state == "COMPLETED" || n.state == "STABLE")
1083            .count();
1084        let failed = node_states.iter().filter(|n| n.state == "FAILED").count();
1085        let escalated = node_states
1086            .iter()
1087            .filter(|n| n.state == "Escalated")
1088            .count();
1089
1090        // Collect latest energy per node
1091        let mut total_energy: f32 = 0.0;
1092        let mut node_energies: Vec<(String, perspt_store::EnergyRecord)> = Vec::new();
1093        for ns in &node_states {
1094            if let Ok(history) = self.store.get_energy_history(&session_id, &ns.node_id) {
1095                if let Some(latest) = history.last() {
1096                    total_energy += latest.v_total;
1097                    node_energies.push((ns.node_id.clone(), latest.clone()));
1098                }
1099            }
1100        }
1101
1102        let escalation_reports = self
1103            .store
1104            .get_escalation_reports(&session_id)
1105            .unwrap_or_default();
1106
1107        let branches = self
1108            .store
1109            .get_provisional_branches(&session_id)
1110            .unwrap_or_default();
1111
1112        let active_branches = branches.iter().filter(|b| b.state == "active").count();
1113        let sealed_branches = branches.iter().filter(|b| b.state == "sealed").count();
1114        let merged_branches = branches.iter().filter(|b| b.state == "merged").count();
1115        let flushed_branches = branches.iter().filter(|b| b.state == "flushed").count();
1116
1117        let flushes = self
1118            .store
1119            .get_branch_flushes(&session_id)
1120            .unwrap_or_default();
1121
1122        // Review audit aggregation
1123        let review_outcomes = self
1124            .store
1125            .get_all_review_outcomes(&session_id)
1126            .unwrap_or_default();
1127        let review_total = review_outcomes.len();
1128        let reviews_approved = review_outcomes
1129            .iter()
1130            .filter(|r| r.outcome.starts_with("approved") || r.outcome == "auto_approved")
1131            .count();
1132        let reviews_rejected = review_outcomes
1133            .iter()
1134            .filter(|r| r.outcome == "rejected" || r.outcome == "aborted")
1135            .count();
1136        let reviews_corrected = review_outcomes
1137            .iter()
1138            .filter(|r| r.outcome == "correction_requested")
1139            .count();
1140
1141        Ok(SessionReviewSummary {
1142            session_id,
1143            total_nodes,
1144            completed,
1145            failed,
1146            escalated,
1147            total_energy,
1148            node_energies,
1149            escalation_reports,
1150            branches_total: branches.len(),
1151            active_branches,
1152            sealed_branches,
1153            merged_branches,
1154            flushed_branches,
1155            flush_decisions: flushes,
1156            review_total,
1157            reviews_approved,
1158            reviews_rejected,
1159            reviews_corrected,
1160        })
1161    }
1162}
1163
1164/// PSP-5 Phase 7: Aggregated review summary for a single node.
1165///
1166/// Consumed by both TUI review modal and CLI status/resume commands.
1167#[derive(Debug, Clone)]
1168pub struct NodeReviewSummary {
1169    pub node_id: String,
1170    pub latest_energy: Option<perspt_store::EnergyRecord>,
1171    pub energy_history: Vec<perspt_store::EnergyRecord>,
1172    pub attempt_count: u32,
1173    pub escalation_reports: Vec<perspt_store::EscalationReportRecord>,
1174    pub sheaf_validations: Vec<perspt_store::SheafValidationRow>,
1175    pub interface_seals: Vec<perspt_store::InterfaceSealRow>,
1176    pub context_provenance: Vec<perspt_store::ContextProvenanceRecord>,
1177    pub branches: Vec<perspt_store::ProvisionalBranchRow>,
1178}
1179
1180/// PSP-5 Phase 7: Aggregated session-level review summary.
1181///
1182/// Consumed by both TUI dashboard and CLI status/resume commands.
1183#[derive(Debug, Clone)]
1184pub struct SessionReviewSummary {
1185    pub session_id: String,
1186    pub total_nodes: usize,
1187    pub completed: usize,
1188    pub failed: usize,
1189    pub escalated: usize,
1190    pub total_energy: f32,
1191    pub node_energies: Vec<(String, perspt_store::EnergyRecord)>,
1192    pub escalation_reports: Vec<perspt_store::EscalationReportRecord>,
1193    pub branches_total: usize,
1194    pub active_branches: usize,
1195    pub sealed_branches: usize,
1196    pub merged_branches: usize,
1197    pub flushed_branches: usize,
1198    pub flush_decisions: Vec<perspt_store::BranchFlushRow>,
1199    /// Review audit: total decisions and breakdown
1200    pub review_total: usize,
1201    pub reviews_approved: usize,
1202    pub reviews_rejected: usize,
1203    pub reviews_corrected: usize,
1204}
1205
1206/// Ledger statistics (Legacy)
1207#[derive(Debug, Clone)]
1208pub struct LedgerStats {
1209    pub total_sessions: usize,
1210    pub total_commits: usize,
1211    pub db_size_bytes: u64,
1212}
1213
1214/// PSP-5 Phase 8: Per-node evidence bundle for session rehydration.
1215#[derive(Debug, Clone)]
1216pub struct NodeSnapshotDetail {
1217    pub record: NodeStateRecord,
1218    pub energy_history: Vec<perspt_store::EnergyRecord>,
1219    pub verification: Option<perspt_store::VerificationResultRow>,
1220    pub artifact_bundle: Option<perspt_store::ArtifactBundleRow>,
1221    pub sheaf_validations: Vec<perspt_store::SheafValidationRow>,
1222    pub interface_seals: Vec<perspt_store::InterfaceSealRow>,
1223    pub context_provenance: Option<perspt_store::ContextProvenanceRecord>,
1224}
1225
1226/// PSP-5 Phase 8: Complete session snapshot for resume/rehydration.
1227///
1228/// Aggregates all persisted state needed to reconstruct the orchestrator
1229/// DAG, restore node states, and resume execution from the last durable
1230/// boundary.
1231#[derive(Debug, Clone)]
1232pub struct SessionSnapshot {
1233    pub session_id: String,
1234    pub node_details: Vec<NodeSnapshotDetail>,
1235    pub graph_edges: Vec<perspt_store::TaskGraphEdgeRow>,
1236    pub branches: Vec<perspt_store::ProvisionalBranchRow>,
1237    pub escalation_reports: Vec<perspt_store::EscalationReportRecord>,
1238    pub flushes: Vec<perspt_store::BranchFlushRow>,
1239}
1240
1241/// Generate a unique commit ID
1242fn generate_commit_id() -> String {
1243    use std::time::{SystemTime, UNIX_EPOCH};
1244    let now = SystemTime::now()
1245        .duration_since(UNIX_EPOCH)
1246        .unwrap()
1247        .as_nanos();
1248    format!("{:x}", now)
1249}
1250
1251/// Get current timestamp
1252fn chrono_timestamp() -> i64 {
1253    use std::time::{SystemTime, UNIX_EPOCH};
1254    SystemTime::now()
1255        .duration_since(UNIX_EPOCH)
1256        .unwrap()
1257        .as_secs() as i64
1258}
1259
1260/// ISO-8601 timestamp for committed_at fields
1261fn chrono_iso_now() -> String {
1262    use std::time::{SystemTime, UNIX_EPOCH};
1263    let secs = SystemTime::now()
1264        .duration_since(UNIX_EPOCH)
1265        .unwrap()
1266        .as_secs();
1267    // Simple UTC timestamp — YYYY-MM-DDTHH:MM:SSZ
1268    let days = secs / 86400;
1269    let time = secs % 86400;
1270    let h = time / 3600;
1271    let m = (time % 3600) / 60;
1272    let s = time % 60;
1273    // Days since 1970-01-01 to y/m/d (civil calendar)
1274    let (y, mo, d) = days_to_ymd(days);
1275    format!("{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z", y, mo, d, h, m, s)
1276}
1277
1278/// Convert days since Unix epoch to (year, month, day)
1279fn days_to_ymd(days: u64) -> (u64, u64, u64) {
1280    // Algorithm from Howard Hinnant's date library
1281    let z = days + 719468;
1282    let era = z / 146097;
1283    let doe = z - era * 146097;
1284    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
1285    let y = yoe + era * 400;
1286    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1287    let mp = (5 * doy + 2) / 153;
1288    let d = doy - (153 * mp + 2) / 5 + 1;
1289    let m = if mp < 10 { mp + 3 } else { mp - 9 };
1290    let y = if m <= 2 { y + 1 } else { y };
1291    (y, m, d)
1292}