use crate::config::OrchestratorConfig;
use crate::events::ExecutionEvent;
use crate::openspec::{Change, ProposalMetadata};
use crate::parallel::cleanup::WorkspaceCleanupGuard;
use crate::parallel::dynamic_queue::ReanalysisReason;
use crate::parallel::{ParallelExecutor, WorkspaceResult};
use crate::tui::queue::DynamicQueue;
use crate::vcs::VcsBackend;
use std::collections::{HashMap, HashSet};
use std::sync::{atomic::AtomicUsize, Arc};
use tempfile::TempDir;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
fn create_test_config() -> OrchestratorConfig {
OrchestratorConfig {
apply_command: Some("echo apply {change_id}".to_string()),
archive_command: Some("echo archive {change_id}".to_string()),
analyze_command: Some("echo analyze".to_string()),
acceptance_command: Some("echo acceptance".to_string()),
resolve_command: Some("echo resolve".to_string()),
..Default::default()
}
}
#[tokio::test]
async fn test_manual_resolve_counter_reduces_available_slots() {
let temp_dir = TempDir::new().unwrap();
let repo_root = temp_dir.path().to_path_buf();
let config = create_test_config();
let manual_resolve_counter = Arc::new(AtomicUsize::new(0));
let mut executor = ParallelExecutor::new(repo_root.clone(), config.clone(), None);
executor.set_manual_resolve_counter(manual_resolve_counter.clone());
assert_eq!(
manual_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Manual resolve counter should start at 0"
);
manual_resolve_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
manual_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
1,
"Manual resolve counter should be 1 after increment"
);
manual_resolve_counter.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
manual_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Manual resolve counter should return to 0 after completion"
);
}
#[tokio::test]
async fn test_multiple_manual_resolves_consume_multiple_slots() {
let temp_dir = TempDir::new().unwrap();
let repo_root = temp_dir.path().to_path_buf();
let config = create_test_config();
let manual_resolve_counter = Arc::new(AtomicUsize::new(0));
let mut executor = ParallelExecutor::new(repo_root.clone(), config.clone(), None);
executor.set_manual_resolve_counter(manual_resolve_counter.clone());
manual_resolve_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
manual_resolve_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
manual_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
2,
"Manual resolve counter should be 2 for concurrent resolves"
);
manual_resolve_counter.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
manual_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
1,
"Manual resolve counter should be 1 after one completes"
);
manual_resolve_counter.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
assert_eq!(
manual_resolve_counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Manual resolve counter should be 0 after all complete"
);
}
#[tokio::test]
async fn test_manual_resolve_completion_notifies_scheduler() {
let queue = DynamicQueue::new();
let notified = queue.notified();
queue.notify_scheduler();
tokio::time::timeout(std::time::Duration::from_secs(1), notified)
.await
.expect("scheduler notification should wake waiters");
}
#[test]
fn test_manual_resolve_counter_is_thread_safe() {
let counter = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..10)
.map(|_| {
let counter_clone = counter.clone();
std::thread::spawn(move || {
for _ in 0..100 {
counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
counter_clone.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
assert_eq!(
counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"Counter should be 0 after concurrent increment/decrement operations"
);
}
fn test_change(id: &str) -> Change {
Change {
id: id.to_string(),
completed_tasks: 0,
total_tasks: 1,
last_modified: String::new(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}
}
fn analysis_result<'a>(
changes: &'a [Change],
_in_flight: &'a [String],
_iteration: u32,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::analyzer::AnalysisResult> + Send + 'a>>
{
let order = changes.iter().map(|change| change.id.clone()).collect();
Box::pin(async move {
crate::analyzer::AnalysisResult {
order,
dependencies: HashMap::new(),
groups: None,
}
})
}
#[tokio::test]
async fn test_manual_resolve_zero_capacity_runs_analysis_but_suppresses_apply_dispatch() {
let temp_dir = TempDir::new().unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(16);
let mut executor = ParallelExecutor::new(
temp_dir.path().to_path_buf(),
create_test_config(),
Some(tx),
);
let manual_resolve_counter = Arc::new(AtomicUsize::new(1));
executor.set_manual_resolve_counter(manual_resolve_counter);
let mut queued = vec![test_change("queued-apply")];
let mut in_flight = HashSet::new();
let semaphore = Arc::new(Semaphore::new(1));
let mut join_set: JoinSet<WorkspaceResult> = JoinSet::new();
let mut cleanup_guard =
WorkspaceCleanupGuard::new(VcsBackend::Git, temp_dir.path().to_path_buf());
let (should_break, iteration) = executor
.perform_reanalysis_and_dispatch(
&mut queued,
&mut in_flight,
1,
1,
ReanalysisReason::ResolveCompletion,
&analysis_result,
semaphore,
&mut join_set,
&mut cleanup_guard,
)
.await
.expect("re-analysis should not fail");
assert!(!should_break);
assert_eq!(
iteration, 1,
"suppressed dispatch must not advance iteration"
);
assert!(
in_flight.is_empty(),
"zero capacity must not start apply work"
);
assert_eq!(
queued.len(),
1,
"queued change remains pending until capacity recovers"
);
assert!(
join_set.is_empty(),
"no workspace task should be spawned at zero capacity"
);
let mut saw_analysis_started = false;
let mut saw_apply_started = false;
let mut saw_capacity_diagnostic = false;
while let Ok(event) = rx.try_recv() {
match event {
ExecutionEvent::AnalysisStarted { .. } => saw_analysis_started = true,
ExecutionEvent::ApplyStarted { .. } => saw_apply_started = true,
ExecutionEvent::Log(entry)
if entry
.message
.contains("dispatch_capacity_zero_after_analysis") =>
{
saw_capacity_diagnostic = true;
}
_ => {}
}
}
assert!(
saw_analysis_started,
"queued work should enter analysis during active manual resolve"
);
assert!(
!saw_apply_started,
"ordinary apply must remain capacity-gated during active manual resolve"
);
assert!(
saw_capacity_diagnostic,
"capacity-gated dispatch should emit an operator-visible diagnostic"
);
}