use super::verification::severity_to_str;
use super::*;
impl SRBNOrchestrator {
pub(super) async fn step_converge(
&mut self,
idx: NodeIndex,
energy: EnergyComponents,
) -> Result<bool> {
log::info!("Step 5: Convergence check");
let total = {
let node = &self.graph[idx];
energy.total(&node.contract)
};
let node = &mut self.graph[idx];
node.monitor.record_energy(total);
let node_id = node.node_id.clone();
let goal = node.goal.clone();
let epsilon = node.monitor.stability_epsilon;
let attempt_count = node.monitor.attempt_count;
let stable = node.monitor.stable;
let should_escalate = node.monitor.should_escalate();
if stable {
if let Some(ref vr) = self.last_verification_result {
if vr.has_degraded_stages() {
let reasons = vr.degraded_stage_reasons();
log::warn!(
"Node {} energy is below ε but verification was degraded: {:?}",
node_id,
reasons
);
self.emit_log(format!(
"⚠️ V(x)={:.2} < ε but stability unconfirmed — degraded sensors: {}",
total,
reasons.join(", ")
));
self.emit_event(perspt_core::AgentEvent::DegradedVerification {
node_id: node_id.clone(),
degraded_stages: reasons,
stability_blocked: true,
});
} else {
log::info!(
"Node {} is stable (V(x)={:.2} < ε={:.2})",
node_id,
total,
epsilon
);
self.emit_log(format!("✅ Stable! V(x)={:.2} < ε={:.2}", total, epsilon));
return Ok(true);
}
} else {
log::info!(
"Node {} is stable (V(x)={:.2} < ε={:.2})",
node_id,
total,
epsilon
);
self.emit_log(format!("✅ Stable! V(x)={:.2} < ε={:.2}", total, epsilon));
return Ok(true);
}
}
if should_escalate {
log::warn!(
"Node {} failed to converge after {} attempts (V(x)={:.2})",
node_id,
attempt_count,
total
);
self.emit_log(format!(
"⚠️ Escalating: failed to converge after {} attempts",
attempt_count
));
return Ok(false);
}
self.graph[idx].state = NodeState::Retry;
self.emit_event(perspt_core::AgentEvent::TaskStatusChanged {
node_id: self.graph[idx].node_id.clone(),
status: perspt_core::NodeStatus::Retrying,
});
log::info!(
"V(x)={:.2} > ε={:.2}, regenerating with feedback (attempt {})",
total,
epsilon,
attempt_count
);
self.emit_log(format!(
"🔄 V(x)={:.2} > ε={:.2}, sending errors to LLM (attempt {})",
total, epsilon, attempt_count
));
if self.budget.any_exhausted() {
log::warn!(
"Budget exhausted before correction (steps {}/{:?}, revisions {}/{:?}, cost ${:.2} / {:?}) — escalating node '{}'",
self.budget.steps_used,
self.budget.max_steps,
self.budget.revisions_used,
self.budget.max_revisions,
self.budget.cost_used_usd,
self.budget.max_cost_usd,
node_id
);
self.emit_log(format!(
"💰 Budget exhausted before correction (${:.2}) — escalating",
self.budget.cost_used_usd
));
return Ok(false);
}
let correction_prompt = self.build_correction_prompt(&node_id, &goal, &energy)?;
log::info!(
"--- CORRECTION PROMPT ---\n{}\n-------------------------",
correction_prompt
);
self.emit_log("📤 Sending correction prompt to LLM...".to_string());
let corrected = self.call_llm_for_correction(&correction_prompt).await?;
let node_class = self.graph[idx].node_class;
let attempt = self.graph[idx].monitor.attempt_count;
let diagnosis = self.context.last_diagnostics.clone();
let owner_plugin = self.graph[idx].owner_plugin.clone();
let (bundle_opt, parse_state, record_opt) =
self.parse_artifact_bundle_typed(&corrected, &node_id, attempt as u32);
if let Some(ref record) = record_opt {
log::info!(
"PSP-7 correction attempt {}: parse_state={}, accepted={}, rejection={:?}",
record.attempt,
record.parse_state,
record.accepted,
record.rejection_reason
);
let row = perspt_store::CorrectionAttemptRow {
session_id: self.context.session_id.clone(),
node_id: node_id.clone(),
attempt: record.attempt as i32,
parse_state: format!("{}", record.parse_state),
retry_classification: record
.retry_classification
.as_ref()
.map(|c| format!("{}", c)),
response_fingerprint: record.response_fingerprint.clone(),
response_length: record.response_length as i32,
energy_json: record
.energy_after
.as_ref()
.and_then(|e| serde_json::to_string(e).ok()),
accepted: record.accepted,
rejection_reason: record.rejection_reason.clone(),
created_at: record.created_at,
};
if let Err(e) = self.ledger.record_correction_attempt(&row) {
log::warn!("Failed to persist correction attempt: {}", e);
}
}
match parse_state {
perspt_core::types::ParseResultState::StrictJsonOk
| perspt_core::types::ParseResultState::TolerantRecoveryOk => {
let bundle = bundle_opt.expect("Accepted parse must yield a bundle");
log::info!(
"Applying correction bundle ({}): {} artifact(s), {} command(s)",
parse_state,
bundle.artifacts.len(),
bundle.commands.len()
);
self.emit_log(format!(
"🔧 Applying correction bundle ({} artifact(s))",
bundle.artifacts.len()
));
self.apply_bundle_transactionally(&bundle, &node_id, node_class)
.await?;
self.last_tool_failure = None;
let node_workdir = self.effective_working_dir(idx);
if let Some(first_path) = bundle.artifacts.first().map(|a| a.path().to_string()) {
self.last_written_file = Some(node_workdir.join(&first_path));
}
self.file_version += 1;
self.last_applied_bundle = Some(bundle.clone());
let diagnosis_str = format!("{:?}", diagnosis);
let footprint = perspt_core::RepairFootprint::new(
&self.context.session_id,
&node_id,
"initial",
attempt as u32,
&bundle,
&diagnosis_str,
);
self.last_repair_footprint = Some(footprint.clone());
if let Err(e) = self.ledger.record_repair_footprint(&footprint) {
log::warn!("Failed to record repair footprint: {}", e);
}
if !bundle.commands.is_empty() {
self.emit_log(format!(
"🔧 Executing {} bundle command(s)...",
bundle.commands.len()
));
let work_dir = self.effective_working_dir(idx);
let is_python = self.graph[idx].owner_plugin == "python";
for raw_command in &bundle.commands {
let command = if is_python {
Self::normalize_command_to_uv(raw_command)
} else {
raw_command.clone()
};
self.execute_correction_command(
idx,
&node_id,
&command,
&owner_plugin,
&work_dir,
)
.await?;
}
}
}
perspt_core::types::ParseResultState::SemanticallyRejected => {
let rejection_reason = record_opt
.as_ref()
.and_then(|r| r.rejection_reason.clone())
.unwrap_or_default();
let classification = if rejection_reason.contains("All artifacts rejected") {
perspt_core::types::RetryClassification::Retarget
} else if rejection_reason.contains("support") {
perspt_core::types::RetryClassification::SupportFileViolation
} else {
perspt_core::types::RetryClassification::Replan
};
log::warn!(
"Correction bundle semantically rejected ({:?}): {}",
classification,
rejection_reason
);
self.emit_log(format!(
"⚠️ Correction rejected ({:?}) — will retry",
classification
));
if matches!(
classification,
perspt_core::types::RetryClassification::Retarget
) {
if let Some(idx) = self.node_indices.get(&node_id) {
let expected: Vec<String> = self.graph[*idx]
.output_targets
.iter()
.map(|p| p.to_string_lossy().to_string())
.collect();
log::warn!(
"Expected targets: {}, but response targeted wrong files",
expected.join(", ")
);
}
}
}
perspt_core::types::ParseResultState::NoStructuredPayload
| perspt_core::types::ParseResultState::SchemaInvalid => {
log::warn!(
"Correction response parse failed ({}), will retry with schema guidance",
parse_state
);
self.emit_log(format!(
"⚠️ Response parse failed ({}) — will retry",
parse_state
));
}
perspt_core::types::ParseResultState::EmptyBundle => {
log::warn!("Correction produced empty bundle, will retry");
self.emit_log("⚠️ Empty correction bundle — will retry".to_string());
}
}
let correction_cmds = Self::extract_commands_from_correction(&corrected, &owner_plugin);
if !correction_cmds.is_empty() {
self.emit_log(format!(
"📦 Running {} dependency command(s) from correction...",
correction_cmds.len()
));
let work_dir = self.effective_working_dir(idx);
for cmd in &correction_cmds {
self.execute_correction_command(idx, &node_id, cmd, &owner_plugin, &work_dir)
.await?;
}
}
let new_energy = self.step_verify(idx).await?;
Box::pin(self.step_converge(idx, new_energy)).await
}
fn build_correction_prompt(
&self,
node_id: &str,
goal: &str,
energy: &EnergyComponents,
) -> Result<String> {
let diagnostics = &self.context.last_diagnostics;
let owner_plugin = self
.node_indices
.get(node_id)
.map(|idx| self.graph[*idx].owner_plugin.as_str())
.unwrap_or("")
.to_string();
let mut file_sections: Vec<(String, String)> = Vec::new();
let mut seen_paths = std::collections::HashSet::new();
if let Some(idx) = self.node_indices.get(node_id) {
let node_workdir = self.effective_working_dir(*idx);
for target in &self.graph[*idx].output_targets {
let target_str = target.to_string_lossy().to_string();
let full_path = node_workdir.join(target);
if let Ok(content) = std::fs::read_to_string(&full_path) {
if !content.is_empty() && seen_paths.insert(target_str.clone()) {
file_sections.push((target_str, content));
}
}
}
}
if let Some(ref footprint) = self.last_repair_footprint {
let node_workdir = if let Some(idx) = self.node_indices.get(&footprint.node_id) {
self.effective_working_dir(*idx)
} else if let Some(ref path) = self.last_written_file {
path.parent()
.unwrap_or(std::path::Path::new("."))
.to_path_buf()
} else {
self.context.working_dir.clone()
};
for file_path in &footprint.affected_files {
if seen_paths.insert(file_path.clone()) {
let full_path = node_workdir.join(file_path);
if let Ok(content) = std::fs::read_to_string(&full_path) {
if !content.is_empty() {
file_sections.push((file_path.clone(), content));
}
}
}
}
}
let root_manifest_names = ["Cargo.toml", "package.json", "pyproject.toml"];
for manifest_name in &root_manifest_names {
let manifest_path = self.context.working_dir.join(manifest_name);
if manifest_path.exists() {
let rel = manifest_name.to_string();
if seen_paths.insert(rel.clone()) {
if let Ok(content) = std::fs::read_to_string(&manifest_path) {
if !content.is_empty() {
file_sections.push((rel, content));
}
}
}
break;
}
}
if file_sections.is_empty() {
let current_code = if let Some(ref path) = self.last_written_file {
std::fs::read_to_string(path).unwrap_or_default()
} else {
String::new()
};
let file_path = self
.last_written_file
.as_ref()
.map(|p| {
p.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string()
})
.unwrap_or_else(|| "unknown".to_string());
file_sections.push((file_path, current_code));
}
let mut diag_text = String::new();
for (i, diag) in diagnostics.iter().enumerate() {
let fix_direction = self.get_fix_direction(diag);
diag_text.push_str(&format!(
r#"
#### Error {}
- **Location**: Line {}, Column {}
- **Severity**: {}
- **Message**: {}
- **How to fix**: {}
"#,
i + 1,
diag.range.start.line + 1,
diag.range.start.character + 1,
severity_to_str(diag.severity),
diag.message,
fix_direction
));
}
let project_tree = if let Some(idx) = self.node_indices.get(node_id) {
let wd = self.effective_working_dir(*idx);
crate::tools::list_sandbox_files(&wd)
.ok()
.filter(|t| !t.is_empty())
.map(|t| t.join("\n"))
} else {
None
};
let build_output = self
.context
.last_test_output
.as_ref()
.filter(|o| !o.is_empty())
.map(|o| {
if o.len() > 3000 {
o[..3000].to_string()
} else {
o.clone()
}
});
let previous_attempts = self
.node_indices
.get(node_id)
.map(|idx| self.graph[*idx].monitor.attempt_count)
.unwrap_or(0);
let evidence = perspt_core::types::PromptEvidence {
node_goal: Some(goal.to_string()),
existing_file_contents: file_sections,
verifier_diagnostics: if diag_text.is_empty() {
None
} else {
Some(diag_text)
},
energy_v_syn: Some(energy.v_syn),
owner_plugin: Some(owner_plugin),
restriction_map_context: if self.last_formatted_context.is_empty() {
None
} else {
Some(self.last_formatted_context.clone())
},
project_file_tree: project_tree,
build_test_output: build_output,
previous_attempt_count: previous_attempts.saturating_sub(1),
plugin_correction_fragment: {
let registry = perspt_core::plugin::PluginRegistry::new();
self.node_indices
.get(node_id)
.and_then(|idx| {
let p = self.graph[*idx].owner_plugin.as_str();
registry.get(p)
})
.and_then(|plugin| plugin.correction_prompt_fragment())
.map(|s| s.to_string())
},
..Default::default()
};
let compiled = crate::prompt_compiler::compile(
perspt_core::types::PromptIntent::CorrectionRetry,
&evidence,
);
Ok(compiled.text)
}
fn get_fix_direction(&self, diag: &lsp_types::Diagnostic) -> String {
let msg = diag.message.to_lowercase();
if msg.contains("undefined") || msg.contains("unresolved") || msg.contains("not defined") {
if msg.contains("crate") || msg.contains("module") {
"The crate may not be in Cargo.toml. Add it with `cargo add <crate>` in the Commands section, or use `crate::` for intra-crate imports".into()
} else {
"Define the missing variable/function, or import it from the correct module".into()
}
} else if msg.contains("type") && (msg.contains("expected") || msg.contains("incompatible"))
{
"Change the value or add a type conversion to match the expected type".into()
} else if msg.contains("import") || msg.contains("no module named") {
"Add the correct import statement at the top of the file. For Python: use `uv add <pkg>` for external packages; use relative imports (`from . import mod`) inside package modules.".into()
} else if msg.contains("argument") && (msg.contains("missing") || msg.contains("expected"))
{
"Provide all required arguments to the function call".into()
} else if msg.contains("return") && msg.contains("type") {
"Ensure the return statement returns a value of the declared return type".into()
} else if msg.contains("attribute") {
"Check if the object has this attribute, or fix the object type".into()
} else if msg.contains("syntax") {
"Fix the syntax error - check for missing colons, parentheses, or indentation".into()
} else if msg.contains("indentation") {
"Fix the indentation to match Python's indentation rules (4 spaces per level)".into()
} else if msg.contains("parameter") {
"Check the function signature and update parameter types/names".into()
} else {
format!("Review and fix: {}", diag.message)
}
}
async fn call_llm_for_correction(&mut self, prompt: &str) -> Result<String> {
let verifier_prompt = format!(
"{}{}",
crate::prompt_compiler::VERIFIER_ANALYSIS_PREAMBLE,
prompt
);
log::debug!(
"Stage 1: Sending analysis to verifier model: {}",
self.verifier_model
);
let guidance = self
.call_llm_with_logging(&self.verifier_model.clone(), &verifier_prompt, None)
.await
.unwrap_or_else(|e| {
log::warn!(
"Verifier analysis failed ({}), falling back to actuator-only correction",
e
);
String::new()
});
if self.budget.any_exhausted() {
log::warn!("Budget exhausted after verifier analysis; skipping actuator correction");
self.emit_log(
"💰 Budget exhausted after verifier analysis — skipping correction generation"
.to_string(),
);
return Ok(String::new());
}
let actuator_prompt = if guidance.is_empty() {
prompt.to_string()
} else {
format!(
"{}\n\n## Verifier Analysis\n{}\n\nApply the above analysis to produce corrected code.",
prompt, guidance
)
};
log::debug!(
"Stage 2: Sending correction to actuator model: {}",
self.actuator_model
);
let response = self
.call_llm_with_logging(&self.actuator_model.clone(), &actuator_prompt, None)
.await?;
log::debug!("Received correction response with {} chars", response.len());
Ok(response)
}
pub(super) async fn call_llm_with_logging(
&mut self,
model: &str,
prompt: &str,
node_id: Option<&str>,
) -> Result<String> {
if self.budget.any_exhausted() {
anyhow::bail!(
"Budget exhausted before LLM call (steps {}/{:?}, revisions {}/{:?}, cost ${:.2} / {:?})",
self.budget.steps_used,
self.budget.max_steps,
self.budget.revisions_used,
self.budget.max_revisions,
self.budget.cost_used_usd,
self.budget.max_cost_usd
);
}
let start = Instant::now();
let llm_response = self
.provider
.generate_response_simple(model, prompt)
.await?;
let latency_ms = start.elapsed().as_millis() as i32;
let tokens_in = llm_response.tokens_in.unwrap_or(0);
let tokens_out = llm_response.tokens_out.unwrap_or(0);
if !self.context.log_llm {
if let Err(e) = self
.ledger
.record_llm_usage(model, node_id, latency_ms, tokens_in, tokens_out)
{
log::warn!("Failed to persist LLM usage metrics: {}", e);
}
}
let estimated_cost = (tokens_in as f64 * 0.00001) + (tokens_out as f64 * 0.00003);
self.budget.record_cost(estimated_cost);
if self.context.log_llm {
if let Err(e) = self.ledger.record_llm_request(
model,
prompt,
&llm_response.text,
node_id,
latency_ms,
tokens_in,
tokens_out,
) {
log::warn!("Failed to persist full LLM request: {}", e);
}
}
log::debug!(
"LLM call: model={}, latency={}ms, tokens_in={}, tokens_out={}, est_cost=${:.4}",
model,
latency_ms,
tokens_in,
tokens_out,
estimated_cost,
);
Ok(llm_response.text)
}
async fn execute_correction_command(
&mut self,
idx: NodeIndex,
node_id: &str,
command: &str,
owner_plugin: &str,
work_dir: &std::path::Path,
) -> Result<()> {
let registry = perspt_core::plugin::PluginRegistry::new();
let decision = registry
.get(owner_plugin)
.map(|plugin| plugin.dependency_command_policy(command))
.unwrap_or(perspt_core::types::CommandPolicyDecision::Allow);
if decision == perspt_core::types::CommandPolicyDecision::Deny {
log::warn!(
"Correction command '{}' denied by plugin policy for '{}'",
command,
owner_plugin
);
self.emit_log(format!("⏭️ Command denied by plugin policy: {}", command));
return Ok(());
}
let approval_result = self
.await_approval_for_node(
perspt_core::ActionType::Command {
command: command.to_string(),
},
format!("Execute correction command: {}", command),
None,
Some(node_id),
)
.await;
if !matches!(
approval_result,
ApprovalResult::Approved | ApprovalResult::ApprovedWithEdit(_)
) {
self.emit_log(format!("⏭️ Correction command skipped: {}", command));
return Ok(());
}
let mut args = HashMap::new();
args.insert("command".to_string(), command.to_string());
args.insert(
"working_dir".to_string(),
work_dir.to_string_lossy().to_string(),
);
let result = self
.tools
.execute(&ToolCall {
name: "run_command".to_string(),
arguments: args,
})
.await;
if result.success {
self.emit_log(format!("✅ {}", command));
return Ok(());
}
let error = result.error.unwrap_or_else(|| result.output.clone());
log::warn!("Correction command failed: {} — {}", command, error);
self.last_tool_failure = Some(format!(
"Correction command '{}' failed: {}",
command, error
));
self.emit_log(format!("❌ Command failed: {} — {}", command, error));
self.graph[idx]
.monitor
.record_failure(perspt_core::types::ErrorType::ToolFailure);
Ok(())
}
pub(super) async fn call_llm_with_tier_fallback<F>(
&mut self,
primary_model: &str,
prompt: &str,
node_id: Option<&str>,
tier: ModelTier,
validator: F,
) -> Result<String>
where
F: Fn(&str) -> std::result::Result<(), String>,
{
let response = self
.call_llm_with_logging(primary_model, prompt, node_id)
.await?;
if validator(&response).is_ok() {
return Ok(response);
}
let validation_err = validator(&response).unwrap_err();
log::warn!(
"Primary model '{}' failed structured-output contract for {:?}: {}",
primary_model,
tier,
validation_err
);
let fallback_model = match tier {
ModelTier::Architect => self.architect_fallback_model.clone(),
ModelTier::Actuator => self.actuator_fallback_model.clone(),
ModelTier::Verifier => self.verifier_fallback_model.clone(),
ModelTier::Speculator => self.speculator_fallback_model.clone(),
};
let fallback_model = fallback_model
.as_deref()
.unwrap_or(primary_model)
.to_string();
log::info!(
"Falling back to model '{}' for {:?} tier",
fallback_model,
tier
);
self.emit_event_ref(perspt_core::AgentEvent::ModelFallback {
node_id: node_id.unwrap_or("").to_string(),
tier: format!("{:?}", tier),
primary_model: primary_model.to_string(),
fallback_model: fallback_model.to_string(),
reason: validation_err,
});
self.call_llm_with_logging(&fallback_model, prompt, node_id)
.await
}
pub(super) fn emit_event_ref(&self, event: perspt_core::AgentEvent) {
if let Some(sender) = &self.event_sender {
let _ = sender.send(event);
}
}
}