use crate::ai_command_runner::{AiCommandRunner, SharedStaggerState};
use crate::analyzer::{ParallelGroup, ParallelizationAnalyzer};
use crate::command_queue::CommandQueueConfig;
use crate::config::defaults::*;
use crate::config::OrchestratorConfig;
use crate::dependency_targets::union_metadata_dependencies;
use crate::error::Result;
use crate::hooks::HookRunner;
use crate::openspec::Change;
use crate::parallel::{ParallelEvent, ParallelExecutor};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
type AnalyzeFailureDiagnosticSignature = (Vec<String>, Vec<String>, String);
type AnalyzeFailureDiagnosticStore = Arc<Mutex<HashSet<AnalyzeFailureDiagnosticSignature>>>;
pub struct ParallelRunService {
config: OrchestratorConfig,
repo_root: PathBuf,
no_resume: bool,
shared_stagger_state: SharedStaggerState,
shared_orchestrator_state:
Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
ai_runner: AiCommandRunner,
analyze_failure_diagnostics_seen: AnalyzeFailureDiagnosticStore,
}
impl ParallelRunService {
pub fn new(repo_root: PathBuf, config: OrchestratorConfig) -> Self {
let shared_stagger_state: SharedStaggerState = Arc::new(Mutex::new(None));
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state.clone());
ai_runner.set_stream_json_textify(config.get_stream_json_textify());
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
let shared_orchestrator_state = Arc::new(tokio::sync::RwLock::new(
crate::orchestration::state::OrchestratorState::with_mode(
Vec::new(),
1,
crate::orchestration::state::ExecutionMode::Parallel,
),
));
Self {
config,
repo_root,
no_resume: false,
shared_stagger_state,
shared_orchestrator_state,
ai_runner,
analyze_failure_diagnostics_seen: Arc::new(Mutex::new(HashSet::new())),
}
}
pub fn new_with_shared_state(
repo_root: PathBuf,
config: OrchestratorConfig,
shared_stagger_state: SharedStaggerState,
) -> Self {
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state.clone());
ai_runner.set_stream_json_textify(config.get_stream_json_textify());
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
let shared_orchestrator_state = Arc::new(tokio::sync::RwLock::new(
crate::orchestration::state::OrchestratorState::with_mode(
Vec::new(),
1,
crate::orchestration::state::ExecutionMode::Parallel,
),
));
Self {
config,
repo_root,
no_resume: false,
shared_stagger_state,
shared_orchestrator_state,
ai_runner,
analyze_failure_diagnostics_seen: Arc::new(Mutex::new(HashSet::new())),
}
}
pub fn set_no_resume(&mut self, no_resume: bool) {
self.no_resume = no_resume;
}
pub fn set_shared_orchestrator_state(
&mut self,
shared_state: Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
) {
self.shared_orchestrator_state = shared_state;
}
pub async fn check_vcs_available(&self) -> Result<()> {
if !crate::cli::check_parallel_available() {
return Err(crate::error::OrchestratorError::GitCommand(
"Git repository not available for parallel execution".to_string(),
));
}
Ok(())
}
pub fn create_executor_with_queue_state(
&self,
event_tx: Option<mpsc::Sender<ParallelEvent>>,
cancel_token: Option<CancellationToken>,
shared_queue_change: Option<std::sync::Arc<tokio::sync::Mutex<Option<std::time::Instant>>>>,
dynamic_queue: Option<std::sync::Arc<crate::tui::queue::DynamicQueue>>,
manual_resolve_counter: Option<std::sync::Arc<std::sync::atomic::AtomicUsize>>,
shared_orchestrator_state: Option<
std::sync::Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
>,
) -> ParallelExecutor {
let vcs_backend = self.config.get_vcs_backend();
let hooks = if let Some(ref tx) = event_tx {
HookRunner::with_event_tx(self.config.get_hooks(), &self.repo_root, tx.clone())
} else {
HookRunner::new(self.config.get_hooks(), &self.repo_root)
};
let has_dynamic_queue = dynamic_queue.is_some();
let mut executor = ParallelExecutor::with_backend_and_queue_and_stagger(
self.repo_root.clone(),
self.config.clone(),
event_tx,
vcs_backend,
shared_queue_change,
Some(self.shared_stagger_state.clone()),
);
executor.set_no_resume(self.no_resume);
if has_dynamic_queue {
executor.set_persistent_lifetime();
}
executor.set_hooks(hooks);
if let Some(token) = cancel_token {
executor.set_cancel_token(token);
}
if let Some(queue) = dynamic_queue {
executor.set_dynamic_queue(queue);
}
if let Some(counter) = manual_resolve_counter {
executor.set_manual_resolve_counter(counter);
}
if let Some(shared_state) = shared_orchestrator_state {
executor.set_shared_orchestrator_state(shared_state);
}
executor
}
async fn filter_committed_changes(
&self,
changes: Vec<Change>,
) -> Result<(Vec<Change>, Vec<String>)> {
let committed_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_in_head(&self.repo_root).await {
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
warn!(
"Failed to load committed change snapshot; assuming all changes are committed: {}",
err
);
return Ok((changes, Vec::new()));
}
};
let uncommitted_file_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_with_uncommitted_files(&self.repo_root)
.await
{
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
warn!(
"Failed to detect uncommitted files in changes; assuming no uncommitted files: {}",
err
);
HashSet::new()
}
};
let mut committed = Vec::new();
let mut skipped = Vec::new();
for change in changes {
if !committed_change_ids.contains(&change.id)
|| uncommitted_file_change_ids.contains(&change.id)
{
skipped.push(change.id);
} else {
committed.push(change);
}
}
skipped.sort();
Ok((committed, skipped))
}
async fn prepare_parallel_execution(
&self,
changes: Vec<Change>,
event_tx: &mpsc::Sender<ParallelEvent>,
allow_empty_when_resolve_wait: bool,
) -> Result<Option<Vec<Change>>> {
let (changes, skipped) = self.filter_committed_changes(changes).await?;
if !skipped.is_empty() {
let message = format!(
"Skipping uncommitted changes in parallel mode: {}",
skipped.join(", ")
);
warn!("{}", message);
let _ = event_tx
.send(ParallelEvent::Warning {
title: "Uncommitted changes skipped".to_string(),
message,
})
.await;
let _ = event_tx
.send(ParallelEvent::ParallelStartRejected {
change_ids: skipped.clone(),
reason: "uncommitted or not in HEAD".to_string(),
})
.await;
}
if changes.is_empty() {
if allow_empty_when_resolve_wait {
info!(
"No committed changes available, but scheduler-owned ResolveWait retry is present; continuing with empty queue"
);
return Ok(Some(changes));
}
info!("No committed changes available for parallel execution");
return Ok(None);
}
Ok(Some(changes))
}
pub async fn run_parallel<F>(
&self,
changes: Vec<Change>,
cancel_token: Option<CancellationToken>,
event_handler: F,
) -> Result<()>
where
F: Fn(ParallelEvent) + Send + Sync + 'static,
{
let (event_tx, mut event_rx) = mpsc::channel::<ParallelEvent>(100);
{
let mut guard = self.shared_orchestrator_state.write().await;
for change in &changes {
guard.add_dynamic_change(change.id.clone());
}
}
let changes = match self
.prepare_parallel_execution(changes, &event_tx, true)
.await?
{
Some(changes) => changes,
None => {
drop(event_tx);
while let Some(event) = event_rx.recv().await {
event_handler(event);
}
return Ok(());
}
};
info!(
"Starting parallel execution with re-analysis for {} changes",
changes.len()
);
let forward_handle = tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
let is_completed =
matches!(event, ParallelEvent::AllCompleted | ParallelEvent::Stopped);
event_handler(event);
if is_completed {
break;
}
}
});
let mut executor = ParallelExecutor::with_backend_and_queue_and_stagger(
self.repo_root.clone(),
self.config.clone(),
Some(event_tx.clone()),
self.config.get_vcs_backend(),
None,
Some(self.shared_stagger_state.clone()),
);
executor.set_no_resume(self.no_resume);
executor.set_shared_orchestrator_state(self.shared_orchestrator_state.clone());
let hooks =
HookRunner::with_event_tx(self.config.get_hooks(), &self.repo_root, event_tx.clone());
executor.set_hooks(hooks);
if let Some(token) = cancel_token {
executor.set_cancel_token(token);
}
let config = self.config.clone();
let repo_root = self.repo_root.clone();
let shared_stagger_state = self.shared_stagger_state.clone();
let result = executor
.execute_with_order_based_reanalysis(
changes,
move |remaining, in_flight_ids, iteration| {
let config = config.clone();
let repo_root = repo_root.clone();
let event_tx = event_tx.clone();
let shared_stagger_state = shared_stagger_state.clone();
Box::pin(async move {
let service = ParallelRunService::new_with_shared_state(
repo_root,
config,
shared_stagger_state,
);
service
.analyze_order_with_sender(
remaining,
in_flight_ids,
Some(&event_tx),
iteration,
)
.await
})
},
)
.await;
let _ = forward_handle.await;
result
}
#[allow(clippy::too_many_arguments)]
pub async fn run_parallel_with_channel_and_queue_state(
&self,
changes: Vec<Change>,
event_tx: mpsc::Sender<ParallelEvent>,
cancel_token: Option<CancellationToken>,
shared_queue_change: Option<std::sync::Arc<tokio::sync::Mutex<Option<std::time::Instant>>>>,
dynamic_queue: Option<std::sync::Arc<crate::tui::queue::DynamicQueue>>,
manual_resolve_counter: Option<std::sync::Arc<std::sync::atomic::AtomicUsize>>,
shared_orchestrator_state: Option<
std::sync::Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
>,
) -> Result<()> {
let executor = self.create_executor_with_queue_state(
Some(event_tx.clone()),
cancel_token,
shared_queue_change,
dynamic_queue,
manual_resolve_counter,
shared_orchestrator_state,
);
self.run_parallel_order_based_with_executor(executor, changes, event_tx)
.await
}
pub async fn run_parallel_order_based_with_executor(
&self,
mut executor: ParallelExecutor,
changes: Vec<Change>,
event_tx: mpsc::Sender<ParallelEvent>,
) -> Result<()> {
executor.ensure_shared_orchestrator_state(self.shared_orchestrator_state.clone());
let allow_empty_when_resolve_wait = changes.is_empty() && executor.has_resolve_wait();
let changes = match self
.prepare_parallel_execution(changes, &event_tx, allow_empty_when_resolve_wait)
.await?
{
Some(changes) => changes,
None => return Ok(()),
};
info!(
"Starting order-based parallel execution with re-analysis for {} changes",
changes.len()
);
let config = self.config.clone();
let repo_root = self.repo_root.clone();
let shared_stagger_state = self.shared_stagger_state.clone();
executor
.execute_with_order_based_reanalysis(
changes,
move |remaining, in_flight_ids, iteration| {
let config = config.clone();
let repo_root = repo_root.clone();
let event_tx = event_tx.clone();
let shared_stagger_state = shared_stagger_state.clone();
Box::pin(async move {
let service = ParallelRunService::new_with_shared_state(
repo_root,
config,
shared_stagger_state,
);
service
.analyze_order_with_sender(
remaining,
in_flight_ids,
Some(&event_tx),
iteration,
)
.await
})
},
)
.await
}
pub async fn analyze_and_group_public(&self, changes: &[Change]) -> Vec<ParallelGroup> {
self.analyze_and_group(changes).await
}
async fn analyze_and_group(&self, changes: &[Change]) -> Vec<ParallelGroup> {
self.analyze_and_group_with_sender(changes, None, 1).await
}
async fn analyze_order_with_sender(
&self,
changes: &[Change],
in_flight_ids: &[String],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
iteration: u32,
) -> crate::analyzer::AnalysisResult {
if self.config.use_llm_analysis() {
info!("Using LLM analysis for parallelization (analyze_command)");
match self
.analyze_order_with_llm_streaming(changes, in_flight_ids, event_tx, iteration)
.await
{
Ok(result) => {
info!(
"LLM analysis successful: {} changes in order",
result.order.len()
);
return result;
}
Err(e) => {
Self::log_recoverable_analysis_fallback(&e);
self.emit_analysis_failure_diagnostic_once(
changes,
in_flight_ids,
event_tx,
&e.to_string(),
)
.await;
}
}
} else {
info!("LLM analysis disabled, using metadata-dependency-only analysis");
}
Self::metadata_dependency_analysis_result(changes)
}
fn log_recoverable_analysis_fallback(error: &dyn std::fmt::Display) {
warn!(
error = %error,
"LLM analysis failed; falling back to metadata-dependency-only analysis"
);
}
async fn emit_analysis_failure_diagnostic_once(
&self,
changes: &[Change],
in_flight_ids: &[String],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
error: &str,
) {
let Some(tx) = event_tx else {
return;
};
let mut queued_ids: Vec<String> = changes.iter().map(|change| change.id.clone()).collect();
queued_ids.sort();
let mut in_flight = in_flight_ids.to_vec();
in_flight.sort();
let normalized_error = error.trim().to_string();
let key = (
queued_ids.clone(),
in_flight.clone(),
normalized_error.clone(),
);
let mut seen = self.analyze_failure_diagnostics_seen.lock().await;
if !seen.insert(key) {
debug!(
queued = ?queued_ids,
in_flight = ?in_flight,
error = %normalized_error,
"Suppressing repeated analysis failure diagnostic"
);
return;
}
drop(seen);
let message = format!(
"Dependency analysis failed: error={}, queued={:?}, in_flight={:?}",
normalized_error, queued_ids, in_flight
);
let _ = tx
.send(ParallelEvent::Log(crate::events::LogEntry::warn(&message)))
.await;
let _ = tx.send(ParallelEvent::Error { message }).await;
}
fn metadata_dependency_analysis_result(changes: &[Change]) -> crate::analyzer::AnalysisResult {
let mut dependencies = HashMap::new();
for change in changes {
union_metadata_dependencies(&mut dependencies, &change.id, &change.dependencies);
}
crate::analyzer::AnalysisResult {
order: changes.iter().map(|c| c.id.clone()).collect(),
dependencies,
groups: None,
}
}
async fn analyze_and_group_with_sender(
&self,
changes: &[Change],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
iteration: u32,
) -> Vec<ParallelGroup> {
if self.config.use_llm_analysis() {
info!("Using LLM analysis for parallelization (analyze_command)");
match self
.analyze_with_llm_streaming(changes, event_tx, iteration)
.await
{
Ok(groups) => {
info!("LLM analysis successful: {} groups", groups.len());
return groups;
}
Err(e) => {
error!("LLM analysis failed: {}", e);
warn!(
"Falling back to running all changes in parallel (no dependency analysis)"
);
}
}
} else {
info!("LLM analysis disabled, running all changes in parallel");
}
Self::all_parallel(changes)
}
fn all_parallel(changes: &[Change]) -> Vec<ParallelGroup> {
if changes.is_empty() {
return Vec::new();
}
vec![ParallelGroup {
id: 1,
changes: changes.iter().map(|c| c.id.clone()).collect(),
depends_on: Vec::new(),
}]
}
async fn analyze_order_with_llm_streaming(
&self,
changes: &[Change],
in_flight_ids: &[String],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
iteration: u32,
) -> Result<crate::analyzer::AnalysisResult> {
let analyzer = ParallelizationAnalyzer::new(self.ai_runner.clone(), self.config.clone());
if let Some(tx) = event_tx {
let tx = tx.clone();
analyzer
.analyze_with_callback(changes, in_flight_ids, move |output| {
let _ = tx.try_send(ParallelEvent::AnalysisOutput {
output: output.clone(),
iteration,
});
})
.await
} else {
analyzer.analyze_with_inflight(changes, in_flight_ids).await
}
}
async fn analyze_with_llm_streaming(
&self,
changes: &[Change],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
iteration: u32,
) -> Result<Vec<ParallelGroup>> {
let analyzer = ParallelizationAnalyzer::new(self.ai_runner.clone(), self.config.clone());
if let Some(tx) = event_tx {
let tx = tx.clone();
analyzer
.analyze_groups_with_callback(changes, move |output| {
let _ = tx.try_send(ParallelEvent::AnalysisOutput {
output: output.clone(),
iteration,
});
})
.await
} else {
analyzer.analyze_groups(changes).await
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::openspec::ProposalMetadata;
use tempfile::TempDir;
use tokio::process::Command;
fn create_test_change(id: &str, dependencies: Vec<&str>) -> Change {
Change {
id: id.to_string(),
completed_tasks: 0,
total_tasks: 5,
last_modified: "1m ago".to_string(),
dependencies: dependencies.into_iter().map(String::from).collect(),
metadata: ProposalMetadata::default(),
}
}
fn create_test_config() -> OrchestratorConfig {
OrchestratorConfig {
apply_command: Some("echo apply {change_id}".to_string()),
archive_command: Some("echo archive {change_id}".to_string()),
analyze_command: Some("echo '{\"order\":[\"route\",\"policy\"],\"dependencies\":{\"route\":[\"ghost\"]}}'".to_string()),
acceptance_command: Some("echo acceptance".to_string()),
resolve_command: Some("echo resolve".to_string()),
..Default::default()
}
}
async fn init_git_repo(temp_dir: &TempDir) -> bool {
let init_result = Command::new("git")
.args(["init"])
.current_dir(temp_dir.path())
.output()
.await;
let init_ok = init_result
.as_ref()
.map(|output| output.status.success())
.unwrap_or(false);
if !init_ok {
return false;
}
let _ = Command::new("git")
.args(["config", "user.email", "test@example.com"])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["config", "user.name", "Test User"])
.current_dir(temp_dir.path())
.output()
.await;
true
}
#[tokio::test]
async fn test_filter_committed_changes_skips_uncommitted() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
std::fs::create_dir_all(base_dir.join("change-a")).unwrap();
std::fs::write(base_dir.join("change-a/proposal.md"), "test").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "add change-a"])
.current_dir(temp_dir.path())
.output()
.await;
std::fs::create_dir_all(base_dir.join("change-b")).unwrap();
std::fs::write(base_dir.join("change-b/proposal.md"), "test").unwrap();
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), create_test_config());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let (committed, skipped) = service
.filter_committed_changes(changes)
.await
.expect("filter changes");
let committed_ids: Vec<String> = committed.into_iter().map(|change| change.id).collect();
assert_eq!(committed_ids, vec!["change-a".to_string()]);
assert_eq!(skipped, vec!["change-b".to_string()]);
}
#[tokio::test]
async fn test_filter_committed_changes_skips_partially_uncommitted() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
std::fs::create_dir_all(base_dir.join("change-a")).unwrap();
std::fs::write(base_dir.join("change-a/proposal.md"), "test").unwrap();
std::fs::create_dir_all(base_dir.join("change-b")).unwrap();
std::fs::write(base_dir.join("change-b/proposal.md"), "test").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "add changes"])
.current_dir(temp_dir.path())
.output()
.await;
std::fs::write(base_dir.join("change-a/tasks.md"), "new task").unwrap();
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), create_test_config());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let (committed, skipped) = service
.filter_committed_changes(changes)
.await
.expect("filter changes");
let committed_ids: Vec<String> = committed.into_iter().map(|change| change.id).collect();
assert_eq!(committed_ids, vec!["change-b".to_string()]);
assert_eq!(skipped, vec!["change-a".to_string()]);
}
#[tokio::test]
async fn test_prepare_parallel_execution_emits_rejection_event() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
std::fs::create_dir_all(base_dir.join("change-a")).unwrap();
std::fs::write(base_dir.join("change-a/proposal.md"), "test").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "add change-a"])
.current_dir(temp_dir.path())
.output()
.await;
std::fs::create_dir_all(base_dir.join("change-b")).unwrap();
std::fs::write(base_dir.join("change-b/proposal.md"), "test").unwrap();
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), create_test_config());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<ParallelEvent>(32);
let result = service
.prepare_parallel_execution(changes, &event_tx, false)
.await
.expect("prepare_parallel_execution");
assert!(result.is_some(), "change-a should still be eligible");
let committed = result.unwrap();
assert_eq!(committed.len(), 1);
assert_eq!(committed[0].id, "change-a");
drop(event_tx);
let mut got_rejection_event = false;
while let Some(event) = event_rx.recv().await {
if let ParallelEvent::ParallelStartRejected { change_ids, .. } = event {
assert!(
change_ids.contains(&"change-b".to_string()),
"rejection event should include change-b"
);
got_rejection_event = true;
}
}
assert!(
got_rejection_event,
"expected a ParallelStartRejected event for the uncommitted change"
);
}
#[tokio::test]
async fn test_prepare_parallel_execution_all_rejected_emits_rejection_event() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
let placeholder = base_dir.join("placeholder");
std::fs::create_dir_all(&placeholder).unwrap();
std::fs::write(placeholder.join("proposal.md"), "placeholder").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "initial commit"])
.current_dir(temp_dir.path())
.output()
.await;
for id in &["change-a", "change-b"] {
std::fs::create_dir_all(base_dir.join(id)).unwrap();
std::fs::write(base_dir.join(id).join("proposal.md"), "test").unwrap();
}
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), create_test_config());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<ParallelEvent>(32);
let result = service
.prepare_parallel_execution(changes, &event_tx, false)
.await
.expect("prepare_parallel_execution");
assert!(
result.is_none(),
"all changes were uncommitted so result should be None"
);
drop(event_tx);
let mut got_rejection_event = false;
let mut rejected_ids: Vec<String> = Vec::new();
while let Some(event) = event_rx.recv().await {
if let ParallelEvent::ParallelStartRejected { change_ids, .. } = event {
rejected_ids = change_ids;
got_rejection_event = true;
}
}
assert!(
got_rejection_event,
"expected a ParallelStartRejected event even when all changes are rejected"
);
rejected_ids.sort();
assert_eq!(rejected_ids, vec!["change-a", "change-b"]);
}
#[tokio::test]
async fn test_prepare_parallel_execution_allows_empty_when_resolve_wait_requested() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), create_test_config());
let changes = Vec::new();
let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<ParallelEvent>(32);
let result = service
.prepare_parallel_execution(changes, &event_tx, true)
.await
.expect("prepare_parallel_execution");
assert!(
result.is_some(),
"empty startup should continue when reducer-owned ResolveWait exists"
);
assert!(
result.expect("checked is_some").is_empty(),
"no committed changes should still produce an empty queue"
);
}
#[tokio::test]
async fn test_prepare_parallel_execution_empty_parallel_without_resolve_wait_is_noop() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), create_test_config());
let changes = Vec::new();
let (event_tx, _event_rx) = tokio::sync::mpsc::channel::<ParallelEvent>(32);
let result = service
.prepare_parallel_execution(changes, &event_tx, false)
.await
.expect("prepare_parallel_execution");
assert!(
result.is_none(),
"empty startup without reducer-owned ResolveWait must remain a safe no-op"
);
}
#[tokio::test]
async fn test_analyze_order_fallback_preserves_metadata_dependencies_when_llm_disabled() {
let temp_dir = TempDir::new().expect("tempdir");
let mut config = create_test_config();
config.use_llm_analysis = Some(false);
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), config);
let changes = vec![
create_test_change("route", vec!["policy"]),
create_test_change("policy", vec![]),
];
let result = service
.analyze_order_with_sender(&changes, &[], None, 1)
.await;
assert_eq!(
result.order,
vec!["route".to_string(), "policy".to_string()]
);
assert_eq!(
result.dependencies.get("route"),
Some(&vec!["policy".to_string()])
);
}
#[tokio::test]
async fn test_analyze_order_recoverable_fallback_preserves_metadata_dependencies() {
let temp_dir = TempDir::new().expect("tempdir");
let mut config = create_test_config();
config.use_llm_analysis = Some(true);
config.analyze_command = Some("printf 'not json'".to_string());
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), config);
let changes = vec![
create_test_change("route", vec!["policy"]),
create_test_change("policy", vec![]),
];
let result = service
.analyze_order_with_sender(&changes, &[], None, 1)
.await;
assert_eq!(
result.order,
vec!["route".to_string(), "policy".to_string()]
);
assert_eq!(
result.dependencies.get("route"),
Some(&vec!["policy".to_string()])
);
assert!(
!result.dependencies.is_empty(),
"recoverable fallback must not degrade to dependency-free analysis"
);
}
#[test]
fn test_recoverable_fallback_log_uses_warn_level_only() {
use tracing::Level;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Layer;
#[derive(Clone, Default)]
struct CaptureLayer(std::sync::Arc<std::sync::Mutex<Vec<(Level, String)>>>);
impl<S> Layer<S> for CaptureLayer
where
S: tracing::Subscriber,
{
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
struct Visitor {
fields: String,
}
impl tracing::field::Visit for Visitor {
fn record_debug(
&mut self,
field: &tracing::field::Field,
value: &dyn std::fmt::Debug,
) {
self.fields
.push_str(&format!("{}={:?};", field.name(), value));
}
}
let mut visitor = Visitor {
fields: String::new(),
};
event.record(&mut visitor);
self.0
.lock()
.expect("capture layer mutex")
.push((*event.metadata().level(), visitor.fields));
}
}
let capture = CaptureLayer::default();
let subscriber = tracing_subscriber::registry().with(capture.clone());
tracing::subscriber::with_default(subscriber, || {
ParallelRunService::log_recoverable_analysis_fallback(&"invalid dependency graph");
});
let events = capture.0.lock().expect("capture layer mutex");
assert_eq!(events.len(), 1);
assert_eq!(events[0].0, Level::WARN);
assert!(
events[0]
.1
.contains("falling back to metadata-dependency-only analysis"),
"fallback diagnostic should remain operator-visible"
);
assert!(
events[0].1.contains("invalid dependency graph"),
"original LLM analysis failure should remain visible as warning context"
);
assert!(
events.iter().all(|(level, _)| *level != Level::ERROR),
"recoverable fallback must not emit ERROR-level records"
);
}
#[tokio::test]
async fn test_order_based_empty_resolve_wait_shared_state_enters_scheduler_path() {
use crate::orchestration::state::{
ExecutionMode, OrchestratorState, ReducerCommand, WorkspaceObservation,
};
use crate::parallel::ParallelExecutor;
use std::sync::Arc;
use tokio::sync::RwLock;
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let _ = Command::new("git")
.args(["commit", "--allow-empty", "-m", "initial commit"])
.current_dir(temp_dir.path())
.output()
.await;
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), create_test_config());
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<ParallelEvent>(32);
let shared = Arc::new(RwLock::new(OrchestratorState::with_mode(
vec!["alpha".to_string()],
3,
ExecutionMode::Parallel,
)));
{
let mut state = shared.write().await;
state.apply_observation("alpha", WorkspaceObservation::WorkspaceArchived);
state.apply_command(ReducerCommand::ResolveMerge("alpha".to_string()));
}
let mut executor = ParallelExecutor::new(
temp_dir.path().to_path_buf(),
create_test_config(),
Some(event_tx.clone()),
);
executor.set_shared_orchestrator_state(shared.clone());
service
.run_parallel_order_based_with_executor(executor, Vec::new(), event_tx.clone())
.await
.expect("empty ResolveWait scheduler path should run");
drop(event_tx);
let mut rejected_empty_start = false;
while let Some(event) = event_rx.recv().await {
if matches!(event, ParallelEvent::ParallelStartRejected { .. }) {
rejected_empty_start = true;
}
}
assert!(
!rejected_empty_start,
"empty ResolveWait startup must not be treated as a zero-change start rejection"
);
}
#[tokio::test]
async fn test_run_parallel_all_rejected_forwards_event_to_callback() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
let placeholder = base_dir.join("placeholder");
std::fs::create_dir_all(&placeholder).unwrap();
std::fs::write(placeholder.join("proposal.md"), "placeholder").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "initial commit"])
.current_dir(temp_dir.path())
.output()
.await;
for id in &["change-a", "change-b"] {
std::fs::create_dir_all(base_dir.join(id)).unwrap();
std::fs::write(base_dir.join(id).join("proposal.md"), "test").unwrap();
}
let service = ParallelRunService::new(temp_dir.path().to_path_buf(), create_test_config());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let collected_events: std::sync::Arc<std::sync::Mutex<Vec<ParallelEvent>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collected_events_clone = collected_events.clone();
service
.run_parallel(changes, None, move |event| {
collected_events_clone.lock().unwrap().push(event);
})
.await
.expect("run_parallel should succeed even when all changes are rejected");
let events = collected_events.lock().unwrap();
let got_rejection = events
.iter()
.any(|e| matches!(e, ParallelEvent::ParallelStartRejected { .. }));
assert!(
got_rejection,
"ParallelStartRejected must be forwarded to the callback when all changes are rejected at start time"
);
}
}