use tokio::sync::mpsc;
use crate::agent::r#loop::AgentEvent;
use crate::agent::subagent::SubagentResult;
use crate::agent::swarm::config::ConflictResolution;
use crate::agent::swarm::conflict;
use crate::api::Content;
use crate::api::models::{ChatRequest, Message};
use crate::api::provider::OpenAiCompatibleProvider;
use super::*;
impl SwarmCoordinator {
pub(super) async fn create_worktree(&self, task_id: &str) -> Option<String> {
let project_name = std::path::Path::new(&self.working_dir)
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_else(|| "project".to_string());
let wt_path = self
.config
.collet_home
.join("worktrees")
.join(&project_name)
.join(task_id);
if let Some(parent) = wt_path.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
let output = tokio::process::Command::new("git")
.args(["worktree", "add", "--detach", wt_path.to_str()?])
.current_dir(&self.working_dir)
.output()
.await;
match output {
Ok(o) if o.status.success() => {
tracing::debug!(
"Created worktree for task '{task_id}' at {}",
wt_path.display()
);
Some(wt_path.to_string_lossy().into_owned())
}
Ok(o) => {
tracing::debug!(
"git worktree add failed for {task_id}: {}",
String::from_utf8_lossy(&o.stderr).trim()
);
None
}
Err(e) => {
tracing::debug!("git not available, skipping worktree: {e}");
None
}
}
}
pub(super) async fn merge_and_remove_worktree(&self, worktree_path: &str) -> Vec<String> {
let diff = tokio::process::Command::new("git")
.args(["diff", "HEAD", "--name-only"])
.current_dir(worktree_path)
.output()
.await;
let untracked = tokio::process::Command::new("git")
.args(["ls-files", "--others", "--exclude-standard"])
.current_dir(worktree_path)
.output()
.await;
let mut changed_files: Vec<String> = Vec::new();
if let Ok(o) = diff {
for line in String::from_utf8_lossy(&o.stdout).lines() {
if !line.is_empty() {
changed_files.push(line.to_string());
}
}
}
if let Ok(o) = untracked {
for line in String::from_utf8_lossy(&o.stdout).lines() {
if !line.is_empty() {
changed_files.push(line.to_string());
}
}
}
for rel_path in &changed_files {
let src = std::path::PathBuf::from(worktree_path).join(rel_path);
let dst = std::path::PathBuf::from(&self.working_dir).join(rel_path);
if src.exists() {
if let Some(parent) = dst.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
if let Err(e) = tokio::fs::copy(&src, &dst).await {
tracing::warn!("Failed to merge {rel_path} from worktree: {e}");
}
} else if dst.exists() {
let _ = tokio::fs::remove_file(&dst).await;
tracing::debug!("Removed {rel_path} (deleted in agent's worktree)");
}
}
tracing::debug!(
"Merged {} file(s) from worktree {}",
changed_files.len(),
worktree_path
);
let remove_result = tokio::process::Command::new("git")
.args(["worktree", "remove", worktree_path, "--force"])
.current_dir(&self.working_dir)
.output()
.await;
if let Err(e) = remove_result {
tracing::warn!("Failed to remove worktree {}: {}", worktree_path, e);
}
changed_files
}
pub(super) async fn resolve_conflicts(
&self,
conflicts: &mut [conflict::FileConflict],
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) {
if conflicts.is_empty() {
return;
}
let auto_merged = conflict::try_auto_merge_conflicts(conflicts, &self.knowledge).await;
if auto_merged > 0 {
tracing::info!(
auto_merged,
remaining = conflicts.iter().filter(|c| c.resolution.is_none()).count(),
"Auto-merged sequential conflicts"
);
}
let summary = conflict::conflicts_summary(conflicts);
if !summary.is_empty() {
tracing::debug!(conflicts = ?summary, "Unresolved conflicts before resolution strategy");
}
let unresolved: Vec<_> = conflicts
.iter()
.filter(|c| c.resolution.is_none())
.map(|c| (c.path.clone(), c.agents.clone()))
.collect();
if unresolved.is_empty() {
return;
}
let _ = event_tx.send(AgentEvent::SwarmConflict {
conflicts: unresolved,
});
match self.hive_config.conflict_resolution {
ConflictResolution::UserResolves => {
tracing::info!(
"conflict_resolution=user_resolves: {} conflict(s) left for user",
conflicts.len()
);
}
ConflictResolution::LastWriterWins => {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — resolving conflicts (last-writer-wins)...",
self.mode_label()
),
});
for c in conflicts.iter_mut() {
let mods = self.knowledge.file_modifications(&c.path).await;
if let Some(winner) = mods.iter().max_by_key(|m| m.timestamp) {
tracing::info!("last_writer_wins: {} → agent {}", c.path, winner.agent_id);
c.resolution = Some(conflict::ConflictResolutionOutcome::AutoMerged);
}
}
}
ConflictResolution::CoordinatorResolves => {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — resolving conflicts (coordinator)...",
self.mode_label()
),
});
let coord_client = self.hive_config.coordinator_client(&self.client);
for c in conflicts.iter_mut() {
let mods = self.knowledge.file_modifications(&c.path).await;
let can_auto = mods.len() == 2 && conflict::can_auto_merge(&mods[0], &mods[1]);
if can_auto {
tracing::info!("auto-merged sequential edits: {}", c.path);
c.resolution = Some(conflict::ConflictResolutionOutcome::AutoMerged);
continue;
}
match self.coordinator_resolve_file(&coord_client, c, &mods).await {
Ok(explanation) => {
tracing::info!(path = %c.path, %explanation, "Coordinator resolved conflict");
c.resolution =
Some(conflict::ConflictResolutionOutcome::CoordinatorResolved {
explanation,
});
}
Err(e) => {
tracing::warn!(
"coordinator failed to resolve {}: {e}; leaving unresolved",
c.path
);
}
}
}
}
}
for c in conflicts.iter() {
match &c.resolution {
Some(conflict::ConflictResolutionOutcome::CoordinatorResolved { explanation }) => {
tracing::debug!(path = %c.path, explanation, "Coordinator resolution recorded");
}
Some(conflict::ConflictResolutionOutcome::UserResolved { choice }) => {
tracing::debug!(path = %c.path, choice, "User-resolved conflict recorded");
}
_ => {}
}
}
}
pub(super) async fn read_content_snapshot(path: &str, working_dir: &str) -> Option<String> {
let full_path = if std::path::Path::new(path).is_absolute() {
std::path::PathBuf::from(path)
} else {
std::path::PathBuf::from(working_dir).join(path)
};
match tokio::fs::read_to_string(&full_path).await {
Ok(content) => {
const MAX_SNAPSHOT: usize = 8192;
if content.len() <= MAX_SNAPSHOT {
Some(content)
} else {
Some(content[..MAX_SNAPSHOT].to_string())
}
}
Err(_) => None,
}
}
pub(super) async fn coordinator_resolve_file(
&self,
coord_client: &OpenAiCompatibleProvider,
file_conflict: &conflict::FileConflict,
mods: &[crate::agent::swarm::knowledge::FileModification],
) -> crate::common::Result<String> {
let mut version_sections = Vec::new();
for (i, m) in mods.iter().enumerate() {
let content_preview = m
.content_snapshot
.as_deref()
.map(|s| truncate(s, 3000))
.unwrap_or_else(|| "(snapshot not available)".to_string());
version_sections.push(format!(
"### Version {} — Agent `{}`\n\
Modification: {:?}\n\
```\n{}\n```",
i + 1,
m.agent_id,
m.modification_type,
content_preview,
));
}
let prompt = format!(
"You are a code merge coordinator. Multiple agents modified the same file \
and their changes conflict.\n\n\
**File:** `{}`\n\n\
{}\n\n\
Compare the versions above. Decide which version to keep, or describe how to \
combine them. Respond with a brief explanation (1-3 sentences) of your decision.",
file_conflict.path,
version_sections.join("\n\n"),
);
let req = ChatRequest {
model: coord_client.model.clone(),
messages: vec![Message {
role: "user".to_string(),
content: Some(Content::text(prompt)),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
}],
tools: None,
tool_choice: None,
temperature: Some(0.2),
max_tokens: 500,
stream: false,
thinking_budget_tokens: None,
reasoning_effort: None,
};
let resp = coord_client
.chat(&req)
.await
.map_err(|e| crate::common::AgentError::Transport(e.to_string()))?;
let explanation = resp
.choices
.first()
.and_then(|c| c.message.content.as_ref())
.map(|c| c.text_content())
.unwrap_or_else(|| "No explanation provided.".to_string());
Ok(explanation)
}
pub(super) async fn verify_merge(
&self,
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) -> Option<VerificationResult> {
let cargo_toml = std::path::PathBuf::from(&self.working_dir).join("Cargo.toml");
if !cargo_toml.exists() {
return None;
}
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — running verification gate (cargo check)...",
self.mode_label()
),
});
let output = tokio::process::Command::new("cargo")
.arg("check")
.arg("--message-format=short")
.current_dir(&self.working_dir)
.output()
.await;
match output {
Ok(out) => {
let stdout = String::from_utf8_lossy(&out.stdout).to_string();
let stderr = String::from_utf8_lossy(&out.stderr).to_string();
let combined = format!("{stdout}\n{stderr}");
let passed = out.status.success();
if passed {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"{} — ✓ verification passed (cargo check)",
self.mode_label()
),
});
}
Some(VerificationResult {
passed,
output: combined,
command: "cargo check".to_string(),
})
}
Err(e) => {
tracing::warn!("Verification gate failed to run: {e}");
None
}
}
}
pub(super) fn merge_results(
&self,
results: &[SubagentResult],
conflicts: &[conflict::FileConflict],
verification: Option<&VerificationResult>,
) -> String {
let mut parts = Vec::new();
let successful: Vec<&SubagentResult> = results.iter().filter(|r| r.success).collect();
let failed: Vec<&SubagentResult> = results.iter().filter(|r| !r.success).collect();
parts.push(format!(
"## {} Execution Summary\n\n{} agents completed ({} ok, {} failed).",
self.mode_label(),
results.len(),
successful.len(),
failed.len(),
));
for result in &successful {
parts.push(format!(
"\n### Agent `{}` [OK]\n\n{}\n\nFiles modified: {}\nTool calls: {}",
result.id,
truncate(&result.response, 500),
if result.modified_files.is_empty() {
"none".to_string()
} else {
result.modified_files.join(", ")
},
result.tool_calls,
));
}
if !failed.is_empty() {
parts.push("\n### ⚠ Failed Agents (excluded from merge)\n".to_string());
for result in &failed {
parts.push(format!(
"- `{}`: {} (files: {}, tool calls: {})",
result.id,
truncate(&result.response, 200),
if result.modified_files.is_empty() {
"none".to_string()
} else {
result.modified_files.join(", ")
},
result.tool_calls,
));
}
}
if !conflicts.is_empty() {
parts.push("\n### Conflicts Detected\n".to_string());
for conflict in conflicts {
parts.push(format!(
"- `{}` modified by: {}{}",
conflict.path,
conflict.agents.join(", "),
conflict
.resolution
.as_ref()
.map(|r| format!(" (resolved: {:?})", r))
.unwrap_or_default()
));
}
}
if let Some(vr) = verification {
parts.push(format!(
"\n### Verification Gate ({})\n\n{}: {}",
vr.command,
if vr.passed {
"✓ PASSED"
} else {
"✗ FAILED"
},
truncate(&vr.output, 500),
));
}
parts.join("\n")
}
}