use crate::config::OrchestratorConfig;
use crate::error::{OrchestratorError, Result};
use crate::history::{ResolveAttempt, ResolveContext};
use crate::vcs::git::commands as git_commands;
use crate::vcs::{VcsBackend, WorkspaceManager};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{info, warn};
use super::events::{send_event, ParallelEvent};
struct AutoResolveGuard {
counter: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
impl AutoResolveGuard {
fn new(counter: std::sync::Arc<std::sync::atomic::AtomicUsize>) -> Self {
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Self { counter }
}
}
impl Drop for AutoResolveGuard {
fn drop(&mut self) {
self.counter
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}
pub async fn detect_conflicts(workspace_manager: &dyn WorkspaceManager) -> Result<Vec<String>> {
workspace_manager
.detect_conflicts()
.await
.map_err(OrchestratorError::from)
}
pub async fn get_vcs_status(workspace_manager: &dyn WorkspaceManager) -> Result<String> {
workspace_manager
.get_status()
.await
.map_err(OrchestratorError::from)
}
pub async fn get_vcs_log_for_revisions(
workspace_manager: &dyn WorkspaceManager,
revisions: &[String],
) -> Result<String> {
workspace_manager
.get_log_for_revisions(revisions)
.await
.map_err(OrchestratorError::from)
}
#[allow(clippy::too_many_arguments)]
pub async fn resolve_conflicts_with_retry(
workspace_manager: &dyn WorkspaceManager,
config: &OrchestratorConfig,
event_tx: &Option<mpsc::Sender<ParallelEvent>>,
revisions: &[String],
change_ids: &[String],
vcs_error: &str,
max_retries: u32,
shared_stagger_state: crate::ai_command_runner::SharedStaggerState,
auto_resolve_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
) -> Result<()> {
let _guard = AutoResolveGuard::new(auto_resolve_count);
send_event(event_tx, ParallelEvent::ConflictResolutionStarted).await;
let conflict_files = detect_conflicts(workspace_manager).await?;
let conflict_files_str = conflict_files.join(", ");
let vcs_status = get_vcs_status(workspace_manager).await.unwrap_or_default();
let vcs_log = get_vcs_log_for_revisions(workspace_manager, revisions)
.await
.unwrap_or_default();
let vcs_prompt_prefix = workspace_manager.conflict_resolution_prompt();
let mut resolve_context = ResolveContext::new(max_retries);
let combined_change_id = change_ids.join("+");
use crate::ai_command_runner::AiCommandRunner;
use crate::command_queue::CommandQueueConfig;
use crate::config::defaults::*;
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let stream_json_textify = config.get_stream_json_textify();
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state.clone());
ai_runner.set_stream_json_textify(stream_json_textify);
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
let initial_resolve_prompt = build_conflict_resolve_prompt(
vcs_prompt_prefix,
&revisions.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
vcs_error,
&vcs_status,
&vcs_log,
&conflict_files_str,
);
let template = config.get_resolve_command()?;
let initial_command =
crate::config::OrchestratorConfig::expand_prompt(template, &initial_resolve_prompt);
let initial_command =
crate::config::expand::expand_conflict_files(&initial_command, &conflict_files_str);
for change_id in change_ids {
send_event(
event_tx,
ParallelEvent::ResolveStarted {
change_id: change_id.to_string(),
command: initial_command.clone(),
},
)
.await;
}
for attempt in 1..=max_retries {
let start = Instant::now();
info!(
"Conflict resolution attempt {}/{} for files: {}",
attempt, max_retries, conflict_files_str
);
let mut resolve_prompt = build_conflict_resolve_prompt(
vcs_prompt_prefix,
&revisions.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
vcs_error,
&vcs_status,
&vcs_log,
&conflict_files_str,
);
let continuation_context = resolve_context.format_continuation_context();
if !continuation_context.is_empty() {
resolve_prompt = format!("{}\n\n{}", resolve_prompt, continuation_context);
}
let template = config.get_resolve_command()?;
let command = crate::config::OrchestratorConfig::expand_prompt(template, &resolve_prompt);
let (mut child, mut rx) = ai_runner
.execute_streaming_with_retry(
&command,
Some(workspace_manager.repo_root()),
Some("resolve"),
None,
)
.await?;
let mut output_collector = crate::history::OutputCollector::new();
while let Some(line) = rx.recv().await {
let text = match &line {
crate::ai_command_runner::OutputLine::Stdout(s) => {
output_collector.add_stdout(s);
s.clone()
}
crate::ai_command_runner::OutputLine::Stderr(s) => {
output_collector.add_stderr(s);
s.clone()
}
};
send_event(
event_tx,
ParallelEvent::ResolveOutput {
change_id: combined_change_id.clone(),
output: text.clone(),
iteration: Some(attempt),
},
)
.await;
}
let status = child.wait().await.map_err(|e| {
OrchestratorError::AgentCommand(format!(
"Resolve command failed in workspace '{}' (attempt {}): {}",
workspace_manager.repo_root().display(),
attempt,
e
))
})?;
let status_success = status.success();
let remaining_conflicts = detect_conflicts(workspace_manager).await?;
let duration = start.elapsed();
if remaining_conflicts.is_empty() {
if !status_success {
warn!(
"Resolve command exited non-zero but conflicts cleared (attempt {}/{})",
attempt, max_retries
);
}
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: true,
duration,
continuation_reason: None,
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
send_event(event_tx, ParallelEvent::ConflictResolutionCompleted).await;
return Ok(());
}
let continuation_reason = if status_success {
let reason = format!(
"Conflicts still present after resolution attempt: {}",
remaining_conflicts.join(", ")
);
warn!("{}", reason);
Some(reason)
} else {
let reason = format!(
"Resolution command failed with exit code: {:?}",
status.code()
);
warn!(
"Resolution attempt {} failed with exit code: {:?}",
attempt,
status.code()
);
Some(reason)
};
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: false,
duration,
continuation_reason,
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
}
let error_msg = format!("Failed to resolve conflicts after {} attempts", max_retries);
send_event(
event_tx,
ParallelEvent::ConflictResolutionFailed {
error: error_msg.clone(),
},
)
.await;
match workspace_manager.backend_type() {
VcsBackend::Git | VcsBackend::Auto => Err(OrchestratorError::GitConflict(error_msg)),
}
}
#[derive(Clone)]
pub struct ResolveMergesWithRetryArgs<'a> {
pub workspace_manager: &'a dyn WorkspaceManager,
pub config: &'a OrchestratorConfig,
pub event_tx: &'a Option<mpsc::Sender<ParallelEvent>>,
pub revisions: &'a [String],
pub change_ids: &'a [String],
pub target_branch: &'a str,
pub base_revision: &'a str,
pub max_retries: u32,
pub shared_stagger_state: crate::ai_command_runner::SharedStaggerState,
pub auto_resolve_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
pub async fn resolve_merges_with_retry(args: ResolveMergesWithRetryArgs<'_>) -> Result<()> {
let ResolveMergesWithRetryArgs {
workspace_manager,
config,
event_tx,
revisions,
change_ids,
target_branch,
base_revision,
max_retries,
shared_stagger_state,
auto_resolve_count,
} = args;
let _guard = AutoResolveGuard::new(auto_resolve_count);
send_event(event_tx, ParallelEvent::ConflictResolutionStarted).await;
let conflict_files = detect_conflicts(workspace_manager).await?;
if conflict_files.is_empty()
&& matches!(
workspace_manager.backend_type(),
VcsBackend::Git | VcsBackend::Auto
)
{
let merge_in_progress = git_commands::is_merge_in_progress(workspace_manager.repo_root())
.await
.map_err(OrchestratorError::from)?;
if merge_in_progress {
let merge_subject = if change_ids.len() == 1 {
format!("Merge change: {}", change_ids[0])
} else {
format!("Merge changes: {}", change_ids.join(", "))
};
info!(
"No unresolved conflicts and merge is in-progress (MERGE_HEAD exists); committing conflictless merge-ready state with subject '{}' and skipping AI resolve path for revisions: {}",
merge_subject,
revisions.join(", ")
);
git_commands::run_git(
&["commit", "-m", merge_subject.as_str()],
workspace_manager.repo_root(),
)
.await
.map_err(OrchestratorError::from)?;
send_event(event_tx, ParallelEvent::ConflictResolutionCompleted).await;
for change_id in change_ids {
send_event(
event_tx,
ParallelEvent::ResolveCompleted {
change_id: change_id.to_string(),
worktree_change_ids: None,
},
)
.await;
}
return Ok(());
}
let mut not_integrated_revisions: Vec<String> = Vec::new();
for revision in revisions {
let integrated =
git_commands::is_ancestor(workspace_manager.repo_root(), revision, "HEAD")
.await
.map_err(OrchestratorError::from)?;
if !integrated {
not_integrated_revisions.push(revision.clone());
}
}
if not_integrated_revisions.is_empty() {
info!(
"No unresolved conflicts, no merge in progress, and all revisions are already integrated into HEAD; skipping AI resolve path for revisions: {}",
revisions.join(", ")
);
send_event(event_tx, ParallelEvent::ConflictResolutionCompleted).await;
for change_id in change_ids {
send_event(
event_tx,
ParallelEvent::ResolveCompleted {
change_id: change_id.to_string(),
worktree_change_ids: None,
},
)
.await;
}
return Ok(());
}
info!(
"No unresolved conflicts and no merge in progress, but revisions are not integrated into HEAD yet ({}); continuing resolve path",
not_integrated_revisions.join(", ")
);
}
let conflict_files_str = conflict_files.join(", ");
let vcs_status = get_vcs_status(workspace_manager).await.unwrap_or_default();
let vcs_log = get_vcs_log_for_revisions(workspace_manager, revisions)
.await
.unwrap_or_default();
let vcs_prompt_prefix = workspace_manager.conflict_resolution_prompt();
let merge_plan = revisions
.iter()
.zip(change_ids.iter())
.map(|(rev, change_id)| format!("- {} => {}", rev, change_id))
.collect::<Vec<_>>()
.join("\n");
let workspaces = workspace_manager.workspaces();
let workspace_paths: HashMap<String, PathBuf> = workspaces
.iter()
.map(|workspace| (workspace.name.clone(), workspace.path.clone()))
.collect();
let workspace_base_revisions: HashMap<String, String> = workspaces
.into_iter()
.map(|workspace| (workspace.name, workspace.base_revision))
.collect();
let worktree_locations = revisions
.iter()
.zip(change_ids.iter())
.map(|(rev, change_id)| {
let path = workspace_paths
.get(rev)
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|| "(unknown)".to_string());
format!("- {} => {} (change_id: {})", rev, path, change_id)
})
.collect::<Vec<_>>()
.join("\n");
let mut resolve_context = ResolveContext::new(max_retries);
let combined_change_id = change_ids.join("+");
use crate::ai_command_runner::AiCommandRunner;
use crate::command_queue::CommandQueueConfig;
use crate::config::defaults::*;
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let stream_json_textify = config.get_stream_json_textify();
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state.clone());
ai_runner.set_stream_json_textify(stream_json_textify);
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
let initial_resolve_prompt = build_sequential_merge_resolve_prompt(
vcs_prompt_prefix,
target_branch,
base_revision,
&merge_plan,
&worktree_locations,
&vcs_status,
&vcs_log,
&conflict_files_str,
);
let template = config.get_resolve_command()?;
let initial_command =
crate::config::OrchestratorConfig::expand_prompt(template, &initial_resolve_prompt);
let initial_command =
crate::config::expand::expand_conflict_files(&initial_command, &conflict_files_str);
for change_id in change_ids {
send_event(
event_tx,
ParallelEvent::ResolveStarted {
change_id: change_id.to_string(),
command: initial_command.clone(),
},
)
.await;
}
for attempt in 1..=max_retries {
let start = Instant::now();
info!(
"Merge resolution attempt {}/{} for branches: {}",
attempt,
max_retries,
revisions.join(", ")
);
let mut resolve_prompt = build_sequential_merge_resolve_prompt(
vcs_prompt_prefix,
target_branch,
base_revision,
&merge_plan,
&worktree_locations,
&vcs_status,
&vcs_log,
&conflict_files_str,
);
let continuation_context = resolve_context.format_continuation_context();
if !continuation_context.is_empty() {
resolve_prompt = format!("{}\n\n{}", resolve_prompt, continuation_context);
}
let template = config.get_resolve_command()?;
let command = crate::config::OrchestratorConfig::expand_prompt(template, &resolve_prompt);
let (mut child, mut rx) = ai_runner
.execute_streaming_with_retry(
&command,
Some(workspace_manager.repo_root()),
Some("resolve"),
None,
)
.await?;
let mut output_collector = crate::history::OutputCollector::new();
while let Some(line) = rx.recv().await {
let text = match &line {
crate::ai_command_runner::OutputLine::Stdout(s) => {
output_collector.add_stdout(s);
s.clone()
}
crate::ai_command_runner::OutputLine::Stderr(s) => {
output_collector.add_stderr(s);
s.clone()
}
};
send_event(
event_tx,
ParallelEvent::ResolveOutput {
change_id: combined_change_id.clone(),
output: text.clone(),
iteration: Some(attempt),
},
)
.await;
}
let status = child.wait().await.map_err(|e| {
OrchestratorError::AgentCommand(format!(
"Resolve command failed in workspace '{}' (attempt {}): {}",
workspace_manager.repo_root().display(),
attempt,
e
))
})?;
let status_success = status.success();
let duration = start.elapsed();
let remaining_conflicts = detect_conflicts(workspace_manager).await?;
if remaining_conflicts.is_empty() {
if matches!(
workspace_manager.backend_type(),
VcsBackend::Git | VcsBackend::Auto
) {
let repo_root = workspace_manager.repo_root();
let merge_in_progress = git_commands::is_merge_in_progress(repo_root)
.await
.map_err(OrchestratorError::from)?;
if merge_in_progress {
let reason =
"Merge still in progress (MERGE_HEAD exists); retrying resolve".to_string();
warn!(
"Merge still in progress after resolve attempt {}/{}",
attempt, max_retries
);
send_event(
event_tx,
ParallelEvent::ResolveOutput {
change_id: combined_change_id.clone(),
output: reason.clone(),
iteration: Some(attempt),
},
)
.await;
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: false,
duration,
continuation_reason: Some(reason),
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
continue;
}
let mut retry_reason: Option<String> = None;
for revision in revisions {
if let Some(worktree_path) = workspace_paths.get(revision) {
let worktree_merge_in_progress =
git_commands::is_merge_in_progress(worktree_path)
.await
.map_err(OrchestratorError::from)?;
if worktree_merge_in_progress {
retry_reason = Some(format!(
"Worktree merge still in progress for '{}' (MERGE_HEAD exists); retrying resolve",
revision
));
break;
}
let worktree_conflicts = git_commands::get_conflict_files(worktree_path)
.await
.map_err(OrchestratorError::from)?;
if !worktree_conflicts.is_empty() {
retry_reason = Some(format!(
"Worktree conflicts still present for '{}' ({}); retrying resolve",
revision,
worktree_conflicts.join(", ")
));
break;
}
}
}
if let Some(reason) = retry_reason {
warn!("{}", reason);
send_event(
event_tx,
ParallelEvent::ResolveOutput {
change_id: combined_change_id.clone(),
output: reason.clone(),
iteration: Some(attempt),
},
)
.await;
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: false,
duration,
continuation_reason: Some(reason),
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
continue;
}
let mut presync_retry_reason: Option<String> = None;
for (revision, change_id) in revisions.iter().zip(change_ids.iter()) {
let Some(worktree_path) = workspace_paths.get(revision) else {
continue;
};
let mismatches = git_commands::presync_merge_subject_mismatches_since(
worktree_path,
base_revision,
change_id,
)
.await
.map_err(OrchestratorError::from)?;
if !mismatches.is_empty() {
presync_retry_reason = Some(format!(
"Invalid pre-sync merge commit subject in worktree '{}' (expected: 'Pre-sync base into {}', got: {}); retrying resolve",
revision,
change_id,
mismatches.join("; ")
));
break;
}
}
if let Some(reason) = presync_retry_reason {
warn!("{}", reason);
send_event(
event_tx,
ParallelEvent::ResolveOutput {
change_id: combined_change_id.clone(),
output: reason.clone(),
iteration: Some(attempt),
},
)
.await;
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: false,
duration,
continuation_reason: Some(reason),
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
continue;
}
let missing_commits =
git_commands::missing_merge_commits_since(repo_root, base_revision, change_ids)
.await
.map_err(OrchestratorError::from)?;
let mut truly_missing: Vec<String> = Vec::new();
for missing_id in &missing_commits {
let revision = revisions
.iter()
.zip(change_ids.iter())
.find(|(_, cid)| *cid == missing_id)
.map(|(rev, _)| rev.as_str());
if let Some(rev) = revision {
let is_integrated = git_commands::is_ancestor(repo_root, rev, "HEAD")
.await
.unwrap_or(false);
if is_integrated {
info!(
"Change '{}' (branch '{}') integrated via fast-forward; skipping merge commit check",
missing_id, rev
);
} else {
truly_missing.push(missing_id.clone());
}
} else {
truly_missing.push(missing_id.clone());
}
}
if !truly_missing.is_empty() {
let reason = format!(
"Missing merge commits for change_ids ({}); retrying resolve",
truly_missing.join(", ")
);
warn!(
"Missing merge commits after resolve attempt {}/{}: {:?}",
attempt, max_retries, truly_missing
);
send_event(
event_tx,
ParallelEvent::ResolveOutput {
change_id: combined_change_id.clone(),
output: reason.clone(),
iteration: Some(attempt),
},
)
.await;
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: false,
duration,
continuation_reason: Some(reason),
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
continue;
}
let mut presync_missing_reason: Option<String> = None;
for (revision, change_id) in revisions.iter().zip(change_ids.iter()) {
let expected_subject = format!("Merge change: {}", change_id);
let merge_commit = git_commands::merge_commit_hash_by_subject_since(
repo_root,
base_revision,
expected_subject.as_str(),
)
.await
.map_err(OrchestratorError::from)?;
let Some(merge_commit) = merge_commit else {
continue;
};
let pre_merge_base =
git_commands::first_parent_of(repo_root, merge_commit.trim())
.await
.map_err(OrchestratorError::from)?;
let pre_merge_base = pre_merge_base.trim();
let includes_presync_base =
git_commands::is_ancestor(repo_root, pre_merge_base, revision)
.await
.map_err(OrchestratorError::from)?;
if !includes_presync_base {
let already_merged = git_commands::is_ancestor(repo_root, revision, "HEAD")
.await
.map_err(OrchestratorError::from)?;
if !already_merged {
let short = &pre_merge_base[..8.min(pre_merge_base.len())];
presync_missing_reason = Some(format!(
"Worktree branch '{}' does not include pre-merge base '{}' for change_id '{}' (pre-sync may have been skipped); retrying resolve",
revision, short, change_id
));
break;
}
info!(
"Skipping pre-sync base ancestry retry for '{}' because branch is already integrated into HEAD",
revision
);
}
let (Some(worktree_path), Some(worktree_base_revision)) = (
workspace_paths.get(revision),
workspace_base_revisions.get(revision),
) else {
continue;
};
if worktree_base_revision.trim() != pre_merge_base {
let expected_presync_subject = format!("Pre-sync base into {}", change_id);
let presync_commit = git_commands::merge_commit_hash_by_subject_since(
worktree_path,
worktree_base_revision.trim(),
expected_presync_subject.as_str(),
)
.await
.map_err(OrchestratorError::from)?;
if presync_commit.is_none() {
presync_missing_reason = Some(format!(
"Missing pre-sync merge commit in worktree '{}' (expected subject: 'Pre-sync base into {}'); retrying resolve",
revision, change_id
));
break;
}
}
}
if let Some(reason) = presync_missing_reason {
warn!("{}", reason);
send_event(
event_tx,
ParallelEvent::ResolveOutput {
change_id: combined_change_id.clone(),
output: reason.clone(),
iteration: Some(attempt),
},
)
.await;
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: false,
duration,
continuation_reason: Some(reason),
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
continue;
}
}
if !status_success {
warn!(
"Resolve command exited non-zero but goals met (attempt {}/{})",
attempt, max_retries
);
}
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: true,
duration,
continuation_reason: None,
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
send_event(event_tx, ParallelEvent::ConflictResolutionCompleted).await;
for change_id in change_ids {
send_event(
event_tx,
ParallelEvent::ResolveCompleted {
change_id: change_id.to_string(),
worktree_change_ids: None,
},
)
.await;
}
return Ok(());
}
let continuation_reason = if status_success {
let reason = format!(
"Conflicts still present after merge resolution attempt: {}",
remaining_conflicts.join(", ")
);
warn!("{}", reason);
Some(reason)
} else {
let reason = format!(
"Merge resolution command failed with exit code: {:?}",
status.code()
);
warn!(
"Merge resolution attempt {} failed with exit code: {:?}",
attempt,
status.code()
);
Some(reason)
};
resolve_context.record(ResolveAttempt {
attempt,
command_success: status_success,
verification_success: false,
duration,
continuation_reason,
exit_code: status.code(),
stdout_tail: output_collector.stdout_tail(),
stderr_tail: output_collector.stderr_tail(),
});
}
let error_msg = format!("Failed to resolve merges after {} attempts", max_retries);
send_event(
event_tx,
ParallelEvent::ConflictResolutionFailed {
error: error_msg.clone(),
},
)
.await;
for change_id in change_ids {
send_event(
event_tx,
ParallelEvent::ResolveFailed {
change_id: change_id.to_string(),
error: error_msg.clone(),
},
)
.await;
}
match workspace_manager.backend_type() {
VcsBackend::Git | VcsBackend::Auto => Err(OrchestratorError::GitConflict(error_msg)),
}
}
fn build_conflict_resolve_prompt(
vcs_prompt_prefix: &str,
revisions: &[&str],
vcs_error: &str,
vcs_status: &str,
vcs_log: &str,
conflict_files_str: &str,
) -> String {
format!(
"load skills: cflx-resolve\n\n\
{}\n\n\
Conflicting revisions: {}\n\n\
VCS error output:\n\
{}\n\n\
Current VCS status:\n\
{}\n\n\
VCS log for conflicting changes:\n\
{}\n\n\
Conflicting files: {}",
vcs_prompt_prefix,
revisions.join(", "),
vcs_error,
vcs_status,
vcs_log,
conflict_files_str
)
}
#[allow(clippy::too_many_arguments)]
fn build_sequential_merge_resolve_prompt(
vcs_prompt_prefix: &str,
target_branch: &str,
base_revision: &str,
merge_plan: &str,
worktree_locations: &str,
vcs_status: &str,
vcs_log: &str,
conflict_files_str: &str,
) -> String {
format!(
"load skills: cflx-resolve\n\n\
{}\n\n\
Operation: sequential merge\n\n\
Target branch: {}\n\
Base revision before merges: {}\n\
Merge plan (branch => change_id):\n{}\n\n\
Worktree directories (branch => path):\n{}\n\n\
Current VCS status:\n{}\n\n\
VCS log for branches:\n{}\n\n\
Conflicting files (repo root, if any): {}",
vcs_prompt_prefix,
target_branch,
base_revision,
merge_plan,
worktree_locations,
vcs_status,
vcs_log,
conflict_files_str
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_conflict_resolve_prompt_has_skill_prelude() {
let prompt = build_conflict_resolve_prompt(
"Git conflict resolution:",
&["branch-a", "branch-b"],
"merge failed",
"UU file.rs",
"commit log here",
"file.rs",
);
assert!(prompt.contains("load skills: cflx-resolve"));
assert!(prompt.contains("branch-a, branch-b"));
assert!(prompt.contains("file.rs"));
}
#[test]
fn test_conflict_resolve_prompt_no_fixed_guidance() {
let prompt =
build_conflict_resolve_prompt("prefix", &["rev1"], "err", "status", "log", "files");
assert!(
!prompt.contains("Please resolve the merge conflicts"),
"Resolution instruction must not be in Rust prompt"
);
assert!(
!prompt.contains("Safety Constraints"),
"Safety constraints must not be in Rust prompt"
);
assert!(
!prompt.contains("--no-verify"),
"Safety rules must not be in Rust prompt"
);
}
#[test]
fn test_sequential_merge_prompt_has_skill_prelude() {
let prompt = build_sequential_merge_resolve_prompt(
"Git conflict resolution:",
"main",
"abc123",
"- branch-a => change-1",
"- branch-a => /path (change_id: change-1)",
"clean",
"log entries",
"(none)",
);
assert!(prompt.contains("load skills: cflx-resolve"));
assert!(prompt.contains("Operation: sequential merge"));
assert!(prompt.contains("Target branch: main"));
assert!(prompt.contains("abc123"));
assert!(prompt.contains("change-1"));
}
#[test]
fn test_sequential_merge_prompt_no_fixed_guidance() {
let prompt = build_sequential_merge_resolve_prompt(
"prefix",
"main",
"base",
"plan",
"locations",
"status",
"log",
"files",
);
assert!(
!prompt.contains("Requirements:"),
"Requirements section must not be in Rust prompt"
);
assert!(
!prompt.contains("Instructions (repeat"),
"Step-by-step instructions must not be in Rust prompt"
);
assert!(
!prompt.contains("Pre-sync base into"),
"Commit convention must not be in Rust prompt"
);
assert!(
!prompt.contains("Merge change:"),
"Merge commit convention must not be in Rust prompt"
);
assert!(
!prompt.contains("git merge --no-ff"),
"Git commands must not be in Rust prompt"
);
assert!(
!prompt.contains("git rm -rf"),
"Git cleanup commands must not be in Rust prompt"
);
assert!(
!prompt.contains("--no-verify"),
"Safety rules must not be in Rust prompt"
);
assert!(
!prompt.contains("Complete the merges"),
"Completion instruction must not be in Rust prompt"
);
}
}