use crate::agent::AgentRunner;
use crate::ai_command_runner::{AiCommandRunner, SharedStaggerState};
use crate::command_queue::CommandQueueConfig;
use crate::config::defaults::*;
use crate::config::OrchestratorConfig;
use crate::error::{OrchestratorError, Result};
use crate::error_history::CircuitBreakerConfig;
use crate::events::{cli_event_sinks, dispatch_event, ExecutionEvent};
use crate::execution::apply::{check_task_progress, create_progress_commit};
use crate::hooks::{HookContext, HookRunner, HookType};
use crate::openspec::{self, Change};
use crate::orchestration::state::OrchestratorState;
use crate::orchestration::LogOutputHandler;
use crate::parallel_run_service::ParallelRunService;
use crate::progress::ProgressDisplay;
use crate::serial_run_service::SerialRunService;
use crate::stall::StallDetector;
use crate::task_parser::TaskProgress;
use crate::tui::log_deduplicator;
use crate::vcs::git::commands as git_commands;
use crate::vcs::{GitWorkspaceManager, VcsBackend, WorkspaceManager};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
#[cfg(feature = "web-monitoring")]
use crate::web::WebState;
#[cfg(feature = "web-monitoring")]
use tokio::sync::mpsc;
struct SerialSnapshot {
progress: crate::task_parser::TaskProgress,
empty_commit: Option<bool>,
}
pub struct Orchestrator {
agent: AgentRunner,
ai_runner: AiCommandRunner,
config: OrchestratorConfig,
progress: Option<ProgressDisplay>,
target_changes: Option<Vec<String>>,
initial_change_ids: Option<HashSet<String>>,
hooks: HookRunner,
stall_detector: StallDetector,
max_iterations: u32,
parallel: bool,
max_concurrent: Option<usize>,
dry_run: bool,
#[allow(dead_code)] vcs_backend: VcsBackend,
no_resume: bool,
shared_state: std::sync::Arc<tokio::sync::RwLock<OrchestratorState>>,
#[cfg(feature = "web-monitoring")]
web_state: Option<Arc<WebState>>,
#[cfg(feature = "web-monitoring")]
execution_mode: String,
}
enum LoopControl {
Continue,
Break { finish_status: &'static str },
}
impl Orchestrator {
#[allow(clippy::too_many_arguments)]
pub fn new(
target_changes: Option<Vec<String>>,
config_path: Option<PathBuf>,
max_iterations_override: Option<u32>,
parallel: bool,
max_concurrent: Option<usize>,
dry_run: bool,
vcs_override: Option<VcsBackend>,
no_resume: bool,
) -> Result<Self> {
let config = OrchestratorConfig::load(config_path.as_deref())?;
log_deduplicator::configure_logging(config.get_logging());
let repo_root = std::env::current_dir()?;
let hooks = HookRunner::with_output_handler(
config.get_hooks(),
&repo_root,
Arc::new(LogOutputHandler::new()),
);
let max_iterations = max_iterations_override.unwrap_or_else(|| config.get_max_iterations());
let agent = AgentRunner::new(config.clone());
let vcs_backend = vcs_override.unwrap_or_else(|| config.get_vcs_backend());
let stall_detector = StallDetector::new(config.get_stall_detection());
let shared_stagger_state: SharedStaggerState = Arc::new(Mutex::new(None));
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state);
ai_runner.set_stream_json_textify(config.get_stream_json_textify());
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
let shared_state = std::sync::Arc::new(tokio::sync::RwLock::new(OrchestratorState::new(
Vec::new(),
max_iterations,
)));
Ok(Self {
agent,
ai_runner,
config,
progress: None,
target_changes,
initial_change_ids: None,
hooks,
stall_detector,
max_iterations,
parallel,
max_concurrent,
dry_run,
vcs_backend,
no_resume,
shared_state,
#[cfg(feature = "web-monitoring")]
web_state: None,
#[cfg(feature = "web-monitoring")]
execution_mode: "select".to_string(),
})
}
#[cfg(feature = "web-monitoring")]
pub async fn set_web_state(&mut self, web_state: Arc<WebState>) {
web_state.set_shared_state(self.shared_state.clone()).await;
self.web_state = Some(web_state);
}
#[cfg(feature = "web-monitoring")]
async fn broadcast_state_update(&self, changes: &[Change]) {
if let Some(ref web_state) = self.web_state {
web_state
.update_with_mode(changes, &self.execution_mode)
.await;
}
}
#[cfg(not(feature = "web-monitoring"))]
async fn broadcast_state_update(&self, _changes: &[Change]) {}
#[cfg(test)]
pub fn with_config(
target_changes: Option<Vec<String>>,
config: OrchestratorConfig,
) -> Result<Self> {
log_deduplicator::configure_logging(config.get_logging());
let repo_root = std::env::current_dir()?;
let hooks = HookRunner::with_output_handler(
config.get_hooks(),
&repo_root,
Arc::new(LogOutputHandler::new()),
);
let max_iterations = config.get_max_iterations();
let agent = AgentRunner::new(config.clone());
let stall_detector = StallDetector::new(config.get_stall_detection());
let shared_stagger_state: SharedStaggerState = Arc::new(Mutex::new(None));
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state);
ai_runner.set_stream_json_textify(config.get_stream_json_textify());
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
let shared_state = std::sync::Arc::new(tokio::sync::RwLock::new(OrchestratorState::new(
Vec::new(),
max_iterations,
)));
Ok(Self {
agent,
ai_runner,
config,
progress: None,
target_changes,
initial_change_ids: None,
hooks,
stall_detector,
max_iterations,
parallel: false,
max_concurrent: None,
dry_run: false,
vcs_backend: VcsBackend::Auto,
no_resume: false,
shared_state,
#[cfg(feature = "web-monitoring")]
web_state: None,
#[cfg(feature = "web-monitoring")]
execution_mode: "select".to_string(),
})
}
#[cfg(feature = "web-monitoring")]
async fn update_execution_mode(&mut self, mode: &str) {
self.execution_mode = mode.to_string();
let current_changes = openspec::list_changes_native().unwrap_or_default();
self.broadcast_state_update(¤t_changes).await;
}
async fn check_graceful_stop(
&mut self,
graceful_stop_flag: &Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
previous_graceful_stop: &mut bool,
) -> LoopControl {
if let Some(ref graceful_flag) = graceful_stop_flag {
let current_graceful_stop = graceful_flag.load(std::sync::atomic::Ordering::SeqCst);
if current_graceful_stop && !*previous_graceful_stop {
info!("Graceful stop requested, entering stopping state");
#[cfg(feature = "web-monitoring")]
self.update_execution_mode("stopping").await;
}
if !current_graceful_stop && *previous_graceful_stop {
info!("Graceful stop cancelled, resuming running state");
#[cfg(feature = "web-monitoring")]
self.update_execution_mode("running").await;
}
*previous_graceful_stop = current_graceful_stop;
if current_graceful_stop {
info!("Graceful stop: stopping after current change");
#[cfg(feature = "web-monitoring")]
self.update_execution_mode("stopped").await;
if let Some(progress) = &mut self.progress {
progress.complete_all();
}
return LoopControl::Break {
finish_status: "graceful_stop",
};
}
}
LoopControl::Continue
}
async fn check_cancellation(
&mut self,
cancel_token: &tokio_util::sync::CancellationToken,
) -> LoopControl {
if cancel_token.is_cancelled() {
info!("Cancellation requested, stopping orchestration");
#[cfg(feature = "web-monitoring")]
self.update_execution_mode("stopped").await;
if let Some(progress) = &mut self.progress {
progress.complete_all();
}
return LoopControl::Break {
finish_status: "cancelled",
};
}
LoopControl::Continue
}
async fn check_max_iterations(&mut self) -> LoopControl {
let mut state = self.shared_state.write().await;
state.increment_iteration();
let iteration = state.iteration();
let max_iterations = state.max_iterations();
drop(state);
if max_iterations > 0 {
let warning_threshold = (max_iterations as f32 * 0.8) as u32;
if iteration == warning_threshold {
warn!(
"Approaching max iterations: {}/{}",
iteration, max_iterations
);
}
if iteration > max_iterations {
info!(
"Max iterations ({}) reached, stopping orchestration",
max_iterations
);
if let Some(progress) = &mut self.progress {
progress.complete_all();
}
return LoopControl::Break {
finish_status: "iteration_limit",
};
}
}
LoopControl::Continue
}
async fn check_loop_controls(
&mut self,
graceful_stop_flag: &Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
previous_graceful_stop: &mut bool,
cancel_token: &tokio_util::sync::CancellationToken,
) -> LoopControl {
match self
.check_graceful_stop(graceful_stop_flag, previous_graceful_stop)
.await
{
LoopControl::Continue => {}
break_control => return break_control,
}
match self.check_cancellation(cancel_token).await {
LoopControl::Continue => {}
break_control => return break_control,
}
self.check_max_iterations().await
}
async fn update_shared_state(&self, event: ExecutionEvent) {
let sinks: Vec<std::sync::Arc<dyn crate::events::EventSink>> = cli_event_sinks();
dispatch_event(self.shared_state.as_ref(), &sinks, event).await;
}
async fn handle_archived(&mut self, next: &Change) {
self.update_shared_state(ExecutionEvent::ChangeArchived(next.id.clone()))
.await;
self.stall_detector.clear_change(&next.id);
if let Some(progress) = &mut self.progress {
progress.archive_change(&next.id);
}
}
async fn handle_stalled(&mut self, next: &Change, error: &str) -> LoopControl {
warn!("Change stalled: {} - {}", next.id, error);
self.mark_change_stalled(&next.id, error).await;
LoopControl::Continue
}
async fn handle_failed(&mut self, next: &Change, error: &str) -> Result<()> {
error!("Change failed: {} - {}", next.id, error);
if let Some(progress) = &mut self.progress {
progress.error(&format!("Failed: {}", next.id));
}
#[cfg(feature = "web-monitoring")]
self.update_execution_mode("error").await;
Err(OrchestratorError::AgentCommand(error.to_string()))
}
async fn handle_apply_success_incomplete(
&mut self,
next: &Change,
serial_service: &mut SerialRunService,
) -> LoopControl {
self.update_shared_state(ExecutionEvent::ApplyCompleted {
change_id: next.id.clone(),
revision: "serial".to_string(),
})
.await;
let apply_count = serial_service.apply_count(&next.id);
let snapshot = match self.snapshot_serial_iteration(&next.id, apply_count).await {
Ok(snapshot) => snapshot,
Err(e) => {
warn!("Failed to snapshot WIP commit for {}: {}", next.id, e);
SerialSnapshot {
progress: crate::task_parser::TaskProgress::default(),
empty_commit: None,
}
}
};
if let Some(stall_reason) = serial_service.check_stall_after_apply(
&next.id,
&snapshot.progress,
snapshot.empty_commit,
) {
warn!("{}", stall_reason);
self.mark_change_stalled(&next.id, &stall_reason).await;
return LoopControl::Continue;
}
if let Some(progress) = &mut self.progress {
progress.complete_change(&next.id);
}
LoopControl::Continue
}
async fn handle_apply_failed(
&mut self,
next: &Change,
error: &str,
serial_service: &mut SerialRunService,
) -> Result<()> {
self.update_shared_state(ExecutionEvent::ApplyStarted {
change_id: next.id.clone(),
command: "(placeholder)".to_string(),
})
.await;
let apply_count = serial_service.apply_count(&next.id);
if let Err(e) = self.snapshot_serial_iteration(&next.id, apply_count).await {
warn!("Failed to snapshot WIP commit for {}: {}", next.id, e);
}
if self
.record_error_and_check_circuit_breaker(&next.id, error)
.await
{
let message = format!(
"Circuit breaker opened for '{}' due to repeated errors",
next.id
);
warn!("{}", message);
self.mark_change_stalled(&next.id, &message).await;
serial_service.mark_stalled(&next.id, &message);
return Ok(());
}
error!("Apply failed for {}: {}", next.id, error);
if let Some(progress) = &mut self.progress {
progress.error(&format!("Apply failed: {}", next.id));
}
#[cfg(feature = "web-monitoring")]
self.update_execution_mode("error").await;
Err(OrchestratorError::AgentCommand(error.to_string()))
}
async fn handle_acceptance_result(
&mut self,
next: &Change,
serial_service: &mut SerialRunService,
result: &crate::serial_run_service::ChangeProcessResult,
) {
use crate::serial_run_service::ChangeProcessResult;
self.update_shared_state(ExecutionEvent::ApplyCompleted {
change_id: next.id.clone(),
revision: "serial".to_string(),
})
.await;
match result {
ChangeProcessResult::AcceptancePassed => {
let apply_count = serial_service.apply_count(&next.id);
let _ = self.squash_serial_wip_commits(&next.id, apply_count).await;
info!("Acceptance passed for {}, ready for archive", next.id);
}
ChangeProcessResult::AcceptanceContinue => {
info!(
"Acceptance requires continuation for {}, retrying...",
next.id
);
}
ChangeProcessResult::AcceptanceContinueExceeded => {
warn!(
"Acceptance CONTINUE limit exceeded for {}, treating as FAIL",
next.id
);
}
ChangeProcessResult::Rejected { reason } => {
info!(
"Acceptance blocked for {} - rejected flow completed: {}",
next.id, reason
);
self.update_shared_state(ExecutionEvent::ChangeRejected {
change_id: next.id.clone(),
reason: reason.clone(),
})
.await;
}
ChangeProcessResult::AcceptanceFailed { .. } => {
info!("Acceptance failed for {}, will retry apply", next.id);
}
ChangeProcessResult::AcceptanceCommandFailed { .. } => {
info!(
"Acceptance command failed for {}, will retry apply",
next.id
);
}
_ => {}
}
if let Some(progress) = &mut self.progress {
progress.complete_change(&next.id);
}
}
async fn initialize_run_loop(
&mut self,
initial_changes: Vec<Change>,
) -> Result<(Vec<Change>, SerialRunService, usize)> {
let filtered_initial = if let Some(targets) = &self.target_changes {
let mut found = Vec::new();
for target in targets {
let trimmed = target.trim();
if let Some(change) = initial_changes.iter().find(|c| c.id == trimmed) {
found.push(change.clone());
} else {
warn!("Specified change '{}' not found, skipping", trimmed);
}
}
found
} else {
initial_changes
};
if filtered_initial.is_empty() {
let repo_root = std::env::current_dir()?;
let serial_service = SerialRunService::new(repo_root, self.config.clone());
return Ok((filtered_initial, serial_service, 0));
}
let snapshot_ids: HashSet<String> = filtered_initial.iter().map(|c| c.id.clone()).collect();
info!(
"Captured snapshot of {} changes: {:?}",
snapshot_ids.len(),
snapshot_ids
);
self.initial_change_ids = Some(snapshot_ids.clone());
let change_ids: Vec<String> = filtered_initial.iter().map(|c| c.id.clone()).collect();
*self.shared_state.write().await = OrchestratorState::new(change_ids, self.max_iterations);
self.progress = Some(ProgressDisplay::new(filtered_initial.len()));
let total_changes = filtered_initial.len();
let repo_root = std::env::current_dir()?;
let serial_service = SerialRunService::new(repo_root, self.config.clone());
Ok((filtered_initial, serial_service, total_changes))
}
async fn handle_change_result(
&mut self,
result: crate::serial_run_service::ChangeProcessResult,
next: &Change,
serial_service: &mut SerialRunService,
) -> Result<LoopControl> {
use crate::serial_run_service::ChangeProcessResult;
match result {
ChangeProcessResult::Archived => {
self.handle_archived(next).await;
Ok(LoopControl::Continue)
}
ChangeProcessResult::Stalled { error } => Ok(self.handle_stalled(next, &error).await),
ChangeProcessResult::Failed { error } => {
self.handle_failed(next, &error).await?;
Ok(LoopControl::Continue)
}
ChangeProcessResult::Cancelled => {
info!("Processing cancelled for {}", next.id);
Ok(LoopControl::Break {
finish_status: "cancelled",
})
}
ChangeProcessResult::ChangeStopped => {
info!("Change {} stopped", next.id);
Ok(LoopControl::Break {
finish_status: "stopped",
})
}
ChangeProcessResult::ApplySuccessIncomplete => Ok(self
.handle_apply_success_incomplete(next, serial_service)
.await),
ChangeProcessResult::ApplyFailed { error } => {
self.handle_apply_failed(next, &error, serial_service)
.await?;
Ok(LoopControl::Continue)
}
ChangeProcessResult::AcceptancePassed
| ChangeProcessResult::AcceptanceContinue
| ChangeProcessResult::AcceptanceContinueExceeded
| ChangeProcessResult::AcceptanceFailed { .. }
| ChangeProcessResult::AcceptanceCommandFailed { .. }
| ChangeProcessResult::Rejected { .. } => {
self.handle_acceptance_result(next, serial_service, &result)
.await;
Ok(LoopControl::Continue)
}
}
}
pub async fn run(
&mut self,
cancel_token: tokio_util::sync::CancellationToken,
graceful_stop_flag: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
) -> Result<()> {
info!("Starting orchestration loop");
#[cfg(feature = "web-monitoring")]
{
self.execution_mode = "running".to_string();
}
let initial_changes = openspec::list_changes_native()?;
if self.parallel && self.dry_run {
return self.run_parallel_dry_run(&initial_changes).await;
}
if self.parallel {
return self
.run_parallel(&initial_changes, cancel_token, graceful_stop_flag)
.await;
}
if initial_changes.is_empty() {
info!("No changes found");
return Ok(());
}
let (filtered_initial, mut serial_service, total_changes) =
self.initialize_run_loop(initial_changes).await?;
if filtered_initial.is_empty() {
info!("No changes found matching specified targets");
return Ok(());
}
let start_context = HookContext::new(0, total_changes, total_changes, false);
self.hooks
.run_hook(HookType::OnStart, &start_context)
.await?;
let finish_status;
let mut previous_graceful_stop = false;
loop {
match self
.check_loop_controls(
&graceful_stop_flag,
&mut previous_graceful_stop,
&cancel_token,
)
.await
{
LoopControl::Continue => {}
LoopControl::Break {
finish_status: status,
} => {
finish_status = status;
break;
}
}
let (next, remaining_changes) =
match self.refetch_and_select_change(&mut serial_service).await? {
Some(result) => result,
None => {
finish_status = "completed";
break;
}
};
let is_new_change = {
let state = self.shared_state.read().await;
state.current_change_id() != Some(&next.id)
};
if is_new_change {
self.update_shared_state(ExecutionEvent::ProcessingStarted(next.id.clone()))
.await;
}
let output = LogOutputHandler::new();
let cancel_check = || false; let is_single_change_stopped = || false;
let result = serial_service
.process_change(
&next,
&mut self.agent,
&self.ai_runner,
&self.hooks,
&output,
total_changes,
remaining_changes,
cancel_check,
is_single_change_stopped,
None, )
.await?;
match self
.handle_change_result(result, &next, &mut serial_service)
.await?
{
LoopControl::Continue => {}
LoopControl::Break {
finish_status: status,
} => {
finish_status = status;
break;
}
}
}
let processed = self.shared_state.read().await.changes_processed();
let finish_context =
HookContext::new(processed, total_changes, 0, false).with_status(finish_status);
self.hooks
.run_hook(HookType::OnFinish, &finish_context)
.await?;
#[cfg(feature = "web-monitoring")]
self.update_execution_mode("stopped").await;
info!("Orchestration completed");
Ok(())
}
async fn snapshot_serial_iteration(
&self,
change_id: &str,
iteration: u32,
) -> Result<SerialSnapshot> {
let repo_root = std::env::current_dir()?;
let progress =
check_task_progress(&repo_root, change_id).unwrap_or_else(|_| TaskProgress::default());
let mut empty_commit = None;
if matches!(self.vcs_backend, VcsBackend::Git | VcsBackend::Auto) {
let is_git_repo = match git_commands::check_git_repo(&repo_root).await {
Ok(is_repo) => is_repo,
Err(e) => {
warn!("Failed to check Git repository status: {}", e);
false
}
};
if is_git_repo {
let workspace_manager = GitWorkspaceManager::new(
repo_root.join(".openspec-worktrees"),
repo_root.clone(),
1,
self.config.clone(),
);
if let Err(e) = create_progress_commit(
&workspace_manager,
&repo_root,
change_id,
&progress,
iteration,
)
.await
{
warn!(
"Failed to create WIP commit for {} (apply#{}): {}",
change_id, iteration, e
);
} else {
match git_commands::is_head_empty_commit(&repo_root).await {
Ok(is_empty) => empty_commit = Some(is_empty),
Err(e) => {
warn!(
"Failed to check WIP commit contents for {} (apply#{}): {}",
change_id, iteration, e
);
}
}
}
}
}
Ok(SerialSnapshot {
progress,
empty_commit,
})
}
async fn squash_serial_wip_commits(&self, change_id: &str, iteration: u32) -> Result<()> {
if !matches!(self.vcs_backend, VcsBackend::Git | VcsBackend::Auto) {
return Ok(());
}
let repo_root = std::env::current_dir()?;
let is_git_repo = match git_commands::check_git_repo(&repo_root).await {
Ok(is_repo) => is_repo,
Err(e) => {
warn!("Failed to check Git repository status: {}", e);
false
}
};
if !is_git_repo {
return Ok(());
}
let workspace_manager = GitWorkspaceManager::new(
repo_root.join(".openspec-worktrees"),
repo_root.clone(),
1,
self.config.clone(),
);
if let Err(e) = workspace_manager
.squash_wip_commits(&repo_root, change_id, iteration)
.await
{
warn!(
"Failed to squash WIP commits for {} (apply#{}): {}",
change_id, iteration, e
);
}
Ok(())
}
fn filter_to_snapshot(&self, changes: &[Change]) -> Vec<Change> {
match &self.initial_change_ids {
Some(snapshot) => changes
.iter()
.filter(|c| snapshot.contains(&c.id))
.cloned()
.collect(),
None => changes.to_vec(),
}
}
fn log_new_changes(&self, changes: &[Change]) {
if let Some(snapshot) = &self.initial_change_ids {
for change in changes {
if !snapshot.contains(&change.id) {
warn!(
"New change '{}' detected after run started - will be ignored",
change.id
);
}
}
}
}
async fn filter_stalled_changes(&mut self, changes: &[Change]) -> Vec<Change> {
let mut eligible = Vec::new();
let mut state = self.shared_state.write().await;
for change in changes {
if state.stalled_change_ids().contains(&change.id) {
continue;
}
if let Some(failed_dep) = change
.dependencies
.iter()
.find(|dep| state.stalled_change_ids().contains(*dep))
{
if state.mark_skipped(change.id.clone()) {
warn!(
"Skipping '{}' because dependency '{}' stalled",
change.id, failed_dep
);
}
continue;
}
eligible.push(change.clone());
}
eligible
}
async fn refetch_and_select_change(
&mut self,
serial_service: &mut SerialRunService,
) -> Result<Option<(Change, usize)>> {
let changes = openspec::list_changes_native()?;
self.broadcast_state_update(&changes).await;
let snapshot_changes = self.filter_to_snapshot(&changes);
self.log_new_changes(&changes);
if snapshot_changes.is_empty() {
info!("All changes from initial snapshot processed");
if let Some(progress) = &mut self.progress {
progress.complete_all();
}
return Ok(None);
}
let eligible_changes = self.filter_stalled_changes(&snapshot_changes).await;
let remaining_changes = eligible_changes.len();
if eligible_changes.is_empty() {
info!("All remaining changes are blocked by stalled dependencies");
if let Some(progress) = &mut self.progress {
progress.complete_all();
}
return Ok(None);
}
let next = serial_service
.select_next_change(&eligible_changes)
.ok_or_else(|| {
OrchestratorError::AgentCommand("No eligible change found".to_string())
})?;
info!("Selected change: {}", next.id);
if let Some(progress) = &mut self.progress {
progress.update_change(next);
}
Ok(Some((next.clone(), remaining_changes)))
}
async fn mark_change_stalled(&mut self, change_id: &str, reason: &str) {
{
let mut state = self.shared_state.write().await;
state.mark_stalled(change_id.to_string());
state.clear_stalled_change(change_id);
state.clear_error_history(change_id);
}
self.stall_detector.clear_change(change_id);
if let Some(progress) = &mut self.progress {
progress.error(reason);
}
}
async fn record_error_and_check_circuit_breaker(
&mut self,
change_id: &str,
error: &str,
) -> bool {
let cb_config = self.config.get_error_circuit_breaker();
let circuit_breaker_config = CircuitBreakerConfig {
enabled: cb_config.enabled,
threshold: cb_config.threshold,
};
let mut state = self.shared_state.write().await;
if state.record_error_and_check_circuit_breaker(
change_id,
error,
circuit_breaker_config.clone(),
) {
error!(
"Circuit breaker triggered for '{}': same error occurred {} times consecutively",
change_id, circuit_breaker_config.threshold
);
if let Some(last_err) = state.last_error(change_id) {
error!("Last error pattern: {}", last_err);
}
true
} else {
false
}
}
#[cfg(test)]
pub fn set_initial_change_ids(&mut self, ids: HashSet<String>) {
self.initial_change_ids = Some(ids);
}
async fn run_parallel_dry_run(&self, changes: &[Change]) -> Result<()> {
info!("Running parallel mode dry run (preview only)");
if changes.is_empty() {
println!("No changes found for parallel execution.");
return Ok(());
}
let repo_root = std::env::current_dir()?;
let service = ParallelRunService::new(repo_root, self.config.clone());
let groups = service.analyze_and_group_public(changes).await;
println!("\n=== Parallel Execution Plan (Dry Run) ===\n");
println!("Total changes: {}", changes.len());
println!("Parallelization groups: {}\n", groups.len());
for group in &groups {
println!("Group {} (can run in parallel):", group.id);
for change_id in &group.changes {
let change = changes.iter().find(|c| c.id == *change_id);
if let Some(c) = change {
println!(
" - {} ({}/{} tasks, {:.1}%)",
c.id,
c.completed_tasks,
c.total_tasks,
c.progress_percent()
);
} else {
println!(" - {}", change_id);
}
}
if !group.depends_on.is_empty() {
println!(" (depends on group(s): {:?})", group.depends_on);
}
println!();
}
println!(
"Max concurrent workspaces: {}",
self.max_concurrent.unwrap_or(4)
);
println!("\nTo execute, run without --dry-run flag.");
Ok(())
}
async fn run_parallel(
&mut self,
changes: &[Change],
cancel_token: tokio_util::sync::CancellationToken,
graceful_stop_flag: Option<std::sync::Arc<std::sync::atomic::AtomicBool>>,
) -> Result<()> {
info!("Running parallel execution mode");
if changes.is_empty() {
info!("No changes found for parallel execution");
return Ok(());
}
let snapshot_ids: HashSet<String> = changes.iter().map(|c| c.id.clone()).collect();
self.initial_change_ids = Some(snapshot_ids);
{
let change_ids: Vec<String> = changes.iter().map(|c| c.id.clone()).collect();
*self.shared_state.write().await = OrchestratorState::with_mode(
change_ids,
self.max_iterations,
crate::orchestration::state::ExecutionMode::Parallel,
);
}
let repo_root = std::env::current_dir()?;
let mut service = ParallelRunService::new(repo_root.clone(), self.config.clone());
service.set_no_resume(self.no_resume);
service.check_vcs_available().await?;
info!("Git available, executing changes in parallel using worktrees");
#[cfg(feature = "web-monitoring")]
let (web_event_tx, web_event_handle) = if let Some(web_state) = self.web_state.clone() {
let (tx, mut rx) = mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
while let Some(event) = rx.recv().await {
crate::web::WebState::apply_execution_event(&web_state, &event).await;
if matches!(
event,
crate::events::ExecutionEvent::AllCompleted
| crate::events::ExecutionEvent::Stopped
) {
break;
}
}
});
(Some(tx), Some(handle))
} else {
(None, None)
};
#[cfg(feature = "web-monitoring")]
let web_event_sender = web_event_tx.clone();
if let Some(ref stop_flag) = graceful_stop_flag {
let monitor_token = cancel_token.clone();
let monitor_flag = stop_flag.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if monitor_flag.load(std::sync::atomic::Ordering::SeqCst) {
info!("Graceful stop requested in parallel mode, cancelling execution");
monitor_token.cancel();
break;
}
}
});
}
let total_requested = changes.len();
let rejected_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let track_rejected = rejected_count.clone();
let result = service
.run_parallel(changes.to_vec(), Some(cancel_token), move |event| {
use crate::parallel::ParallelEvent;
#[cfg(feature = "web-monitoring")]
if let Some(tx) = &web_event_sender {
let _ = tx.send(event.clone());
}
match event {
ParallelEvent::ParallelStartRejected {
ref change_ids,
ref reason,
} => {
eprintln!(
"WARNING: {} change(s) rejected at start-time ({}): {}",
change_ids.len(),
reason,
change_ids.join(", ")
);
track_rejected
.fetch_add(change_ids.len(), std::sync::atomic::Ordering::SeqCst);
}
ParallelEvent::ApplyStarted { change_id, command } => {
info!("Apply started for {}", change_id);
println!("[{} apply] {}", change_id, command);
}
ParallelEvent::ApplyOutput {
change_id,
output,
iteration,
} => {
let iter = iteration
.map(|n| format!("#{}", n))
.unwrap_or_else(|| "".to_string());
if iter.is_empty() {
println!("[{} apply] {}", change_id, output);
} else {
println!("[{} apply {}] {}", change_id, iter, output);
}
}
ParallelEvent::ProgressUpdated {
change_id,
completed,
total,
} if total > 0 => {
info!("Progress {}: {}/{}", change_id, completed, total);
}
ParallelEvent::ApplyCompleted { change_id, .. } => {
info!("Apply completed for {}", change_id);
}
ParallelEvent::ApplyFailed { change_id, error } => {
error!("Apply failed for {}: {}", change_id, error);
}
ParallelEvent::AcceptanceStarted { change_id, command } => {
info!("Acceptance started for {}", change_id);
println!("[{} acceptance] {}", change_id, command);
}
ParallelEvent::AcceptanceOutput {
change_id,
output,
iteration,
} => {
let iter = iteration
.map(|n| format!("#{}", n))
.unwrap_or_else(|| "".to_string());
if iter.is_empty() {
println!("[{} acceptance] {}", change_id, output);
} else {
println!("[{} acceptance {}] {}", change_id, iter, output);
}
}
ParallelEvent::AcceptanceCompleted { change_id } => {
info!("Acceptance completed for {}", change_id);
}
ParallelEvent::AcceptanceFailed { change_id, error } => {
error!("Acceptance failed for {}: {}", change_id, error);
}
ParallelEvent::ArchiveStarted { change_id, command } => {
info!("Archive started for {}", change_id);
println!("[{} archive] {}", change_id, command);
}
ParallelEvent::ArchiveOutput {
change_id,
output,
iteration,
} => {
println!("[{} archive #{}] {}", change_id, iteration, output);
}
ParallelEvent::ChangeArchived(change_id) => {
info!("Archived {}", change_id);
}
ParallelEvent::ArchiveFailed { change_id, error } => {
error!("Archive failed for {}: {}", change_id, error);
}
ParallelEvent::AllCompleted => {
info!("All parallel execution completed");
}
ParallelEvent::Error { message } => {
error!("Parallel execution error: {}", message);
}
ParallelEvent::Warning { message, .. } => {
eprintln!("{}", message);
}
ParallelEvent::Log(entry) => {
println!("{}", entry.message);
}
_ => {}
}
})
.await;
#[cfg(feature = "web-monitoring")]
if let Some(handle) = web_event_handle {
drop(web_event_tx);
let _ = handle.await;
}
result?;
let n_rejected = rejected_count.load(std::sync::atomic::Ordering::SeqCst);
if n_rejected >= total_requested && total_requested > 0 {
eprintln!(
"ERROR: No changes started: all {} requested change(s) were rejected by \
start-time eligibility filter (uncommitted or not in HEAD). \
Commit your changes before running in parallel mode.",
total_requested
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::openspec::ProposalMetadata;
fn create_test_change(id: &str, completed: u32, total: u32) -> Change {
Change {
id: id.to_string(),
completed_tasks: completed,
total_tasks: total,
last_modified: "1m ago".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}
}
#[test]
fn test_filter_to_snapshot_filters_new_changes() {
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::with_config(None, config).unwrap();
let snapshot: HashSet<String> = ["change-a", "change-b"]
.iter()
.map(|s| s.to_string())
.collect();
orchestrator.set_initial_change_ids(snapshot);
let all_changes = vec![
create_test_change("change-a", 2, 5),
create_test_change("change-b", 3, 5),
create_test_change("change-c", 0, 3), ];
let filtered = orchestrator.filter_to_snapshot(&all_changes);
assert_eq!(filtered.len(), 2);
assert!(filtered.iter().any(|c| c.id == "change-a"));
assert!(filtered.iter().any(|c| c.id == "change-b"));
assert!(!filtered.iter().any(|c| c.id == "change-c"));
}
#[test]
fn test_filter_to_snapshot_returns_all_when_no_snapshot() {
let config = OrchestratorConfig::default();
let orchestrator = Orchestrator::with_config(None, config).unwrap();
let all_changes = vec![
create_test_change("change-a", 2, 5),
create_test_change("change-b", 3, 5),
];
let filtered = orchestrator.filter_to_snapshot(&all_changes);
assert_eq!(filtered.len(), 2);
}
#[test]
fn test_filter_to_snapshot_removes_archived_changes() {
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::with_config(None, config).unwrap();
let snapshot: HashSet<String> = ["change-a", "change-b", "change-c"]
.iter()
.map(|s| s.to_string())
.collect();
orchestrator.set_initial_change_ids(snapshot);
let current_changes = vec![
create_test_change("change-a", 2, 5),
create_test_change("change-c", 1, 5),
];
let filtered = orchestrator.filter_to_snapshot(¤t_changes);
assert_eq!(filtered.len(), 2);
assert!(filtered.iter().any(|c| c.id == "change-a"));
assert!(filtered.iter().any(|c| c.id == "change-c"));
}
#[test]
fn test_filter_to_snapshot_handles_empty_changes() {
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::with_config(None, config).unwrap();
let snapshot: HashSet<String> = ["change-a"].iter().map(|s| s.to_string()).collect();
orchestrator.set_initial_change_ids(snapshot);
let current_changes: Vec<Change> = vec![];
let filtered = orchestrator.filter_to_snapshot(¤t_changes);
assert!(filtered.is_empty());
}
#[test]
fn test_snapshot_preserves_updated_progress() {
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::with_config(None, config).unwrap();
let snapshot: HashSet<String> = ["change-a"].iter().map(|s| s.to_string()).collect();
orchestrator.set_initial_change_ids(snapshot);
let current_changes = vec![
create_test_change("change-a", 4, 5), ];
let filtered = orchestrator.filter_to_snapshot(¤t_changes);
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].completed_tasks, 4); }
#[tokio::test]
async fn test_filter_stalled_changes_skips_dependencies() {
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::with_config(None, config).unwrap();
orchestrator
.shared_state
.write()
.await
.mark_stalled("change-a".to_string());
let changes = vec![
Change {
id: "change-a".to_string(),
completed_tasks: 0,
total_tasks: 3,
last_modified: "now".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
},
Change {
id: "change-b".to_string(),
completed_tasks: 0,
total_tasks: 3,
last_modified: "now".to_string(),
dependencies: vec!["change-a".to_string()],
metadata: ProposalMetadata::default(),
},
Change {
id: "change-c".to_string(),
completed_tasks: 0,
total_tasks: 3,
last_modified: "now".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
},
];
let eligible = orchestrator.filter_stalled_changes(&changes).await;
assert_eq!(eligible.len(), 1);
assert_eq!(eligible[0].id, "change-c");
}
#[tokio::test]
async fn test_orchestrator_creation() {
let config = OrchestratorConfig::default();
let orchestrator = Orchestrator::with_config(None, config).unwrap();
assert!(orchestrator.target_changes.is_none());
assert!(orchestrator.initial_change_ids.is_none());
let state = orchestrator.shared_state.read().await;
assert!(state.current_change_id().is_none());
assert_eq!(state.changes_processed(), 0);
assert_eq!(state.iteration(), 0);
}
#[test]
fn test_orchestrator_with_single_target_change() {
let config = OrchestratorConfig::default();
let orchestrator =
Orchestrator::with_config(Some(vec!["my-change".to_string()]), config).unwrap();
assert_eq!(
orchestrator.target_changes,
Some(vec!["my-change".to_string()])
);
}
#[test]
fn test_orchestrator_with_multiple_target_changes() {
let config = OrchestratorConfig::default();
let orchestrator = Orchestrator::with_config(
Some(vec![
"change-a".to_string(),
"change-b".to_string(),
"change-c".to_string(),
]),
config,
)
.unwrap();
assert_eq!(
orchestrator.target_changes,
Some(vec![
"change-a".to_string(),
"change-b".to_string(),
"change-c".to_string()
])
);
}
#[tokio::test]
async fn test_serial_shared_state_apply_count_and_iteration_increment() {
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::with_config(None, config).unwrap();
{
let mut state = orchestrator.shared_state.write().await;
*state = crate::orchestration::state::OrchestratorState::new(
vec!["change-a".to_string()],
3,
);
}
match orchestrator.check_max_iterations().await {
super::LoopControl::Continue => {}
_ => panic!("iteration check should continue"),
}
{
let state = orchestrator.shared_state.read().await;
assert_eq!(state.iteration(), 1);
assert_eq!(state.apply_count("change-a"), 0);
}
orchestrator
.update_shared_state(crate::events::ExecutionEvent::ApplyCompleted {
change_id: "change-a".to_string(),
revision: "serial".to_string(),
})
.await;
let state = orchestrator.shared_state.read().await;
assert_eq!(state.apply_count("change-a"), 1);
}
#[tokio::test]
async fn test_rejected_result_marks_change_rejected_state() {
use crate::serial_run_service::ChangeProcessResult;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::with_config(None, config.clone()).unwrap();
let mut serial_service = SerialRunService::new(temp_dir.path().to_path_buf(), config);
let blocked_change = create_test_change("blocked-change", 3, 5);
let result = ChangeProcessResult::Rejected {
reason: "Implementation blocker detected".to_string(),
};
orchestrator
.handle_change_result(result, &blocked_change, &mut serial_service)
.await
.unwrap();
let state = orchestrator.shared_state.read().await;
assert_eq!(state.display_status(&blocked_change.id), "rejected");
}
#[test]
fn test_cli_all_rejected_start_detection() {
use crate::parallel::ParallelEvent;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
let total_requested: usize = 2;
let rejected_count = Arc::new(AtomicUsize::new(0));
let track_rejected = rejected_count.clone();
let handle_event = move |event: ParallelEvent| {
if let ParallelEvent::ParallelStartRejected { change_ids, .. } = event {
track_rejected.fetch_add(change_ids.len(), Ordering::SeqCst);
}
};
handle_event(ParallelEvent::ParallelStartRejected {
change_ids: vec!["change-a".to_string(), "change-b".to_string()],
reason: "uncommitted or not in HEAD".to_string(),
});
let n_rejected = rejected_count.load(Ordering::SeqCst);
assert_eq!(
n_rejected, total_requested,
"rejected_count must equal total_requested when all changes are filtered out"
);
assert!(
n_rejected >= total_requested && total_requested > 0,
"orchestrator should detect the all-rejected condition and report no changes started"
);
}
}