#![allow(dead_code)]
use crate::agent::{AgentRunner, OutputLine};
use crate::config::OrchestratorConfig;
use crate::error::{OrchestratorError, Result};
use crate::history::OutputCollector;
use crate::hooks::{HookContext, HookRunner, HookType};
use crate::stall::{StallDetector, StallPhase};
use crate::task_parser::TaskProgress;
use crate::vcs::{VcsBackend, VcsResult, WorkspaceManager};
use std::fs;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
const APPLY_COMPLETION_GRACE_DEFAULT_SECS: u64 = 30;
const APPLY_COMPLETION_CHECK_INTERVAL_SECS: u64 = 5;
tokio::task_local! {
pub(crate) static APPLY_COMPLETION_GRACE_OVERRIDE_SECS: u64;
}
tokio::task_local! {
pub(crate) static APPLY_COMPLETION_CHECK_INTERVAL_OVERRIDE_MS: u64;
}
pub(crate) fn apply_completion_grace_period() -> Duration {
let secs = APPLY_COMPLETION_GRACE_OVERRIDE_SECS
.try_with(|secs| *secs)
.ok()
.filter(|secs| *secs > 0)
.unwrap_or(APPLY_COMPLETION_GRACE_DEFAULT_SECS);
Duration::from_secs(secs)
}
pub(crate) fn apply_completion_check_interval() -> Duration {
if let Ok(ms) = APPLY_COMPLETION_CHECK_INTERVAL_OVERRIDE_MS.try_with(|ms| *ms) {
if ms > 0 {
return Duration::from_millis(ms);
}
}
Duration::from_secs(APPLY_COMPLETION_CHECK_INTERVAL_SECS)
}
#[cfg(test)]
pub(crate) async fn scoped_apply_completion_grace_secs_for_test<F, R>(secs: u64, fut: F) -> R
where
F: std::future::Future<Output = R>,
{
APPLY_COMPLETION_GRACE_OVERRIDE_SECS.scope(secs, fut).await
}
#[cfg(test)]
pub(crate) async fn scoped_apply_completion_check_interval_ms_for_test<F, R>(ms: u64, fut: F) -> R
where
F: std::future::Future<Output = R>,
{
APPLY_COMPLETION_CHECK_INTERVAL_OVERRIDE_MS
.scope(ms, fut)
.await
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ApplyCompletionKind {
TasksComplete,
BlockedHandoff,
RejectingHandoff,
}
fn detect_apply_completion(workspace_path: &Path, change_id: &str) -> Option<ApplyCompletionKind> {
if detect_apply_blocked_handoff(workspace_path, change_id).is_some() {
return Some(ApplyCompletionKind::BlockedHandoff);
}
if detect_apply_rejected_handoff(workspace_path, change_id).is_some() {
return Some(ApplyCompletionKind::RejectingHandoff);
}
match check_task_progress(workspace_path, change_id) {
Ok(progress) if is_progress_complete(&progress) => Some(ApplyCompletionKind::TasksComplete),
_ => None,
}
}
pub const DEFAULT_MAX_ITERATIONS: u32 = 50;
#[derive(Debug, Clone)]
pub struct ApplyConfig {
pub max_iterations: u32,
pub progress_commits_enabled: bool,
pub streaming_enabled: bool,
}
impl Default for ApplyConfig {
fn default() -> Self {
Self {
max_iterations: DEFAULT_MAX_ITERATIONS,
progress_commits_enabled: true,
streaming_enabled: false,
}
}
}
impl ApplyConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_max_iterations(mut self, max: u32) -> Self {
self.max_iterations = max;
self
}
pub fn with_progress_commits(mut self, enabled: bool) -> Self {
self.progress_commits_enabled = enabled;
self
}
pub fn with_streaming(mut self, enabled: bool) -> Self {
self.streaming_enabled = enabled;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ApplyIterationResult {
Complete,
Progress { completed: u32, total: u32 },
NoProgress { completed: u32, total: u32 },
Failed { error: String },
}
impl ApplyIterationResult {
pub fn is_complete(&self) -> bool {
matches!(self, ApplyIterationResult::Complete)
}
pub fn is_failed(&self) -> bool {
matches!(self, ApplyIterationResult::Failed { .. })
}
}
pub fn check_task_progress(workspace_path: &Path, change_id: &str) -> Result<TaskProgress> {
let change_dir = workspace_path.join("openspec/changes").join(change_id);
let tasks_path = change_dir.join("tasks.md");
debug!(
change_id = change_id,
workspace_path = %workspace_path.display(),
tasks_path = %tasks_path.display(),
"Checking tasks path in workspace"
);
if tasks_path.exists() {
let progress = crate::task_parser::parse_file(&tasks_path, Some(change_id))?;
debug!(
"Tasks file found for {}: {}/{} complete",
change_id, progress.completed, progress.total
);
return Ok(progress);
}
let archive_root = if change_dir.is_dir() {
change_dir.join("archive")
} else {
workspace_path.join("openspec/changes/archive")
};
let archive_root_exists = archive_root.is_dir();
let latest_archive_dir = if archive_root_exists {
let mut latest: Option<String> = None;
for entry in fs::read_dir(&archive_root)? {
let entry = entry?;
let file_type = entry.file_type()?;
if !file_type.is_dir() {
continue;
}
let name = entry.file_name();
let name = match name.to_str() {
Some(value) => value,
None => continue,
};
if !name.ends_with(change_id) {
continue;
}
if latest
.as_ref()
.is_none_or(|current| name > current.as_str())
{
latest = Some(name.to_string());
}
}
latest
} else {
None
};
if let Some(latest_dir) = latest_archive_dir {
let archive_tasks_path = archive_root.join(latest_dir).join("tasks.md");
if archive_tasks_path.exists() {
let progress = crate::task_parser::parse_file(&archive_tasks_path, Some(change_id))?;
warn!(
"Tasks file for '{}' not found in active change directory; \
falling back to archived copy at '{}' ({}/{} tasks complete). \
This is expected for Archiving state but unexpected for fresh workspaces.",
change_id,
archive_tasks_path.display(),
progress.completed,
progress.total
);
return Ok(progress);
}
}
let change_dir_exists = change_dir.is_dir();
Err(OrchestratorError::AgentCommand(format!(
"Tasks file not found; change_id={}; workspace_path=\"{}\"; tasks_path=\"{}\"; change_dir_exists={}; archive_root=\"{}\"; archive_root_exists={}; exists=false",
change_id,
workspace_path.display(),
tasks_path.display(),
change_dir_exists,
archive_root.display(),
archive_root_exists
)))
}
pub fn format_wip_commit_message(
change_id: &str,
progress: &TaskProgress,
iteration: u32,
) -> String {
format!(
"WIP: {} ({}/{} tasks, apply#{})",
change_id, progress.completed, progress.total, iteration
)
}
pub async fn create_progress_commit<W: WorkspaceManager + ?Sized>(
workspace_manager: &W,
workspace_path: &Path,
change_id: &str,
progress: &TaskProgress,
iteration: u32,
) -> VcsResult<()> {
let commit_message = format_wip_commit_message(change_id, progress, iteration);
debug!(
"Creating progress commit for {}: {}",
change_id, commit_message
);
workspace_manager
.snapshot_working_copy(workspace_path)
.await?;
workspace_manager
.create_iteration_snapshot(
workspace_path,
change_id,
iteration,
progress.completed,
progress.total,
)
.await?;
debug!(
"Progress commit created for {} ({})",
change_id,
workspace_manager.backend_type()
);
Ok(())
}
pub async fn create_final_commit<W: WorkspaceManager + ?Sized>(
workspace_manager: &W,
workspace_path: &Path,
change_id: &str,
) -> VcsResult<()> {
let commit_message = format!("Apply: {}", change_id);
debug!(
"Creating final commit for {}: {}",
change_id, commit_message
);
workspace_manager
.snapshot_working_copy(workspace_path)
.await?;
workspace_manager
.set_commit_message(workspace_path, &commit_message)
.await?;
info!(
"Final commit created for {} ({})",
change_id,
workspace_manager.backend_type()
);
Ok(())
}
pub async fn get_workspace_revision<W: WorkspaceManager + ?Sized>(
workspace_manager: &W,
workspace_path: &Path,
) -> VcsResult<String> {
workspace_manager
.get_revision_in_workspace(workspace_path)
.await
}
pub fn build_apply_prompt(
config: &OrchestratorConfig,
change_id: &str,
history: &str,
acceptance_tail: &str,
) -> String {
let user_prompt = config.get_apply_prompt();
crate::agent::build_apply_prompt(change_id, user_prompt, history, acceptance_tail)
}
pub fn expand_apply_command(template: &str, change_id: &str, prompt: &str) -> String {
let command = OrchestratorConfig::expand_change_id(template, change_id);
OrchestratorConfig::expand_prompt(&command, prompt)
}
pub fn is_progress_complete(progress: &TaskProgress) -> bool {
progress.total > 0 && progress.completed >= progress.total
}
pub fn progress_increased(old: &TaskProgress, new: &TaskProgress) -> bool {
new.completed > old.completed
}
pub fn summarize_output(output: &str, max_lines: usize) -> String {
if output.is_empty() {
return String::new();
}
let lines: Vec<&str> = output.lines().collect();
if lines.len() > max_lines {
let tail_lines = 5.min(lines.len());
format!(
"... ({} lines) ...\n{}",
lines.len(),
lines[lines.len() - tail_lines..].join("\n")
)
} else {
output.to_string()
}
}
pub trait ApplyEventHandler {
fn on_apply_started(&self, change_id: &str, command: &str);
fn on_progress_updated(&self, change_id: &str, completed: u32, total: u32);
fn on_hook_started(&self, change_id: &str, hook_type: &str);
fn on_hook_completed(&self, change_id: &str, hook_type: &str);
fn on_hook_failed(&self, change_id: &str, hook_type: &str, error: &str);
fn on_apply_output(&self, change_id: &str, line: &OutputLine, iteration: u32);
}
pub struct NoOpEventHandler;
impl ApplyEventHandler for NoOpEventHandler {
fn on_apply_started(&self, _change_id: &str, _command: &str) {}
fn on_progress_updated(&self, _change_id: &str, _completed: u32, _total: u32) {}
fn on_hook_started(&self, _change_id: &str, _hook_type: &str) {}
fn on_hook_completed(&self, _change_id: &str, _hook_type: &str) {}
fn on_hook_failed(&self, _change_id: &str, _hook_type: &str, _error: &str) {}
fn on_apply_output(&self, _change_id: &str, _line: &OutputLine, _iteration: u32) {}
}
pub struct ApplyLoopHookContext {
pub changes_processed: usize,
pub total_changes: usize,
pub remaining_changes: usize,
pub workspace_path: Option<String>,
pub group_index: Option<usize>,
}
impl ApplyLoopHookContext {
pub fn serial(
changes_processed: usize,
total_changes: usize,
remaining_changes: usize,
) -> Self {
Self {
changes_processed,
total_changes,
remaining_changes,
workspace_path: None,
group_index: None,
}
}
pub fn parallel(
changes_processed: usize,
total_changes: usize,
remaining_changes: usize,
workspace_path: String,
group_index: usize,
) -> Self {
Self {
changes_processed,
total_changes,
remaining_changes,
workspace_path: Some(workspace_path),
group_index: Some(group_index),
}
}
fn build_hook_context(
&self,
change_id: &str,
completed: u32,
total: u32,
apply_count: u32,
) -> HookContext {
let mut ctx = HookContext::new(
self.changes_processed,
self.total_changes,
self.remaining_changes,
false,
)
.with_change(change_id, completed, total)
.with_apply_count(apply_count);
if let Some(ref workspace_path) = self.workspace_path {
if let Some(group_index) = self.group_index {
ctx = ctx.with_parallel_context(workspace_path, Some(group_index as u32));
}
}
ctx
}
}
#[derive(Debug)]
pub struct ApplyLoopResult {
pub revision: String,
pub completed: bool,
pub iterations: u32,
pub blocked_handoff: Option<ApplyBlockedHandoff>,
pub rejected_handoff: Option<ApplyRejectedHandoff>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ApplyBlockedHandoff {
pub blocker_path: PathBuf,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ApplyRejectedHandoff {
pub rejected_path: PathBuf,
}
fn detect_apply_blocked_handoff(
workspace_path: &Path,
change_id: &str,
) -> Option<ApplyBlockedHandoff> {
let blocker_path = workspace_path
.join("openspec")
.join("changes")
.join(change_id)
.join("APPLY_BLOCKED")
.join("marker.md");
blocker_path
.is_file()
.then_some(ApplyBlockedHandoff { blocker_path })
}
fn detect_apply_rejected_handoff(
workspace_path: &Path,
change_id: &str,
) -> Option<ApplyRejectedHandoff> {
let rejected_path = workspace_path
.join("openspec")
.join("changes")
.join(change_id)
.join("REJECTED.md");
rejected_path
.is_file()
.then_some(ApplyRejectedHandoff { rejected_path })
}
#[allow(clippy::too_many_arguments)]
pub async fn execute_apply_loop<E, F, Fut>(
change_id: &str,
workspace_path: &Path,
config: &OrchestratorConfig,
agent: &mut AgentRunner,
vcs_backend: VcsBackend,
workspace_manager: Option<&dyn WorkspaceManager>,
hooks: Option<&HookRunner>,
hook_ctx: &ApplyLoopHookContext,
event_handler: &E,
cancel_token: Option<&CancellationToken>,
ai_runner: &crate::ai_command_runner::AiCommandRunner,
mut output_handler: F,
) -> Result<ApplyLoopResult>
where
E: ApplyEventHandler,
F: FnMut(OutputLine) -> Fut,
Fut: Future<Output = ()>,
{
let max_iterations = config.get_max_iterations();
let mut iteration = 0;
let mut first_apply = true;
let stall_config = config.get_stall_detection();
let mut stall_detector = StallDetector::new(stall_config.clone());
let mut apply_escalation_uses_for_current_stall = 0_u32;
let mut apply_escalation_started = false;
let is_git = matches!(vcs_backend, VcsBackend::Git);
let apply_succeeded = loop {
iteration += 1;
if cancel_token.is_some_and(|token| token.is_cancelled()) {
return Err(OrchestratorError::AgentCommand(format!(
"Cancelled apply for '{}' in workspace '{}'",
change_id,
workspace_path.display()
)));
}
if iteration > max_iterations {
let error_msg = format!(
"Max iterations ({}) reached for change '{}' in workspace '{}'",
max_iterations,
change_id,
workspace_path.display()
);
if let Some(hook_runner) = hooks {
let progress = check_task_progress(workspace_path, change_id)
.unwrap_or_else(|_| TaskProgress::default());
let error_ctx = hook_ctx
.build_hook_context(change_id, progress.completed, progress.total, iteration)
.with_error(&error_msg);
if let Err(e) = hook_runner.run_hook(HookType::OnError, &error_ctx).await {
error!("on_error hook failed: {}", e);
}
}
return Err(OrchestratorError::AgentCommand(error_msg));
}
let progress = check_task_progress(workspace_path, change_id)?;
if progress.total > 0 {
event_handler.on_progress_updated(change_id, progress.completed, progress.total);
}
if let Some(blocked_handoff) = detect_apply_blocked_handoff(workspace_path, change_id) {
info!(
change_id = change_id,
blocker_path = %blocked_handoff.blocker_path.display(),
completed = progress.completed,
total = progress.total,
"Apply stalled handoff detected via APPLY_BLOCKED marker; exiting apply loop as stalled"
);
break false;
}
if is_progress_complete(&progress) {
info!(
"Change {} is already complete ({}/{})",
change_id, progress.completed, progress.total
);
break true;
}
let current_empty_wip_count = stall_detector.current_count(change_id, StallPhase::Apply);
let escalation_eligible = stall_config.enabled
&& stall_config.apply_escalation_policy_enabled()
&& config.get_apply_escalation_command().is_some()
&& current_empty_wip_count
>= stall_config
.apply_escalation_after_empty_wip
.unwrap_or(u32::MAX)
&& apply_escalation_uses_for_current_stall
< stall_config
.apply_escalation_max_uses_per_stall
.unwrap_or(0);
if escalation_eligible && !apply_escalation_started {
apply_escalation_started = true;
info!(
change_id = change_id,
empty_wip_count = current_empty_wip_count,
trigger = stall_config.apply_escalation_after_empty_wip,
max_uses = stall_config.apply_escalation_max_uses_per_stall,
"Apply empty-WIP escalation starting"
);
}
let stage_label = if escalation_eligible {
"apply_escalation"
} else {
"apply"
};
info!(
"Executing {} #{} for {} ({}/{} tasks, empty_wip_count={}, escalation_uses={})",
stage_label,
iteration,
change_id,
progress.completed,
progress.total,
current_empty_wip_count,
apply_escalation_uses_for_current_stall
);
let (mut child, mut rx, start_time, command) = if escalation_eligible {
apply_escalation_uses_for_current_stall =
apply_escalation_uses_for_current_stall.saturating_add(1);
info!(
change_id = change_id,
iteration = iteration,
escalation_use = apply_escalation_uses_for_current_stall,
"Using apply escalation command for late empty-WIP retry"
);
agent
.run_apply_escalation_streaming_with_runner(
change_id,
ai_runner,
Some(workspace_path),
)
.await?
} else {
agent
.run_apply_streaming_with_runner(change_id, ai_runner, Some(workspace_path))
.await?
};
if first_apply {
first_apply = false;
event_handler.on_apply_started(change_id, &command);
}
if let Some(hook_runner) = hooks {
let current_hook_ctx = hook_ctx.build_hook_context(
change_id,
progress.completed,
progress.total,
iteration,
);
event_handler.on_hook_started(change_id, "pre_apply");
match hook_runner
.run_hook(HookType::PreApply, ¤t_hook_ctx)
.await
{
Ok(()) => {
event_handler.on_hook_completed(change_id, "pre_apply");
}
Err(e) => {
error!("pre_apply hook failed for {}: {}", change_id, e);
event_handler.on_hook_failed(change_id, "pre_apply", &e.to_string());
return Err(e);
}
}
}
let mut output_collector = OutputCollector::new();
let grace_period = apply_completion_grace_period();
let check_interval = apply_completion_check_interval();
let mut completion_kind: Option<ApplyCompletionKind> = None;
let mut completion_deadline: Option<tokio::time::Instant> = None;
let mut early_terminated = false;
let mut next_check_at = tokio::time::Instant::now() + check_interval;
loop {
if completion_kind.is_none() && tokio::time::Instant::now() >= next_check_at {
completion_kind = detect_apply_completion(workspace_path, change_id);
next_check_at = tokio::time::Instant::now() + check_interval;
if let Some(kind) = completion_kind {
completion_deadline = Some(tokio::time::Instant::now() + grace_period);
info!(
change_id = change_id,
kind = ?kind,
grace_secs = grace_period.as_secs(),
"Apply completion observed; starting grace period before terminating lingering apply child"
);
}
}
let wait_deadline = match completion_deadline {
Some(deadline) => deadline,
None => next_check_at,
};
let recv_result = tokio::time::timeout_at(wait_deadline, rx.recv()).await;
match recv_result {
Ok(Some(line)) => {
match &line {
OutputLine::Stdout(s) => output_collector.add_stdout(s),
OutputLine::Stderr(s) => output_collector.add_stderr(s),
}
event_handler.on_apply_output(change_id, &line, iteration);
output_handler(line).await;
}
Ok(None) => break,
Err(_) => {
if let Some(deadline) = completion_deadline {
if tokio::time::Instant::now() >= deadline {
info!(
change_id = change_id,
kind = ?completion_kind,
grace_secs = grace_period.as_secs(),
"Apply completion grace period expired; terminating lingering apply child"
);
let _ = child.terminate();
early_terminated = true;
break;
}
}
}
}
}
while let Ok(line) = rx.try_recv() {
match &line {
OutputLine::Stdout(s) => output_collector.add_stdout(s),
OutputLine::Stderr(s) => output_collector.add_stderr(s),
}
event_handler.on_apply_output(change_id, &line, iteration);
output_handler(line).await;
}
let status = child.wait().await.map_err(|e| {
OrchestratorError::AgentCommand(format!(
"Failed to wait for apply command for '{}' in workspace '{}' (iteration {}): {}",
change_id,
workspace_path.display(),
iteration,
e
))
})?;
let completion_finalized_run = early_terminated && completion_kind.is_some();
agent.record_apply_attempt(
change_id,
&status,
start_time,
output_collector.stdout_tail(),
output_collector.stderr_tail(),
);
let permission_reject = crate::permission::detect_permission_reject(
output_collector.stdout_tail().as_deref(),
output_collector.stderr_tail().as_deref(),
);
if let Some(reject) = &permission_reject {
warn!(
"Permission auto-reject detected for {}: {}",
change_id, reject.denied_path
);
}
if !status.success() && permission_reject.is_none() && !completion_finalized_run {
let error_msg = format!("Apply command failed with exit code: {:?}", status.code());
if let Some(hook_runner) = hooks {
let error_ctx = hook_ctx
.build_hook_context(change_id, progress.completed, progress.total, iteration)
.with_error(&error_msg);
let _ = hook_runner.run_hook(HookType::OnError, &error_ctx).await;
}
return Err(OrchestratorError::AgentCommand(error_msg));
}
let new_progress = check_task_progress(workspace_path, change_id)?;
if new_progress.total > 0 {
event_handler.on_progress_updated(
change_id,
new_progress.completed,
new_progress.total,
);
}
info!(
"After apply #{}: {}/{} tasks complete",
iteration, new_progress.completed, new_progress.total
);
if completion_finalized_run
&& matches!(
completion_kind,
Some(ApplyCompletionKind::BlockedHandoff | ApplyCompletionKind::RejectingHandoff)
)
{
info!(
change_id = change_id,
completion_kind = ?completion_kind,
"Apply loop exiting for non-complete handoff after grace-driven terminate"
);
break false;
}
if detect_apply_rejected_handoff(workspace_path, change_id).is_some() {
info!(
change_id = change_id,
"Apply loop exiting for rejecting handoff after normal apply command exit"
);
break false;
}
if let Some(reject) = permission_reject {
let task_state_changed =
new_progress.completed > progress.completed || new_progress.total != progress.total;
if !task_state_changed {
warn!(
"Permission auto-reject detected for {} but task state unchanged; continuing to next iteration",
change_id
);
warn!("Denied path: {}", reject.denied_path);
warn!("Guidance: {}", reject.format_error_message());
} else {
info!(
"Permission auto-reject detected for {} but task state changed; continuing",
change_id
);
}
if !status.success() {
warn!(
"Apply command for {} exited non-zero after permission auto-reject; continuing to next iteration",
change_id
);
}
}
if let Some(hook_runner) = hooks {
let current_hook_ctx = hook_ctx.build_hook_context(
change_id,
new_progress.completed,
new_progress.total,
iteration,
);
event_handler.on_hook_started(change_id, "post_apply");
match hook_runner
.run_hook(HookType::PostApply, ¤t_hook_ctx)
.await
{
Ok(()) => {
event_handler.on_hook_completed(change_id, "post_apply");
}
Err(e) => {
error!("post_apply hook failed for {}: {}", change_id, e);
event_handler.on_hook_failed(change_id, "post_apply", &e.to_string());
return Err(e);
}
}
}
if is_git {
if let Some(ws_mgr) = workspace_manager {
match create_progress_commit(
ws_mgr,
workspace_path,
change_id,
&new_progress,
iteration,
)
.await
{
Ok(()) => {
let is_empty = if new_progress.completed <= progress.completed {
true
} else {
crate::vcs::git::commands::is_head_empty_commit(workspace_path)
.await
.unwrap_or(false)
};
let reached_threshold = !is_progress_complete(&new_progress)
&& stall_detector.register_commit(
change_id,
StallPhase::Apply,
is_empty,
);
if !is_empty {
apply_escalation_uses_for_current_stall = 0;
apply_escalation_started = false;
}
if reached_threshold {
let count = stall_detector.current_count(change_id, StallPhase::Apply);
let threshold = stall_detector.config().threshold;
let message = format!(
"Stall detected for {} after {} empty WIP commits (apply)",
change_id, count
);
if config.get_apply_stall_diagnose_command().is_some() {
info!(
change_id = change_id,
empty_wip_count = count,
threshold = threshold,
"Running apply stall diagnosis before final empty-WIP stall classification"
);
match agent
.run_apply_stall_diagnose_with_runner(
change_id,
ai_runner,
Some(workspace_path),
)
.await
{
Ok((status, stdout_tail, stderr_tail, diagnose_command)) => {
info!(
change_id = change_id,
success = status.success(),
exit_code = ?status.code(),
command = %diagnose_command,
stdout_tail = ?stdout_tail,
stderr_tail = ?stderr_tail,
"Apply stall diagnosis completed; preserving primary empty-WIP stall outcome"
);
if !status.success() {
warn!(
change_id = change_id,
exit_code = ?status.code(),
"Apply stall diagnosis command failed; primary stall reason remains unchanged"
);
}
}
Err(e) => {
warn!(
change_id = change_id,
error = %e,
"Apply stall diagnosis failed to run; primary stall reason remains unchanged"
);
}
}
}
warn!("{} (threshold {})", message, threshold);
return Err(OrchestratorError::AgentCommand(message));
}
}
Err(e) => {
warn!(
"Failed to create iteration snapshot for {}: {}",
change_id, e
);
}
}
}
} else {
debug!("Skipping WIP snapshot for {} (non-Git backend)", change_id);
}
if is_progress_complete(&new_progress) {
if let Some(hook_runner) = hooks {
let current_hook_ctx = hook_ctx.build_hook_context(
change_id,
new_progress.completed,
new_progress.total,
iteration,
);
event_handler.on_hook_started(change_id, "on_change_complete");
match hook_runner
.run_hook(HookType::OnChangeComplete, ¤t_hook_ctx)
.await
{
Ok(()) => {
event_handler.on_hook_completed(change_id, "on_change_complete");
}
Err(e) => {
error!("on_change_complete hook failed for {}: {}", change_id, e);
event_handler.on_hook_failed(
change_id,
"on_change_complete",
&e.to_string(),
);
return Err(e);
}
}
}
info!(
"Change {} completed after {} iteration(s)",
change_id, iteration
);
break true;
}
if new_progress.completed <= progress.completed && iteration > 1 {
warn!(
"No progress made for {} (still {}/{}), continuing...",
change_id, new_progress.completed, new_progress.total
);
}
};
if apply_succeeded && is_git {
if let Some(ws_mgr) = workspace_manager {
info!(
"Creating final Apply commit for {} after {} iterations",
change_id, iteration
);
if let Err(e) = create_final_commit(ws_mgr, workspace_path, change_id).await {
warn!("Failed to create final commit for {}: {}", change_id, e);
}
}
} else if !apply_succeeded {
info!(
"Apply loop exited without completion for {}; WIP snapshots preserved",
change_id
);
}
let revision = if let Some(ws_mgr) = workspace_manager {
match get_workspace_revision(ws_mgr, workspace_path).await {
Ok(rev) => rev,
Err(e) => {
warn!("Failed to get workspace revision: {}", e);
String::new()
}
}
} else {
String::new()
};
let blocked_handoff = if apply_succeeded {
None
} else {
detect_apply_blocked_handoff(workspace_path, change_id)
};
let rejected_handoff = if apply_succeeded {
None
} else {
detect_apply_rejected_handoff(workspace_path, change_id)
};
Ok(ApplyLoopResult {
revision,
completed: apply_succeeded,
iterations: iteration,
blocked_handoff,
rejected_handoff,
})
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_apply_config_default() {
let config = ApplyConfig::default();
assert_eq!(config.max_iterations, DEFAULT_MAX_ITERATIONS);
assert!(config.progress_commits_enabled);
assert!(!config.streaming_enabled);
}
#[test]
fn test_apply_config_builder() {
let config = ApplyConfig::new()
.with_max_iterations(100)
.with_progress_commits(false)
.with_streaming(true);
assert_eq!(config.max_iterations, 100);
assert!(!config.progress_commits_enabled);
assert!(config.streaming_enabled);
}
#[test]
fn test_apply_iteration_result_complete() {
let result = ApplyIterationResult::Complete;
assert!(result.is_complete());
assert!(!result.is_failed());
}
#[test]
fn test_apply_iteration_result_progress() {
let result = ApplyIterationResult::Progress {
completed: 5,
total: 10,
};
assert!(!result.is_complete());
assert!(!result.is_failed());
}
#[test]
fn test_apply_iteration_result_no_progress() {
let result = ApplyIterationResult::NoProgress {
completed: 5,
total: 10,
};
assert!(!result.is_complete());
assert!(!result.is_failed());
}
#[test]
fn test_apply_iteration_result_failed() {
let result = ApplyIterationResult::Failed {
error: "test error".to_string(),
};
assert!(!result.is_complete());
assert!(result.is_failed());
}
#[test]
fn test_is_progress_complete() {
assert!(!is_progress_complete(&TaskProgress {
completed: 0,
total: 10
}));
assert!(!is_progress_complete(&TaskProgress {
completed: 5,
total: 10
}));
assert!(is_progress_complete(&TaskProgress {
completed: 10,
total: 10
}));
assert!(is_progress_complete(&TaskProgress {
completed: 11,
total: 10
}));
assert!(!is_progress_complete(&TaskProgress {
completed: 0,
total: 0
}));
}
#[test]
fn test_progress_increased() {
let old = TaskProgress {
completed: 3,
total: 10,
};
let new_same = TaskProgress {
completed: 3,
total: 10,
};
let new_increased = TaskProgress {
completed: 5,
total: 10,
};
let new_decreased = TaskProgress {
completed: 2,
total: 10,
};
assert!(!progress_increased(&old, &new_same));
assert!(progress_increased(&old, &new_increased));
assert!(!progress_increased(&old, &new_decreased));
}
#[test]
fn test_summarize_output_empty() {
assert_eq!(summarize_output("", 10), "");
}
#[test]
fn test_summarize_output_short() {
let output = "line1\nline2\nline3";
assert_eq!(summarize_output(output, 10), output);
}
#[test]
fn test_summarize_output_long() {
let output = "1\n2\n3\n4\n5\n6\n7\n8\n9\n10";
let result = summarize_output(output, 5);
assert!(result.contains("(10 lines)"));
assert!(result.contains("6\n7\n8\n9\n10"));
}
#[test]
fn test_progress_commit_message_format() {
let change_id = "add-feature";
let progress = TaskProgress {
completed: 5,
total: 10,
};
let iteration = 3;
let expected = "WIP: add-feature (5/10 tasks, apply#3)";
let actual = format_wip_commit_message(change_id, &progress, iteration);
assert_eq!(actual, expected);
}
#[test]
fn test_progress_commit_message_all_complete() {
let change_id = "fix-bug";
let progress = TaskProgress {
completed: 7,
total: 7,
};
let iteration = 5;
let expected = "WIP: fix-bug (7/7 tasks, apply#5)";
let actual = format_wip_commit_message(change_id, &progress, iteration);
assert_eq!(actual, expected);
}
#[test]
fn test_progress_commit_message_zero_progress() {
let change_id = "new-change";
let progress = TaskProgress {
completed: 0,
total: 5,
};
let iteration = 1;
let expected = "WIP: new-change (0/5 tasks, apply#1)";
let actual = format_wip_commit_message(change_id, &progress, iteration);
assert_eq!(actual, expected);
}
#[test]
fn test_progress_commit_message_special_characters() {
let change_id = "add-web-monitoring-feature";
let progress = TaskProgress {
completed: 50,
total: 70,
};
let iteration = 8;
let expected = "WIP: add-web-monitoring-feature (50/70 tasks, apply#8)";
let actual = format_wip_commit_message(change_id, &progress, iteration);
assert_eq!(actual, expected);
}
#[test]
fn test_detect_apply_blocked_handoff_absent_without_blocked_marker() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
std::fs::create_dir_all(workspace.join("openspec/changes/change-a")).unwrap();
let handoff = detect_apply_blocked_handoff(workspace, "change-a");
assert!(handoff.is_none());
}
#[test]
fn test_detect_apply_blocked_handoff_present_with_blocked_marker() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
let blocker_path = workspace
.join("openspec")
.join("changes")
.join("change-a")
.join("APPLY_BLOCKED")
.join("marker.md");
std::fs::create_dir_all(blocker_path.parent().unwrap()).unwrap();
std::fs::write(&blocker_path, "# APPLY_BLOCKED\n- reason: blocked\n").unwrap();
let handoff = detect_apply_blocked_handoff(workspace, "change-a");
assert!(handoff.is_some());
assert_eq!(
handoff.unwrap().blocker_path,
blocker_path,
"detected handoff should point to APPLY_BLOCKED marker"
);
}
#[test]
fn test_detect_apply_completion_detects_rejected_handoff() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
let change_dir = workspace.join("openspec").join("changes").join("change-a");
std::fs::create_dir_all(&change_dir).unwrap();
std::fs::write(
change_dir.join("tasks.md"),
"## Implementation Tasks\n- [ ] pending\n",
)
.unwrap();
std::fs::write(change_dir.join("REJECTED.md"), "# REJECTED\n").unwrap();
let completion = detect_apply_completion(workspace, "change-a");
assert_eq!(completion, Some(ApplyCompletionKind::RejectingHandoff));
}
#[test]
fn test_apply_blocked_and_rejected_handoffs_are_distinct() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
let change_dir = workspace.join("openspec").join("changes").join("change-a");
let blocked_marker = change_dir.join("APPLY_BLOCKED").join("marker.md");
std::fs::create_dir_all(blocked_marker.parent().unwrap()).unwrap();
std::fs::write(&blocked_marker, "# APPLY_BLOCKED\n").unwrap();
std::fs::write(change_dir.join("REJECTED.md"), "# REJECTED\n").unwrap();
let blocked = detect_apply_blocked_handoff(workspace, "change-a")
.expect("blocked handoff should be present");
let rejected = detect_apply_rejected_handoff(workspace, "change-a")
.expect("rejected handoff should be present");
assert_eq!(blocked.blocker_path, blocked_marker);
assert_eq!(rejected.rejected_path, change_dir.join("REJECTED.md"));
assert_ne!(
blocked.blocker_path, rejected.rejected_path,
"blocked and rejected handoff artifacts must stay distinct"
);
}
#[tokio::test]
async fn test_apply_loop_rejected_handoff_skips_empty_wip_stall() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
let change_id = "rejected-change";
let change_dir = workspace.join("openspec").join("changes").join(change_id);
std::fs::create_dir_all(&change_dir).unwrap();
std::fs::write(
change_dir.join("tasks.md"),
"## Implementation Tasks\n- [ ] pending\n",
)
.unwrap();
std::fs::write(change_dir.join("REJECTED.md"), "# REJECTED\n").unwrap();
let config = OrchestratorConfig {
apply_command: Some("echo apply {change_id}".to_string()),
..Default::default()
};
let mut agent = AgentRunner::new(config.clone());
let ai_runner = make_test_ai_runner();
let result = execute_apply_loop(
change_id,
workspace,
&config,
&mut agent,
VcsBackend::Auto,
None,
None,
&ApplyLoopHookContext::serial(0, 1, 1),
&NoOpEventHandler,
None,
&ai_runner,
|_line| async move {},
)
.await
.expect("apply loop should return rejecting handoff without stall error");
assert!(
!result.completed,
"rejected handoff must not mark apply complete"
);
assert_eq!(
result.iterations, 1,
"rejected handoff should exit before retry/stall loop"
);
assert!(result.blocked_handoff.is_none());
assert!(
result.rejected_handoff.is_some(),
"rejected handoff metadata must be returned"
);
}
#[tokio::test]
async fn test_execute_apply_loop_returns_blocked_handoff_without_stall_loop() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
let change_id = "blocked-change";
let change_dir = workspace.join("openspec").join("changes").join(change_id);
let blocked_dir = change_dir.join("APPLY_BLOCKED");
std::fs::create_dir_all(&blocked_dir).unwrap();
std::fs::write(
change_dir.join("tasks.md"),
"## Implementation Tasks\n- [ ] pending\n",
)
.unwrap();
std::fs::write(
blocked_dir.join("marker.md"),
"# APPLY_BLOCKED\n\n- change_id: blocked-change\n- reason: apply blocked\n",
)
.unwrap();
let config = OrchestratorConfig::default();
let mut agent = AgentRunner::new(config.clone());
let ai_runner = make_test_ai_runner();
let result = execute_apply_loop(
change_id,
workspace,
&config,
&mut agent,
VcsBackend::Auto,
None,
None,
&ApplyLoopHookContext::serial(0, 1, 1),
&NoOpEventHandler,
None,
&ai_runner,
|_line| async move {},
)
.await
.expect("apply loop should return blocked handoff without error");
assert!(
!result.completed,
"blocked handoff should not be treated as completed apply"
);
assert_eq!(
result.iterations, 1,
"blocked handoff should exit before retry/stall loop"
);
assert!(
result.blocked_handoff.is_some(),
"blocked handoff metadata must be returned"
);
}
fn make_test_ai_runner() -> crate::ai_command_runner::AiCommandRunner {
let queue_config = crate::command_queue::CommandQueueConfig {
stagger_delay_ms: 0,
max_retries: 0,
retry_delay_ms: 0,
retry_error_patterns: Vec::new(),
retry_if_duration_under_secs: 0,
inactivity_timeout_secs: 0,
inactivity_kill_grace_secs: 0,
inactivity_timeout_max_retries: 0,
strict_process_cleanup: false,
};
let shared_state = std::sync::Arc::new(tokio::sync::Mutex::new(None));
crate::ai_command_runner::AiCommandRunner::new(queue_config, shared_state)
}
fn init_git_repo(path: &Path) {
std::process::Command::new("git")
.args(["init"])
.current_dir(path)
.output()
.expect("git init should run");
std::process::Command::new("git")
.args(["config", "user.email", "test@example.com"])
.current_dir(path)
.output()
.expect("git config user.email should run");
std::process::Command::new("git")
.args(["config", "user.name", "Test User"])
.current_dir(path)
.output()
.expect("git config user.name should run");
std::fs::write(path.join("README.md"), "initial\n").unwrap();
std::process::Command::new("git")
.args(["add", "README.md"])
.current_dir(path)
.output()
.expect("git add should run");
let output = std::process::Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(path)
.output()
.expect("git commit should run");
assert!(
output.status.success(),
"initial commit failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
#[cfg_attr(windows, ignore)]
#[tokio::test]
async fn test_apply_loop_uses_escalation_command_on_late_empty_wip_retries() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
init_git_repo(workspace);
let change_id = "escalate-change";
let change_dir = workspace.join("openspec").join("changes").join(change_id);
std::fs::create_dir_all(&change_dir).unwrap();
std::fs::write(
change_dir.join("tasks.md"),
"## Implementation Tasks\n- [ ] pending\n",
)
.unwrap();
std::process::Command::new("git")
.args(["add", "openspec"])
.current_dir(workspace)
.output()
.expect("git add openspec should run");
std::process::Command::new("git")
.args(["commit", "-m", "add change"])
.current_dir(workspace)
.output()
.expect("git commit change should run");
let command_log_path = temp_dir.path().join("command.log");
let touch_path = workspace.join("touched.txt");
let marker_path = temp_dir.path().join("base_once_marker");
let config = OrchestratorConfig {
apply_command: Some(format!(
"sh -c 'if [ ! -f {} ]; then echo x > {}; touch {}; fi; echo base >> {}'",
marker_path.display(),
touch_path.display(),
marker_path.display(),
command_log_path.display()
)),
apply_escalation_command: Some(format!(
"sh -c 'echo escalation >> {}'",
command_log_path.display()
)),
stall_detection: Some(crate::config::StallDetectionConfig {
enabled: true,
threshold: 3,
apply_escalation_after_empty_wip: Some(1),
apply_escalation_max_uses_per_stall: Some(2),
}),
max_iterations: Some(10),
..Default::default()
};
let mut agent = AgentRunner::new(config.clone());
let ai_runner = make_test_ai_runner();
let workspace_manager = crate::vcs::git::GitWorkspaceManager::new(
temp_dir.path().join("worktrees"),
workspace.to_path_buf(),
1,
config.clone(),
);
let err = execute_apply_loop(
change_id,
workspace,
&config,
&mut agent,
VcsBackend::Git,
Some(&workspace_manager),
None,
&ApplyLoopHookContext::serial(0, 1, 1),
&NoOpEventHandler,
None,
&ai_runner,
|_line| async move {},
)
.await
.expect_err("empty WIP commits should eventually stall");
let command_log = std::fs::read_to_string(&command_log_path).unwrap_or_default();
let lines: Vec<_> = command_log.lines().collect();
assert!(
lines.contains(&"base"),
"base command should run while optional escalation config remains silent if Git empty-commit inspection is unavailable; err={err}; command_log={command_log:?}"
);
}
#[cfg_attr(windows, ignore)]
#[tokio::test]
async fn test_apply_loop_runs_diagnosis_once_and_preserves_stall_error() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
init_git_repo(workspace);
let change_id = "diagnose-change";
let change_dir = workspace.join("openspec").join("changes").join(change_id);
std::fs::create_dir_all(&change_dir).unwrap();
std::fs::write(
change_dir.join("tasks.md"),
"## Implementation Tasks\n- [ ] pending\n",
)
.unwrap();
std::process::Command::new("git")
.args(["add", "openspec"])
.current_dir(workspace)
.output()
.expect("git add openspec should run");
std::process::Command::new("git")
.args(["commit", "-m", "add change"])
.current_dir(workspace)
.output()
.expect("git commit change should run");
let command_log_path = temp_dir.path().join("command.log");
let diagnose_log_path = temp_dir.path().join("diagnose.log");
let touch_path = workspace.join("touched.txt");
let config = OrchestratorConfig {
apply_command: Some(format!(
"sh -c 'if [ ! -f {} ]; then echo x > {}; fi; echo base >> {}'",
command_log_path.display(),
touch_path.display(),
command_log_path.display()
)),
apply_stall_diagnose_command: Some(format!(
"sh -c 'echo diagnose >> {}; exit 7'",
diagnose_log_path.display()
)),
stall_detection: Some(crate::config::StallDetectionConfig {
enabled: true,
threshold: 2,
apply_escalation_after_empty_wip: None,
apply_escalation_max_uses_per_stall: None,
}),
max_iterations: Some(10),
..Default::default()
};
let mut agent = AgentRunner::new(config.clone());
let ai_runner = make_test_ai_runner();
let workspace_manager = crate::vcs::git::GitWorkspaceManager::new(
temp_dir.path().join("worktrees"),
workspace.to_path_buf(),
1,
config.clone(),
);
let err = execute_apply_loop(
change_id,
workspace,
&config,
&mut agent,
VcsBackend::Git,
Some(&workspace_manager),
None,
&ApplyLoopHookContext::serial(0, 1, 1),
&NoOpEventHandler,
None,
&ai_runner,
|_line| async move {},
)
.await
.expect_err("empty WIP commits should stall after diagnosis");
assert!(
err.to_string().contains("Stall detected")
|| err.to_string().contains("Max iterations"),
"unexpected apply-loop error: {err}"
);
if diagnose_log_path.exists() {
let diagnose_log = std::fs::read_to_string(&diagnose_log_path).unwrap();
assert_eq!(diagnose_log.lines().collect::<Vec<_>>(), ["diagnose"]);
}
}
#[cfg_attr(windows, ignore)]
#[tokio::test]
async fn test_execute_apply_loop_terminates_lingering_child_after_tasks_complete() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
let change_id = "linger-complete";
let change_dir = workspace.join("openspec").join("changes").join(change_id);
std::fs::create_dir_all(&change_dir).unwrap();
std::fs::write(
change_dir.join("tasks.md"),
"## Implementation Tasks\n- [ ] one\n",
)
.unwrap();
let apply_cmd = "sh -c 'printf \"## Implementation Tasks\\n- [x] one\\n\" > openspec/changes/{change_id}/tasks.md; echo applied; sleep 120'".to_string();
let config = OrchestratorConfig {
apply_command: Some(apply_cmd),
..Default::default()
};
let mut agent = AgentRunner::new(config.clone());
let ai_runner = make_test_ai_runner();
let start = std::time::Instant::now();
let result = scoped_apply_completion_grace_secs_for_test(
1,
scoped_apply_completion_check_interval_ms_for_test(
200,
execute_apply_loop(
change_id,
workspace,
&config,
&mut agent,
VcsBackend::Auto,
None,
None,
&ApplyLoopHookContext::serial(0, 1, 1),
&NoOpEventHandler,
None,
&ai_runner,
|_line| async move {},
),
),
)
.await
.expect("apply loop must finish without error despite lingering child");
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(20),
"apply loop must exit within grace period + buffer, but took {:?}",
elapsed
);
assert!(
result.completed,
"tasks-complete run must be reported as completed"
);
assert!(
result.blocked_handoff.is_none(),
"tasks-complete run must not report a blocked handoff"
);
assert_eq!(
result.iterations, 1,
"tasks-complete grace-terminated run should exit in a single iteration"
);
}
#[cfg_attr(windows, ignore)]
#[tokio::test]
async fn test_execute_apply_loop_terminates_lingering_child_after_blocked_handoff() {
let temp_dir = TempDir::new().unwrap();
let workspace = temp_dir.path();
let change_id = "linger-blocked";
let change_dir = workspace.join("openspec").join("changes").join(change_id);
std::fs::create_dir_all(&change_dir).unwrap();
std::fs::write(
change_dir.join("tasks.md"),
"## Implementation Tasks\n- [ ] one\n",
)
.unwrap();
let apply_cmd = "sh -c 'mkdir -p openspec/changes/{change_id}/APPLY_BLOCKED; printf \"# APPLY_BLOCKED\\n\\n- change_id: linger-blocked\\n- reason: test\\n\" > openspec/changes/{change_id}/APPLY_BLOCKED/marker.md; echo blocked; sleep 120'".to_string();
let config = OrchestratorConfig {
apply_command: Some(apply_cmd),
..Default::default()
};
let mut agent = AgentRunner::new(config.clone());
let ai_runner = make_test_ai_runner();
let start = std::time::Instant::now();
let result = scoped_apply_completion_grace_secs_for_test(
1,
scoped_apply_completion_check_interval_ms_for_test(
200,
execute_apply_loop(
change_id,
workspace,
&config,
&mut agent,
VcsBackend::Auto,
None,
None,
&ApplyLoopHookContext::serial(0, 1, 1),
&NoOpEventHandler,
None,
&ai_runner,
|_line| async move {},
),
),
)
.await
.expect("apply loop must finish without error despite lingering child");
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(20),
"blocked-handoff apply loop must exit within grace period + buffer, but took {:?}",
elapsed
);
assert!(
!result.completed,
"blocked-handoff grace-terminated run must not be reported as completed"
);
assert!(
result.blocked_handoff.is_some(),
"blocked-handoff grace-terminated run must expose blocker_path"
);
assert_eq!(
result.iterations, 1,
"blocked-handoff grace-terminated run should exit in a single iteration"
);
}
}