use crate::agent::AgentRunner;
use crate::ai_command_runner::{AiCommandRunner, SharedStaggerState};
use crate::command_queue::CommandQueueConfig;
use crate::config::defaults::*;
use crate::config::OrchestratorConfig;
use crate::error::Result;
use crate::openspec::Change;
use crate::events::EventSink;
use crate::orchestration::output::{ChannelOutputHandler, ContextualOutputHandler, OutputMessage};
use crate::serial_run_service::SerialRunService;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use super::events::{LogEntry, OrchestratorEvent, TuiEventSink};
use super::queue::DynamicQueue;
fn post_archive_dispatch_event(
state: &crate::orchestration::state::OrchestratorState,
change_id: &str,
) -> Option<OrchestratorEvent> {
let has_resolve_lane_blocker = state.has_other_post_archive_lane_blocker(change_id);
if has_resolve_lane_blocker {
return Some(OrchestratorEvent::MergeDeferred {
change_id: change_id.to_string(),
reason: "Resolve lane occupied by active resolving/rejecting change; auto-queue archived change".to_string(),
auto_resumable: true,
});
}
None
}
async fn dispatch_event(
tx: &mpsc::Sender<OrchestratorEvent>,
shared_state: &Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
#[cfg(feature = "web-monitoring")] web_state: Option<&Arc<crate::web::WebState>>,
event: OrchestratorEvent,
) {
let _ = tx.send(event.clone()).await;
{
let mut state = shared_state.write().await;
crate::orchestration::state::OrchestratorState::apply_execution_event(&mut state, &event);
}
#[cfg(feature = "web-monitoring")]
if let Some(ws) = web_state {
crate::web::WebState::apply_execution_event(ws, &event).await;
}
}
async fn initialize_parallel_shared_state(
shared_state: &Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
change_ids: &[String],
max_iterations: u32,
) -> bool {
let mut state = shared_state.write().await;
let resolve_wait_ids = state.resolve_wait_change_ids();
let preserve_manual_resolve_startup = change_ids.is_empty() && !resolve_wait_ids.is_empty();
if preserve_manual_resolve_startup {
tracing::info!(
resolve_wait_ids = ?resolve_wait_ids,
"Preserving reducer-owned ResolveWait during empty manual resolve scheduler startup"
);
state.set_execution_mode(crate::orchestration::state::ExecutionMode::Parallel);
true
} else {
*state = crate::orchestration::state::OrchestratorState::with_mode(
change_ids.to_vec(),
max_iterations,
crate::orchestration::state::ExecutionMode::Parallel,
);
for id in change_ids {
state.apply_command(crate::orchestration::state::ReducerCommand::AddToQueue(
id.clone(),
));
}
false
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run_orchestrator(
change_ids: Vec<String>,
config: OrchestratorConfig,
tx: mpsc::Sender<OrchestratorEvent>,
cancel_token: CancellationToken,
dynamic_queue: DynamicQueue,
_graceful_stop_flag: Arc<AtomicBool>,
shared_state: Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
#[cfg(feature = "web-monitoring")] web_state: Option<Arc<crate::web::WebState>>,
) -> Result<()> {
use crate::hooks::{HookContext, HookRunner, HookType};
use crate::openspec;
let repo_root = std::env::current_dir()?;
let hooks = HookRunner::with_event_tx(config.get_hooks(), &repo_root, tx.clone());
let max_iterations = config.get_max_iterations();
let mut agent = AgentRunner::new(config.clone());
let shared_stagger_state: SharedStaggerState = Arc::new(Mutex::new(None));
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let stream_json_textify = config.get_stream_json_textify();
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state);
ai_runner.set_stream_json_textify(stream_json_textify);
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
let repo_root = std::env::current_dir()?;
let mut serial_service = SerialRunService::new(repo_root, config);
let mut sinks: Vec<Arc<dyn EventSink>> = vec![Arc::new(TuiEventSink::new(tx.clone()))];
#[cfg(feature = "web-monitoring")]
if let Some(ws) = &web_state {
sinks.push(Arc::new(crate::web::state::WebEventSink::new(ws.clone())));
}
{
let mut state = shared_state.write().await;
*state = crate::orchestration::state::OrchestratorState::new(change_ids, max_iterations);
}
let total_changes = shared_state.read().await.total_changes();
let start_context = HookContext::new(0, total_changes, total_changes, false);
if let Err(e) = hooks.run_hook(HookType::OnStart, &start_context).await {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(format!(
"on_start hook failed: {}",
e
))))
.await;
}
loop {
if cancel_token.is_cancelled() {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(
"Processing cancelled".to_string(),
)))
.await;
break;
}
if _graceful_stop_flag.load(Ordering::SeqCst) {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::info(
"Graceful stop: stopping after current change".to_string(),
)))
.await;
let _ = tx.send(OrchestratorEvent::Stopped).await;
break;
}
let current_iteration = serial_service.iteration();
if max_iterations > 0 && current_iteration >= max_iterations {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(format!(
"Max iterations ({}) reached, stopping orchestration",
max_iterations
))))
.await;
let _ = tx.send(OrchestratorEvent::AllCompleted).await;
break;
}
if max_iterations > 0 {
let warning_threshold = (max_iterations as f32 * 0.8) as u32;
if current_iteration == warning_threshold {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(format!(
"Approaching max iterations: {}/{}",
current_iteration, max_iterations
))))
.await;
}
}
while let Some(dynamic_id) = dynamic_queue.pop().await {
let should_add = {
let state = shared_state.read().await;
!state.is_archived(&dynamic_id) && !state.is_pending(&dynamic_id)
};
if should_add {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::info(format!(
"Processing dynamically added: {}",
dynamic_id
))))
.await;
shared_state
.write()
.await
.add_dynamic_change(dynamic_id.clone());
}
}
let removed_ids = dynamic_queue.drain_removed().await;
if !removed_ids.is_empty() {
let mut removed_pending = Vec::new();
{
let mut state = shared_state.write().await;
for id in removed_ids {
if state.drop_pending_change(&id) {
removed_pending.push(id);
}
}
}
for id in removed_pending {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::info(format!(
"Removed from pending queue: {}",
id
))))
.await;
}
}
if shared_state.read().await.is_complete() {
break;
}
if cancel_token.is_cancelled() {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(
"Processing cancelled".to_string(),
)))
.await;
break;
}
let changes = openspec::list_changes_native()?;
let eligible_changes: Vec<_> = {
let state = shared_state.read().await;
changes
.iter()
.filter(|c| state.is_pending(&c.id))
.cloned()
.collect()
};
let next_change = serial_service.select_next_change(&eligible_changes);
let Some(change) = next_change else {
continue;
};
let change_id = change.id.clone();
let change = change.clone();
if dynamic_queue.is_stopped(&change_id).await {
dynamic_queue.clear_stopped(&change_id).await;
shared_state.write().await.drop_pending_change(&change_id);
let change_stopped_event = OrchestratorEvent::ChangeDequeued {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
change_stopped_event,
)
.await;
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::info(format!(
"Change stopped: {}",
change_id
))))
.await;
continue;
}
let processing_started_event = OrchestratorEvent::ProcessingStarted(change_id.clone());
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
processing_started_event,
)
.await;
let remaining_changes = shared_state.read().await.remaining_changes();
let apply_count_before = shared_state.read().await.apply_count(&change_id);
let tx_clone = tx.clone();
let change_id_clone = change_id.clone();
let apply_count_for_output = apply_count_before + 1; let current_operation = std::sync::Arc::new(std::sync::RwLock::new("apply".to_string()));
let current_operation_clone = current_operation.clone();
let output = ChannelOutputHandler::new(move |msg: OutputMessage| {
let tx = tx_clone.clone();
let change_id = change_id_clone.clone();
let apply_count = apply_count_for_output;
let operation = current_operation_clone.read().unwrap().clone();
tokio::spawn(async move {
match msg {
OutputMessage::Stdout(s) => {
let _ = tx
.send(OrchestratorEvent::Log(
LogEntry::info(s)
.with_change_id(&change_id)
.with_operation(&operation)
.with_iteration(apply_count),
))
.await;
}
OutputMessage::Stderr(s) => {
let _ = tx
.send(OrchestratorEvent::Log(
LogEntry::warn(s)
.with_change_id(&change_id)
.with_operation(&operation)
.with_iteration(apply_count),
))
.await;
}
OutputMessage::AgentStderr(s) => {
let _ = tx
.send(OrchestratorEvent::Log(
LogEntry::info(s)
.with_change_id(&change_id)
.with_operation(&operation)
.with_iteration(apply_count),
))
.await;
}
OutputMessage::Info(s) => {
let _ = tx
.send(OrchestratorEvent::Log(
LogEntry::info(s)
.with_change_id(&change_id)
.with_operation(&operation)
.with_iteration(apply_count),
))
.await;
}
OutputMessage::Warn(s) => {
let _ = tx
.send(OrchestratorEvent::Log(
LogEntry::warn(s)
.with_change_id(&change_id)
.with_operation(&operation)
.with_iteration(apply_count),
))
.await;
}
OutputMessage::Error(s) => {
let _ = tx
.send(OrchestratorEvent::Log(
LogEntry::error(s)
.with_change_id(&change_id)
.with_operation(&operation)
.with_iteration(apply_count),
))
.await;
}
OutputMessage::Success(s) => {
let _ = tx
.send(OrchestratorEvent::Log(
LogEntry::success(s)
.with_change_id(&change_id)
.with_operation(&operation)
.with_iteration(apply_count),
))
.await;
}
}
});
});
let output = ContextualOutputHandler::new(output, current_operation.clone());
let acceptance_tail = agent.peek_acceptance_tail_context_for_apply(&change_id);
let apply_template = agent.config().get_apply_command()?;
let apply_user_prompt = agent.config().get_apply_prompt();
let apply_history_context = agent.format_apply_history(&change_id);
let apply_full_prompt = crate::agent::build_apply_prompt_with_skill(
agent.config().get_apply_skill(),
&change_id,
apply_user_prompt,
&apply_history_context,
&acceptance_tail,
);
let apply_expanded_command =
OrchestratorConfig::expand_change_id(apply_template, &change_id);
let apply_expanded_command =
OrchestratorConfig::expand_prompt(&apply_expanded_command, &apply_full_prompt);
let apply_started_event = OrchestratorEvent::ApplyStarted {
change_id: change_id.to_string(),
command: apply_expanded_command,
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_started_event,
)
.await;
use crate::serial_run_service::ChangeProcessResult;
let cancel_token_clone = cancel_token.clone();
let dynamic_queue_clone = dynamic_queue.clone();
let change_id_for_cancel = change_id.clone();
let cancel_check = move || {
if cancel_token_clone.is_cancelled() {
return true;
}
dynamic_queue_clone.try_is_stopped(&change_id_for_cancel)
};
let dynamic_queue_clone2 = dynamic_queue.clone();
let change_id_for_single_stop = change_id.clone();
let is_single_change_stopped =
move || dynamic_queue_clone2.try_is_stopped(&change_id_for_single_stop);
let total_changes = shared_state.read().await.total_changes();
let result = serial_service
.process_change(
&change,
&mut agent,
&ai_runner,
&hooks,
&output,
total_changes,
remaining_changes,
cancel_check,
is_single_change_stopped,
Some(current_operation.clone()),
)
.await;
let apply_count = serial_service.apply_count(&change_id);
let apply_output_event = OrchestratorEvent::ApplyOutput {
change_id: change_id.clone(),
output: String::new(),
iteration: Some(apply_count),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_output_event,
)
.await;
match result {
Ok(ChangeProcessResult::Cancelled) => {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(
"Processing cancelled".to_string(),
)))
.await;
shared_state.write().await.clear_pending_changes();
break;
}
Ok(ChangeProcessResult::ChangeStopped) => {
dynamic_queue.clear_stopped(&change_id).await;
let change_stopped_event = OrchestratorEvent::ChangeDequeued {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
change_stopped_event,
)
.await;
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::info(format!(
"Change {} stopped, continuing with other queued changes",
change_id
))))
.await;
shared_state.write().await.remove_from_pending(&change_id);
continue;
}
Ok(ChangeProcessResult::AcceptancePassed) => {
let apply_completed_event = OrchestratorEvent::ApplyCompleted {
change_id: change_id.clone(),
revision: String::new(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_completed_event,
)
.await;
let acceptance_started_event = OrchestratorEvent::AcceptanceStarted {
change_id: change_id.clone(),
command: format!("opencode acceptance {}", change_id),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
acceptance_started_event,
)
.await;
let acceptance_completed_event = OrchestratorEvent::AcceptanceCompleted {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
acceptance_completed_event,
)
.await;
let processing_completed_event =
OrchestratorEvent::ProcessingCompleted(change_id.clone());
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
processing_completed_event,
)
.await;
}
Ok(ChangeProcessResult::ApplySuccessIncomplete) => {
let apply_completed_event = OrchestratorEvent::ApplyCompleted {
change_id: change_id.clone(),
revision: String::new(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_completed_event,
)
.await;
}
Ok(ChangeProcessResult::AcceptanceContinue) => {
let apply_completed_event = OrchestratorEvent::ApplyCompleted {
change_id: change_id.clone(),
revision: String::new(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_completed_event,
)
.await;
let acceptance_completed_event = OrchestratorEvent::AcceptanceCompleted {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
acceptance_completed_event,
)
.await;
}
Ok(ChangeProcessResult::AcceptanceContinueExceeded) => {
let apply_completed_event = OrchestratorEvent::ApplyCompleted {
change_id: change_id.clone(),
revision: String::new(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_completed_event,
)
.await;
let acceptance_completed_event = OrchestratorEvent::AcceptanceCompleted {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
acceptance_completed_event,
)
.await;
}
Ok(ChangeProcessResult::Rejected { reason }) => {
let apply_completed_event = OrchestratorEvent::ApplyCompleted {
change_id: change_id.clone(),
revision: String::new(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_completed_event,
)
.await;
let acceptance_completed_event = OrchestratorEvent::AcceptanceCompleted {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
acceptance_completed_event,
)
.await;
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(format!(
"Acceptance gated - rejection flow completed: {}",
reason
))))
.await;
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
OrchestratorEvent::ChangeRejected {
change_id: change_id.clone(),
reason,
},
)
.await;
}
Ok(ChangeProcessResult::AcceptanceFailed { .. }) => {
let apply_completed_event = OrchestratorEvent::ApplyCompleted {
change_id: change_id.clone(),
revision: String::new(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_completed_event,
)
.await;
let acceptance_completed_event = OrchestratorEvent::AcceptanceCompleted {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
acceptance_completed_event,
)
.await;
}
Ok(ChangeProcessResult::AcceptanceCommandFailed { error }) => {
let apply_completed_event = OrchestratorEvent::ApplyCompleted {
change_id: change_id.clone(),
revision: String::new(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
apply_completed_event,
)
.await;
let acceptance_completed_event = OrchestratorEvent::AcceptanceCompleted {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
acceptance_completed_event,
)
.await;
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::error(format!(
"Acceptance command failed: {}",
error
))))
.await;
}
Ok(ChangeProcessResult::ApplyFailed { error }) => {
let processing_error_event = OrchestratorEvent::ProcessingError {
id: change_id.clone(),
error: error.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
processing_error_event,
)
.await;
}
Ok(ChangeProcessResult::Archived) => {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::success(format!(
"Change {} archived successfully",
change_id
))))
.await;
let change_archived_event = OrchestratorEvent::ChangeArchived(change_id.clone());
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
change_archived_event,
)
.await;
let post_archive_event = {
let state = shared_state.read().await;
post_archive_dispatch_event(&state, &change_id)
};
if let Some(post_event) = post_archive_event {
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
post_event,
)
.await;
}
}
Ok(ChangeProcessResult::Stalled { error }) => {
let processing_error_event = OrchestratorEvent::ProcessingError {
id: change_id.clone(),
error: error.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
processing_error_event,
)
.await;
shared_state.write().await.remove_from_pending(&change_id);
}
Ok(ChangeProcessResult::Failed { error }) => {
let processing_error_event = OrchestratorEvent::ProcessingError {
id: change_id.clone(),
error: error.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
processing_error_event,
)
.await;
}
Err(e) => {
let error_str = e.to_string();
if error_str.contains("Cancelled") && dynamic_queue.try_is_stopped(&change_id) {
dynamic_queue.clear_stopped(&change_id).await;
shared_state.write().await.drop_pending_change(&change_id);
let change_stopped_event2 = OrchestratorEvent::ChangeDequeued {
change_id: change_id.clone(),
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
change_stopped_event2,
)
.await;
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::info(format!(
"Change stopped during execution: {}",
change_id
))))
.await;
continue;
} else {
let error_msg = format!("Processing error for {}: {}", change_id, e);
let processing_error_event = OrchestratorEvent::ProcessingError {
id: change_id.clone(),
error: error_msg,
};
dispatch_event(
&tx,
&shared_state,
#[cfg(feature = "web-monitoring")]
web_state.as_ref(),
processing_error_event,
)
.await;
break;
}
}
}
}
let state = shared_state.read().await;
let complete_context =
HookContext::new(state.changes_processed(), state.total_changes(), 0, false);
if let Err(e) = hooks.run_hook(HookType::OnFinish, &complete_context).await {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(format!(
"on_finish hook failed: {}",
e
))))
.await;
}
let _ = tx.send(OrchestratorEvent::AllCompleted).await;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn run_orchestrator_parallel(
change_ids: Vec<String>,
config: OrchestratorConfig,
tx: mpsc::Sender<OrchestratorEvent>,
cancel_token: CancellationToken,
dynamic_queue: DynamicQueue,
_graceful_stop_flag: Arc<AtomicBool>,
shared_state: Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
manual_resolve_counter: Arc<std::sync::atomic::AtomicUsize>,
#[cfg(feature = "web-monitoring")] web_state: Option<Arc<crate::web::WebState>>,
) -> Result<()> {
use crate::openspec::list_changes_native;
use crate::parallel::ParallelEvent;
use crate::parallel_run_service::ParallelRunService;
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::info(format!(
"Starting parallel processing of {} change(s)",
change_ids.len()
))))
.await;
let repo_root = std::env::current_dir()?;
let mut service = ParallelRunService::new(repo_root.clone(), config.clone());
service.set_shared_orchestrator_state(shared_state.clone());
service.check_vcs_available().await?;
initialize_parallel_shared_state(&shared_state, &change_ids, config.get_max_iterations()).await;
let shared_queue_change = Arc::new(tokio::sync::Mutex::new(None::<std::time::Instant>));
let mut stopped_or_cancelled = false;
let mut had_errors = false;
let all_changes = list_changes_native()?;
let committed_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_in_head(&repo_root).await {
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
tracing::warn!(
error = %err,
"Failed to load committed change snapshot for parallel start"
);
all_changes.iter().map(|change| change.id.clone()).collect()
}
};
let uncommitted_file_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_with_uncommitted_files(&repo_root).await {
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
tracing::warn!(
error = %err,
"Failed to detect uncommitted files in changes for parallel start"
);
HashSet::new()
}
};
let changes_to_process: Vec<Change> = all_changes
.iter()
.filter(|c| change_ids.contains(&c.id))
.cloned()
.collect();
let _ = tx
.send(OrchestratorEvent::ChangesRefreshed {
changes: all_changes,
committed_change_ids,
uncommitted_file_change_ids,
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
})
.await;
#[cfg(feature = "web-monitoring")]
let (web_event_tx, web_event_handle) = if let Some(web_state) = web_state.clone() {
let (tx, mut rx) = mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
while let Some(event) = rx.recv().await {
crate::web::WebState::apply_execution_event(&web_state, &event).await;
if matches!(
event,
crate::events::ExecutionEvent::AllCompleted
| crate::events::ExecutionEvent::Stopped
) {
break;
}
}
});
(Some(tx), Some(handle))
} else {
(None, None)
};
let (parallel_tx, mut parallel_rx) = mpsc::channel::<ParallelEvent>(100);
let forward_tx = tx.clone();
let forward_cancel = cancel_token.clone();
let merge_deferred_stop = Arc::new(AtomicBool::new(false));
let forward_merge_stop = merge_deferred_stop.clone();
let forward_shared_state = shared_state.clone();
#[cfg(feature = "web-monitoring")]
let forward_web_tx = web_event_tx.clone();
let forward_handle = tokio::spawn(async move {
loop {
tokio::select! {
_ = forward_cancel.cancelled() => {
break;
}
event = parallel_rx.recv() => {
match event {
Some(ParallelEvent::AllCompleted) => {
#[cfg(feature = "web-monitoring")]
if let Some(tx) = &forward_web_tx {
let _ = tx.send(ParallelEvent::AllCompleted);
}
break;
}
Some(ParallelEvent::Stopped) => {
forward_merge_stop.store(true, Ordering::SeqCst);
let _ = forward_tx.send(ParallelEvent::Stopped).await;
#[cfg(feature = "web-monitoring")]
if let Some(tx) = &forward_web_tx {
let _ = tx.send(ParallelEvent::Stopped);
}
break;
}
Some(parallel_event) => {
let _ = forward_tx.send(parallel_event.clone()).await;
{
let mut state = forward_shared_state.write().await;
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
¶llel_event,
);
}
#[cfg(feature = "web-monitoring")]
if let Some(tx) = &forward_web_tx {
let _ = tx.send(parallel_event);
}
}
None => {
break;
}
}
}
}
}
});
let result = tokio::select! {
_ = cancel_token.cancelled() => {
let change_ids: Vec<String> = changes_to_process.iter().map(|c| c.id.clone()).collect();
let cancel_msg = format!(
"Cancelled parallel execution ({} changes: {})",
change_ids.len(),
change_ids.join(", ")
);
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(
cancel_msg.clone(),
)))
.await;
Err(crate::error::OrchestratorError::AgentCommand(cancel_msg))
}
result = service.run_parallel_with_channel_and_queue_state(
changes_to_process.clone(),
parallel_tx,
Some(cancel_token.clone()),
Some(shared_queue_change.clone()),
Some(Arc::new(dynamic_queue.clone())),
Some(manual_resolve_counter.clone()),
Some(shared_state.clone()),
) => {
result
}
};
let _ = forward_handle.await;
if merge_deferred_stop.load(Ordering::SeqCst) {
stopped_or_cancelled = true;
}
let has_reducer_owned_lane_wait_or_active = {
let state = shared_state.read().await;
!state.resolve_wait_change_ids().is_empty()
|| !state.reject_wait_change_ids().is_empty()
|| state.is_base_mutating_lane_occupied()
};
match result {
Ok(_) => {
if merge_deferred_stop.load(Ordering::SeqCst) {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(format!(
"Execution stopped with deferred merges ({} changes processed)",
changes_to_process.len()
))))
.await;
} else if has_reducer_owned_lane_wait_or_active {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(format!(
"Execution paused with reducer-owned lane retry work still pending or active ({} changes processed)",
changes_to_process.len()
))))
.await;
stopped_or_cancelled = true;
} else {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::success(format!(
"Execution completed ({} changes processed)",
changes_to_process.len()
))))
.await;
}
}
Err(e) => {
had_errors = true;
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::error(format!(
"Execution failed: {}",
e
))))
.await;
}
}
#[cfg(feature = "web-monitoring")]
if let Some(handle) = web_event_handle {
drop(web_event_tx);
let _ = handle.await;
}
if !stopped_or_cancelled {
if had_errors {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::warn(
"Processing completed with errors".to_string(),
)))
.await;
} else {
let _ = tx
.send(OrchestratorEvent::Log(LogEntry::success(
"All parallel changes completed".to_string(),
)))
.await;
}
let _ = tx.send(OrchestratorEvent::AllCompleted).await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::path::Path;
#[test]
fn test_archive_path_structure() {
let change_id = "test-change";
let change_path = Path::new("openspec/changes").join(change_id);
let archive_path = Path::new("openspec/changes/archive").join(change_id);
assert_eq!(
change_path.to_str().unwrap(),
"openspec/changes/test-change"
);
assert_eq!(
archive_path.to_str().unwrap(),
"openspec/changes/archive/test-change"
);
assert!(archive_path.starts_with("openspec/changes/archive"));
assert!(!archive_path.starts_with("openspec/archive/"));
}
#[test]
fn test_archive_verification_logic() {
use std::fs;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let base = temp_dir.path();
let changes_dir = base.join("openspec/changes");
let archive_dir = base.join("openspec/changes/archive");
fs::create_dir_all(&changes_dir).unwrap();
fs::create_dir_all(&archive_dir).unwrap();
let change_id = "my-change";
let change_path = changes_dir.join(change_id);
let archive_path = archive_dir.join(change_id);
fs::create_dir(&change_path).unwrap();
assert!(change_path.exists());
assert!(!archive_path.exists());
let archive_failed = change_path.exists() && !archive_path.exists();
assert!(archive_failed);
fs::remove_dir(&change_path).unwrap();
fs::create_dir(&archive_path).unwrap();
assert!(!change_path.exists());
assert!(archive_path.exists());
let archive_succeeded = !change_path.exists() || archive_path.exists();
assert!(archive_succeeded);
fs::create_dir(&change_path).unwrap();
assert!(change_path.exists());
assert!(archive_path.exists());
let archive_ok = archive_path.exists();
assert!(archive_ok);
}
#[tokio::test]
async fn test_parallel_startup_preserves_empty_manual_resolve_wait_state() {
use crate::orchestration::state::{
ExecutionMode, OrchestratorState, ReducerCommand, WorkspaceObservation,
};
let shared_state = std::sync::Arc::new(tokio::sync::RwLock::new(
OrchestratorState::with_mode(vec!["alpha".to_string()], 3, ExecutionMode::Serial),
));
{
let mut state = shared_state.write().await;
state.apply_observation("alpha", WorkspaceObservation::WorkspaceArchived);
state.apply_command(ReducerCommand::ResolveMerge("alpha".to_string()));
}
let preserved = super::initialize_parallel_shared_state(&shared_state, &[], 7).await;
let state = shared_state.read().await;
assert!(
preserved,
"empty ResolveWait startup should skip replacement"
);
assert_eq!(state.execution_mode(), ExecutionMode::Parallel);
assert_eq!(state.display_status("alpha"), "resolve pending");
assert_eq!(state.resolve_wait_change_ids(), vec!["alpha".to_string()]);
}
#[tokio::test]
async fn test_parallel_startup_resets_selected_run_and_drops_stale_resolve_wait() {
use crate::orchestration::state::{
ExecutionMode, OrchestratorState, ReducerCommand, WorkspaceObservation,
};
let shared_state = std::sync::Arc::new(tokio::sync::RwLock::new(
OrchestratorState::with_mode(vec!["stale".to_string()], 3, ExecutionMode::Parallel),
));
{
let mut state = shared_state.write().await;
state.apply_observation("stale", WorkspaceObservation::WorkspaceArchived);
state.apply_command(ReducerCommand::ResolveMerge("stale".to_string()));
}
let selected = vec!["fresh".to_string()];
let preserved = super::initialize_parallel_shared_state(&shared_state, &selected, 7).await;
let state = shared_state.read().await;
assert!(!preserved, "selected startup must create a fresh run state");
assert_eq!(state.execution_mode(), ExecutionMode::Parallel);
assert_eq!(state.display_status("fresh"), "queued");
assert_eq!(state.display_status("stale"), "not queued");
assert!(state.resolve_wait_change_ids().is_empty());
assert!(state.pending_changes().contains("fresh"));
assert!(!state.pending_changes().contains("stale"));
}
#[tokio::test]
async fn test_parallel_startup_empty_without_resolve_wait_resets_to_noop_state() {
use crate::orchestration::state::{ExecutionMode, OrchestratorState};
let shared_state = std::sync::Arc::new(tokio::sync::RwLock::new(
OrchestratorState::with_mode(vec!["old".to_string()], 3, ExecutionMode::Parallel),
));
let preserved = super::initialize_parallel_shared_state(&shared_state, &[], 7).await;
let state = shared_state.read().await;
assert!(
!preserved,
"empty startup without ResolveWait remains ordinary no-op"
);
assert_eq!(state.execution_mode(), ExecutionMode::Parallel);
assert!(state.pending_changes().is_empty());
assert_eq!(state.display_status("old"), "not queued");
assert!(state.resolve_wait_change_ids().is_empty());
}
#[tokio::test]
async fn test_tui_shared_state_pending_changes_decrease_when_cleared() {
use crate::orchestration::state::OrchestratorState;
let shared_state = std::sync::Arc::new(tokio::sync::RwLock::new(OrchestratorState::new(
vec!["change-a".to_string(), "change-b".to_string()],
3,
)));
{
let state = shared_state.read().await;
assert_eq!(state.pending_changes().len(), 2);
}
shared_state.write().await.clear_pending_changes();
let state = shared_state.read().await;
assert_eq!(state.pending_changes().len(), 0);
assert!(state.pending_changes().is_empty());
}
#[test]
fn test_tui_archived_during_resolve() {
use crate::events::ExecutionEvent;
use crate::orchestration::state::{ExecutionMode, OrchestratorState, WaitState};
let mut state = OrchestratorState::with_mode(
vec!["change-a".to_string(), "change-b".to_string()],
3,
ExecutionMode::Parallel,
);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::ResolveStarted {
change_id: "change-a".to_string(),
command: "resolve change-a".to_string(),
},
);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::ChangeArchived("change-b".to_string()),
);
let deferred = super::post_archive_dispatch_event(&state, "change-b");
assert!(matches!(
deferred,
Some(ExecutionEvent::MergeDeferred {
ref change_id,
auto_resumable: true,
..
}) if change_id == "change-b"
));
if let Some(event) = deferred {
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state, &event,
);
}
let runtime = state
.change_runtime("change-b")
.expect("change-b runtime should exist");
assert_eq!(runtime.wait_state, WaitState::ResolveWait);
}
#[test]
fn test_tui_archived_no_active_resolve_or_rejecting() {
use crate::events::ExecutionEvent;
use crate::orchestration::state::{
ActivityState, ExecutionMode, OrchestratorState, WaitState,
};
let mut state =
OrchestratorState::with_mode(vec!["change-a".to_string()], 3, ExecutionMode::Parallel);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::ChangeArchived("change-a".to_string()),
);
let deferred = super::post_archive_dispatch_event(&state, "change-a");
assert!(deferred.is_none());
let runtime = state
.change_runtime("change-a")
.expect("change-a runtime should exist");
assert_eq!(runtime.wait_state, WaitState::None);
assert_eq!(runtime.activity, ActivityState::Resolving);
}
#[test]
fn test_tui_archived_during_rejecting_emits_auto_resumable_deferred() {
use crate::events::ExecutionEvent;
use crate::orchestration::state::{ExecutionMode, OrchestratorState};
use crate::vcs::WorkspaceStatus;
let mut state = OrchestratorState::with_mode(
vec!["change-a".to_string(), "change-b".to_string()],
3,
ExecutionMode::Parallel,
);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::WorkspaceStatusUpdated {
change_id: "change-a".to_string(),
workspace_name: "ws-a".to_string(),
status: WorkspaceStatus::Rejecting,
},
);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::ChangeArchived("change-b".to_string()),
);
let deferred = super::post_archive_dispatch_event(&state, "change-b");
assert!(matches!(
deferred,
Some(ExecutionEvent::MergeDeferred {
ref change_id,
auto_resumable: true,
..
}) if change_id == "change-b"
));
}
#[test]
fn test_tui_archived_during_applying_does_not_emit_auto_resumable_deferred() {
use crate::events::ExecutionEvent;
use crate::orchestration::state::{ExecutionMode, OrchestratorState};
let mut state = OrchestratorState::with_mode(
vec!["change-a".to_string(), "change-b".to_string()],
3,
ExecutionMode::Parallel,
);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::ApplyStarted {
change_id: "change-a".to_string(),
command: "apply change-a".to_string(),
},
);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::ChangeArchived("change-b".to_string()),
);
let deferred = super::post_archive_dispatch_event(&state, "change-b");
assert!(
deferred.is_none(),
"applying blocker must not trigger resolve-pending auto dispatch"
);
}
#[test]
fn test_tui_archived_with_terminal_rejected_change_does_not_emit_auto_resumable_deferred() {
use crate::events::ExecutionEvent;
use crate::orchestration::state::{ExecutionMode, OrchestratorState};
let mut state = OrchestratorState::with_mode(
vec!["change-a".to_string(), "change-b".to_string()],
3,
ExecutionMode::Parallel,
);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::ChangeRejected {
change_id: "change-a".to_string(),
reason: "blocked".to_string(),
},
);
crate::orchestration::state::OrchestratorState::apply_execution_event(
&mut state,
&ExecutionEvent::ChangeArchived("change-b".to_string()),
);
let deferred = super::post_archive_dispatch_event(&state, "change-b");
assert!(
deferred.is_none(),
"terminal rejected blocker must not trigger resolve-pending auto dispatch"
);
}
#[tokio::test]
async fn test_tui_rejection_removes_from_pending_selection() {
use crate::serial_run_service::SerialRunService;
use std::collections::HashSet;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let config = crate::config::OrchestratorConfig::default();
let mut serial_service = SerialRunService::new(temp_dir.path().to_path_buf(), config);
let blocked_change_id = "blocked-change";
let other_change_id = "other-change";
let mut pending_changes: HashSet<String> =
vec![blocked_change_id.to_string(), other_change_id.to_string()]
.into_iter()
.collect();
let reason = "Implementation blocker detected - requires manual intervention";
serial_service.mark_stalled(blocked_change_id, reason);
pending_changes.remove(blocked_change_id);
assert!(!pending_changes.contains(blocked_change_id));
assert!(pending_changes.contains(other_change_id));
assert!(serial_service.is_stalled(blocked_change_id));
assert!(!serial_service.is_stalled(other_change_id));
assert_eq!(pending_changes.len(), 1);
assert!(pending_changes.contains(other_change_id));
}
}