use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::agent::r#loop::AgentEvent;
use crate::agent::subagent::SubagentTask;
use crate::agent::swarm::knowledge::BlackboardKind;
use super::*;
impl SwarmCoordinator {
pub async fn run_plan_review(
&self,
user_msg: &str,
system_prompt: &str,
event_tx: &mpsc::UnboundedSender<AgentEvent>,
cancel: &CancellationToken,
) -> Option<String> {
let planner_task = SubagentTask {
id: "planner".to_string(),
prompt: format!(
"[PLANNING PHASE] Analyze the codebase thoroughly and produce a detailed \
implementation plan for: {user_msg}"
),
agent_name: None,
available_agents: self.available_agents.clone(),
model_override: self.hive_config.coordinator_model.clone(),
working_dir_override: None,
outer_event_tx: None,
cancel_token: None,
iteration_budget: None,
instruction_rx: None,
};
let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
agent_id: "planner".to_string(),
agent_name: "planner".to_string(),
task_preview: truncate(user_msg, 80),
});
let plan_result = crate::agent::subagent::spawn(
planner_task,
self.client.clone(),
self.config.clone(),
system_prompt.to_string(),
self.working_dir.clone(),
self.lsp_manager.clone(),
None,
)
.await;
if cancel.is_cancelled() || !plan_result.success {
return None;
}
let mut current_plan = plan_result.response;
if self.hive_config.mode.has_consensus() {
self.knowledge
.post_to_blackboard(
"plan:main",
¤t_plan,
"planner",
BlackboardKind::Proposal,
)
.await;
}
let review_aspects = [
"correctness and completeness",
"risks and conflicts",
"performance",
];
let max_reviewers = (self.hive_config.max_agents - 1).min(review_aspects.len());
let make_reviewer_tasks = |plan: &str, suffix: &str| -> Vec<SubagentTask> {
review_aspects[..max_reviewers]
.iter()
.enumerate()
.map(|(i, aspect)| SubagentTask {
id: format!("reviewer-{i}{suffix}"),
prompt: format!(
"[REVIEW PHASE] Review this implementation plan for {aspect}.\n\n\
Mark issues as CRITICAL (must fix) or SUGGESTION (optional).\n\n\
Plan:\n{plan}"
),
agent_name: None,
available_agents: self.available_agents.clone(),
model_override: self.hive_config.worker_model.clone(),
working_dir_override: None,
outer_event_tx: None,
cancel_token: None,
iteration_budget: None,
instruction_rx: None,
})
.collect()
};
let reviewer_tasks = make_reviewer_tasks(¤t_plan, "");
for task in &reviewer_tasks {
let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
agent_id: task.id.clone(),
agent_name: task.id.clone(),
task_preview: "Reviewing plan".to_string(),
});
}
let mut reviews = crate::agent::subagent::spawn_parallel(
reviewer_tasks,
self.client.clone(),
self.config.clone(),
system_prompt.to_string(),
self.working_dir.clone(),
self.lsp_manager.clone(),
None,
)
.await;
if self.hive_config.mode.has_consensus() {
for review in &reviews {
let any_critical = review.response.to_uppercase().contains("CRITICAL");
self.knowledge
.vote_on_proposal("plan:main", &review.id, !any_critical)
.await;
}
let total = reviews.len() + 1;
if let Some(result) = self.knowledge.check_consensus("plan:main", total).await {
tracing::debug!(proposal = %result.key, total_agents = result.total_agents, approved = result.approved, "Flock consensus result");
if result.approved {
self.knowledge.finalize_proposal("plan:main").await;
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — initial plan approved ({}/{} votes for)",
self.mode_label(),
result.votes_for,
total
),
});
} else {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — initial plan rejected ({}/{} against) — revising…",
self.mode_label(),
result.votes_against,
total
),
});
}
}
}
let mut has_critical = reviews
.iter()
.any(|r| r.response.to_uppercase().contains("CRITICAL"));
const MAX_REVISIONS: u32 = 3;
let mut revision = 0u32;
while has_critical && revision < MAX_REVISIONS && self.hive_config.require_consensus {
if cancel.is_cancelled() {
return None;
}
revision += 1;
let critical_feedback: String = reviews
.iter()
.filter(|r| r.response.to_uppercase().contains("CRITICAL"))
.map(|r| format!("Reviewer ({}): {}", r.id, truncate(&r.response, 800)))
.collect::<Vec<_>>()
.join("\n\n---\n\n");
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — plan revision {revision}/{MAX_REVISIONS} (addressing critical issues)…",
self.mode_label()
),
});
let revision_task = SubagentTask {
id: format!("planner-r{revision}"),
prompt: format!(
"[REVISION {revision}/{MAX_REVISIONS}] Your implementation plan has CRITICAL issues \
that must be fixed before execution.\n\n\
CRITICAL FEEDBACK:\n{critical_feedback}\n\n\
Produce a revised plan that addresses ALL critical issues.\n\
Preserve correct parts of the original plan.\n\n\
Original task: {user_msg}\n\n\
Previous plan:\n{current_plan}"
),
agent_name: None,
available_agents: self.available_agents.clone(),
model_override: self.hive_config.coordinator_model.clone(),
working_dir_override: None,
outer_event_tx: None,
cancel_token: None,
iteration_budget: None,
instruction_rx: None,
};
let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
agent_id: format!("planner-r{revision}"),
agent_name: "planner".to_string(),
task_preview: format!("Revising plan (attempt {revision}/{MAX_REVISIONS})"),
});
let revised = crate::agent::subagent::spawn(
revision_task,
self.client.clone(),
self.config.clone(),
system_prompt.to_string(),
self.working_dir.clone(),
self.lsp_manager.clone(),
None,
)
.await;
if !revised.success {
break;
}
current_plan = revised.response;
if self.hive_config.mode.has_consensus() {
let rev_key = format!("plan:revision-{revision}");
self.knowledge
.post_to_blackboard(
&rev_key,
¤t_plan,
&format!("planner-r{revision}"),
BlackboardKind::Proposal,
)
.await;
}
let re_review_tasks = make_reviewer_tasks(¤t_plan, &format!("-r{revision}"));
for task in &re_review_tasks {
let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
agent_id: task.id.clone(),
agent_name: task.id.clone(),
task_preview: format!("Re-reviewing revised plan (r{revision})"),
});
}
reviews = crate::agent::subagent::spawn_parallel(
re_review_tasks,
self.client.clone(),
self.config.clone(),
system_prompt.to_string(),
self.working_dir.clone(),
self.lsp_manager.clone(),
None,
)
.await;
if self.hive_config.mode.has_consensus() {
let rev_key = format!("plan:revision-{revision}");
for review in &reviews {
let any_critical = review.response.to_uppercase().contains("CRITICAL");
self.knowledge
.vote_on_proposal(&rev_key, &review.id, !any_critical)
.await;
}
let total = reviews.len() + 1;
if let Some(result) = self.knowledge.check_consensus(&rev_key, total).await {
if result.approved {
self.knowledge.finalize_proposal(&rev_key).await;
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — revised plan approved (r{revision}, {}/{} for)",
self.mode_label(),
result.votes_for,
total
),
});
} else {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — revised plan still has issues (r{revision}, {}/{} against)",
self.mode_label(),
result.votes_against,
total
),
});
}
}
}
has_critical = reviews
.iter()
.any(|r| r.response.to_uppercase().contains("CRITICAL"));
}
if has_critical {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — plan has unresolved critical issues after {revision} revision(s); proceeding with best effort",
self.mode_label()
),
});
} else {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — plan finalized, ready for execution",
self.mode_label()
),
});
}
Some(current_plan)
}
}