use std::path::Path;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use super::{
CheckpointTask, ExecutionCheckpoint, SwarmCoordinator, clear_checkpoint, conflict,
save_checkpoint, scheduler::truncate,
};
use crate::agent::context::ConversationContext;
use crate::agent::r#loop::AgentEvent;
use crate::api::Content;
use crate::api::models::Message;
impl SwarmCoordinator {
pub(super) async fn delegate_single_agent(
&self,
context: ConversationContext,
user_msg: String,
event_tx: mpsc::UnboundedSender<AgentEvent>,
cancel: CancellationToken,
) {
crate::agent::r#loop::run_with_mode(crate::agent::r#loop::AgentParams {
client: self.client_for_preferred_agent(),
config: self.config_for_preferred_agent(),
context,
user_msg,
working_dir: self.working_dir.clone(),
event_tx,
cancel,
lsp_manager: self.lsp_manager.clone(),
trust_level: crate::trust::TrustLevel::Full,
approval_gate: self.make_approval_gate(),
images: Vec::new(),
})
.await;
}
pub(super) async fn try_cli_passthrough(
&self,
context: ConversationContext,
user_msg: String,
event_tx: mpsc::UnboundedSender<AgentEvent>,
cancel: CancellationToken,
) -> bool {
if self.config.cli.is_none() {
return false;
}
let _ = event_tx.send(AgentEvent::SwarmResolvedToSingle {
agent_label: self.single_agent_label(),
});
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — CLI provider detected, running as single agent...",
self.mode_label()
),
});
self.delegate_single_agent(context, user_msg, event_tx, cancel)
.await;
true
}
pub(super) async fn try_trivially_sequential(
&self,
context: ConversationContext,
user_msg: String,
event_tx: mpsc::UnboundedSender<AgentEvent>,
cancel: CancellationToken,
) -> Result<
(),
(
ConversationContext,
String,
mpsc::UnboundedSender<AgentEvent>,
CancellationToken,
),
> {
if !super::is_trivially_sequential(&user_msg) {
return Err((context, user_msg, event_tx, cancel));
}
let Some(name) = self.preferred_agent.clone() else {
return Err((context, user_msg, event_tx, cancel));
};
let _ = event_tx.send(AgentEvent::SwarmResolvedToSingle {
agent_label: name.clone(),
});
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!("{} — delegating to {} agent...", self.mode_label(), name),
});
let context = self.apply_preferred_agent_prompt(context, &name);
self.delegate_single_agent(context, user_msg, event_tx, cancel)
.await;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn finalize_swarm_results(
&self,
context: ConversationContext,
results: Vec<crate::agent::subagent::SubagentResult>,
mut checkpoint: ExecutionCheckpoint,
kb_path: &Path,
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) {
let mut conflicts = conflict::detect_conflicts(&self.knowledge).await;
self.resolve_conflicts(&mut conflicts, event_tx).await;
let verification = self.verify_merge(event_tx).await;
if let Some(ref vr) = verification
&& !vr.passed
{
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — ⚠ verification failed: {}",
self.mode_label(),
truncate(&vr.output, 200)
),
});
}
let merged = self.merge_results(&results, &conflicts, verification.as_ref());
let total_tool_calls: u32 = results.iter().map(|r| r.tool_calls).sum();
checkpoint.mark_finished();
let _ = clear_checkpoint(&self.checkpoint_path());
if let Some(parent) = kb_path.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
if let Err(e) = self.knowledge.save_to_file(kb_path).await {
tracing::debug!("Could not persist knowledge: {e}");
}
let mut merged_context = context;
merged_context.push(Message {
role: "assistant".to_string(),
content: Some(Content::text(merged.clone())),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
});
let _ = event_tx.send(AgentEvent::SwarmDone {
context: merged_context,
merged_response: merged,
agent_count: results.len(),
total_tool_calls,
conflicts_resolved: conflicts.len(),
});
}
pub(super) fn announce_tasks(
&self,
tasks: &[super::SwarmTask],
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) {
for task in tasks {
let _ = event_tx.send(AgentEvent::SwarmAgentStarted {
agent_id: task.id.clone(),
agent_name: task.role.clone(),
task_preview: truncate(&task.prompt, 80),
});
}
}
pub(super) fn build_checkpoint(
user_msg: &str,
tasks: &[super::SwarmTask],
) -> ExecutionCheckpoint {
ExecutionCheckpoint::new(
user_msg.to_string(),
tasks
.iter()
.map(|t| CheckpointTask {
id: t.id.clone(),
prompt: t.prompt.clone(),
role: t.role.clone(),
agent_name: t.agent_name.clone(),
dependencies: t.dependencies.clone(),
target_files: t.target_files.clone(),
})
.collect(),
)
}
pub(super) fn send_per_agent_done(
&self,
results: &[crate::agent::subagent::SubagentResult],
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) {
for result in results {
let _ = event_tx.send(AgentEvent::SwarmAgentDone {
agent_id: result.id.clone(),
agent_name: result.id.clone(),
success: result.success,
modified_files: result.modified_files.clone(),
tool_calls: result.tool_calls,
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
response: result.response.clone(),
});
}
}
pub(super) async fn run_plan_review_execute_strategy(
&self,
context: ConversationContext,
user_msg: String,
system_prompt: String,
kb_path: std::path::PathBuf,
event_tx: mpsc::UnboundedSender<AgentEvent>,
cancel: CancellationToken,
) {
let tier_label = "FORK";
let _ = event_tx.send(AgentEvent::SwarmModeSwitch {
label: tier_label.to_string(),
});
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — plan-review-execute: planning phase…",
self.mode_label()
),
});
let approved_plan = self
.run_plan_review(&user_msg, &system_prompt, &event_tx, &cancel)
.await;
let Some(plan) = approved_plan else {
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason: None,
});
return;
};
if cancel.is_cancelled() {
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason: None,
});
return;
}
let exec_label = if self.hive_config.mode.has_realtime() {
"FLOCK"
} else {
"FORK"
};
let _ = event_tx.send(AgentEvent::SwarmModeSwitch {
label: exec_label.to_string(),
});
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!("{} — executing approved plan…", self.mode_label()),
});
let execution_msg = format!(
"Execute the following approved implementation plan. \
Follow it exactly, implementing all steps:\n\n{plan}\n\n\
Original task context: {user_msg}"
);
let split_tasks = match self.analyze_and_split(&execution_msg, &system_prompt).await {
Ok(mut t) => {
t.truncate(self.hive_config.max_agents);
t
}
Err(e) => {
tracing::warn!(
"PlanReviewExecute: task split failed ({e}), falling back to single agent"
);
self.delegate_single_agent(context, execution_msg, event_tx, cancel)
.await;
return;
}
};
if split_tasks.len() <= 1 {
self.delegate_single_agent(context, execution_msg, event_tx, cancel)
.await;
return;
}
self.announce_tasks(&split_tasks, &event_tx);
let mut checkpoint = Self::build_checkpoint(&user_msg, &split_tasks);
let results = if split_tasks.len() <= 2 && !self.checkpoint_path().exists() {
self.execute_with_work_stealing(split_tasks, &system_prompt, &event_tx, &cancel)
.await
} else {
let _ = save_checkpoint(&checkpoint, &self.checkpoint_path());
self.execute_with_work_stealing_checkpointed(
split_tasks,
&system_prompt,
&event_tx,
&cancel,
&mut checkpoint,
)
.await
};
if cancel.is_cancelled() {
let _ = save_checkpoint(&checkpoint, &self.checkpoint_path());
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason: None,
});
return;
}
self.send_per_agent_done(&results, &event_tx);
self.finalize_swarm_results(context, results, checkpoint, &kb_path, &event_tx)
.await;
}
pub(super) async fn run_auto_split_strategy(
&self,
context: ConversationContext,
user_msg: String,
system_prompt: String,
kb_path: std::path::PathBuf,
event_tx: mpsc::UnboundedSender<AgentEvent>,
cancel: CancellationToken,
) {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — analyzing task and splitting work...",
self.mode_label()
),
});
let tasks = match self.resolve_split(&user_msg, &system_prompt).await {
SplitOutcome::Multi(tasks) => tasks,
SplitOutcome::Single { agent_label } => {
let _ = event_tx.send(AgentEvent::SwarmResolvedToSingle {
agent_label: agent_label.clone(),
});
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — single subtask, running as standard agent...",
self.mode_label()
),
});
let context = self.apply_preferred_agent_prompt(context, &agent_label);
self.delegate_single_agent(context, user_msg, event_tx, cancel)
.await;
return;
}
SplitOutcome::Failed(e) => {
tracing::warn!(
"swarm analysis failed ({e}); falling back to single-agent execution"
);
let agent_label = self.single_agent_label();
let _ = event_tx.send(AgentEvent::SwarmResolvedToSingle {
agent_label: agent_label.clone(),
});
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — coordinator unavailable ({e}); running as single agent…",
self.mode_label()
),
});
let context = self.apply_preferred_agent_prompt(context, &agent_label);
self.delegate_single_agent(context, user_msg, event_tx, cancel)
.await;
return;
}
};
if cancel.is_cancelled() {
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason: None,
});
return;
}
let mut checkpoint = Self::build_checkpoint(&user_msg, &tasks);
let _ = save_checkpoint(&checkpoint, &self.checkpoint_path());
self.announce_tasks(&tasks, &event_tx);
let active_label = if self.hive_config.mode.has_realtime() {
"FLOCK"
} else {
"FORK"
};
let _ = event_tx.send(AgentEvent::SwarmModeSwitch {
label: active_label.to_string(),
});
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — executing {} subtasks...",
self.mode_label(),
tasks.len()
),
});
if self.hive_config.mode.has_consensus() {
let _ = event_tx.send(AgentEvent::SwarmWorkersDispatched);
}
let results = self
.execute_with_work_stealing_checkpointed(
tasks,
&system_prompt,
&event_tx,
&cancel,
&mut checkpoint,
)
.await;
if cancel.is_cancelled() {
let _ = save_checkpoint(&checkpoint, &self.checkpoint_path());
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason: None,
});
return;
}
self.send_per_agent_done(&results, &event_tx);
self.finalize_swarm_results(context, results, checkpoint, &kb_path, &event_tx)
.await;
}
async fn resolve_split(&self, user_msg: &str, system_prompt: &str) -> SplitOutcome {
match self.analyze_and_split(user_msg, system_prompt).await {
Ok(mut tasks) if tasks.len() > 1 => {
tasks.truncate(self.hive_config.max_agents);
SplitOutcome::Multi(tasks)
}
Ok(mut tasks) => {
let agent_label = tasks
.first_mut()
.and_then(|t| t.agent_name.take())
.unwrap_or_else(|| self.single_agent_label());
SplitOutcome::Single { agent_label }
}
Err(e) => {
tracing::warn!("analyze_and_split failed: {e}, attempting fallback...");
match self
.analyze_and_split_fallback(user_msg, system_prompt)
.await
{
Ok(mut tasks) if tasks.len() > 1 => {
tasks.truncate(self.hive_config.max_agents);
SplitOutcome::Multi(tasks)
}
Ok(mut tasks) => {
let agent_label = tasks
.first_mut()
.and_then(|t| t.agent_name.take())
.unwrap_or_else(|| self.single_agent_label());
SplitOutcome::Single { agent_label }
}
Err(_) => SplitOutcome::Failed(e.to_string()),
}
}
}
}
}
pub(super) enum SplitOutcome {
Multi(Vec<super::SwarmTask>),
Single { agent_label: String },
Failed(String),
}