1use std::collections::{BTreeMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use std::time::Instant;
7
8use harn_lexer::Lexer;
9use harn_parser::{Attribute, Node, Parser, SNode};
10
11use crate::env_guard::ScopedEnvVar;
12
13#[derive(Clone, Debug)]
14pub struct TestResult {
15 pub name: String,
16 pub file: String,
17 pub passed: bool,
18 pub error: Option<String>,
19 pub duration_ms: u64,
20 pub phases: PhaseTimings,
24}
25
26#[derive(Clone, Debug)]
27pub struct TestSummary {
28 pub results: Vec<TestResult>,
29 pub passed: usize,
30 pub failed: usize,
31 pub total: usize,
32 pub duration_ms: u64,
33 pub aggregate: AggregateTimings,
35}
36
37#[derive(Debug, Default, Clone, Copy)]
43pub struct PhaseTimings {
44 pub setup_ms: u64,
47 pub compile_ms: u64,
49 pub execute_ms: u64,
51 pub teardown_ms: u64,
53}
54
55#[derive(Debug, Default, Clone, Copy)]
58pub struct AggregateTimings {
59 pub collection_ms: u64,
60 pub setup_ms: u64,
61 pub compile_ms: u64,
62 pub execute_ms: u64,
63 pub teardown_ms: u64,
64}
65
66impl AggregateTimings {
67 fn from_results(collection_ms: u64, results: &[TestResult]) -> Self {
68 results.iter().map(|r| r.phases).fold(
69 Self {
70 collection_ms,
71 ..Self::default()
72 },
73 |acc, p| Self {
74 collection_ms: acc.collection_ms,
75 setup_ms: acc.setup_ms.saturating_add(p.setup_ms),
76 compile_ms: acc.compile_ms.saturating_add(p.compile_ms),
77 execute_ms: acc.execute_ms.saturating_add(p.execute_ms),
78 teardown_ms: acc.teardown_ms.saturating_add(p.teardown_ms),
79 },
80 )
81 }
82}
83
84impl TestResult {
85 fn emit_diagnose(&self) {
89 let outcome = if self.passed { "ok" } else { "FAIL" };
90 eprintln!(
91 "[harn test diag] {} {} setup={}ms compile={}ms execute={}ms teardown={}ms total={}ms",
92 outcome,
93 self.name,
94 self.phases.setup_ms,
95 self.phases.compile_ms,
96 self.phases.execute_ms,
97 self.phases.teardown_ms,
98 self.duration_ms,
99 );
100 }
101}
102
103#[derive(Clone, Debug)]
104pub enum TestRunEvent {
105 SuiteDiscovered {
106 total_tests: usize,
107 total_files: usize,
108 parallel: bool,
109 workers: usize,
110 },
111 LargeSequentialSuite {
112 total_tests: usize,
113 total_files: usize,
114 },
115 TestStarted {
116 name: String,
117 file: String,
118 test_index: usize,
119 total_tests: usize,
120 },
121 TestFinished(TestResult),
122}
123
124pub type TestRunProgress = Arc<dyn Fn(TestRunEvent) + Send + Sync>;
125
126const LARGE_SEQUENTIAL_TEST_THRESHOLD: usize = 50;
127const LARGE_SEQUENTIAL_FILE_THRESHOLD: usize = 10;
128const DEFAULT_PARALLEL_JOBS_CAP: usize = 8;
129const TIMINGS_CACHE_RELATIVE_PATH: &str = ".harn/test-timings.json";
130const HARN_TEST_JOBS_ENV: &str = "HARN_TEST_JOBS";
131
132const DEFAULT_WORKER_MEMORY_MB: u64 = 1024;
139const HARN_TEST_WORKER_MEMORY_MB_ENV: &str = "HARN_TEST_WORKER_MEMORY_MB";
140
141const RESERVED_SYSTEM_MEMORY_MB: u64 = 1024;
149
150#[derive(Clone, Default)]
156pub struct RunOptions {
157 pub filter: Option<String>,
158 pub timeout_ms: u64,
159 pub parallel: bool,
163 pub jobs: Option<usize>,
167 pub cli_skill_dirs: Vec<PathBuf>,
168 pub progress: Option<TestRunProgress>,
171 pub diagnose: bool,
175}
176
177impl RunOptions {
178 pub fn new(timeout_ms: u64) -> Self {
179 Self {
180 timeout_ms,
181 ..Default::default()
182 }
183 }
184}
185
186#[derive(Clone)]
190struct TestCase {
191 file: PathBuf,
192 name: String,
193 source: Arc<String>,
194 program: Arc<Vec<SNode>>,
195 serial_group: Option<String>,
199 weight: usize,
202}
203
204fn canonicalize_existing_path(path: &Path) -> PathBuf {
205 path.canonicalize().unwrap_or_else(|_| path.to_path_buf())
206}
207
208fn test_execution_cwd() -> PathBuf {
209 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
210}
211
212fn emit_progress(progress: &Option<TestRunProgress>, event: TestRunEvent) {
213 if let Some(callback) = progress {
214 callback(event);
215 }
216}
217
218fn should_warn_large_sequential_suite(total_tests: usize, total_files: usize) -> bool {
219 total_tests >= LARGE_SEQUENTIAL_TEST_THRESHOLD || total_files >= LARGE_SEQUENTIAL_FILE_THRESHOLD
220}
221
222pub async fn run_tests(
224 path: &Path,
225 filter: Option<&str>,
226 timeout_ms: u64,
227 parallel: bool,
228 cli_skill_dirs: &[PathBuf],
229) -> TestSummary {
230 let options = RunOptions {
231 filter: filter.map(str::to_owned),
232 timeout_ms,
233 parallel,
234 jobs: None,
235 cli_skill_dirs: cli_skill_dirs.to_vec(),
236 progress: None,
237 diagnose: diagnose_enabled_via_env(),
238 };
239 run_tests_with_options(path, &options).await
240}
241
242pub async fn run_tests_with_progress(
244 path: &Path,
245 filter: Option<&str>,
246 timeout_ms: u64,
247 parallel: bool,
248 cli_skill_dirs: &[PathBuf],
249 progress: Option<TestRunProgress>,
250) -> TestSummary {
251 let options = RunOptions {
252 filter: filter.map(str::to_owned),
253 timeout_ms,
254 parallel,
255 jobs: None,
256 cli_skill_dirs: cli_skill_dirs.to_vec(),
257 progress,
258 diagnose: diagnose_enabled_via_env(),
259 };
260 run_tests_with_options(path, &options).await
261}
262
263fn diagnose_enabled_via_env() -> bool {
264 let Ok(raw) = std::env::var("HARN_TEST_DIAGNOSE") else {
265 return false;
266 };
267 matches!(
268 raw.to_ascii_lowercase().as_str(),
269 "1" | "true" | "yes" | "on"
270 )
271}
272
273pub async fn run_tests_with_options(path: &Path, options: &RunOptions) -> TestSummary {
278 let _default_llm_provider = ScopedEnvVar::set_if_unset("HARN_LLM_PROVIDER", "mock");
280 let _disable_llm_calls = ScopedEnvVar::set(harn_vm::llm::LLM_CALLS_DISABLED_ENV, "1");
281
282 let start = Instant::now();
283
284 let collection_start = Instant::now();
285 let canonical_target = canonicalize_existing_path(path);
286 let files = if canonical_target.is_dir() {
287 discover_test_files(&canonical_target)
288 } else {
289 vec![canonical_target.clone()]
290 };
291
292 let workers = resolve_workers(options);
293 let timings_path = timings_cache_path(&canonical_target);
294 let timings = timings_path
295 .as_deref()
296 .map(load_timings_cache)
297 .unwrap_or_default();
298
299 let discovery = discover_test_cases(&files, options.filter.as_deref(), workers);
300 let collection_ms = collection_start.elapsed().as_millis() as u64;
301
302 emit_progress(
303 &options.progress,
304 TestRunEvent::SuiteDiscovered {
305 total_tests: discovery.cases.len(),
306 total_files: discovery.files_with_tests,
307 parallel: options.parallel,
308 workers,
309 },
310 );
311 if workers == 1
312 && should_warn_large_sequential_suite(discovery.cases.len(), discovery.files_with_tests)
313 {
314 emit_progress(
315 &options.progress,
316 TestRunEvent::LargeSequentialSuite {
317 total_tests: discovery.cases.len(),
318 total_files: discovery.files_with_tests,
319 },
320 );
321 }
322
323 let mut cases = discovery.cases;
324 sort_cases_longest_first(&mut cases, &timings);
325
326 let mut all_results = discovery.discovery_errors;
327 let total_tests = cases.len();
328 all_results.extend(execute_cases(cases, workers, options, total_tests).await);
329
330 let total = all_results.len();
331 let passed = all_results.iter().filter(|r| r.passed).count();
332 let failed = total - passed;
333 let aggregate = AggregateTimings::from_results(collection_ms, &all_results);
334
335 if let Some(path) = timings_path.as_deref() {
336 update_timings_cache(path, timings, &all_results);
337 }
338
339 TestSummary {
340 results: all_results,
341 passed,
342 failed,
343 total,
344 duration_ms: start.elapsed().as_millis() as u64,
345 aggregate,
346 }
347}
348
349pub async fn run_test_file(
357 path: &Path,
358 filter: Option<&str>,
359 timeout_ms: u64,
360 execution_cwd: Option<&Path>,
361 cli_skill_dirs: &[PathBuf],
362) -> Result<Vec<TestResult>, String> {
363 let source =
364 fs::read_to_string(path).map_err(|e| format!("Failed to read {}: {e}", path.display()))?;
365 let program = parse_program(&source)?;
366 let source = Arc::new(source);
367 let program = Arc::new(program);
368
369 let cases = extract_cases_from_program(path, &source, &program, filter, usize::MAX);
370
371 let mut results = Vec::with_capacity(cases.len());
372 let execution_cwd = execution_cwd
373 .map(Path::to_path_buf)
374 .unwrap_or_else(test_execution_cwd);
375 for case in cases {
376 results.push(execute_case(&case, &execution_cwd, timeout_ms, cli_skill_dirs).await);
377 }
378 Ok(results)
379}
380
381fn resolve_workers(options: &RunOptions) -> usize {
382 if !options.parallel {
383 return 1;
384 }
385 if let Some(jobs) = options.jobs {
386 return jobs.max(1);
387 }
388 if let Ok(raw) = std::env::var(HARN_TEST_JOBS_ENV) {
389 if let Ok(parsed) = raw.trim().parse::<usize>() {
390 if parsed >= 1 {
391 return parsed;
392 }
393 }
394 }
395 let detected = thread::available_parallelism()
396 .map(|n| n.get())
397 .unwrap_or(1);
398 let core_cap = detected.clamp(1, DEFAULT_PARALLEL_JOBS_CAP);
399 apply_memory_cap(core_cap)
400}
401
402fn apply_memory_cap(core_cap: usize) -> usize {
408 let Some(available_mb) = available_memory_mb() else {
409 return core_cap;
410 };
411 let budget = per_worker_memory_mb();
412 let mem_cap = memory_worker_cap(available_mb, budget, RESERVED_SYSTEM_MEMORY_MB);
413 if mem_cap < core_cap {
414 eprintln!(
415 "harn test: capping workers {core_cap} -> {mem_cap} \
416 (~{available_mb} MiB available, {budget} MiB/worker; \
417 override with --jobs / HARN_TEST_JOBS)"
418 );
419 return mem_cap;
420 }
421 core_cap
422}
423
424fn memory_worker_cap(available_mb: u64, per_worker_mb: u64, reserved_mb: u64) -> usize {
427 let usable = available_mb.saturating_sub(reserved_mb);
428 let per_worker = per_worker_mb.max(1);
429 ((usable / per_worker).max(1)) as usize
430}
431
432fn per_worker_memory_mb() -> u64 {
435 std::env::var(HARN_TEST_WORKER_MEMORY_MB_ENV)
436 .ok()
437 .and_then(|raw| raw.trim().parse::<u64>().ok())
438 .filter(|&n| n >= 1)
439 .unwrap_or(DEFAULT_WORKER_MEMORY_MB)
440}
441
442fn available_memory_mb() -> Option<u64> {
453 let mut sys = sysinfo::System::new();
454 sys.refresh_memory();
455 let host_mb = match sys.available_memory() {
456 0 => None, bytes => Some(bytes / (1024 * 1024)),
458 };
459 match (host_mb, cgroup_v2_headroom_mb()) {
460 (Some(h), Some(c)) => Some(h.min(c)),
461 (Some(h), None) => Some(h),
462 (None, c) => c,
463 }
464}
465
466#[cfg(target_os = "linux")]
469fn cgroup_v2_headroom_mb() -> Option<u64> {
470 let dir = own_cgroup_v2_dir()?;
471 let max_raw = fs::read_to_string(dir.join("memory.max")).ok()?;
472 let current_raw = fs::read_to_string(dir.join("memory.current")).ok()?;
473 cgroup_headroom_mb(&max_raw, ¤t_raw)
474}
475
476#[cfg(not(target_os = "linux"))]
477fn cgroup_v2_headroom_mb() -> Option<u64> {
478 None
479}
480
481#[cfg(target_os = "linux")]
487fn own_cgroup_v2_dir() -> Option<PathBuf> {
488 let content = fs::read_to_string("/proc/self/cgroup").ok()?;
489 let rel = content
490 .lines()
491 .find_map(|line| line.strip_prefix("0::"))?
492 .trim();
493 let rel = rel.strip_prefix('/').unwrap_or(rel);
494 Some(Path::new("/sys/fs/cgroup").join(rel))
495}
496
497#[cfg(any(target_os = "linux", test))]
503fn cgroup_headroom_mb(memory_max: &str, memory_current: &str) -> Option<u64> {
504 let max = memory_max.trim();
505 if max == "max" {
506 return None;
507 }
508 let max: u64 = max.parse().ok()?;
509 let current: u64 = memory_current.trim().parse().ok()?;
510 Some(max.saturating_sub(current) / (1024 * 1024))
511}
512
513struct Discovery {
514 cases: Vec<TestCase>,
515 files_with_tests: usize,
516 discovery_errors: Vec<TestResult>,
517}
518
519fn discover_test_cases(files: &[PathBuf], filter: Option<&str>, workers: usize) -> Discovery {
520 let mut cases = Vec::new();
521 let mut files_with_tests = 0usize;
522 let mut discovery_errors = Vec::new();
523
524 for file in files {
525 let source = match fs::read_to_string(file) {
526 Ok(s) => s,
527 Err(e) => {
528 discovery_errors.push(TestResult {
529 name: "<file error>".to_string(),
530 file: file.display().to_string(),
531 passed: false,
532 error: Some(format!("Failed to read {}: {e}", file.display())),
533 duration_ms: 0,
534 phases: PhaseTimings::default(),
535 });
536 continue;
537 }
538 };
539
540 let program = match parse_program(&source) {
541 Ok(p) => p,
542 Err(e) => {
543 discovery_errors.push(TestResult {
544 name: "<file error>".to_string(),
545 file: file.display().to_string(),
546 passed: false,
547 error: Some(e),
548 duration_ms: 0,
549 phases: PhaseTimings::default(),
550 });
551 continue;
552 }
553 };
554
555 let source = Arc::new(source);
556 let program = Arc::new(program);
557 let file_cases = extract_cases_from_program(file, &source, &program, filter, workers);
558 if !file_cases.is_empty() {
559 files_with_tests += 1;
560 cases.extend(file_cases);
561 }
562 }
563
564 Discovery {
565 cases,
566 files_with_tests,
567 discovery_errors,
568 }
569}
570
571fn parse_program(source: &str) -> Result<Vec<SNode>, String> {
572 let mut lexer = Lexer::new(source);
573 let tokens = lexer.tokenize().map_err(|e| format!("{e}"))?;
574 let mut parser = Parser::new(tokens);
575 parser.parse().map_err(|e| format!("{e}"))
576}
577
578fn extract_cases_from_program(
579 file: &Path,
580 source: &Arc<String>,
581 program: &Arc<Vec<SNode>>,
582 filter: Option<&str>,
583 workers: usize,
584) -> Vec<TestCase> {
585 let mut cases = Vec::new();
586 for snode in program.iter() {
587 let Some(meta) = inspect_test_pipeline(snode) else {
588 continue;
589 };
590 if let Some(pattern) = filter {
591 if !meta.name.contains(pattern) {
592 continue;
593 }
594 }
595 let weight = meta.weight.min(workers).max(1);
598 cases.push(TestCase {
599 file: file.to_path_buf(),
600 name: meta.name,
601 source: Arc::clone(source),
602 program: Arc::clone(program),
603 serial_group: meta.serial_group,
604 weight,
605 });
606 }
607 cases
608}
609
610struct PipelineMeta {
611 name: String,
612 serial_group: Option<String>,
613 weight: usize,
614}
615
616fn inspect_test_pipeline(snode: &SNode) -> Option<PipelineMeta> {
617 let (attributes, inner) = match &snode.node {
621 Node::AttributedDecl { attributes, inner } => (attributes.as_slice(), inner.as_ref()),
622 _ => (&[][..], snode),
623 };
624 let name = match &inner.node {
625 Node::Pipeline { name, .. } => name.clone(),
626 _ => return None,
627 };
628 let has_test_attr = attributes.iter().any(|a| a.name == "test");
629 if !has_test_attr && !name.starts_with("test_") {
630 return None;
631 }
632 let serial_group = attributes
633 .iter()
634 .find(|a| a.name == "serial")
635 .map(serial_group_for);
636 let weight = attributes
637 .iter()
638 .find(|a| a.name == "heavy")
639 .and_then(heavy_weight_for)
640 .unwrap_or(1);
641 Some(PipelineMeta {
642 name,
643 serial_group,
644 weight,
645 })
646}
647
648fn serial_group_for(attr: &Attribute) -> String {
649 attr.string_arg("group")
650 .unwrap_or_else(|| "__default__".to_string())
651}
652
653fn heavy_weight_for(attr: &Attribute) -> Option<usize> {
654 attr.args
655 .iter()
656 .find(|a| a.name.as_deref() == Some("threads"))
657 .and_then(|a| match &a.value.node {
658 Node::IntLiteral(n) if *n >= 1 => Some(*n as usize),
659 _ => None,
660 })
661}
662
663fn sort_cases_longest_first(cases: &mut [TestCase], timings: &BTreeMap<String, u64>) {
664 cases.sort_by(|a, b| {
670 let key_a = timings_key(&a.file, &a.name);
671 let key_b = timings_key(&b.file, &b.name);
672 let dur_a = timings.get(&key_a).copied().unwrap_or(0);
673 let dur_b = timings.get(&key_b).copied().unwrap_or(0);
674 dur_a
675 .cmp(&dur_b)
676 .then_with(|| a.file.cmp(&b.file))
677 .then_with(|| a.name.cmp(&b.name))
678 });
679}
680
681fn timings_key(file: &Path, name: &str) -> String {
682 format!("{}::{}", file.display(), name)
683}
684
685fn timings_cache_path(target: &Path) -> Option<PathBuf> {
686 let probe_root = if target.is_dir() {
691 target.to_path_buf()
692 } else {
693 target.parent()?.to_path_buf()
694 };
695 let root = harn_vm::stdlib::process::find_project_root(&probe_root)
696 .unwrap_or_else(|| probe_root.clone());
697 Some(root.join(TIMINGS_CACHE_RELATIVE_PATH))
698}
699
700fn load_timings_cache(path: &Path) -> BTreeMap<String, u64> {
701 let Ok(contents) = fs::read_to_string(path) else {
702 return BTreeMap::new();
703 };
704 serde_json::from_str::<BTreeMap<String, u64>>(&contents).unwrap_or_default()
705}
706
707fn update_timings_cache(path: &Path, mut existing: BTreeMap<String, u64>, results: &[TestResult]) {
708 for result in results {
709 if result.name == "<file error>" || result.name == "<join error>" {
710 continue;
711 }
712 existing.insert(
713 timings_key(Path::new(&result.file), &result.name),
714 result.duration_ms,
715 );
716 }
717 if let Some(parent) = path.parent() {
718 let _ = fs::create_dir_all(parent);
719 }
720 if let Ok(serialized) = serde_json::to_string(&existing) {
721 let _ = fs::write(path, serialized);
722 }
723}
724
725async fn execute_cases(
726 cases: Vec<TestCase>,
727 workers: usize,
728 options: &RunOptions,
729 total_tests: usize,
730) -> Vec<TestResult> {
731 if cases.is_empty() {
732 return Vec::new();
733 }
734 let completed = Arc::new(Mutex::new(0usize));
735 if workers <= 1 {
736 let mut results = Vec::with_capacity(cases.len());
737 for case in cases {
738 let cwd = case_execution_cwd(&case);
739 let test_index = next_test_index(&completed);
740 emit_progress(
741 &options.progress,
742 TestRunEvent::TestStarted {
743 name: case.name.clone(),
744 file: case.file.display().to_string(),
745 test_index,
746 total_tests,
747 },
748 );
749 let result =
750 execute_case(&case, &cwd, options.timeout_ms, &options.cli_skill_dirs).await;
751 if options.diagnose {
752 result.emit_diagnose();
753 }
754 emit_progress(
755 &options.progress,
756 TestRunEvent::TestFinished(result.clone()),
757 );
758 results.push(result);
759 }
760 return results;
761 }
762
763 let queue = Arc::new(Mutex::new(cases));
764 let gate = Arc::new(ResourceGate::new(workers));
765 let results: Arc<Mutex<Vec<TestResult>>> = Arc::new(Mutex::new(Vec::new()));
766
767 let mut handles = Vec::with_capacity(workers);
768 for worker_idx in 0..workers {
769 let queue = Arc::clone(&queue);
770 let gate = Arc::clone(&gate);
771 let results = Arc::clone(&results);
772 let completed = Arc::clone(&completed);
773 let timeout_ms = options.timeout_ms;
774 let cli_skill_dirs = options.cli_skill_dirs.clone();
775 let progress = options.progress.clone();
776 let diagnose = options.diagnose;
777 let handle = thread::Builder::new()
778 .name(format!("harn-test-worker-{worker_idx}"))
779 .spawn(move || {
780 let runtime = match tokio::runtime::Builder::new_current_thread()
781 .enable_all()
782 .build()
783 {
784 Ok(rt) => rt,
785 Err(error) => {
786 results.lock().unwrap().push(TestResult {
787 name: "<worker error>".to_string(),
788 file: String::new(),
789 passed: false,
790 error: Some(format!("failed to start test runtime: {error}")),
791 duration_ms: 0,
792 phases: PhaseTimings::default(),
793 });
794 return;
795 }
796 };
797 while let Some(case) = queue.lock().unwrap().pop() {
802 let _guard = gate.acquire(case.weight, case.serial_group.as_deref());
803 let cwd = case_execution_cwd(&case);
804 let test_index = next_test_index(&completed);
805 emit_progress(
806 &progress,
807 TestRunEvent::TestStarted {
808 name: case.name.clone(),
809 file: case.file.display().to_string(),
810 test_index,
811 total_tests,
812 },
813 );
814 let result =
815 runtime.block_on(execute_case(&case, &cwd, timeout_ms, &cli_skill_dirs));
816 if diagnose {
817 result.emit_diagnose();
818 }
819 emit_progress(&progress, TestRunEvent::TestFinished(result.clone()));
820 results.lock().unwrap().push(result);
821 }
822 })
823 .expect("spawning a harn-test worker thread should succeed");
824 handles.push(handle);
825 }
826
827 for handle in handles {
828 let _ = handle.join();
829 }
830
831 Arc::try_unwrap(results)
835 .map(|m| m.into_inner().unwrap_or_default())
836 .unwrap_or_else(|arc| arc.lock().unwrap().clone())
837}
838
839fn next_test_index(counter: &Mutex<usize>) -> usize {
840 let mut guard = counter.lock().unwrap();
841 *guard += 1;
842 *guard
843}
844
845fn case_execution_cwd(case: &TestCase) -> PathBuf {
846 case.file
847 .parent()
848 .filter(|p| !p.as_os_str().is_empty())
849 .map(Path::to_path_buf)
850 .unwrap_or_else(test_execution_cwd)
851}
852
853struct ResourceGate {
857 state: Mutex<GateState>,
858 cond: Condvar,
859 capacity: usize,
860}
861
862struct GateState {
863 available: usize,
864 busy_groups: HashSet<String>,
865}
866
867struct GateGuard<'a> {
868 gate: &'a ResourceGate,
869 weight: usize,
870 group: Option<String>,
871}
872
873impl ResourceGate {
874 fn new(capacity: usize) -> Self {
875 Self {
876 state: Mutex::new(GateState {
877 available: capacity,
878 busy_groups: HashSet::new(),
879 }),
880 cond: Condvar::new(),
881 capacity,
882 }
883 }
884
885 fn acquire(&self, weight: usize, group: Option<&str>) -> GateGuard<'_> {
886 let weight = weight.min(self.capacity).max(1);
887 let mut state = self.state.lock().unwrap();
888 loop {
889 let group_free = group.is_none_or(|g| !state.busy_groups.contains(g));
890 if state.available >= weight && group_free {
891 state.available -= weight;
892 if let Some(g) = group {
893 state.busy_groups.insert(g.to_string());
894 }
895 return GateGuard {
896 gate: self,
897 weight,
898 group: group.map(str::to_owned),
899 };
900 }
901 state = self.cond.wait(state).unwrap();
902 }
903 }
904}
905
906impl Drop for GateGuard<'_> {
907 fn drop(&mut self) {
908 let mut state = self.gate.state.lock().unwrap();
909 state.available += self.weight;
910 if let Some(group) = self.group.as_deref() {
911 state.busy_groups.remove(group);
912 }
913 self.gate.cond.notify_all();
914 }
915}
916
917async fn execute_case(
918 case: &TestCase,
919 execution_cwd: &Path,
920 timeout_ms: u64,
921 cli_skill_dirs: &[PathBuf],
922) -> TestResult {
923 harn_vm::reset_thread_local_state();
924
925 let mut phases = PhaseTimings::default();
926 let total_start = Instant::now();
927
928 let compile_start = Instant::now();
929 let chunk = match harn_vm::Compiler::new().compile_named(&case.program, &case.name) {
930 Ok(c) => c,
931 Err(e) => {
932 phases.compile_ms = compile_start.elapsed().as_millis() as u64;
933 return TestResult {
934 name: case.name.clone(),
935 file: case.file.display().to_string(),
936 passed: false,
937 error: Some(format!("Compile error: {e}")),
938 duration_ms: total_start.elapsed().as_millis() as u64,
939 phases,
940 };
941 }
942 };
943 phases.compile_ms = compile_start.elapsed().as_millis() as u64;
944
945 let local = tokio::task::LocalSet::new();
946 let timeout = std::time::Duration::from_millis(timeout_ms);
947 let file_display = case.file.display().to_string();
948 let setup_start = Instant::now();
949 let result = tokio::time::timeout(
950 timeout,
951 local.run_until(async {
952 let mut vm = harn_vm::Vm::new();
953 harn_vm::register_vm_stdlib(&mut vm);
954 crate::install_default_hostlib(&mut vm);
955 let source_parent = case.file.parent().unwrap_or(Path::new("."));
956 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
957 let store_base = project_root.as_deref().unwrap_or(source_parent);
958 let source_dir = source_parent.to_string_lossy().into_owned();
959 harn_vm::register_store_builtins(&mut vm, store_base);
960 harn_vm::register_metadata_builtins(&mut vm, store_base);
961 let pipeline_name = case
962 .file
963 .file_stem()
964 .and_then(|s| s.to_str())
965 .unwrap_or("test");
966 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
967 vm.set_source_info(&file_display, &case.source);
968 harn_vm::stdlib::process::set_thread_execution_context(Some(
969 harn_vm::orchestration::RunExecutionRecord {
970 cwd: Some(execution_cwd.to_string_lossy().into_owned()),
971 source_dir: Some(source_dir),
972 env: BTreeMap::new(),
973 adapter: None,
974 repo_path: None,
975 worktree_path: None,
976 branch: None,
977 base_ref: None,
978 cleanup: None,
979 },
980 ));
981 if let Some(ref root) = project_root {
982 vm.set_project_root(root);
983 }
984 if let Some(parent) = case.file.parent() {
985 if !parent.as_os_str().is_empty() {
986 vm.set_source_dir(parent);
987 }
988 }
989 let loaded =
990 crate::skill_loader::load_skills(&crate::skill_loader::SkillLoaderInputs {
991 cli_dirs: cli_skill_dirs.to_vec(),
992 source_path: Some(case.file.clone()),
993 });
994 crate::skill_loader::emit_loader_warnings(&loaded.loader_warnings);
995 crate::skill_loader::install_skills_global(&mut vm, &loaded);
996 let extensions = crate::package::load_runtime_extensions(&case.file);
997 crate::package::install_runtime_extensions(&extensions);
998 crate::package::install_manifest_triggers(&mut vm, &extensions)
999 .await
1000 .map_err(|error| format!("failed to install manifest triggers: {error}"))?;
1001 crate::package::install_manifest_hooks(&mut vm, &extensions)
1002 .await
1003 .map_err(|error| format!("failed to install manifest hooks: {error}"))?;
1004 vm.set_harness(harn_vm::Harness::real());
1005 let setup_ms = setup_start.elapsed().as_millis() as u64;
1006 let exec_start = Instant::now();
1007 let outcome = match vm.execute(&chunk).await {
1008 Ok(val) => Ok(val),
1009 Err(e) => Err(vm.format_runtime_error(&e)),
1010 };
1011 let execute_ms = exec_start.elapsed().as_millis() as u64;
1012 harn_vm::egress::reset_egress_policy_for_host();
1013 Ok::<_, String>((outcome, setup_ms, execute_ms))
1014 }),
1015 )
1016 .await;
1017
1018 let teardown_start = Instant::now();
1019 harn_vm::reset_thread_local_state();
1023 phases.teardown_ms = teardown_start.elapsed().as_millis() as u64;
1024
1025 let elapsed_ms = total_start.elapsed().as_millis() as u64;
1026 let (passed, error, duration_ms) = match result {
1027 Ok(Ok((outcome, setup_ms, execute_ms))) => {
1028 phases.setup_ms = setup_ms;
1029 phases.execute_ms = execute_ms;
1030 match outcome {
1031 Ok(_) => (true, None, elapsed_ms),
1032 Err(message) => (false, Some(message), elapsed_ms),
1033 }
1034 }
1035 Ok(Err(setup_error)) => (false, Some(setup_error), elapsed_ms),
1036 Err(_) => (
1040 false,
1041 Some(format!("timed out after {timeout_ms}ms")),
1042 timeout_ms,
1043 ),
1044 };
1045
1046 TestResult {
1047 name: case.name.clone(),
1048 file: file_display,
1049 passed,
1050 error,
1051 duration_ms,
1052 phases,
1053 }
1054}
1055
1056fn discover_test_files(dir: &Path) -> Vec<PathBuf> {
1057 let mut files = Vec::new();
1058 if let Ok(entries) = fs::read_dir(dir) {
1059 for entry in entries.flatten() {
1060 let path = entry.path();
1061 if path.is_dir() {
1062 files.extend(discover_test_files(&path));
1063 } else if path.extension().is_some_and(|e| e == "harn") {
1064 if let Ok(content) = fs::read_to_string(&path) {
1065 if content.contains("test_") || content.contains("@test") {
1066 files.push(canonicalize_existing_path(&path));
1067 }
1068 }
1069 }
1070 }
1071 }
1072 files.sort();
1073 files
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::*;
1079
1080 use std::sync::Arc;
1081 use std::time::Duration;
1082
1083 struct TempTestDir {
1084 inner: tempfile::TempDir,
1085 }
1086
1087 impl TempTestDir {
1088 fn new() -> Self {
1089 Self {
1090 inner: tempfile::tempdir().unwrap(),
1091 }
1092 }
1093
1094 fn write(&self, relative: &str, contents: &str) {
1095 let path = self.path().join(relative);
1096 if let Some(parent) = path.parent() {
1097 fs::create_dir_all(parent).unwrap();
1098 }
1099 fs::write(path, contents).unwrap();
1100 }
1101
1102 fn path(&self) -> &Path {
1103 self.inner.path()
1104 }
1105 }
1106
1107 #[test]
1108 fn discover_test_files_returns_canonical_absolute_paths() {
1109 let temp = TempTestDir::new();
1110 temp.write("suite/test_alpha.harn", "pipeline test_alpha(task) {}");
1111 temp.write("suite/nested/test_beta.harn", "pipeline test_beta(task) {}");
1112 temp.write("suite/annotated.harn", "@test\npipeline annotated(task) {}");
1113 temp.write("suite/ignore.harn", "pipeline build(task) {}");
1114
1115 let files = discover_test_files(&temp.path().join("suite"));
1116
1117 assert_eq!(files.len(), 3);
1118 assert!(files.iter().all(|path| path.is_absolute()));
1119 assert!(files
1120 .iter()
1121 .any(|path| path.ends_with("suite/test_alpha.harn")));
1122 assert!(files
1123 .iter()
1124 .any(|path| path.ends_with("suite/nested/test_beta.harn")));
1125 assert!(files
1126 .iter()
1127 .any(|path| path.ends_with("suite/annotated.harn")));
1128 }
1129
1130 #[tokio::test]
1131 async fn run_tests_uses_file_parent_as_execution_cwd_and_restores_shell_cwd() {
1132 let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1133 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1134 let temp = TempTestDir::new();
1135 temp.write(
1136 "suite/test_cwd.harn",
1137 r"
1138pipeline test_current_dir(task) {
1139 assert_eq(cwd(), source_dir())
1140}
1141",
1142 );
1143
1144 let original_cwd = std::env::current_dir().unwrap();
1145 let summary = run_tests(&temp.path().join("suite"), None, 1_000, false, &[]).await;
1146 let restored_cwd = std::env::current_dir().unwrap();
1147
1148 assert_eq!(summary.failed, 0);
1149 assert_eq!(summary.passed, 1);
1150 assert_eq!(
1151 fs::canonicalize(restored_cwd).unwrap(),
1152 fs::canonicalize(original_cwd).unwrap()
1153 );
1154 }
1155
1156 #[tokio::test]
1157 async fn parallel_run_tests_uses_each_file_parent_as_execution_cwd() {
1158 let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1159 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1160 let temp = TempTestDir::new();
1161 temp.write(
1162 "suite/a/test_one.harn",
1163 r"
1164pipeline test_one(task) {
1165 assert_eq(cwd(), source_dir())
1166}
1167",
1168 );
1169 temp.write(
1170 "suite/b/test_two.harn",
1171 r"
1172pipeline test_two(task) {
1173 assert_eq(cwd(), source_dir())
1174}
1175",
1176 );
1177
1178 let summary = run_tests(&temp.path().join("suite"), None, 1_000, true, &[]).await;
1179 assert_eq!(summary.failed, 0);
1180 assert_eq!(summary.passed, 2);
1181 }
1182
1183 #[tokio::test]
1184 async fn run_tests_loads_cli_skill_dirs() {
1185 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1186 let temp = TempTestDir::new();
1187 temp.write(
1188 "skills/review/SKILL.md",
1189 r"---
1190name: review
1191short: Review PRs
1192description: Review pull requests
1193---
1194
1195Review instructions.
1196",
1197 );
1198 temp.write(
1199 "suite/test_skills.harn",
1200 r#"
1201pipeline test_cli_skills(task) {
1202 assert_eq(skill_count(skills), 1)
1203 let found = skill_find(skills, "review")
1204 assert_eq(found.name, "review")
1205}
1206"#,
1207 );
1208
1209 let summary = run_tests(
1210 &temp.path().join("suite"),
1211 None,
1212 1_000,
1213 false,
1214 &[temp.path().join("skills")],
1215 )
1216 .await;
1217
1218 assert_eq!(summary.failed, 0, "{:?}", summary.results[0].error);
1219 assert_eq!(summary.passed, 1);
1220 }
1221
1222 #[test]
1223 fn resolve_workers_honors_explicit_jobs() {
1224 let mut opts = RunOptions::new(1_000);
1225 opts.parallel = true;
1226 opts.jobs = Some(3);
1227 assert_eq!(resolve_workers(&opts), 3);
1228 }
1229
1230 #[test]
1231 fn resolve_workers_returns_one_when_not_parallel() {
1232 let mut opts = RunOptions::new(1_000);
1233 opts.parallel = false;
1234 opts.jobs = Some(8);
1235 assert_eq!(resolve_workers(&opts), 1);
1236 }
1237
1238 #[test]
1239 fn memory_worker_cap_backs_off_under_pressure() {
1240 assert_eq!(memory_worker_cap(4096, 1024, 1024), 3);
1242 }
1243
1244 #[test]
1245 fn memory_worker_cap_is_generous_when_memory_is_plentiful() {
1246 assert!(memory_worker_cap(32_768, 1024, 1024) >= DEFAULT_PARALLEL_JOBS_CAP);
1249 }
1250
1251 #[test]
1252 fn memory_worker_cap_never_starves_to_zero() {
1253 assert_eq!(memory_worker_cap(512, 1024, 1024), 1);
1255 }
1256
1257 #[test]
1258 fn cgroup_headroom_unlimited_is_none() {
1259 assert_eq!(cgroup_headroom_mb("max\n", "1048576\n"), None);
1261 }
1262
1263 #[test]
1264 fn cgroup_headroom_computes_slice_remainder() {
1265 let four_gib = (4_u64 * 1024 * 1024 * 1024).to_string();
1267 let one_gib = (1024_u64 * 1024 * 1024).to_string();
1268 assert_eq!(cgroup_headroom_mb(&four_gib, &one_gib), Some(3072));
1269 }
1270
1271 #[test]
1272 fn cgroup_headroom_saturates_when_over_limit() {
1273 assert_eq!(cgroup_headroom_mb("1024", "999999999"), Some(0));
1275 }
1276
1277 #[test]
1278 fn cgroup_headroom_rejects_garbage() {
1279 assert_eq!(cgroup_headroom_mb("not-a-number", "123"), None);
1280 }
1281
1282 #[test]
1283 fn sort_cases_longest_first_uses_historical_durations() {
1284 let source = Arc::new(String::new());
1285 let program = Arc::new(Vec::new());
1286 let mk = |name: &str| TestCase {
1287 file: PathBuf::from("tests/a.harn"),
1288 name: name.to_string(),
1289 source: Arc::clone(&source),
1290 program: Arc::clone(&program),
1291 serial_group: None,
1292 weight: 1,
1293 };
1294 let mut cases = vec![mk("test_quick"), mk("test_slow"), mk("test_medium")];
1295 let mut timings = BTreeMap::new();
1296 timings.insert("tests/a.harn::test_slow".to_string(), 5_000);
1297 timings.insert("tests/a.harn::test_medium".to_string(), 1_000);
1298
1299 sort_cases_longest_first(&mut cases, &timings);
1300
1301 let order: Vec<&str> = cases.iter().map(|c| c.name.as_str()).collect();
1303 assert_eq!(order, vec!["test_quick", "test_medium", "test_slow"]);
1304 }
1305
1306 #[test]
1307 fn resource_gate_serializes_same_group() {
1308 let gate = Arc::new(ResourceGate::new(4));
1309 let trace = Arc::new(Mutex::new(Vec::<&'static str>::new()));
1310
1311 let g_a = {
1312 let trace = Arc::clone(&trace);
1313 let gate = Arc::clone(&gate);
1314 thread::spawn(move || {
1315 let _guard = gate.acquire(1, Some("login"));
1316 trace.lock().unwrap().push("a-start");
1317 thread::sleep(Duration::from_millis(50));
1318 trace.lock().unwrap().push("a-end");
1319 })
1320 };
1321 thread::sleep(Duration::from_millis(10));
1323 let g_b = {
1324 let trace = Arc::clone(&trace);
1325 let gate = Arc::clone(&gate);
1326 thread::spawn(move || {
1327 let _guard = gate.acquire(1, Some("login"));
1328 trace.lock().unwrap().push("b-start");
1329 trace.lock().unwrap().push("b-end");
1330 })
1331 };
1332 g_a.join().unwrap();
1333 g_b.join().unwrap();
1334
1335 let trace = trace.lock().unwrap();
1336 let a_end = trace.iter().position(|t| *t == "a-end").unwrap();
1338 let b_start = trace.iter().position(|t| *t == "b-start").unwrap();
1339 assert!(a_end < b_start, "B started before A finished: {trace:?}");
1340 }
1341
1342 #[test]
1343 fn resource_gate_allows_independent_groups_in_parallel() {
1344 let gate = Arc::new(ResourceGate::new(4));
1345 let _guard_a = gate.acquire(1, Some("alpha"));
1346 let acquired = std::sync::Mutex::new(false);
1348 thread::scope(|s| {
1349 s.spawn(|| {
1350 let _ = gate.acquire(1, Some("beta"));
1351 *acquired.lock().unwrap() = true;
1352 });
1353 });
1354 assert!(*acquired.lock().unwrap());
1355 }
1356
1357 #[test]
1358 fn resource_gate_caps_heavy_weight_at_capacity() {
1359 let gate = Arc::new(ResourceGate::new(2));
1362 let _g = gate.acquire(99, None);
1363 let started = Arc::new(Mutex::new(false));
1365 let inner = Arc::clone(&started);
1366 let gate2 = Arc::clone(&gate);
1367 let handle = thread::spawn(move || {
1368 let _guard = gate2.acquire(1, None);
1369 *inner.lock().unwrap() = true;
1370 });
1371 thread::sleep(Duration::from_millis(20));
1372 assert!(!*started.lock().unwrap(), "should still be waiting");
1373 drop(_g);
1374 handle.join().unwrap();
1375 assert!(*started.lock().unwrap());
1376 }
1377
1378 #[tokio::test]
1379 async fn parallel_scheduler_runs_heavy_tests_without_oversubscribing() {
1380 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1383 let temp = TempTestDir::new();
1384 temp.write(
1385 "suite/test_heavy.harn",
1386 r"
1387@test
1388@heavy(threads: 2)
1389pipeline test_heavy_one(task) {}
1390
1391@test
1392pipeline test_light(task) {}
1393",
1394 );
1395
1396 let opts = RunOptions {
1397 parallel: true,
1398 jobs: Some(2),
1399 ..RunOptions::new(5_000)
1400 };
1401 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1402 assert_eq!(summary.failed, 0, "{:?}", summary.results);
1403 assert_eq!(summary.total, 2);
1404 }
1405
1406 #[tokio::test]
1407 async fn parallel_scheduler_handles_serial_group_annotation() {
1408 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1409 let temp = TempTestDir::new();
1410 temp.write(
1411 "suite/test_serial.harn",
1412 r#"
1413@test
1414@serial(group: "fixture")
1415pipeline test_serial_one(task) {}
1416
1417@test
1418@serial(group: "fixture")
1419pipeline test_serial_two(task) {}
1420"#,
1421 );
1422
1423 let opts = RunOptions {
1424 parallel: true,
1425 jobs: Some(4),
1426 ..RunOptions::new(5_000)
1427 };
1428 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1429 assert_eq!(summary.failed, 0, "{:?}", summary.results);
1430 assert_eq!(summary.passed, 2);
1431 }
1432
1433 #[tokio::test]
1434 async fn parallel_scheduler_persists_timings_cache() {
1435 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1436 let temp = TempTestDir::new();
1437 temp.write(
1438 "suite/test_timed.harn",
1439 r"
1440@test
1441pipeline test_first(task) {}
1442
1443@test
1444pipeline test_second(task) {}
1445",
1446 );
1447
1448 let opts = RunOptions {
1449 parallel: true,
1450 jobs: Some(2),
1451 ..RunOptions::new(5_000)
1452 };
1453 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1454 assert_eq!(summary.passed, 2);
1455 let cache = temp.path().join("suite/.harn/test-timings.json");
1456 assert!(cache.exists(), "expected timings cache at {cache:?}");
1457 let stored: BTreeMap<String, u64> =
1458 serde_json::from_str(&fs::read_to_string(&cache).unwrap()).unwrap();
1459 assert!(
1460 stored.keys().any(|key| key.contains("test_first")),
1461 "expected timings for test_first in {stored:?}"
1462 );
1463 assert!(
1464 stored.keys().any(|key| key.contains("test_second")),
1465 "expected timings for test_second in {stored:?}"
1466 );
1467 }
1468
1469 #[tokio::test]
1476 async fn worker_resets_thread_local_state_between_cases() {
1477 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1478 let temp = TempTestDir::new();
1479 temp.write(
1480 "suite/test_isolation.harn",
1481 r#"
1482// The leak probe pins the clock to a future-but-i64-safe value
1483// (year ~2128) so a leaked mock is observable. Larger values overflow
1484// the nanosecond conversion inside the mock clock.
1485pipeline test_a_pins_clock(task) {
1486 mock_time(5000000000000)
1487 assert_eq(now_ms(), 5000000000000)
1488}
1489
1490pipeline test_b_clock_is_fresh(task) {
1491 let ms = now_ms()
1492 assert(ms < 5000000000000, "clock mock leaked from previous test")
1493}
1494"#,
1495 );
1496
1497 let opts = RunOptions::new(5_000);
1498 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1499 assert_eq!(
1500 summary.failed,
1501 0,
1502 "state leaked between tests: {:?}",
1503 summary
1504 .results
1505 .iter()
1506 .filter(|r| !r.passed)
1507 .map(|r| (r.name.clone(), r.error.clone()))
1508 .collect::<Vec<_>>()
1509 );
1510 assert_eq!(summary.passed, 2);
1511 }
1512
1513 #[tokio::test]
1514 async fn summary_aggregate_timings_sum_phases_across_results() {
1515 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1516 let temp = TempTestDir::new();
1517 temp.write(
1518 "suite/test_phases.harn",
1519 r"
1520pipeline test_one(task) { assert_eq(1, 1) }
1521pipeline test_two(task) { assert_eq(2, 2) }
1522",
1523 );
1524
1525 let summary = run_tests(&temp.path().join("suite"), None, 5_000, false, &[]).await;
1526 assert_eq!(summary.passed, 2);
1527 let per_test_sum: u64 = summary
1528 .results
1529 .iter()
1530 .map(|r| r.phases.setup_ms.saturating_add(r.phases.compile_ms))
1531 .sum();
1532 let agg_sum = summary
1533 .aggregate
1534 .setup_ms
1535 .saturating_add(summary.aggregate.compile_ms);
1536 assert_eq!(
1537 per_test_sum, agg_sum,
1538 "aggregate setup+compile must equal sum of per-test setup+compile"
1539 );
1540 }
1541
1542 #[tokio::test]
1543 async fn parallel_scheduler_emits_progress_events() {
1544 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1545 let temp = TempTestDir::new();
1546 temp.write(
1547 "suite/test_events.harn",
1548 r"
1549@test
1550pipeline test_a(task) {}
1551
1552@test
1553pipeline test_b(task) {}
1554",
1555 );
1556
1557 let events: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
1558 let events_for_progress = Arc::clone(&events);
1559 let progress: TestRunProgress = Arc::new(move |event| {
1560 events_for_progress.lock().unwrap().push(match event {
1561 TestRunEvent::SuiteDiscovered { .. } => "suite",
1562 TestRunEvent::LargeSequentialSuite { .. } => "large-seq",
1563 TestRunEvent::TestStarted { .. } => "started",
1564 TestRunEvent::TestFinished(_) => "finished",
1565 });
1566 });
1567 let opts = RunOptions {
1568 parallel: true,
1569 jobs: Some(2),
1570 progress: Some(progress),
1571 ..RunOptions::new(5_000)
1572 };
1573 let _ = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1574 let events = events.lock().unwrap();
1575 assert_eq!(events.first().copied(), Some("suite"));
1576 assert_eq!(events.iter().filter(|e| **e == "started").count(), 2);
1577 assert_eq!(events.iter().filter(|e| **e == "finished").count(), 2);
1578 }
1579}