use super::verification::severity_to_str;
use super::*;
impl SRBNOrchestrator {
pub(super) async fn step_converge(
&mut self,
idx: NodeIndex,
energy: EnergyComponents,
) -> Result<bool> {
log::info!("Step 5: Convergence check");
let total = {
let node = &self.graph[idx];
energy.total(&node.contract)
};
let node = &mut self.graph[idx];
node.monitor.record_energy(total);
let node_id = node.node_id.clone();
let goal = node.goal.clone();
let epsilon = node.monitor.stability_epsilon;
let attempt_count = node.monitor.attempt_count;
let stable = node.monitor.stable;
let should_escalate = node.monitor.should_escalate();
if stable {
if let Some(ref vr) = self.last_verification_result {
if vr.has_degraded_stages() {
let reasons = vr.degraded_stage_reasons();
log::warn!(
"Node {} energy is below ε but verification was degraded: {:?}",
node_id,
reasons
);
self.emit_log(format!(
"⚠️ V(x)={:.2} < ε but stability unconfirmed — degraded sensors: {}",
total,
reasons.join(", ")
));
self.emit_event(perspt_core::AgentEvent::DegradedVerification {
node_id: node_id.clone(),
degraded_stages: reasons,
stability_blocked: true,
});
} else {
log::info!(
"Node {} is stable (V(x)={:.2} < ε={:.2})",
node_id,
total,
epsilon
);
self.emit_log(format!("✅ Stable! V(x)={:.2} < ε={:.2}", total, epsilon));
return Ok(true);
}
} else {
log::info!(
"Node {} is stable (V(x)={:.2} < ε={:.2})",
node_id,
total,
epsilon
);
self.emit_log(format!("✅ Stable! V(x)={:.2} < ε={:.2}", total, epsilon));
return Ok(true);
}
}
if should_escalate {
log::warn!(
"Node {} failed to converge after {} attempts (V(x)={:.2})",
node_id,
attempt_count,
total
);
self.emit_log(format!(
"⚠️ Escalating: failed to converge after {} attempts",
attempt_count
));
return Ok(false);
}
self.graph[idx].state = NodeState::Retry;
self.emit_event(perspt_core::AgentEvent::TaskStatusChanged {
node_id: self.graph[idx].node_id.clone(),
status: perspt_core::NodeStatus::Retrying,
});
log::info!(
"V(x)={:.2} > ε={:.2}, regenerating with feedback (attempt {})",
total,
epsilon,
attempt_count
);
self.emit_log(format!(
"🔄 V(x)={:.2} > ε={:.2}, sending errors to LLM (attempt {})",
total, epsilon, attempt_count
));
let correction_prompt = self.build_correction_prompt(&node_id, &goal, &energy)?;
log::info!(
"--- CORRECTION PROMPT ---\n{}\n-------------------------",
correction_prompt
);
self.emit_log("📤 Sending correction prompt to LLM...".to_string());
let corrected = self.call_llm_for_correction(&correction_prompt).await?;
let node_class = self.graph[idx].node_class;
let attempt = self.graph[idx].monitor.attempt_count;
let diagnosis = self.context.last_diagnostics.clone();
if let Some(bundle) = self.parse_artifact_bundle(&corrected) {
log::info!(
"Applying correction bundle: {} artifact(s), {} command(s)",
bundle.artifacts.len(),
bundle.commands.len()
);
self.emit_log(format!(
"🔧 Applying correction bundle ({} artifact(s))",
bundle.artifacts.len()
));
self.apply_bundle_transactionally(&bundle, &node_id, node_class)
.await?;
self.last_tool_failure = None;
let node_workdir = self.effective_working_dir(idx);
if let Some(first_path) = bundle.artifacts.first().map(|a| a.path().to_string()) {
self.last_written_file = Some(node_workdir.join(&first_path));
}
self.file_version += 1;
self.last_applied_bundle = Some(bundle.clone());
let diagnosis_str = format!("{:?}", diagnosis);
let footprint = perspt_core::RepairFootprint::new(
&self.context.session_id,
&node_id,
"initial", attempt as u32,
&bundle,
&diagnosis_str,
);
self.last_repair_footprint = Some(footprint.clone());
if let Err(e) = self.ledger.record_repair_footprint(&footprint) {
log::warn!("Failed to record repair footprint: {}", e);
}
if !bundle.commands.is_empty() {
self.emit_log(format!(
"🔧 Executing {} bundle command(s)...",
bundle.commands.len()
));
let work_dir = self.effective_working_dir(idx);
let is_python = self.graph[idx].owner_plugin == "python";
for raw_command in &bundle.commands {
let command = if is_python {
Self::normalize_command_to_uv(raw_command)
} else {
raw_command.clone()
};
log::info!("Running correction command: {}", command);
let parts: Vec<&str> = command.split_whitespace().collect();
if parts.is_empty() {
continue;
}
let output = tokio::process::Command::new(parts[0])
.args(&parts[1..])
.current_dir(&work_dir)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.output()
.await;
match output {
Ok(o) if o.status.success() => {
self.emit_log(format!("✅ {}", command));
}
Ok(o) => {
let stderr = String::from_utf8_lossy(&o.stderr);
log::warn!("Command failed: {} — {}", command, stderr);
}
Err(e) => {
log::warn!("Failed to run command: {} — {}", command, e);
}
}
}
}
} else if let Some((filename, new_code, is_diff)) =
self.extract_code_from_response(&corrected)
{
let node_workdir = self.effective_working_dir(idx);
let full_path = node_workdir.join(&filename);
let mut args = HashMap::new();
args.insert("path".to_string(), filename.clone());
let call = if is_diff {
args.insert("diff".to_string(), new_code.clone());
ToolCall {
name: "apply_diff".to_string(),
arguments: args,
}
} else {
args.insert("content".to_string(), new_code.clone());
ToolCall {
name: "write_file".to_string(),
arguments: args,
}
};
let result = self.tools.execute(&call).await;
if result.success {
log::info!("✓ Applied correction to: {}", filename);
self.emit_log(format!("📝 Applied correction to: {}", filename));
self.last_tool_failure = None;
self.last_written_file = Some(full_path.clone());
self.file_version += 1;
let lsp_key = self.lsp_key_for_file(&full_path.to_string_lossy());
if let Some(client) = lsp_key.and_then(|k| self.lsp_clients.get_mut(&k)) {
if let Ok(content) = std::fs::read_to_string(&full_path) {
let _ = client
.did_change(&full_path, &content, self.file_version)
.await;
}
}
} else {
self.last_tool_failure = result.error;
}
}
let correction_cmds = Self::extract_commands_from_correction(&corrected);
if !correction_cmds.is_empty() {
self.emit_log(format!(
"📦 Running {} dependency command(s) from correction...",
correction_cmds.len()
));
let work_dir = self.effective_working_dir(idx);
for cmd in &correction_cmds {
log::info!("Running correction command: {}", cmd);
let parts: Vec<&str> = cmd.split_whitespace().collect();
if parts.is_empty() {
continue;
}
let output = tokio::process::Command::new(parts[0])
.args(&parts[1..])
.current_dir(&work_dir)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.output()
.await;
match output {
Ok(o) if o.status.success() => {
self.emit_log(format!("✅ {}", cmd));
}
Ok(o) => {
let stderr = String::from_utf8_lossy(&o.stderr);
log::warn!("Command failed: {} — {}", cmd, stderr);
}
Err(e) => {
log::warn!("Failed to run command: {} — {}", cmd, e);
}
}
}
}
let new_energy = self.step_verify(idx).await?;
Box::pin(self.step_converge(idx, new_energy)).await
}
fn build_correction_prompt(
&self,
node_id: &str,
goal: &str,
energy: &EnergyComponents,
) -> Result<String> {
let diagnostics = &self.context.last_diagnostics;
let owner_plugin = self
.node_indices
.get(node_id)
.map(|idx| self.graph[*idx].owner_plugin.as_str())
.unwrap_or("");
let mut file_sections = Vec::new();
let mut seen_paths = std::collections::HashSet::new();
if let Some(idx) = self.node_indices.get(node_id) {
let node_workdir = self.effective_working_dir(*idx);
for target in &self.graph[*idx].output_targets {
let target_str = target.to_string_lossy().to_string();
let full_path = node_workdir.join(target);
if let Ok(content) = std::fs::read_to_string(&full_path) {
if !content.is_empty() && seen_paths.insert(target_str.clone()) {
file_sections.push((target_str, content));
}
}
}
}
if let Some(ref footprint) = self.last_repair_footprint {
let node_workdir = if let Some(idx) = self.node_indices.get(&footprint.node_id) {
self.effective_working_dir(*idx)
} else if let Some(ref path) = self.last_written_file {
path.parent()
.unwrap_or(std::path::Path::new("."))
.to_path_buf()
} else {
self.context.working_dir.clone()
};
for file_path in &footprint.affected_files {
if seen_paths.insert(file_path.clone()) {
let full_path = node_workdir.join(file_path);
if let Ok(content) = std::fs::read_to_string(&full_path) {
if !content.is_empty() {
file_sections.push((file_path.clone(), content));
}
}
}
}
}
let root_manifest_names = ["Cargo.toml", "package.json", "pyproject.toml"];
for manifest_name in &root_manifest_names {
let manifest_path = self.context.working_dir.join(manifest_name);
if manifest_path.exists() {
let rel = manifest_name.to_string();
if seen_paths.insert(rel.clone()) {
if let Ok(content) = std::fs::read_to_string(&manifest_path) {
if !content.is_empty() {
file_sections.push((rel, content));
}
}
}
break; }
}
if file_sections.is_empty() {
let current_code = if let Some(ref path) = self.last_written_file {
std::fs::read_to_string(path).unwrap_or_default()
} else {
String::new()
};
let file_path = self
.last_written_file
.as_ref()
.map(|p| {
p.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string()
})
.unwrap_or_else(|| "unknown".to_string());
file_sections.push((file_path, current_code));
}
let primary_path = &file_sections[0].0;
let lang = std::path::Path::new(primary_path)
.extension()
.and_then(|e| e.to_str())
.map(|ext| match ext {
"py" => "python",
"rs" => "rust",
"ts" | "tsx" => "typescript",
"js" | "jsx" => "javascript",
"go" => "go",
"java" => "java",
"rb" => "ruby",
"c" | "h" => "c",
"cpp" | "cc" | "cxx" | "hpp" => "cpp",
"cs" => "csharp",
other => other,
})
.unwrap_or("text");
let mut prompt = format!(
"## Code Correction Required\n\n\
The code you generated has {} error(s) detected by the language toolchain.\n\
Your task is to fix ALL errors and return the complete corrected file(s).\n\n\
### Original Goal\n{}\n\n\
### Current Code (with errors)\n",
diagnostics.len(),
goal,
);
for (path, content) in &file_sections {
prompt.push_str(&format!(
"File: {}\n```{}\n{}\n```\n\n",
path, lang, content
));
}
prompt.push_str(&format!(
"### Detected Errors (V_syn = {:.2})\n",
energy.v_syn
));
for (i, diag) in diagnostics.iter().enumerate() {
let fix_direction = self.get_fix_direction(diag);
prompt.push_str(&format!(
r#"
#### Error {}
- **Location**: Line {}, Column {}
- **Severity**: {}
- **Message**: {}
- **How to fix**: {}
"#,
i + 1,
diag.range.start.line + 1,
diag.range.start.character + 1,
severity_to_str(diag.severity),
diag.message,
fix_direction
));
}
if !self.last_formatted_context.is_empty() {
prompt.push_str(&format!(
"\n### Restriction Map Context\n\n{}\n",
self.last_formatted_context
));
}
if let Some(idx) = self.node_indices.get(node_id) {
let wd = self.effective_working_dir(*idx);
if let Ok(tree) = crate::tools::list_sandbox_files(&wd) {
if !tree.is_empty() {
prompt.push_str(&format!(
"\n### Current Project Tree\n\n```\n{}\n```\n",
tree.join("\n")
));
}
}
}
if let Some(ref test_output) = self.context.last_test_output {
if !test_output.is_empty() {
let truncated = if test_output.len() > 3000 {
&test_output[..3000]
} else {
test_output.as_str()
};
prompt.push_str(&format!(
"\n### Build / Test Output\nThe following is the raw output from the build toolchain (e.g. `cargo check` / `cargo build`). \
Use this to identify missing dependencies, unresolved imports, or type errors:\n```\n{}\n```\n",
truncated
));
}
}
let multi_file = file_sections.len() > 1;
let file_instruction = if multi_file {
"Return ALL affected files as a JSON artifact bundle"
} else {
"Return the COMPLETE corrected file, not just snippets"
};
let commands_example = match owner_plugin {
"rust" => "cargo add thiserror\ncargo add clap --features derive",
"python" => "uv add httpx\nuv add --dev pytest",
"javascript" => "npm install express\nnpm install --save-dev jest",
_ => "cargo add thiserror\nuv add httpx",
};
prompt.push_str(&format!(
r#"
### Fix Requirements
1. Fix ALL errors listed above - do not leave any unfixed
2. Maintain the original functionality and goal
3. Follow {} language conventions and idioms
4. Import any missing modules or dependencies
5. {}
6. If errors mention missing crates/packages (e.g. "can't find crate", "unresolved import" for an external dependency, "ModuleNotFoundError", "No module named"), list the required install commands
### Output Format
Provide the complete corrected file(s) followed by any dependency commands needed:
File: [same filename]
```{}
[complete corrected code]
```
Commands: [optional, one per line]
```
{}
```
"#,
lang, file_instruction, lang, commands_example
));
Ok(prompt)
}
fn get_fix_direction(&self, diag: &lsp_types::Diagnostic) -> String {
let msg = diag.message.to_lowercase();
if msg.contains("undefined") || msg.contains("unresolved") || msg.contains("not defined") {
if msg.contains("crate") || msg.contains("module") {
"The crate may not be in Cargo.toml. Add it with `cargo add <crate>` in the Commands section, or use `crate::` for intra-crate imports".into()
} else {
"Define the missing variable/function, or import it from the correct module".into()
}
} else if msg.contains("type") && (msg.contains("expected") || msg.contains("incompatible"))
{
"Change the value or add a type conversion to match the expected type".into()
} else if msg.contains("import") || msg.contains("no module named") {
"Add the correct import statement at the top of the file. For Python: use `uv add <pkg>` for external packages; use relative imports (`from . import mod`) inside package modules.".into()
} else if msg.contains("argument") && (msg.contains("missing") || msg.contains("expected"))
{
"Provide all required arguments to the function call".into()
} else if msg.contains("return") && msg.contains("type") {
"Ensure the return statement returns a value of the declared return type".into()
} else if msg.contains("attribute") {
"Check if the object has this attribute, or fix the object type".into()
} else if msg.contains("syntax") {
"Fix the syntax error - check for missing colons, parentheses, or indentation".into()
} else if msg.contains("indentation") {
"Fix the indentation to match Python's indentation rules (4 spaces per level)".into()
} else if msg.contains("parameter") {
"Check the function signature and update parameter types/names".into()
} else {
format!("Review and fix: {}", diag.message)
}
}
async fn call_llm_for_correction(&mut self, prompt: &str) -> Result<String> {
let verifier_prompt = format!("{}{}", crate::prompts::VERIFIER_ANALYSIS_PREAMBLE, prompt);
log::debug!(
"Stage 1: Sending analysis to verifier model: {}",
self.verifier_model
);
let guidance = self
.call_llm_with_logging(&self.verifier_model.clone(), &verifier_prompt, None)
.await
.unwrap_or_else(|e| {
log::warn!(
"Verifier analysis failed ({}), falling back to actuator-only correction",
e
);
String::new()
});
let actuator_prompt = if guidance.is_empty() {
prompt.to_string()
} else {
format!(
"{}\n\n## Verifier Analysis\n{}\n\nApply the above analysis to produce corrected code.",
prompt, guidance
)
};
log::debug!(
"Stage 2: Sending correction to actuator model: {}",
self.actuator_model
);
let response = self
.call_llm_with_logging(&self.actuator_model.clone(), &actuator_prompt, None)
.await?;
log::debug!("Received correction response with {} chars", response.len());
Ok(response)
}
pub(super) async fn call_llm_with_logging(
&mut self,
model: &str,
prompt: &str,
node_id: Option<&str>,
) -> Result<String> {
let start = Instant::now();
let llm_response = self
.provider
.generate_response_simple(model, prompt)
.await?;
let latency_ms = start.elapsed().as_millis() as i32;
let tokens_in = llm_response.tokens_in.unwrap_or(0);
let tokens_out = llm_response.tokens_out.unwrap_or(0);
if let Err(e) = self
.ledger
.record_llm_usage(model, node_id, latency_ms, tokens_in, tokens_out)
{
log::warn!("Failed to persist LLM usage metrics: {}", e);
}
let estimated_cost = (tokens_in as f64 * 0.00001) + (tokens_out as f64 * 0.00003);
self.budget.record_cost(estimated_cost);
if self.context.log_llm {
if let Err(e) = self.ledger.record_llm_request(
model,
prompt,
&llm_response.text,
node_id,
latency_ms,
tokens_in,
tokens_out,
) {
log::warn!("Failed to persist full LLM request: {}", e);
}
}
log::debug!(
"LLM call: model={}, latency={}ms, tokens_in={}, tokens_out={}, est_cost=${:.4}",
model,
latency_ms,
tokens_in,
tokens_out,
estimated_cost,
);
Ok(llm_response.text)
}
pub(super) async fn call_llm_with_tier_fallback<F>(
&mut self,
primary_model: &str,
prompt: &str,
node_id: Option<&str>,
tier: ModelTier,
validator: F,
) -> Result<String>
where
F: Fn(&str) -> std::result::Result<(), String>,
{
let response = self
.call_llm_with_logging(primary_model, prompt, node_id)
.await?;
if validator(&response).is_ok() {
return Ok(response);
}
let validation_err = validator(&response).unwrap_err();
log::warn!(
"Primary model '{}' failed structured-output contract for {:?}: {}",
primary_model,
tier,
validation_err
);
let fallback_model = match tier {
ModelTier::Architect => self.architect_fallback_model.clone(),
ModelTier::Actuator => self.actuator_fallback_model.clone(),
ModelTier::Verifier => self.verifier_fallback_model.clone(),
ModelTier::Speculator => self.speculator_fallback_model.clone(),
};
let fallback_model = fallback_model
.as_deref()
.unwrap_or(primary_model)
.to_string();
log::info!(
"Falling back to model '{}' for {:?} tier",
fallback_model,
tier
);
self.emit_event_ref(perspt_core::AgentEvent::ModelFallback {
node_id: node_id.unwrap_or("").to_string(),
tier: format!("{:?}", tier),
primary_model: primary_model.to_string(),
fallback_model: fallback_model.to_string(),
reason: validation_err,
});
self.call_llm_with_logging(&fallback_model, prompt, node_id)
.await
}
pub(super) fn emit_event_ref(&self, event: perspt_core::AgentEvent) {
if let Some(sender) = &self.event_sender {
let _ = sender.send(event);
}
}
}