1use std::collections::{BTreeMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use std::time::Instant;
7
8use harn_lexer::Lexer;
9use harn_parser::{Attribute, Node, Parser, SNode};
10
11use crate::env_guard::ScopedEnvVar;
12
13#[derive(Clone, Debug)]
14pub struct TestResult {
15 pub name: String,
16 pub file: String,
17 pub passed: bool,
18 pub error: Option<String>,
19 pub duration_ms: u64,
20 pub phases: PhaseTimings,
24}
25
26#[derive(Clone, Debug)]
27pub struct TestSummary {
28 pub results: Vec<TestResult>,
29 pub passed: usize,
30 pub failed: usize,
31 pub total: usize,
32 pub duration_ms: u64,
33 pub aggregate: AggregateTimings,
35}
36
37#[derive(Debug, Default, Clone, Copy)]
43pub struct PhaseTimings {
44 pub setup_ms: u64,
47 pub compile_ms: u64,
49 pub execute_ms: u64,
51 pub teardown_ms: u64,
53}
54
55#[derive(Debug, Default, Clone, Copy)]
58pub struct AggregateTimings {
59 pub collection_ms: u64,
60 pub setup_ms: u64,
61 pub compile_ms: u64,
62 pub execute_ms: u64,
63 pub teardown_ms: u64,
64}
65
66impl AggregateTimings {
67 fn from_results(collection_ms: u64, results: &[TestResult]) -> Self {
68 results.iter().map(|r| r.phases).fold(
69 Self {
70 collection_ms,
71 ..Self::default()
72 },
73 |acc, p| Self {
74 collection_ms: acc.collection_ms,
75 setup_ms: acc.setup_ms.saturating_add(p.setup_ms),
76 compile_ms: acc.compile_ms.saturating_add(p.compile_ms),
77 execute_ms: acc.execute_ms.saturating_add(p.execute_ms),
78 teardown_ms: acc.teardown_ms.saturating_add(p.teardown_ms),
79 },
80 )
81 }
82}
83
84impl TestResult {
85 fn emit_diagnose(&self) {
89 let outcome = if self.passed { "ok" } else { "FAIL" };
90 eprintln!(
91 "[harn test diag] {} {} setup={}ms compile={}ms execute={}ms teardown={}ms total={}ms",
92 outcome,
93 self.name,
94 self.phases.setup_ms,
95 self.phases.compile_ms,
96 self.phases.execute_ms,
97 self.phases.teardown_ms,
98 self.duration_ms,
99 );
100 }
101}
102
103#[derive(Clone, Debug)]
104pub enum TestRunEvent {
105 SuiteDiscovered {
106 total_tests: usize,
107 total_files: usize,
108 parallel: bool,
109 workers: usize,
110 },
111 LargeSequentialSuite {
112 total_tests: usize,
113 total_files: usize,
114 },
115 TestStarted {
116 name: String,
117 file: String,
118 test_index: usize,
119 total_tests: usize,
120 },
121 TestFinished(TestResult),
122}
123
124pub type TestRunProgress = Arc<dyn Fn(TestRunEvent) + Send + Sync>;
125
126const LARGE_SEQUENTIAL_TEST_THRESHOLD: usize = 50;
127const LARGE_SEQUENTIAL_FILE_THRESHOLD: usize = 10;
128const DEFAULT_PARALLEL_JOBS_CAP: usize = 8;
129const TIMINGS_CACHE_RELATIVE_PATH: &str = ".harn/test-timings.json";
130const HARN_TEST_JOBS_ENV: &str = "HARN_TEST_JOBS";
131
132#[derive(Clone, Default)]
138pub struct RunOptions {
139 pub filter: Option<String>,
140 pub timeout_ms: u64,
141 pub parallel: bool,
145 pub jobs: Option<usize>,
149 pub cli_skill_dirs: Vec<PathBuf>,
150 pub progress: Option<TestRunProgress>,
153 pub diagnose: bool,
157}
158
159impl RunOptions {
160 pub fn new(timeout_ms: u64) -> Self {
161 Self {
162 timeout_ms,
163 ..Default::default()
164 }
165 }
166}
167
168#[derive(Clone)]
172struct TestCase {
173 file: PathBuf,
174 name: String,
175 source: Arc<String>,
176 program: Arc<Vec<SNode>>,
177 serial_group: Option<String>,
181 weight: usize,
184}
185
186fn canonicalize_existing_path(path: &Path) -> PathBuf {
187 path.canonicalize().unwrap_or_else(|_| path.to_path_buf())
188}
189
190fn test_execution_cwd() -> PathBuf {
191 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
192}
193
194fn emit_progress(progress: &Option<TestRunProgress>, event: TestRunEvent) {
195 if let Some(callback) = progress {
196 callback(event);
197 }
198}
199
200fn should_warn_large_sequential_suite(total_tests: usize, total_files: usize) -> bool {
201 total_tests >= LARGE_SEQUENTIAL_TEST_THRESHOLD || total_files >= LARGE_SEQUENTIAL_FILE_THRESHOLD
202}
203
204pub async fn run_tests(
206 path: &Path,
207 filter: Option<&str>,
208 timeout_ms: u64,
209 parallel: bool,
210 cli_skill_dirs: &[PathBuf],
211) -> TestSummary {
212 let options = RunOptions {
213 filter: filter.map(str::to_owned),
214 timeout_ms,
215 parallel,
216 jobs: None,
217 cli_skill_dirs: cli_skill_dirs.to_vec(),
218 progress: None,
219 diagnose: diagnose_enabled_via_env(),
220 };
221 run_tests_with_options(path, &options).await
222}
223
224pub async fn run_tests_with_progress(
226 path: &Path,
227 filter: Option<&str>,
228 timeout_ms: u64,
229 parallel: bool,
230 cli_skill_dirs: &[PathBuf],
231 progress: Option<TestRunProgress>,
232) -> TestSummary {
233 let options = RunOptions {
234 filter: filter.map(str::to_owned),
235 timeout_ms,
236 parallel,
237 jobs: None,
238 cli_skill_dirs: cli_skill_dirs.to_vec(),
239 progress,
240 diagnose: diagnose_enabled_via_env(),
241 };
242 run_tests_with_options(path, &options).await
243}
244
245fn diagnose_enabled_via_env() -> bool {
246 let Ok(raw) = std::env::var("HARN_TEST_DIAGNOSE") else {
247 return false;
248 };
249 matches!(
250 raw.to_ascii_lowercase().as_str(),
251 "1" | "true" | "yes" | "on"
252 )
253}
254
255pub async fn run_tests_with_options(path: &Path, options: &RunOptions) -> TestSummary {
260 let _default_llm_provider = ScopedEnvVar::set_if_unset("HARN_LLM_PROVIDER", "mock");
262 let _disable_llm_calls = ScopedEnvVar::set(harn_vm::llm::LLM_CALLS_DISABLED_ENV, "1");
263
264 let start = Instant::now();
265
266 let collection_start = Instant::now();
267 let canonical_target = canonicalize_existing_path(path);
268 let files = if canonical_target.is_dir() {
269 discover_test_files(&canonical_target)
270 } else {
271 vec![canonical_target.clone()]
272 };
273
274 let workers = resolve_workers(options);
275 let timings_path = timings_cache_path(&canonical_target);
276 let timings = timings_path
277 .as_deref()
278 .map(load_timings_cache)
279 .unwrap_or_default();
280
281 let discovery = discover_test_cases(&files, options.filter.as_deref(), workers);
282 let collection_ms = collection_start.elapsed().as_millis() as u64;
283
284 emit_progress(
285 &options.progress,
286 TestRunEvent::SuiteDiscovered {
287 total_tests: discovery.cases.len(),
288 total_files: discovery.files_with_tests,
289 parallel: options.parallel,
290 workers,
291 },
292 );
293 if workers == 1
294 && should_warn_large_sequential_suite(discovery.cases.len(), discovery.files_with_tests)
295 {
296 emit_progress(
297 &options.progress,
298 TestRunEvent::LargeSequentialSuite {
299 total_tests: discovery.cases.len(),
300 total_files: discovery.files_with_tests,
301 },
302 );
303 }
304
305 let mut cases = discovery.cases;
306 sort_cases_longest_first(&mut cases, &timings);
307
308 let mut all_results = discovery.discovery_errors;
309 let total_tests = cases.len();
310 all_results.extend(execute_cases(cases, workers, options, total_tests).await);
311
312 let total = all_results.len();
313 let passed = all_results.iter().filter(|r| r.passed).count();
314 let failed = total - passed;
315 let aggregate = AggregateTimings::from_results(collection_ms, &all_results);
316
317 if let Some(path) = timings_path.as_deref() {
318 update_timings_cache(path, timings, &all_results);
319 }
320
321 TestSummary {
322 results: all_results,
323 passed,
324 failed,
325 total,
326 duration_ms: start.elapsed().as_millis() as u64,
327 aggregate,
328 }
329}
330
331pub async fn run_test_file(
339 path: &Path,
340 filter: Option<&str>,
341 timeout_ms: u64,
342 execution_cwd: Option<&Path>,
343 cli_skill_dirs: &[PathBuf],
344) -> Result<Vec<TestResult>, String> {
345 let source =
346 fs::read_to_string(path).map_err(|e| format!("Failed to read {}: {e}", path.display()))?;
347 let program = parse_program(&source)?;
348 let source = Arc::new(source);
349 let program = Arc::new(program);
350
351 let cases = extract_cases_from_program(path, &source, &program, filter, usize::MAX);
352
353 let mut results = Vec::with_capacity(cases.len());
354 let execution_cwd = execution_cwd
355 .map(Path::to_path_buf)
356 .unwrap_or_else(test_execution_cwd);
357 for case in cases {
358 results.push(execute_case(&case, &execution_cwd, timeout_ms, cli_skill_dirs).await);
359 }
360 Ok(results)
361}
362
363fn resolve_workers(options: &RunOptions) -> usize {
364 if !options.parallel {
365 return 1;
366 }
367 if let Some(jobs) = options.jobs {
368 return jobs.max(1);
369 }
370 if let Ok(raw) = std::env::var(HARN_TEST_JOBS_ENV) {
371 if let Ok(parsed) = raw.trim().parse::<usize>() {
372 if parsed >= 1 {
373 return parsed;
374 }
375 }
376 }
377 let detected = thread::available_parallelism()
378 .map(|n| n.get())
379 .unwrap_or(1);
380 detected.clamp(1, DEFAULT_PARALLEL_JOBS_CAP)
381}
382
383struct Discovery {
384 cases: Vec<TestCase>,
385 files_with_tests: usize,
386 discovery_errors: Vec<TestResult>,
387}
388
389fn discover_test_cases(files: &[PathBuf], filter: Option<&str>, workers: usize) -> Discovery {
390 let mut cases = Vec::new();
391 let mut files_with_tests = 0usize;
392 let mut discovery_errors = Vec::new();
393
394 for file in files {
395 let source = match fs::read_to_string(file) {
396 Ok(s) => s,
397 Err(e) => {
398 discovery_errors.push(TestResult {
399 name: "<file error>".to_string(),
400 file: file.display().to_string(),
401 passed: false,
402 error: Some(format!("Failed to read {}: {e}", file.display())),
403 duration_ms: 0,
404 phases: PhaseTimings::default(),
405 });
406 continue;
407 }
408 };
409
410 let program = match parse_program(&source) {
411 Ok(p) => p,
412 Err(e) => {
413 discovery_errors.push(TestResult {
414 name: "<file error>".to_string(),
415 file: file.display().to_string(),
416 passed: false,
417 error: Some(e),
418 duration_ms: 0,
419 phases: PhaseTimings::default(),
420 });
421 continue;
422 }
423 };
424
425 let source = Arc::new(source);
426 let program = Arc::new(program);
427 let file_cases = extract_cases_from_program(file, &source, &program, filter, workers);
428 if !file_cases.is_empty() {
429 files_with_tests += 1;
430 cases.extend(file_cases);
431 }
432 }
433
434 Discovery {
435 cases,
436 files_with_tests,
437 discovery_errors,
438 }
439}
440
441fn parse_program(source: &str) -> Result<Vec<SNode>, String> {
442 let mut lexer = Lexer::new(source);
443 let tokens = lexer.tokenize().map_err(|e| format!("{e}"))?;
444 let mut parser = Parser::new(tokens);
445 parser.parse().map_err(|e| format!("{e}"))
446}
447
448fn extract_cases_from_program(
449 file: &Path,
450 source: &Arc<String>,
451 program: &Arc<Vec<SNode>>,
452 filter: Option<&str>,
453 workers: usize,
454) -> Vec<TestCase> {
455 let mut cases = Vec::new();
456 for snode in program.iter() {
457 let Some(meta) = inspect_test_pipeline(snode) else {
458 continue;
459 };
460 if let Some(pattern) = filter {
461 if !meta.name.contains(pattern) {
462 continue;
463 }
464 }
465 let weight = meta.weight.min(workers).max(1);
468 cases.push(TestCase {
469 file: file.to_path_buf(),
470 name: meta.name,
471 source: Arc::clone(source),
472 program: Arc::clone(program),
473 serial_group: meta.serial_group,
474 weight,
475 });
476 }
477 cases
478}
479
480struct PipelineMeta {
481 name: String,
482 serial_group: Option<String>,
483 weight: usize,
484}
485
486fn inspect_test_pipeline(snode: &SNode) -> Option<PipelineMeta> {
487 let (attributes, inner) = match &snode.node {
491 Node::AttributedDecl { attributes, inner } => (attributes.as_slice(), inner.as_ref()),
492 _ => (&[][..], snode),
493 };
494 let name = match &inner.node {
495 Node::Pipeline { name, .. } => name.clone(),
496 _ => return None,
497 };
498 let has_test_attr = attributes.iter().any(|a| a.name == "test");
499 if !has_test_attr && !name.starts_with("test_") {
500 return None;
501 }
502 let serial_group = attributes
503 .iter()
504 .find(|a| a.name == "serial")
505 .map(serial_group_for);
506 let weight = attributes
507 .iter()
508 .find(|a| a.name == "heavy")
509 .and_then(heavy_weight_for)
510 .unwrap_or(1);
511 Some(PipelineMeta {
512 name,
513 serial_group,
514 weight,
515 })
516}
517
518fn serial_group_for(attr: &Attribute) -> String {
519 attr.string_arg("group")
520 .unwrap_or_else(|| "__default__".to_string())
521}
522
523fn heavy_weight_for(attr: &Attribute) -> Option<usize> {
524 attr.args
525 .iter()
526 .find(|a| a.name.as_deref() == Some("threads"))
527 .and_then(|a| match &a.value.node {
528 Node::IntLiteral(n) if *n >= 1 => Some(*n as usize),
529 _ => None,
530 })
531}
532
533fn sort_cases_longest_first(cases: &mut [TestCase], timings: &BTreeMap<String, u64>) {
534 cases.sort_by(|a, b| {
540 let key_a = timings_key(&a.file, &a.name);
541 let key_b = timings_key(&b.file, &b.name);
542 let dur_a = timings.get(&key_a).copied().unwrap_or(0);
543 let dur_b = timings.get(&key_b).copied().unwrap_or(0);
544 dur_a
545 .cmp(&dur_b)
546 .then_with(|| a.file.cmp(&b.file))
547 .then_with(|| a.name.cmp(&b.name))
548 });
549}
550
551fn timings_key(file: &Path, name: &str) -> String {
552 format!("{}::{}", file.display(), name)
553}
554
555fn timings_cache_path(target: &Path) -> Option<PathBuf> {
556 let probe_root = if target.is_dir() {
561 target.to_path_buf()
562 } else {
563 target.parent()?.to_path_buf()
564 };
565 let root = harn_vm::stdlib::process::find_project_root(&probe_root)
566 .unwrap_or_else(|| probe_root.clone());
567 Some(root.join(TIMINGS_CACHE_RELATIVE_PATH))
568}
569
570fn load_timings_cache(path: &Path) -> BTreeMap<String, u64> {
571 let Ok(contents) = fs::read_to_string(path) else {
572 return BTreeMap::new();
573 };
574 serde_json::from_str::<BTreeMap<String, u64>>(&contents).unwrap_or_default()
575}
576
577fn update_timings_cache(path: &Path, mut existing: BTreeMap<String, u64>, results: &[TestResult]) {
578 for result in results {
579 if result.name == "<file error>" || result.name == "<join error>" {
580 continue;
581 }
582 existing.insert(
583 timings_key(Path::new(&result.file), &result.name),
584 result.duration_ms,
585 );
586 }
587 if let Some(parent) = path.parent() {
588 let _ = fs::create_dir_all(parent);
589 }
590 if let Ok(serialized) = serde_json::to_string(&existing) {
591 let _ = fs::write(path, serialized);
592 }
593}
594
595async fn execute_cases(
596 cases: Vec<TestCase>,
597 workers: usize,
598 options: &RunOptions,
599 total_tests: usize,
600) -> Vec<TestResult> {
601 if cases.is_empty() {
602 return Vec::new();
603 }
604 let completed = Arc::new(Mutex::new(0usize));
605 if workers <= 1 {
606 let mut results = Vec::with_capacity(cases.len());
607 for case in cases {
608 let cwd = case_execution_cwd(&case);
609 let test_index = next_test_index(&completed);
610 emit_progress(
611 &options.progress,
612 TestRunEvent::TestStarted {
613 name: case.name.clone(),
614 file: case.file.display().to_string(),
615 test_index,
616 total_tests,
617 },
618 );
619 let result =
620 execute_case(&case, &cwd, options.timeout_ms, &options.cli_skill_dirs).await;
621 if options.diagnose {
622 result.emit_diagnose();
623 }
624 emit_progress(
625 &options.progress,
626 TestRunEvent::TestFinished(result.clone()),
627 );
628 results.push(result);
629 }
630 return results;
631 }
632
633 let queue = Arc::new(Mutex::new(cases));
634 let gate = Arc::new(ResourceGate::new(workers));
635 let results: Arc<Mutex<Vec<TestResult>>> = Arc::new(Mutex::new(Vec::new()));
636
637 let mut handles = Vec::with_capacity(workers);
638 for worker_idx in 0..workers {
639 let queue = Arc::clone(&queue);
640 let gate = Arc::clone(&gate);
641 let results = Arc::clone(&results);
642 let completed = Arc::clone(&completed);
643 let timeout_ms = options.timeout_ms;
644 let cli_skill_dirs = options.cli_skill_dirs.clone();
645 let progress = options.progress.clone();
646 let diagnose = options.diagnose;
647 let handle = thread::Builder::new()
648 .name(format!("harn-test-worker-{worker_idx}"))
649 .spawn(move || {
650 let runtime = match tokio::runtime::Builder::new_current_thread()
651 .enable_all()
652 .build()
653 {
654 Ok(rt) => rt,
655 Err(error) => {
656 results.lock().unwrap().push(TestResult {
657 name: "<worker error>".to_string(),
658 file: String::new(),
659 passed: false,
660 error: Some(format!("failed to start test runtime: {error}")),
661 duration_ms: 0,
662 phases: PhaseTimings::default(),
663 });
664 return;
665 }
666 };
667 while let Some(case) = queue.lock().unwrap().pop() {
672 let _guard = gate.acquire(case.weight, case.serial_group.as_deref());
673 let cwd = case_execution_cwd(&case);
674 let test_index = next_test_index(&completed);
675 emit_progress(
676 &progress,
677 TestRunEvent::TestStarted {
678 name: case.name.clone(),
679 file: case.file.display().to_string(),
680 test_index,
681 total_tests,
682 },
683 );
684 let result =
685 runtime.block_on(execute_case(&case, &cwd, timeout_ms, &cli_skill_dirs));
686 if diagnose {
687 result.emit_diagnose();
688 }
689 emit_progress(&progress, TestRunEvent::TestFinished(result.clone()));
690 results.lock().unwrap().push(result);
691 }
692 })
693 .expect("spawning a harn-test worker thread should succeed");
694 handles.push(handle);
695 }
696
697 for handle in handles {
698 let _ = handle.join();
699 }
700
701 Arc::try_unwrap(results)
705 .map(|m| m.into_inner().unwrap_or_default())
706 .unwrap_or_else(|arc| arc.lock().unwrap().clone())
707}
708
709fn next_test_index(counter: &Mutex<usize>) -> usize {
710 let mut guard = counter.lock().unwrap();
711 *guard += 1;
712 *guard
713}
714
715fn case_execution_cwd(case: &TestCase) -> PathBuf {
716 case.file
717 .parent()
718 .filter(|p| !p.as_os_str().is_empty())
719 .map(Path::to_path_buf)
720 .unwrap_or_else(test_execution_cwd)
721}
722
723struct ResourceGate {
727 state: Mutex<GateState>,
728 cond: Condvar,
729 capacity: usize,
730}
731
732struct GateState {
733 available: usize,
734 busy_groups: HashSet<String>,
735}
736
737struct GateGuard<'a> {
738 gate: &'a ResourceGate,
739 weight: usize,
740 group: Option<String>,
741}
742
743impl ResourceGate {
744 fn new(capacity: usize) -> Self {
745 Self {
746 state: Mutex::new(GateState {
747 available: capacity,
748 busy_groups: HashSet::new(),
749 }),
750 cond: Condvar::new(),
751 capacity,
752 }
753 }
754
755 fn acquire(&self, weight: usize, group: Option<&str>) -> GateGuard<'_> {
756 let weight = weight.min(self.capacity).max(1);
757 let mut state = self.state.lock().unwrap();
758 loop {
759 let group_free = group.is_none_or(|g| !state.busy_groups.contains(g));
760 if state.available >= weight && group_free {
761 state.available -= weight;
762 if let Some(g) = group {
763 state.busy_groups.insert(g.to_string());
764 }
765 return GateGuard {
766 gate: self,
767 weight,
768 group: group.map(str::to_owned),
769 };
770 }
771 state = self.cond.wait(state).unwrap();
772 }
773 }
774}
775
776impl Drop for GateGuard<'_> {
777 fn drop(&mut self) {
778 let mut state = self.gate.state.lock().unwrap();
779 state.available += self.weight;
780 if let Some(group) = self.group.as_deref() {
781 state.busy_groups.remove(group);
782 }
783 self.gate.cond.notify_all();
784 }
785}
786
787async fn execute_case(
788 case: &TestCase,
789 execution_cwd: &Path,
790 timeout_ms: u64,
791 cli_skill_dirs: &[PathBuf],
792) -> TestResult {
793 harn_vm::reset_thread_local_state();
794
795 let mut phases = PhaseTimings::default();
796 let total_start = Instant::now();
797
798 let compile_start = Instant::now();
799 let chunk = match harn_vm::Compiler::new().compile_named(&case.program, &case.name) {
800 Ok(c) => c,
801 Err(e) => {
802 phases.compile_ms = compile_start.elapsed().as_millis() as u64;
803 return TestResult {
804 name: case.name.clone(),
805 file: case.file.display().to_string(),
806 passed: false,
807 error: Some(format!("Compile error: {e}")),
808 duration_ms: total_start.elapsed().as_millis() as u64,
809 phases,
810 };
811 }
812 };
813 phases.compile_ms = compile_start.elapsed().as_millis() as u64;
814
815 let local = tokio::task::LocalSet::new();
816 let timeout = std::time::Duration::from_millis(timeout_ms);
817 let file_display = case.file.display().to_string();
818 let setup_start = Instant::now();
819 let result = tokio::time::timeout(
820 timeout,
821 local.run_until(async {
822 let mut vm = harn_vm::Vm::new();
823 harn_vm::register_vm_stdlib(&mut vm);
824 crate::install_default_hostlib(&mut vm);
825 let source_parent = case.file.parent().unwrap_or(Path::new("."));
826 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
827 let store_base = project_root.as_deref().unwrap_or(source_parent);
828 let source_dir = source_parent.to_string_lossy().into_owned();
829 harn_vm::register_store_builtins(&mut vm, store_base);
830 harn_vm::register_metadata_builtins(&mut vm, store_base);
831 let pipeline_name = case
832 .file
833 .file_stem()
834 .and_then(|s| s.to_str())
835 .unwrap_or("test");
836 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
837 vm.set_source_info(&file_display, &case.source);
838 harn_vm::stdlib::process::set_thread_execution_context(Some(
839 harn_vm::orchestration::RunExecutionRecord {
840 cwd: Some(execution_cwd.to_string_lossy().into_owned()),
841 source_dir: Some(source_dir),
842 env: BTreeMap::new(),
843 adapter: None,
844 repo_path: None,
845 worktree_path: None,
846 branch: None,
847 base_ref: None,
848 cleanup: None,
849 },
850 ));
851 if let Some(ref root) = project_root {
852 vm.set_project_root(root);
853 }
854 if let Some(parent) = case.file.parent() {
855 if !parent.as_os_str().is_empty() {
856 vm.set_source_dir(parent);
857 }
858 }
859 let loaded =
860 crate::skill_loader::load_skills(&crate::skill_loader::SkillLoaderInputs {
861 cli_dirs: cli_skill_dirs.to_vec(),
862 source_path: Some(case.file.clone()),
863 });
864 crate::skill_loader::emit_loader_warnings(&loaded.loader_warnings);
865 crate::skill_loader::install_skills_global(&mut vm, &loaded);
866 let extensions = crate::package::load_runtime_extensions(&case.file);
867 crate::package::install_runtime_extensions(&extensions);
868 crate::package::install_manifest_triggers(&mut vm, &extensions)
869 .await
870 .map_err(|error| format!("failed to install manifest triggers: {error}"))?;
871 crate::package::install_manifest_hooks(&mut vm, &extensions)
872 .await
873 .map_err(|error| format!("failed to install manifest hooks: {error}"))?;
874 vm.set_harness(harn_vm::Harness::real());
875 let setup_ms = setup_start.elapsed().as_millis() as u64;
876 let exec_start = Instant::now();
877 let outcome = match vm.execute(&chunk).await {
878 Ok(val) => Ok(val),
879 Err(e) => Err(vm.format_runtime_error(&e)),
880 };
881 let execute_ms = exec_start.elapsed().as_millis() as u64;
882 harn_vm::egress::reset_egress_policy_for_host();
883 Ok::<_, String>((outcome, setup_ms, execute_ms))
884 }),
885 )
886 .await;
887
888 let teardown_start = Instant::now();
889 harn_vm::reset_thread_local_state();
893 phases.teardown_ms = teardown_start.elapsed().as_millis() as u64;
894
895 let elapsed_ms = total_start.elapsed().as_millis() as u64;
896 let (passed, error, duration_ms) = match result {
897 Ok(Ok((outcome, setup_ms, execute_ms))) => {
898 phases.setup_ms = setup_ms;
899 phases.execute_ms = execute_ms;
900 match outcome {
901 Ok(_) => (true, None, elapsed_ms),
902 Err(message) => (false, Some(message), elapsed_ms),
903 }
904 }
905 Ok(Err(setup_error)) => (false, Some(setup_error), elapsed_ms),
906 Err(_) => (
910 false,
911 Some(format!("timed out after {timeout_ms}ms")),
912 timeout_ms,
913 ),
914 };
915
916 TestResult {
917 name: case.name.clone(),
918 file: file_display,
919 passed,
920 error,
921 duration_ms,
922 phases,
923 }
924}
925
926fn discover_test_files(dir: &Path) -> Vec<PathBuf> {
927 let mut files = Vec::new();
928 if let Ok(entries) = fs::read_dir(dir) {
929 for entry in entries.flatten() {
930 let path = entry.path();
931 if path.is_dir() {
932 files.extend(discover_test_files(&path));
933 } else if path.extension().is_some_and(|e| e == "harn") {
934 if let Ok(content) = fs::read_to_string(&path) {
935 if content.contains("test_") || content.contains("@test") {
936 files.push(canonicalize_existing_path(&path));
937 }
938 }
939 }
940 }
941 }
942 files.sort();
943 files
944}
945
946#[cfg(test)]
947mod tests {
948 use super::*;
949
950 use std::sync::Arc;
951 use std::time::Duration;
952
953 struct TempTestDir {
954 inner: tempfile::TempDir,
955 }
956
957 impl TempTestDir {
958 fn new() -> Self {
959 Self {
960 inner: tempfile::tempdir().unwrap(),
961 }
962 }
963
964 fn write(&self, relative: &str, contents: &str) {
965 let path = self.path().join(relative);
966 if let Some(parent) = path.parent() {
967 fs::create_dir_all(parent).unwrap();
968 }
969 fs::write(path, contents).unwrap();
970 }
971
972 fn path(&self) -> &Path {
973 self.inner.path()
974 }
975 }
976
977 #[test]
978 fn discover_test_files_returns_canonical_absolute_paths() {
979 let temp = TempTestDir::new();
980 temp.write("suite/test_alpha.harn", "pipeline test_alpha(task) {}");
981 temp.write("suite/nested/test_beta.harn", "pipeline test_beta(task) {}");
982 temp.write("suite/annotated.harn", "@test\npipeline annotated(task) {}");
983 temp.write("suite/ignore.harn", "pipeline build(task) {}");
984
985 let files = discover_test_files(&temp.path().join("suite"));
986
987 assert_eq!(files.len(), 3);
988 assert!(files.iter().all(|path| path.is_absolute()));
989 assert!(files
990 .iter()
991 .any(|path| path.ends_with("suite/test_alpha.harn")));
992 assert!(files
993 .iter()
994 .any(|path| path.ends_with("suite/nested/test_beta.harn")));
995 assert!(files
996 .iter()
997 .any(|path| path.ends_with("suite/annotated.harn")));
998 }
999
1000 #[tokio::test]
1001 async fn run_tests_uses_file_parent_as_execution_cwd_and_restores_shell_cwd() {
1002 let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1003 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1004 let temp = TempTestDir::new();
1005 temp.write(
1006 "suite/test_cwd.harn",
1007 r#"
1008pipeline test_current_dir(task) {
1009 assert_eq(cwd(), source_dir())
1010}
1011"#,
1012 );
1013
1014 let original_cwd = std::env::current_dir().unwrap();
1015 let summary = run_tests(&temp.path().join("suite"), None, 1_000, false, &[]).await;
1016 let restored_cwd = std::env::current_dir().unwrap();
1017
1018 assert_eq!(summary.failed, 0);
1019 assert_eq!(summary.passed, 1);
1020 assert_eq!(
1021 fs::canonicalize(restored_cwd).unwrap(),
1022 fs::canonicalize(original_cwd).unwrap()
1023 );
1024 }
1025
1026 #[tokio::test]
1027 async fn parallel_run_tests_uses_each_file_parent_as_execution_cwd() {
1028 let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1029 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1030 let temp = TempTestDir::new();
1031 temp.write(
1032 "suite/a/test_one.harn",
1033 r#"
1034pipeline test_one(task) {
1035 assert_eq(cwd(), source_dir())
1036}
1037"#,
1038 );
1039 temp.write(
1040 "suite/b/test_two.harn",
1041 r#"
1042pipeline test_two(task) {
1043 assert_eq(cwd(), source_dir())
1044}
1045"#,
1046 );
1047
1048 let summary = run_tests(&temp.path().join("suite"), None, 1_000, true, &[]).await;
1049 assert_eq!(summary.failed, 0);
1050 assert_eq!(summary.passed, 2);
1051 }
1052
1053 #[tokio::test]
1054 async fn run_tests_loads_cli_skill_dirs() {
1055 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1056 let temp = TempTestDir::new();
1057 temp.write(
1058 "skills/review/SKILL.md",
1059 r#"---
1060name: review
1061short: Review PRs
1062description: Review pull requests
1063---
1064
1065Review instructions.
1066"#,
1067 );
1068 temp.write(
1069 "suite/test_skills.harn",
1070 r#"
1071pipeline test_cli_skills(task) {
1072 assert_eq(skill_count(skills), 1)
1073 let found = skill_find(skills, "review")
1074 assert_eq(found.name, "review")
1075}
1076"#,
1077 );
1078
1079 let summary = run_tests(
1080 &temp.path().join("suite"),
1081 None,
1082 1_000,
1083 false,
1084 &[temp.path().join("skills")],
1085 )
1086 .await;
1087
1088 assert_eq!(summary.failed, 0, "{:?}", summary.results[0].error);
1089 assert_eq!(summary.passed, 1);
1090 }
1091
1092 #[test]
1093 fn resolve_workers_honors_explicit_jobs() {
1094 let mut opts = RunOptions::new(1_000);
1095 opts.parallel = true;
1096 opts.jobs = Some(3);
1097 assert_eq!(resolve_workers(&opts), 3);
1098 }
1099
1100 #[test]
1101 fn resolve_workers_returns_one_when_not_parallel() {
1102 let mut opts = RunOptions::new(1_000);
1103 opts.parallel = false;
1104 opts.jobs = Some(8);
1105 assert_eq!(resolve_workers(&opts), 1);
1106 }
1107
1108 #[test]
1109 fn sort_cases_longest_first_uses_historical_durations() {
1110 let source = Arc::new(String::new());
1111 let program = Arc::new(Vec::new());
1112 let mk = |name: &str| TestCase {
1113 file: PathBuf::from("tests/a.harn"),
1114 name: name.to_string(),
1115 source: Arc::clone(&source),
1116 program: Arc::clone(&program),
1117 serial_group: None,
1118 weight: 1,
1119 };
1120 let mut cases = vec![mk("test_quick"), mk("test_slow"), mk("test_medium")];
1121 let mut timings = BTreeMap::new();
1122 timings.insert("tests/a.harn::test_slow".to_string(), 5_000);
1123 timings.insert("tests/a.harn::test_medium".to_string(), 1_000);
1124
1125 sort_cases_longest_first(&mut cases, &timings);
1126
1127 let order: Vec<&str> = cases.iter().map(|c| c.name.as_str()).collect();
1129 assert_eq!(order, vec!["test_quick", "test_medium", "test_slow"]);
1130 }
1131
1132 #[test]
1133 fn resource_gate_serializes_same_group() {
1134 let gate = Arc::new(ResourceGate::new(4));
1135 let trace = Arc::new(Mutex::new(Vec::<&'static str>::new()));
1136
1137 let g_a = {
1138 let trace = Arc::clone(&trace);
1139 let gate = Arc::clone(&gate);
1140 thread::spawn(move || {
1141 let _guard = gate.acquire(1, Some("login"));
1142 trace.lock().unwrap().push("a-start");
1143 thread::sleep(Duration::from_millis(50));
1144 trace.lock().unwrap().push("a-end");
1145 })
1146 };
1147 thread::sleep(Duration::from_millis(10));
1149 let g_b = {
1150 let trace = Arc::clone(&trace);
1151 let gate = Arc::clone(&gate);
1152 thread::spawn(move || {
1153 let _guard = gate.acquire(1, Some("login"));
1154 trace.lock().unwrap().push("b-start");
1155 trace.lock().unwrap().push("b-end");
1156 })
1157 };
1158 g_a.join().unwrap();
1159 g_b.join().unwrap();
1160
1161 let trace = trace.lock().unwrap();
1162 let a_end = trace.iter().position(|t| *t == "a-end").unwrap();
1164 let b_start = trace.iter().position(|t| *t == "b-start").unwrap();
1165 assert!(a_end < b_start, "B started before A finished: {trace:?}");
1166 }
1167
1168 #[test]
1169 fn resource_gate_allows_independent_groups_in_parallel() {
1170 let gate = Arc::new(ResourceGate::new(4));
1171 let _guard_a = gate.acquire(1, Some("alpha"));
1172 let acquired = std::sync::Mutex::new(false);
1174 thread::scope(|s| {
1175 s.spawn(|| {
1176 let _ = gate.acquire(1, Some("beta"));
1177 *acquired.lock().unwrap() = true;
1178 });
1179 });
1180 assert!(*acquired.lock().unwrap());
1181 }
1182
1183 #[test]
1184 fn resource_gate_caps_heavy_weight_at_capacity() {
1185 let gate = Arc::new(ResourceGate::new(2));
1188 let _g = gate.acquire(99, None);
1189 let started = Arc::new(Mutex::new(false));
1191 let inner = Arc::clone(&started);
1192 let gate2 = Arc::clone(&gate);
1193 let handle = thread::spawn(move || {
1194 let _guard = gate2.acquire(1, None);
1195 *inner.lock().unwrap() = true;
1196 });
1197 thread::sleep(Duration::from_millis(20));
1198 assert!(!*started.lock().unwrap(), "should still be waiting");
1199 drop(_g);
1200 handle.join().unwrap();
1201 assert!(*started.lock().unwrap());
1202 }
1203
1204 #[tokio::test]
1205 async fn parallel_scheduler_runs_heavy_tests_without_oversubscribing() {
1206 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1209 let temp = TempTestDir::new();
1210 temp.write(
1211 "suite/test_heavy.harn",
1212 r#"
1213@test
1214@heavy(threads: 2)
1215pipeline test_heavy_one(task) {}
1216
1217@test
1218pipeline test_light(task) {}
1219"#,
1220 );
1221
1222 let opts = RunOptions {
1223 parallel: true,
1224 jobs: Some(2),
1225 ..RunOptions::new(5_000)
1226 };
1227 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1228 assert_eq!(summary.failed, 0, "{:?}", summary.results);
1229 assert_eq!(summary.total, 2);
1230 }
1231
1232 #[tokio::test]
1233 async fn parallel_scheduler_handles_serial_group_annotation() {
1234 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1235 let temp = TempTestDir::new();
1236 temp.write(
1237 "suite/test_serial.harn",
1238 r#"
1239@test
1240@serial(group: "fixture")
1241pipeline test_serial_one(task) {}
1242
1243@test
1244@serial(group: "fixture")
1245pipeline test_serial_two(task) {}
1246"#,
1247 );
1248
1249 let opts = RunOptions {
1250 parallel: true,
1251 jobs: Some(4),
1252 ..RunOptions::new(5_000)
1253 };
1254 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1255 assert_eq!(summary.failed, 0, "{:?}", summary.results);
1256 assert_eq!(summary.passed, 2);
1257 }
1258
1259 #[tokio::test]
1260 async fn parallel_scheduler_persists_timings_cache() {
1261 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1262 let temp = TempTestDir::new();
1263 temp.write(
1264 "suite/test_timed.harn",
1265 r#"
1266@test
1267pipeline test_first(task) {}
1268
1269@test
1270pipeline test_second(task) {}
1271"#,
1272 );
1273
1274 let opts = RunOptions {
1275 parallel: true,
1276 jobs: Some(2),
1277 ..RunOptions::new(5_000)
1278 };
1279 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1280 assert_eq!(summary.passed, 2);
1281 let cache = temp.path().join("suite/.harn/test-timings.json");
1282 assert!(cache.exists(), "expected timings cache at {cache:?}");
1283 let stored: BTreeMap<String, u64> =
1284 serde_json::from_str(&fs::read_to_string(&cache).unwrap()).unwrap();
1285 assert!(
1286 stored.keys().any(|key| key.contains("test_first")),
1287 "expected timings for test_first in {stored:?}"
1288 );
1289 assert!(
1290 stored.keys().any(|key| key.contains("test_second")),
1291 "expected timings for test_second in {stored:?}"
1292 );
1293 }
1294
1295 #[tokio::test]
1302 async fn worker_resets_thread_local_state_between_cases() {
1303 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1304 let temp = TempTestDir::new();
1305 temp.write(
1306 "suite/test_isolation.harn",
1307 r#"
1308// The leak probe pins the clock to a future-but-i64-safe value
1309// (year ~2128) so a leaked mock is observable. Larger values overflow
1310// the nanosecond conversion inside the mock clock.
1311pipeline test_a_pins_clock(task) {
1312 mock_time(5000000000000)
1313 assert_eq(now_ms(), 5000000000000)
1314}
1315
1316pipeline test_b_clock_is_fresh(task) {
1317 let ms = now_ms()
1318 assert(ms < 5000000000000, "clock mock leaked from previous test")
1319}
1320"#,
1321 );
1322
1323 let opts = RunOptions::new(5_000);
1324 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1325 assert_eq!(
1326 summary.failed,
1327 0,
1328 "state leaked between tests: {:?}",
1329 summary
1330 .results
1331 .iter()
1332 .filter(|r| !r.passed)
1333 .map(|r| (r.name.clone(), r.error.clone()))
1334 .collect::<Vec<_>>()
1335 );
1336 assert_eq!(summary.passed, 2);
1337 }
1338
1339 #[tokio::test]
1340 async fn summary_aggregate_timings_sum_phases_across_results() {
1341 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1342 let temp = TempTestDir::new();
1343 temp.write(
1344 "suite/test_phases.harn",
1345 r#"
1346pipeline test_one(task) { assert_eq(1, 1) }
1347pipeline test_two(task) { assert_eq(2, 2) }
1348"#,
1349 );
1350
1351 let summary = run_tests(&temp.path().join("suite"), None, 5_000, false, &[]).await;
1352 assert_eq!(summary.passed, 2);
1353 let per_test_sum: u64 = summary
1354 .results
1355 .iter()
1356 .map(|r| r.phases.setup_ms.saturating_add(r.phases.compile_ms))
1357 .sum();
1358 let agg_sum = summary
1359 .aggregate
1360 .setup_ms
1361 .saturating_add(summary.aggregate.compile_ms);
1362 assert_eq!(
1363 per_test_sum, agg_sum,
1364 "aggregate setup+compile must equal sum of per-test setup+compile"
1365 );
1366 }
1367
1368 #[tokio::test]
1369 async fn parallel_scheduler_emits_progress_events() {
1370 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1371 let temp = TempTestDir::new();
1372 temp.write(
1373 "suite/test_events.harn",
1374 r#"
1375@test
1376pipeline test_a(task) {}
1377
1378@test
1379pipeline test_b(task) {}
1380"#,
1381 );
1382
1383 let events: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
1384 let events_for_progress = Arc::clone(&events);
1385 let progress: TestRunProgress = Arc::new(move |event| {
1386 events_for_progress.lock().unwrap().push(match event {
1387 TestRunEvent::SuiteDiscovered { .. } => "suite",
1388 TestRunEvent::LargeSequentialSuite { .. } => "large-seq",
1389 TestRunEvent::TestStarted { .. } => "started",
1390 TestRunEvent::TestFinished(_) => "finished",
1391 });
1392 });
1393 let opts = RunOptions {
1394 parallel: true,
1395 jobs: Some(2),
1396 progress: Some(progress),
1397 ..RunOptions::new(5_000)
1398 };
1399 let _ = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1400 let events = events.lock().unwrap();
1401 assert_eq!(events.first().copied(), Some("suite"));
1402 assert_eq!(events.iter().filter(|e| **e == "started").count(), 2);
1403 assert_eq!(events.iter().filter(|e| **e == "finished").count(), 2);
1404 }
1405}