use crate::ai_command_runner::{AiCommandRunner, SharedStaggerState};
use crate::analyzer::{ParallelGroup, ParallelizationAnalyzer};
use crate::command_queue::CommandQueueConfig;
use crate::config::defaults::*;
use crate::config::OrchestratorConfig;
use crate::error::Result;
use crate::hooks::HookRunner;
use crate::openspec::Change;
use crate::parallel::{ParallelEvent, ParallelExecutor};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
pub struct ParallelRunService {
config: OrchestratorConfig,
repo_root: PathBuf,
no_resume: bool,
shared_stagger_state: SharedStaggerState,
ai_runner: AiCommandRunner,
}
impl ParallelRunService {
pub fn new(repo_root: PathBuf, config: OrchestratorConfig) -> Self {
let shared_stagger_state: SharedStaggerState = Arc::new(Mutex::new(None));
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state.clone());
ai_runner.set_stream_json_textify(config.get_stream_json_textify());
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
Self {
config,
repo_root,
no_resume: false,
shared_stagger_state,
ai_runner,
}
}
pub fn new_with_shared_state(
repo_root: PathBuf,
config: OrchestratorConfig,
shared_stagger_state: SharedStaggerState,
) -> Self {
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let mut ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state.clone());
ai_runner.set_stream_json_textify(config.get_stream_json_textify());
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
Self {
config,
repo_root,
no_resume: false,
shared_stagger_state,
ai_runner,
}
}
pub fn set_no_resume(&mut self, no_resume: bool) {
self.no_resume = no_resume;
}
pub async fn check_vcs_available(&self) -> Result<()> {
if !crate::cli::check_parallel_available() {
return Err(crate::error::OrchestratorError::GitCommand(
"Git repository not available for parallel execution".to_string(),
));
}
Ok(())
}
pub fn create_executor_with_queue_state(
&self,
event_tx: Option<mpsc::Sender<ParallelEvent>>,
cancel_token: Option<CancellationToken>,
shared_queue_change: Option<std::sync::Arc<tokio::sync::Mutex<Option<std::time::Instant>>>>,
dynamic_queue: Option<std::sync::Arc<crate::tui::queue::DynamicQueue>>,
manual_resolve_counter: Option<std::sync::Arc<std::sync::atomic::AtomicUsize>>,
) -> ParallelExecutor {
let vcs_backend = self.config.get_vcs_backend();
let hooks = if let Some(ref tx) = event_tx {
HookRunner::with_event_tx(self.config.get_hooks(), &self.repo_root, tx.clone())
} else {
HookRunner::new(self.config.get_hooks(), &self.repo_root)
};
let has_dynamic_queue = dynamic_queue.is_some();
let mut executor = ParallelExecutor::with_backend_and_queue_and_stagger(
self.repo_root.clone(),
self.config.clone(),
event_tx,
vcs_backend,
shared_queue_change,
Some(self.shared_stagger_state.clone()),
);
executor.set_no_resume(self.no_resume);
if has_dynamic_queue {
executor.set_persistent_lifetime();
}
executor.set_hooks(hooks);
if let Some(token) = cancel_token {
executor.set_cancel_token(token);
}
if let Some(queue) = dynamic_queue {
executor.set_dynamic_queue(queue);
}
if let Some(counter) = manual_resolve_counter {
executor.set_manual_resolve_counter(counter);
}
executor
}
async fn filter_committed_changes(
&self,
changes: Vec<Change>,
) -> Result<(Vec<Change>, Vec<String>)> {
let committed_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_in_head(&self.repo_root).await {
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
warn!(
"Failed to load committed change snapshot; assuming all changes are committed: {}",
err
);
return Ok((changes, Vec::new()));
}
};
let uncommitted_file_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_with_uncommitted_files(&self.repo_root)
.await
{
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
warn!(
"Failed to detect uncommitted files in changes; assuming no uncommitted files: {}",
err
);
HashSet::new()
}
};
let mut committed = Vec::new();
let mut skipped = Vec::new();
for change in changes {
if !committed_change_ids.contains(&change.id)
|| uncommitted_file_change_ids.contains(&change.id)
{
skipped.push(change.id);
} else {
committed.push(change);
}
}
skipped.sort();
Ok((committed, skipped))
}
async fn prepare_parallel_execution(
&self,
changes: Vec<Change>,
event_tx: &mpsc::Sender<ParallelEvent>,
) -> Result<Option<Vec<Change>>> {
let (changes, skipped) = self.filter_committed_changes(changes).await?;
if !skipped.is_empty() {
let message = format!(
"Skipping uncommitted changes in parallel mode: {}",
skipped.join(", ")
);
warn!("{}", message);
let _ = event_tx
.send(ParallelEvent::Warning {
title: "Uncommitted changes skipped".to_string(),
message,
})
.await;
let _ = event_tx
.send(ParallelEvent::ParallelStartRejected {
change_ids: skipped.clone(),
reason: "uncommitted or not in HEAD".to_string(),
})
.await;
}
if changes.is_empty() {
info!("No committed changes available for parallel execution");
return Ok(None);
}
Ok(Some(changes))
}
pub async fn run_parallel<F>(
&self,
changes: Vec<Change>,
cancel_token: Option<CancellationToken>,
event_handler: F,
) -> Result<()>
where
F: Fn(ParallelEvent) + Send + Sync + 'static,
{
let (event_tx, mut event_rx) = mpsc::channel::<ParallelEvent>(100);
let changes = match self.prepare_parallel_execution(changes, &event_tx).await? {
Some(changes) => changes,
None => {
drop(event_tx);
while let Some(event) = event_rx.recv().await {
event_handler(event);
}
return Ok(());
}
};
info!(
"Starting parallel execution with re-analysis for {} changes",
changes.len()
);
let forward_handle = tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
let is_completed =
matches!(event, ParallelEvent::AllCompleted | ParallelEvent::Stopped);
event_handler(event);
if is_completed {
break;
}
}
});
let mut executor = ParallelExecutor::with_backend_and_queue_and_stagger(
self.repo_root.clone(),
self.config.clone(),
Some(event_tx.clone()),
self.config.get_vcs_backend(),
None,
Some(self.shared_stagger_state.clone()),
);
executor.set_no_resume(self.no_resume);
let hooks =
HookRunner::with_event_tx(self.config.get_hooks(), &self.repo_root, event_tx.clone());
executor.set_hooks(hooks);
if let Some(token) = cancel_token {
executor.set_cancel_token(token);
}
let config = self.config.clone();
let repo_root = self.repo_root.clone();
let shared_stagger_state = self.shared_stagger_state.clone();
let result = executor
.execute_with_order_based_reanalysis(
changes,
move |remaining, in_flight_ids, iteration| {
let config = config.clone();
let repo_root = repo_root.clone();
let event_tx = event_tx.clone();
let shared_stagger_state = shared_stagger_state.clone();
Box::pin(async move {
let service = ParallelRunService::new_with_shared_state(
repo_root,
config,
shared_stagger_state,
);
service
.analyze_order_with_sender(
remaining,
in_flight_ids,
Some(&event_tx),
iteration,
)
.await
})
},
)
.await;
let _ = forward_handle.await;
result
}
pub async fn run_parallel_with_channel_and_queue_state(
&self,
changes: Vec<Change>,
event_tx: mpsc::Sender<ParallelEvent>,
cancel_token: Option<CancellationToken>,
shared_queue_change: Option<std::sync::Arc<tokio::sync::Mutex<Option<std::time::Instant>>>>,
dynamic_queue: Option<std::sync::Arc<crate::tui::queue::DynamicQueue>>,
manual_resolve_counter: Option<std::sync::Arc<std::sync::atomic::AtomicUsize>>,
) -> Result<()> {
let executor = self.create_executor_with_queue_state(
Some(event_tx.clone()),
cancel_token,
shared_queue_change,
dynamic_queue,
manual_resolve_counter,
);
self.run_parallel_order_based_with_executor(executor, changes, event_tx)
.await
}
pub async fn run_parallel_order_based_with_executor(
&self,
mut executor: ParallelExecutor,
changes: Vec<Change>,
event_tx: mpsc::Sender<ParallelEvent>,
) -> Result<()> {
let changes = match self.prepare_parallel_execution(changes, &event_tx).await? {
Some(changes) => changes,
None => return Ok(()),
};
info!(
"Starting order-based parallel execution with re-analysis for {} changes",
changes.len()
);
let config = self.config.clone();
let repo_root = self.repo_root.clone();
let shared_stagger_state = self.shared_stagger_state.clone();
executor
.execute_with_order_based_reanalysis(
changes,
move |remaining, in_flight_ids, iteration| {
let config = config.clone();
let repo_root = repo_root.clone();
let event_tx = event_tx.clone();
let shared_stagger_state = shared_stagger_state.clone();
Box::pin(async move {
let service = ParallelRunService::new_with_shared_state(
repo_root,
config,
shared_stagger_state,
);
service
.analyze_order_with_sender(
remaining,
in_flight_ids,
Some(&event_tx),
iteration,
)
.await
})
},
)
.await
}
pub async fn analyze_and_group_public(&self, changes: &[Change]) -> Vec<ParallelGroup> {
self.analyze_and_group(changes).await
}
async fn analyze_and_group(&self, changes: &[Change]) -> Vec<ParallelGroup> {
self.analyze_and_group_with_sender(changes, None, 1).await
}
async fn analyze_order_with_sender(
&self,
changes: &[Change],
in_flight_ids: &[String],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
iteration: u32,
) -> crate::analyzer::AnalysisResult {
if self.config.use_llm_analysis() {
info!("Using LLM analysis for parallelization (analyze_command)");
match self
.analyze_order_with_llm_streaming(changes, in_flight_ids, event_tx, iteration)
.await
{
Ok(result) => {
info!(
"LLM analysis successful: {} changes in order",
result.order.len()
);
return result;
}
Err(e) => {
error!("LLM analysis failed: {}", e);
warn!(
"Falling back to running all changes in parallel (no dependency analysis)"
);
}
}
} else {
info!("LLM analysis disabled, running all changes in parallel");
}
crate::analyzer::AnalysisResult {
order: changes.iter().map(|c| c.id.clone()).collect(),
dependencies: HashMap::new(),
groups: None,
}
}
async fn analyze_and_group_with_sender(
&self,
changes: &[Change],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
iteration: u32,
) -> Vec<ParallelGroup> {
if self.config.use_llm_analysis() {
info!("Using LLM analysis for parallelization (analyze_command)");
match self
.analyze_with_llm_streaming(changes, event_tx, iteration)
.await
{
Ok(groups) => {
info!("LLM analysis successful: {} groups", groups.len());
return groups;
}
Err(e) => {
error!("LLM analysis failed: {}", e);
warn!(
"Falling back to running all changes in parallel (no dependency analysis)"
);
}
}
} else {
info!("LLM analysis disabled, running all changes in parallel");
}
Self::all_parallel(changes)
}
fn all_parallel(changes: &[Change]) -> Vec<ParallelGroup> {
if changes.is_empty() {
return Vec::new();
}
vec![ParallelGroup {
id: 1,
changes: changes.iter().map(|c| c.id.clone()).collect(),
depends_on: Vec::new(),
}]
}
async fn analyze_order_with_llm_streaming(
&self,
changes: &[Change],
in_flight_ids: &[String],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
iteration: u32,
) -> Result<crate::analyzer::AnalysisResult> {
let analyzer = ParallelizationAnalyzer::new(self.ai_runner.clone(), self.config.clone());
if let Some(tx) = event_tx {
let tx = tx.clone();
analyzer
.analyze_with_callback(changes, in_flight_ids, move |output| {
let _ = tx.try_send(ParallelEvent::AnalysisOutput {
output: output.clone(),
iteration,
});
})
.await
} else {
analyzer.analyze_with_inflight(changes, in_flight_ids).await
}
}
async fn analyze_with_llm_streaming(
&self,
changes: &[Change],
event_tx: Option<&mpsc::Sender<ParallelEvent>>,
iteration: u32,
) -> Result<Vec<ParallelGroup>> {
let analyzer = ParallelizationAnalyzer::new(self.ai_runner.clone(), self.config.clone());
if let Some(tx) = event_tx {
let tx = tx.clone();
analyzer
.analyze_groups_with_callback(changes, move |output| {
let _ = tx.try_send(ParallelEvent::AnalysisOutput {
output: output.clone(),
iteration,
});
})
.await
} else {
analyzer.analyze_groups(changes).await
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::openspec::ProposalMetadata;
use tempfile::TempDir;
use tokio::process::Command;
fn create_test_change(id: &str, dependencies: Vec<&str>) -> Change {
Change {
id: id.to_string(),
completed_tasks: 0,
total_tasks: 5,
last_modified: "1m ago".to_string(),
dependencies: dependencies.into_iter().map(String::from).collect(),
metadata: ProposalMetadata::default(),
}
}
async fn init_git_repo(temp_dir: &TempDir) -> bool {
let init_result = Command::new("git")
.args(["init"])
.current_dir(temp_dir.path())
.output()
.await;
let init_ok = init_result
.as_ref()
.map(|output| output.status.success())
.unwrap_or(false);
if !init_ok {
return false;
}
let _ = Command::new("git")
.args(["config", "user.email", "test@example.com"])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["config", "user.name", "Test User"])
.current_dir(temp_dir.path())
.output()
.await;
true
}
#[tokio::test]
async fn test_filter_committed_changes_skips_uncommitted() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
std::fs::create_dir_all(base_dir.join("change-a")).unwrap();
std::fs::write(base_dir.join("change-a/proposal.md"), "test").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "add change-a"])
.current_dir(temp_dir.path())
.output()
.await;
std::fs::create_dir_all(base_dir.join("change-b")).unwrap();
std::fs::write(base_dir.join("change-b/proposal.md"), "test").unwrap();
let service =
ParallelRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let (committed, skipped) = service
.filter_committed_changes(changes)
.await
.expect("filter changes");
let committed_ids: Vec<String> = committed.into_iter().map(|change| change.id).collect();
assert_eq!(committed_ids, vec!["change-a".to_string()]);
assert_eq!(skipped, vec!["change-b".to_string()]);
}
#[tokio::test]
async fn test_filter_committed_changes_skips_partially_uncommitted() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
std::fs::create_dir_all(base_dir.join("change-a")).unwrap();
std::fs::write(base_dir.join("change-a/proposal.md"), "test").unwrap();
std::fs::create_dir_all(base_dir.join("change-b")).unwrap();
std::fs::write(base_dir.join("change-b/proposal.md"), "test").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "add changes"])
.current_dir(temp_dir.path())
.output()
.await;
std::fs::write(base_dir.join("change-a/tasks.md"), "new task").unwrap();
let service =
ParallelRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let (committed, skipped) = service
.filter_committed_changes(changes)
.await
.expect("filter changes");
let committed_ids: Vec<String> = committed.into_iter().map(|change| change.id).collect();
assert_eq!(committed_ids, vec!["change-b".to_string()]);
assert_eq!(skipped, vec!["change-a".to_string()]);
}
#[tokio::test]
async fn test_prepare_parallel_execution_emits_rejection_event() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
std::fs::create_dir_all(base_dir.join("change-a")).unwrap();
std::fs::write(base_dir.join("change-a/proposal.md"), "test").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "add change-a"])
.current_dir(temp_dir.path())
.output()
.await;
std::fs::create_dir_all(base_dir.join("change-b")).unwrap();
std::fs::write(base_dir.join("change-b/proposal.md"), "test").unwrap();
let service =
ParallelRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<ParallelEvent>(32);
let result = service
.prepare_parallel_execution(changes, &event_tx)
.await
.expect("prepare_parallel_execution");
assert!(result.is_some(), "change-a should still be eligible");
let committed = result.unwrap();
assert_eq!(committed.len(), 1);
assert_eq!(committed[0].id, "change-a");
drop(event_tx);
let mut got_rejection_event = false;
while let Some(event) = event_rx.recv().await {
if let ParallelEvent::ParallelStartRejected { change_ids, .. } = event {
assert!(
change_ids.contains(&"change-b".to_string()),
"rejection event should include change-b"
);
got_rejection_event = true;
}
}
assert!(
got_rejection_event,
"expected a ParallelStartRejected event for the uncommitted change"
);
}
#[tokio::test]
async fn test_prepare_parallel_execution_all_rejected_emits_rejection_event() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
let placeholder = base_dir.join("placeholder");
std::fs::create_dir_all(&placeholder).unwrap();
std::fs::write(placeholder.join("proposal.md"), "placeholder").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "initial commit"])
.current_dir(temp_dir.path())
.output()
.await;
for id in &["change-a", "change-b"] {
std::fs::create_dir_all(base_dir.join(id)).unwrap();
std::fs::write(base_dir.join(id).join("proposal.md"), "test").unwrap();
}
let service =
ParallelRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<ParallelEvent>(32);
let result = service
.prepare_parallel_execution(changes, &event_tx)
.await
.expect("prepare_parallel_execution");
assert!(
result.is_none(),
"all changes were uncommitted so result should be None"
);
drop(event_tx);
let mut got_rejection_event = false;
let mut rejected_ids: Vec<String> = Vec::new();
while let Some(event) = event_rx.recv().await {
if let ParallelEvent::ParallelStartRejected { change_ids, .. } = event {
rejected_ids = change_ids;
got_rejection_event = true;
}
}
assert!(
got_rejection_event,
"expected a ParallelStartRejected event even when all changes are rejected"
);
rejected_ids.sort();
assert_eq!(rejected_ids, vec!["change-a", "change-b"]);
}
#[tokio::test]
async fn test_run_parallel_all_rejected_forwards_event_to_callback() {
let temp_dir = TempDir::new().expect("tempdir");
if !init_git_repo(&temp_dir).await {
return;
}
let base_dir = temp_dir.path().join("openspec/changes");
let placeholder = base_dir.join("placeholder");
std::fs::create_dir_all(&placeholder).unwrap();
std::fs::write(placeholder.join("proposal.md"), "placeholder").unwrap();
let _ = Command::new("git")
.args(["add", "."])
.current_dir(temp_dir.path())
.output()
.await;
let _ = Command::new("git")
.args(["commit", "-m", "initial commit"])
.current_dir(temp_dir.path())
.output()
.await;
for id in &["change-a", "change-b"] {
std::fs::create_dir_all(base_dir.join(id)).unwrap();
std::fs::write(base_dir.join(id).join("proposal.md"), "test").unwrap();
}
let service =
ParallelRunService::new(temp_dir.path().to_path_buf(), OrchestratorConfig::default());
let changes = vec![
create_test_change("change-a", vec![]),
create_test_change("change-b", vec![]),
];
let collected_events: std::sync::Arc<std::sync::Mutex<Vec<ParallelEvent>>> =
std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
let collected_events_clone = collected_events.clone();
service
.run_parallel(changes, None, move |event| {
collected_events_clone.lock().unwrap().push(event);
})
.await
.expect("run_parallel should succeed even when all changes are rejected");
let events = collected_events.lock().unwrap();
let got_rejection = events
.iter()
.any(|e| matches!(e, ParallelEvent::ParallelStartRejected { .. }));
assert!(
got_rejection,
"ParallelStartRejected must be forwarded to the callback when all changes are rejected at start time"
);
}
}