use crate::config::OrchestratorConfig;
use crate::events::ExecutionEvent;
use crate::openspec::{Change, ProposalMetadata};
use crate::orchestration::state::{ExecutionMode, OrchestratorState, ReducerCommand, WaitState};
use crate::parallel::cleanup::WorkspaceCleanupGuard;
use crate::parallel::dynamic_queue::ReanalysisReason;
use crate::parallel::queue_state::ReanalysisDispatchContext;
use crate::parallel::{
MergeResult, MergeResultOrigin, MergeTaskOutcome, ParallelExecutor, WorkspaceResult,
};
use crate::vcs::VcsBackend;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::process::Command;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
fn create_test_config() -> OrchestratorConfig {
OrchestratorConfig {
apply_command: Some("echo apply {change_id}".to_string()),
archive_command: Some("echo archive {change_id}".to_string()),
analyze_command: Some("echo analyze".to_string()),
acceptance_command: Some("echo acceptance".to_string()),
resolve_command: Some("echo resolve".to_string()),
..Default::default()
}
}
async fn init_git_repo(repo_root: &std::path::Path) {
Command::new("git")
.args(["init", "-b", "main"])
.current_dir(repo_root)
.output()
.await
.expect("git init should run");
Command::new("git")
.args(["config", "user.email", "test@example.com"])
.current_dir(repo_root)
.output()
.await
.expect("git config email should run");
Command::new("git")
.args(["config", "user.name", "Test User"])
.current_dir(repo_root)
.output()
.await
.expect("git config name should run");
std::fs::write(repo_root.join("README.md"), "initial\n").expect("write initial file");
Command::new("git")
.args(["add", "."])
.current_dir(repo_root)
.output()
.await
.expect("git add should run");
Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(repo_root)
.output()
.await
.expect("git commit should run");
}
#[tokio::test]
async fn test_auto_resolve_counter_reduces_available_slots() {
let temp_dir = TempDir::new().unwrap();
let repo_root = temp_dir.path().to_path_buf();
let config = create_test_config();
let executor = ParallelExecutor::new(repo_root.clone(), config.clone(), None);
let auto_resolve_counter = executor.get_auto_resolve_counter();
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Auto resolve counter should start at 0"
);
auto_resolve_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
1,
"Auto resolve counter should be 1 after increment"
);
auto_resolve_counter.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Auto resolve counter should return to 0 after completion"
);
}
#[tokio::test]
async fn test_multiple_auto_resolves_consume_multiple_slots() {
let temp_dir = TempDir::new().unwrap();
let repo_root = temp_dir.path().to_path_buf();
let config = create_test_config();
let executor = ParallelExecutor::new(repo_root.clone(), config.clone(), None);
let auto_resolve_counter = executor.get_auto_resolve_counter();
auto_resolve_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
auto_resolve_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
2,
"Auto resolve counter should be 2 for concurrent resolves"
);
auto_resolve_counter.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
1,
"Auto resolve counter should be 1 after one completes"
);
auto_resolve_counter.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Auto resolve counter should be 0 after all complete"
);
}
#[test]
fn test_auto_resolve_counter_is_thread_safe() {
let temp_dir = TempDir::new().unwrap();
let repo_root = temp_dir.path().to_path_buf();
let config = create_test_config();
let executor = ParallelExecutor::new(repo_root.clone(), config.clone(), None);
let counter = executor.get_auto_resolve_counter();
let handles: Vec<_> = (0..10)
.map(|_| {
let counter_clone = counter.clone();
std::thread::spawn(move || {
for _ in 0..100 {
counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
counter_clone.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
assert_eq!(
counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Counter should be 0 after concurrent increment/decrement operations"
);
}
#[tokio::test]
async fn test_combined_manual_and_auto_resolve_slots() {
let temp_dir = TempDir::new().unwrap();
let repo_root = temp_dir.path().to_path_buf();
let config = create_test_config();
let mut executor = ParallelExecutor::new(repo_root.clone(), config.clone(), None);
let manual_resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
executor.set_manual_resolve_counter(manual_resolve_counter.clone());
let auto_resolve_counter = executor.get_auto_resolve_counter();
manual_resolve_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
auto_resolve_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
manual_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
1,
"Manual resolve counter should be 1"
);
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
1,
"Auto resolve counter should be 1"
);
manual_resolve_counter.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
manual_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Manual resolve counter should be 0 after completion"
);
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
1,
"Auto resolve counter should still be 1"
);
auto_resolve_counter.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
auto_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Auto resolve counter should be 0 after completion"
);
}
fn test_change(id: &str) -> Change {
Change {
id: id.to_string(),
completed_tasks: 0,
total_tasks: 1,
last_modified: String::new(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}
}
fn analysis_result<'a>(
changes: &'a [Change],
_in_flight: &'a [String],
_iteration: u32,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::analyzer::AnalysisResult> + Send + 'a>>
{
let order = changes.iter().map(|change| change.id.clone()).collect();
Box::pin(async move {
crate::analyzer::AnalysisResult {
order,
dependencies: HashMap::new(),
groups: None,
}
})
}
#[tokio::test]
async fn test_auto_resolve_zero_capacity_runs_analysis_but_suppresses_apply_dispatch() {
let temp_dir = TempDir::new().unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(16);
let mut executor = ParallelExecutor::new(
temp_dir.path().to_path_buf(),
create_test_config(),
Some(tx),
);
executor
.get_auto_resolve_counter()
.store(1, std::sync::atomic::Ordering::SeqCst);
let mut queued = vec![test_change("queued-auto-apply")];
let mut in_flight = HashSet::new();
let semaphore = Arc::new(Semaphore::new(1));
let mut join_set: JoinSet<WorkspaceResult> = JoinSet::new();
let mut cleanup_guard =
WorkspaceCleanupGuard::new(VcsBackend::Git, temp_dir.path().to_path_buf());
let (should_break, iteration) = executor
.perform_reanalysis_and_dispatch(ReanalysisDispatchContext {
queued: &mut queued,
in_flight: &mut in_flight,
max_parallelism: 1,
iteration: 1,
reanalysis_reason: ReanalysisReason::ResolveCompletion,
analyzer: &analysis_result,
semaphore,
join_set: &mut join_set,
cleanup_guard: &mut cleanup_guard,
})
.await
.expect("re-analysis should not fail");
assert!(!should_break);
assert_eq!(
iteration, 1,
"suppressed dispatch must not advance iteration"
);
assert!(
in_flight.is_empty(),
"zero capacity must not start apply work"
);
assert_eq!(
queued.len(),
1,
"queued change remains pending until capacity recovers"
);
assert!(
join_set.is_empty(),
"no workspace task should be spawned at zero capacity"
);
let mut saw_analysis_started = false;
let mut saw_apply_started = false;
while let Ok(event) = rx.try_recv() {
match event {
ExecutionEvent::AnalysisStarted { .. } => saw_analysis_started = true,
ExecutionEvent::ApplyStarted { .. } => saw_apply_started = true,
_ => {}
}
}
assert!(
saw_analysis_started,
"queued work should enter analysis during active automatic resolve"
);
assert!(
!saw_apply_started,
"ordinary apply must remain capacity-gated during active automatic resolve"
);
}
#[tokio::test]
async fn deferred_retry_repromotes_and_converges_to_merged_without_user_action() {
let temp_dir = TempDir::new().unwrap();
init_git_repo(temp_dir.path()).await;
let workspace_parent = TempDir::new().unwrap();
let workspace_dir = workspace_parent.path().join("ws-change-a");
Command::new("git")
.args([
"worktree",
"add",
"-b",
"ws-change-a",
workspace_dir.to_str().expect("utf-8 temp path"),
"HEAD",
])
.current_dir(temp_dir.path())
.output()
.await
.expect("git worktree add should run");
let mut executor =
ParallelExecutor::new(temp_dir.path().to_path_buf(), create_test_config(), None);
executor.workspace_manager = Box::new(
super::executor::TestWorkspaceManager::new(Arc::new(AtomicUsize::new(1)))
.with_existing_workspace("change-a", workspace_dir.clone()),
);
let (merge_result_tx, mut merge_result_rx) = tokio::sync::mpsc::channel(8);
let shared = Arc::new(tokio::sync::RwLock::new(OrchestratorState::with_mode(
vec!["change-a".to_string()],
3,
ExecutionMode::Parallel,
)));
{
let mut guard = shared.write().await;
guard.apply_observation(
"change-a",
crate::orchestration::state::WorkspaceObservation::WorkspaceArchived,
);
guard.apply_command(ReducerCommand::ResolveMerge("change-a".to_string()));
assert_eq!(
guard.promote_next_base_mutating_lane_waiter(),
Some(("change-a".to_string(), WaitState::ResolveWait))
);
}
executor.set_shared_orchestrator_state(shared.clone());
executor
.pending_merge_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(
!executor
.handle_merge_result_with_tx(
MergeResult {
change_id: "change-a".to_string(),
workspace_name: "ws-change-a".to_string(),
origin: MergeResultOrigin::ResolveWaitRetry,
outcome: Ok(MergeTaskOutcome::deferred("Merge lane busy", true)),
},
&merge_result_tx,
)
.await
);
assert!(!shared.read().await.is_base_mutating_lane_occupied());
executor
.pending_merge_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert!(
executor
.handle_merge_result_with_tx(
MergeResult {
change_id: "blocking-merge".to_string(),
workspace_name: "ws-blocking-merge".to_string(),
origin: MergeResultOrigin::PostArchiveMerge,
outcome: Ok(MergeTaskOutcome::Merged),
},
&merge_result_tx,
)
.await
);
let retry_result = tokio::time::timeout(
std::time::Duration::from_millis(500),
merge_result_rx.recv(),
)
.await
.expect("promotion should spawn a retry task result")
.expect("promotion result channel closed");
assert_eq!(retry_result.change_id, "change-a");
assert_eq!(retry_result.origin, MergeResultOrigin::ResolveWaitRetry);
assert!(
matches!(retry_result.outcome, Ok(MergeTaskOutcome::Merged)),
"spawned retry itself must reach a merged outcome without a synthetic replacement: {:?}",
retry_result.outcome
);
assert!(
executor
.handle_merge_result_with_tx(retry_result, &merge_result_tx)
.await,
"scheduler must accept the spawned retry's merged outcome without user action"
);
assert_eq!(
executor
.pending_merge_count
.load(std::sync::atomic::Ordering::Relaxed),
0,
"all spawned retry accounting should be cleared after convergence"
);
assert!(shared.read().await.global_invariants_hold());
}