use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing::info;
use crate::ai_command_runner::{AiCommandRunner, SharedStaggerState};
use crate::command_queue::CommandQueueConfig;
use crate::config::defaults::*;
use crate::config::OrchestratorConfig;
use crate::hooks::HookRunner;
use crate::vcs::{GitWorkspaceManager, VcsBackend, WorkspaceManager};
use super::{
FailedChangeTracker, ParallelEvent, ParallelExecutor, SchedulerLifetime,
DEFAULT_MAX_CONFLICT_RETRIES,
};
impl ParallelExecutor {
pub fn new(
repo_root: PathBuf,
config: OrchestratorConfig,
event_tx: Option<mpsc::Sender<ParallelEvent>>,
) -> Self {
let vcs_backend = config.get_vcs_backend();
Self::with_backend(repo_root, config, event_tx, vcs_backend)
}
pub fn with_backend_and_queue_state(
repo_root: PathBuf,
config: OrchestratorConfig,
event_tx: Option<mpsc::Sender<ParallelEvent>>,
vcs_backend: VcsBackend,
shared_queue_change: Option<Arc<Mutex<Option<std::time::Instant>>>>,
) -> Self {
Self::with_backend_and_queue_and_stagger(
repo_root,
config,
event_tx,
vcs_backend,
shared_queue_change,
None,
)
}
pub fn with_backend_and_queue_and_stagger(
repo_root: PathBuf,
config: OrchestratorConfig,
event_tx: Option<mpsc::Sender<ParallelEvent>>,
vcs_backend: VcsBackend,
shared_queue_change: Option<Arc<Mutex<Option<std::time::Instant>>>>,
shared_stagger_state: Option<SharedStaggerState>,
) -> Self {
let base_dir = if let Some(configured_dir) = config.get_workspace_base_dir() {
PathBuf::from(configured_dir)
} else {
crate::config::defaults::default_workspace_base_dir(Some(&repo_root))
};
info!("Using workspace base directory: {:?}", base_dir);
let max_concurrent = config.get_max_concurrent_workspaces();
let apply_command = config
.get_apply_command()
.expect("apply_command must be configured before creating ParallelExecutor")
.to_string();
let archive_command = config
.get_archive_command()
.expect("archive_command must be configured before creating ParallelExecutor")
.to_string();
let resolved_backend = Self::resolve_backend(vcs_backend, &repo_root);
info!("Using VCS backend: {:?}", resolved_backend);
let workspace_manager: Box<dyn WorkspaceManager> = match resolved_backend {
VcsBackend::Git | VcsBackend::Auto => Box::new(GitWorkspaceManager::new(
base_dir,
repo_root.clone(),
max_concurrent,
config.clone(),
)),
};
let last_queue_change_at =
shared_queue_change.unwrap_or_else(|| Arc::new(Mutex::new(None)));
let shared_stagger_state =
shared_stagger_state.unwrap_or_else(|| Arc::new(Mutex::new(None)));
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let ai_runner = AiCommandRunner::new(queue_config, shared_stagger_state.clone());
Self {
workspace_manager,
config,
apply_command,
archive_command,
event_tx,
max_conflict_retries: DEFAULT_MAX_CONFLICT_RETRIES,
repo_root,
no_resume: false,
failed_tracker: FailedChangeTracker::new(),
change_dependencies: HashMap::new(),
resolve_wait_changes: HashSet::new(),
reject_wait_changes: HashSet::new(),
merge_wait_changes: HashSet::new(),
previously_blocked_changes: HashSet::new(),
force_recreate_worktree: HashSet::new(),
hooks: None,
cancel_token: None,
last_queue_change_at,
last_available_slots: None,
dynamic_queue: None,
ai_runner,
shared_stagger_state,
apply_history: Arc::new(Mutex::new(crate::history::ApplyHistory::new())),
archive_history: Arc::new(Mutex::new(crate::history::ArchiveHistory::new())),
acceptance_history: Arc::new(Mutex::new(crate::history::AcceptanceHistory::new())),
acceptance_tail_injected: Arc::new(Mutex::new(std::collections::HashMap::new())),
manual_resolve_count: None,
auto_resolve_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
pending_merge_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
scheduler_lifetime: SchedulerLifetime::Finite,
shared_orchestrator_state: None,
last_dispatched_resolve_wait_changes: HashSet::new(),
last_dispatched_reject_wait_changes: HashSet::new(),
resolve_wait_retry_triggered: false,
queue_reconciliation_diagnostics_seen: HashSet::new(),
no_analysis_diagnostics_seen: HashSet::new(),
}
}
pub fn with_backend(
repo_root: PathBuf,
config: OrchestratorConfig,
event_tx: Option<mpsc::Sender<ParallelEvent>>,
vcs_backend: VcsBackend,
) -> Self {
Self::with_backend_and_queue_state(repo_root, config, event_tx, vcs_backend, None)
}
#[allow(dead_code)] pub fn set_hooks(&mut self, hooks: HookRunner) {
self.hooks = Some(Arc::new(hooks));
}
pub fn set_no_resume(&mut self, no_resume: bool) {
self.no_resume = no_resume;
}
pub fn set_shared_orchestrator_state(
&mut self,
shared_state: Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
) {
self.shared_orchestrator_state = Some(shared_state);
}
pub fn set_cancel_token(&mut self, cancel_token: CancellationToken) {
self.cancel_token = Some(cancel_token);
}
pub fn set_dynamic_queue(&mut self, dynamic_queue: Arc<crate::tui::queue::DynamicQueue>) {
self.dynamic_queue = Some(dynamic_queue);
}
pub fn set_scheduler_lifetime(&mut self, lifetime: SchedulerLifetime) {
self.scheduler_lifetime = lifetime;
}
pub fn set_persistent_lifetime(&mut self) {
self.set_scheduler_lifetime(SchedulerLifetime::Persistent);
}
pub fn set_manual_resolve_counter(&mut self, counter: Arc<std::sync::atomic::AtomicUsize>) {
self.manual_resolve_count = Some(counter);
}
#[cfg(test)]
pub fn get_auto_resolve_counter(&self) -> Arc<std::sync::atomic::AtomicUsize> {
self.auto_resolve_count.clone()
}
#[allow(dead_code)] pub fn backend_type(&self) -> VcsBackend {
self.workspace_manager.backend_type()
}
#[allow(dead_code)] pub async fn check_vcs_available(&self) -> crate::error::Result<bool> {
self.workspace_manager
.check_available()
.await
.map_err(Into::into)
}
pub(super) fn resolve_backend(backend: VcsBackend, _repo_root: &Path) -> VcsBackend {
match backend {
VcsBackend::Auto => VcsBackend::Git,
other => other,
}
}
}