Skip to main content

harn_cli/
test_runner.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use std::time::Instant;
7
8use harn_lexer::Lexer;
9use harn_parser::{Attribute, Node, Parser, SNode};
10
11use crate::env_guard::ScopedEnvVar;
12
13#[derive(Clone, Debug)]
14pub struct TestResult {
15    pub name: String,
16    pub file: String,
17    pub passed: bool,
18    pub error: Option<String>,
19    pub duration_ms: u64,
20    /// Per-phase timings. Populated by `execute_case`; default-zero on
21    /// discovery/worker-error rows. Used by `--diagnose` and the
22    /// `--timing` aggregate.
23    pub phases: PhaseTimings,
24}
25
26#[derive(Clone, Debug)]
27pub struct TestSummary {
28    pub results: Vec<TestResult>,
29    pub passed: usize,
30    pub failed: usize,
31    pub total: usize,
32    pub duration_ms: u64,
33    /// Aggregated phase costs across the entire run.
34    pub aggregate: AggregateTimings,
35}
36
37/// Wall-clock cost of each phase of a single test execution.
38///
39/// Sums to the test's `duration_ms` modulo measurement overhead. Surfaced
40/// so consumers can attribute cold-start vs assertion cost without
41/// having to instrument the runner externally.
42#[derive(Debug, Default, Clone, Copy)]
43pub struct PhaseTimings {
44    /// VM construction + stdlib/hostlib registration + skill install +
45    /// runtime extension install + manifest hooks/triggers install.
46    pub setup_ms: u64,
47    /// `Compiler::compile_named` time for this test's chunk.
48    pub compile_ms: u64,
49    /// `vm.execute(chunk)` wall time, i.e. the actual user-test body.
50    pub execute_ms: u64,
51    /// `reset_thread_local_state` between tests.
52    pub teardown_ms: u64,
53}
54
55/// Aggregated cost across the run. Mirrors [`PhaseTimings`] plus the
56/// suite-level collection cost (discover + parse).
57#[derive(Debug, Default, Clone, Copy)]
58pub struct AggregateTimings {
59    pub collection_ms: u64,
60    pub setup_ms: u64,
61    pub compile_ms: u64,
62    pub execute_ms: u64,
63    pub teardown_ms: u64,
64}
65
66impl AggregateTimings {
67    fn from_results(collection_ms: u64, results: &[TestResult]) -> Self {
68        results.iter().map(|r| r.phases).fold(
69            Self {
70                collection_ms,
71                ..Self::default()
72            },
73            |acc, p| Self {
74                collection_ms: acc.collection_ms,
75                setup_ms: acc.setup_ms.saturating_add(p.setup_ms),
76                compile_ms: acc.compile_ms.saturating_add(p.compile_ms),
77                execute_ms: acc.execute_ms.saturating_add(p.execute_ms),
78                teardown_ms: acc.teardown_ms.saturating_add(p.teardown_ms),
79            },
80        )
81    }
82}
83
84impl TestResult {
85    /// Emit a one-line phase breakdown to stderr. Driven by `--diagnose`
86    /// / `HARN_TEST_DIAGNOSE=1`. The format is intentionally
87    /// machine-readable so downstream eval pipelines can grep it.
88    fn emit_diagnose(&self) {
89        let outcome = if self.passed { "ok" } else { "FAIL" };
90        eprintln!(
91            "[harn test diag] {} {} setup={}ms compile={}ms execute={}ms teardown={}ms total={}ms",
92            outcome,
93            self.name,
94            self.phases.setup_ms,
95            self.phases.compile_ms,
96            self.phases.execute_ms,
97            self.phases.teardown_ms,
98            self.duration_ms,
99        );
100    }
101}
102
103#[derive(Clone, Debug)]
104pub enum TestRunEvent {
105    SuiteDiscovered {
106        total_tests: usize,
107        total_files: usize,
108        parallel: bool,
109        workers: usize,
110    },
111    LargeSequentialSuite {
112        total_tests: usize,
113        total_files: usize,
114    },
115    TestStarted {
116        name: String,
117        file: String,
118        test_index: usize,
119        total_tests: usize,
120    },
121    TestFinished(TestResult),
122}
123
124pub type TestRunProgress = Arc<dyn Fn(TestRunEvent) + Send + Sync>;
125
126const LARGE_SEQUENTIAL_TEST_THRESHOLD: usize = 50;
127const LARGE_SEQUENTIAL_FILE_THRESHOLD: usize = 10;
128const DEFAULT_PARALLEL_JOBS_CAP: usize = 8;
129const TIMINGS_CACHE_RELATIVE_PATH: &str = ".harn/test-timings.json";
130const HARN_TEST_JOBS_ENV: &str = "HARN_TEST_JOBS";
131
132/// Per-worker memory budget (MiB) used to cap *auto-detected* parallelism on
133/// memory-constrained or oversubscribed hosts. Overridable via
134/// `HARN_TEST_WORKER_MEMORY_MB`. A worker runs a full VM and may drive nested
135/// agent loops, so this is a deliberately conservative estimate. The cap only
136/// ever *lowers* the core-based default — it never raises it, and an explicit
137/// `--jobs` / `HARN_TEST_JOBS` always wins.
138const DEFAULT_WORKER_MEMORY_MB: u64 = 1024;
139const HARN_TEST_WORKER_MEMORY_MB_ENV: &str = "HARN_TEST_WORKER_MEMORY_MB";
140
141/// Memory (MiB) held back for the OS, the CI runner agent, and any co-tenant
142/// job, so an auto-sized suite cannot consume the last scrap of RAM and starve
143/// the runner's heartbeat. This is the failure mode behind the self-hosted
144/// "The operation was canceled" runner-loss cancellations: two runner agents
145/// share one box, two heavy jobs overcommit RAM + swap, and the kernel never
146/// fires the OOM-killer — instead a starved runner agent stops phoning home
147/// and the control plane declares the job lost.
148const RESERVED_SYSTEM_MEMORY_MB: u64 = 1024;
149
150/// Options that shape how a user-test suite is discovered and executed.
151///
152/// Held separately from the positional path so call sites (one-shot run,
153/// `--watch`, persona doctor) can share the same scheduler without
154/// keyword-argument explosion at the call sites.
155#[derive(Clone, Default)]
156pub struct RunOptions {
157    pub filter: Option<String>,
158    pub timeout_ms: u64,
159    /// When false, the scheduler runs with a single worker, preserving the
160    /// historical "everything sequential" semantics that `harn test`
161    /// defaulted to before `--parallel` was introduced.
162    pub parallel: bool,
163    /// Explicit worker limit (`-j`/`--jobs`). `None` defaults to the
164    /// available parallelism, capped by a small constant when running in
165    /// parallel mode. Ignored when `parallel = false`.
166    pub jobs: Option<usize>,
167    pub cli_skill_dirs: Vec<PathBuf>,
168    /// Optional progress callback. When set, the runner emits events as
169    /// the suite progresses; consumers (CLI, dev mode) render output.
170    pub progress: Option<TestRunProgress>,
171    /// Emit per-test phase timings (setup / compile / execute /
172    /// teardown) to stderr. Also honored via `HARN_TEST_DIAGNOSE=1` so
173    /// users can flip the flag without restarting their shell.
174    pub diagnose: bool,
175}
176
177impl RunOptions {
178    pub fn new(timeout_ms: u64) -> Self {
179        Self {
180            timeout_ms,
181            ..Default::default()
182        }
183    }
184}
185
186/// A single executable test discovered during scan. Workers compile and
187/// run each case in isolation; the parsed program is shared by `Arc` so
188/// large suites parse exactly once.
189#[derive(Clone)]
190struct TestCase {
191    file: PathBuf,
192    name: String,
193    source: Arc<String>,
194    program: Arc<Vec<SNode>>,
195    /// Optional serial group — tests with the same group never run
196    /// concurrently with each other, even if workers are idle. Used for
197    /// shared fixtures.
198    serial_group: Option<String>,
199    /// Number of workers this test reserves while running. Capped at the
200    /// pool size during discovery so heavy tests still get scheduled.
201    weight: usize,
202}
203
204fn canonicalize_existing_path(path: &Path) -> PathBuf {
205    path.canonicalize().unwrap_or_else(|_| path.to_path_buf())
206}
207
208fn test_execution_cwd() -> PathBuf {
209    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
210}
211
212fn emit_progress(progress: &Option<TestRunProgress>, event: TestRunEvent) {
213    if let Some(callback) = progress {
214        callback(event);
215    }
216}
217
218fn should_warn_large_sequential_suite(total_tests: usize, total_files: usize) -> bool {
219    total_tests >= LARGE_SEQUENTIAL_TEST_THRESHOLD || total_files >= LARGE_SEQUENTIAL_FILE_THRESHOLD
220}
221
222/// Discover and run tests in a file or directory.
223pub async fn run_tests(
224    path: &Path,
225    filter: Option<&str>,
226    timeout_ms: u64,
227    parallel: bool,
228    cli_skill_dirs: &[PathBuf],
229) -> TestSummary {
230    let options = RunOptions {
231        filter: filter.map(str::to_owned),
232        timeout_ms,
233        parallel,
234        jobs: None,
235        cli_skill_dirs: cli_skill_dirs.to_vec(),
236        progress: None,
237        diagnose: diagnose_enabled_via_env(),
238    };
239    run_tests_with_options(path, &options).await
240}
241
242/// Backwards-compatible progress-emitting entry point.
243pub async fn run_tests_with_progress(
244    path: &Path,
245    filter: Option<&str>,
246    timeout_ms: u64,
247    parallel: bool,
248    cli_skill_dirs: &[PathBuf],
249    progress: Option<TestRunProgress>,
250) -> TestSummary {
251    let options = RunOptions {
252        filter: filter.map(str::to_owned),
253        timeout_ms,
254        parallel,
255        jobs: None,
256        cli_skill_dirs: cli_skill_dirs.to_vec(),
257        progress,
258        diagnose: diagnose_enabled_via_env(),
259    };
260    run_tests_with_options(path, &options).await
261}
262
263fn diagnose_enabled_via_env() -> bool {
264    let Ok(raw) = std::env::var("HARN_TEST_DIAGNOSE") else {
265        return false;
266    };
267    matches!(
268        raw.to_ascii_lowercase().as_str(),
269        "1" | "true" | "yes" | "on"
270    )
271}
272
273/// Run tests with full control over scheduling, worker count, and
274/// progress reporting. Workers and scheduling mode are reported via
275/// `TestRunEvent::SuiteDiscovered` so consumers can render their own
276/// banner instead of the runner printing to stdout directly.
277pub async fn run_tests_with_options(path: &Path, options: &RunOptions) -> TestSummary {
278    // Default LLM provider to "mock" in test mode unless caller overrides.
279    let _default_llm_provider = ScopedEnvVar::set_if_unset("HARN_LLM_PROVIDER", "mock");
280    let _disable_llm_calls = ScopedEnvVar::set(harn_vm::llm::LLM_CALLS_DISABLED_ENV, "1");
281
282    let start = Instant::now();
283
284    let collection_start = Instant::now();
285    let canonical_target = canonicalize_existing_path(path);
286    let files = if canonical_target.is_dir() {
287        discover_test_files(&canonical_target)
288    } else {
289        vec![canonical_target.clone()]
290    };
291
292    let workers = resolve_workers(options);
293    let timings_path = timings_cache_path(&canonical_target);
294    let timings = timings_path
295        .as_deref()
296        .map(load_timings_cache)
297        .unwrap_or_default();
298
299    let discovery = discover_test_cases(&files, options.filter.as_deref(), workers);
300    let collection_ms = collection_start.elapsed().as_millis() as u64;
301
302    emit_progress(
303        &options.progress,
304        TestRunEvent::SuiteDiscovered {
305            total_tests: discovery.cases.len(),
306            total_files: discovery.files_with_tests,
307            parallel: options.parallel,
308            workers,
309        },
310    );
311    if workers == 1
312        && should_warn_large_sequential_suite(discovery.cases.len(), discovery.files_with_tests)
313    {
314        emit_progress(
315            &options.progress,
316            TestRunEvent::LargeSequentialSuite {
317                total_tests: discovery.cases.len(),
318                total_files: discovery.files_with_tests,
319            },
320        );
321    }
322
323    let mut cases = discovery.cases;
324    sort_cases_longest_first(&mut cases, &timings);
325
326    let mut all_results = discovery.discovery_errors;
327    let total_tests = cases.len();
328    all_results.extend(execute_cases(cases, workers, options, total_tests).await);
329
330    let total = all_results.len();
331    let passed = all_results.iter().filter(|r| r.passed).count();
332    let failed = total - passed;
333    let aggregate = AggregateTimings::from_results(collection_ms, &all_results);
334
335    if let Some(path) = timings_path.as_deref() {
336        update_timings_cache(path, timings, &all_results);
337    }
338
339    TestSummary {
340        results: all_results,
341        passed,
342        failed,
343        total,
344        duration_ms: start.elapsed().as_millis() as u64,
345        aggregate,
346    }
347}
348
349/// Backwards-compatible single-file API used by `harn dev`.
350///
351/// Runs every test in one file on the current thread. The new scheduler
352/// uses per-test worker threads, but `harn dev` re-runs a single module
353/// in the foreground after each rebuild — the queueing machinery would
354/// add latency without parallelism to gain back, so we keep this path
355/// minimal.
356pub async fn run_test_file(
357    path: &Path,
358    filter: Option<&str>,
359    timeout_ms: u64,
360    execution_cwd: Option<&Path>,
361    cli_skill_dirs: &[PathBuf],
362) -> Result<Vec<TestResult>, String> {
363    let source =
364        fs::read_to_string(path).map_err(|e| format!("Failed to read {}: {e}", path.display()))?;
365    let program = parse_program(&source)?;
366    let source = Arc::new(source);
367    let program = Arc::new(program);
368
369    let cases = extract_cases_from_program(path, &source, &program, filter, usize::MAX);
370
371    let mut results = Vec::with_capacity(cases.len());
372    let execution_cwd = execution_cwd
373        .map(Path::to_path_buf)
374        .unwrap_or_else(test_execution_cwd);
375    for case in cases {
376        results.push(execute_case(&case, &execution_cwd, timeout_ms, cli_skill_dirs).await);
377    }
378    Ok(results)
379}
380
381fn resolve_workers(options: &RunOptions) -> usize {
382    if !options.parallel {
383        return 1;
384    }
385    if let Some(jobs) = options.jobs {
386        return jobs.max(1);
387    }
388    if let Ok(raw) = std::env::var(HARN_TEST_JOBS_ENV) {
389        if let Ok(parsed) = raw.trim().parse::<usize>() {
390            if parsed >= 1 {
391                return parsed;
392            }
393        }
394    }
395    let detected = thread::available_parallelism()
396        .map(|n| n.get())
397        .unwrap_or(1);
398    let core_cap = detected.clamp(1, DEFAULT_PARALLEL_JOBS_CAP);
399    apply_memory_cap(core_cap)
400}
401
402/// Lower `core_cap` to what currently-available system memory can hold, so an
403/// auto-sized parallel suite backs off on a loaded or small host instead of
404/// overcommitting RAM. Returns `core_cap` unchanged when memory is plentiful
405/// or cannot be measured. Emits a one-line notice when the cap bites so CI
406/// logs explain the reduced parallelism.
407fn apply_memory_cap(core_cap: usize) -> usize {
408    let Some(available_mb) = available_memory_mb() else {
409        return core_cap;
410    };
411    let budget = per_worker_memory_mb();
412    let mem_cap = memory_worker_cap(available_mb, budget, RESERVED_SYSTEM_MEMORY_MB);
413    if mem_cap < core_cap {
414        eprintln!(
415            "harn test: capping workers {core_cap} -> {mem_cap} \
416             (~{available_mb} MiB available, {budget} MiB/worker; \
417             override with --jobs / HARN_TEST_JOBS)"
418        );
419        return mem_cap;
420    }
421    core_cap
422}
423
424/// Pure worker-count-from-memory math, factored out so it is unit-testable
425/// without touching the host. Always yields at least one worker.
426fn memory_worker_cap(available_mb: u64, per_worker_mb: u64, reserved_mb: u64) -> usize {
427    let usable = available_mb.saturating_sub(reserved_mb);
428    let per_worker = per_worker_mb.max(1);
429    ((usable / per_worker).max(1)) as usize
430}
431
432/// Per-worker memory budget, honoring the `HARN_TEST_WORKER_MEMORY_MB`
433/// override (values `>= 1`), else [`DEFAULT_WORKER_MEMORY_MB`].
434fn per_worker_memory_mb() -> u64 {
435    std::env::var(HARN_TEST_WORKER_MEMORY_MB_ENV)
436        .ok()
437        .and_then(|raw| raw.trim().parse::<u64>().ok())
438        .filter(|&n| n >= 1)
439        .unwrap_or(DEFAULT_WORKER_MEMORY_MB)
440}
441
442/// Best-effort "memory available for new work" in MiB: the lesser of the
443/// host's available memory and (on Linux) this process's cgroup-v2 headroom.
444///
445/// Host memory comes from `sysinfo`, so it is correct on Linux, macOS, and
446/// Windows. The cgroup min means a container or a systemd-sliced CI runner
447/// sizes to its *slice* rather than the whole host — the key to stopping two
448/// runner agents on one box from each sizing to ~100% and collectively
449/// overcommitting RAM (the "thundering herd" behind the self-hosted
450/// runner-loss cancellations). Returns `None` when nothing can be measured,
451/// leaving the core-based cap in force.
452fn available_memory_mb() -> Option<u64> {
453    let mut sys = sysinfo::System::new();
454    sys.refresh_memory();
455    let host_mb = match sys.available_memory() {
456        0 => None, // unsupported / detection failed — don't over-throttle
457        bytes => Some(bytes / (1024 * 1024)),
458    };
459    match (host_mb, cgroup_v2_headroom_mb()) {
460        (Some(h), Some(c)) => Some(h.min(c)),
461        (Some(h), None) => Some(h),
462        (None, c) => c,
463    }
464}
465
466/// cgroup-v2 memory headroom (MiB) for this process's own cgroup, or `None`
467/// when not on cgroup v2, no limit is set, or the files cannot be read.
468#[cfg(target_os = "linux")]
469fn cgroup_v2_headroom_mb() -> Option<u64> {
470    let dir = own_cgroup_v2_dir()?;
471    let max_raw = fs::read_to_string(dir.join("memory.max")).ok()?;
472    let current_raw = fs::read_to_string(dir.join("memory.current")).ok()?;
473    cgroup_headroom_mb(&max_raw, &current_raw)
474}
475
476#[cfg(not(target_os = "linux"))]
477fn cgroup_v2_headroom_mb() -> Option<u64> {
478    None
479}
480
481/// Resolve this process's own cgroup-v2 directory under `/sys/fs/cgroup` from
482/// the unified-hierarchy line (`0::<path>`) in `/proc/self/cgroup`. A limit
483/// set directly on a systemd service slice or on a container's namespaced
484/// root lives here; ancestor-only limits are not chased (the host min still
485/// backstops those). `None` on cgroup v1 / hybrid (no `0::` line).
486#[cfg(target_os = "linux")]
487fn own_cgroup_v2_dir() -> Option<PathBuf> {
488    let content = fs::read_to_string("/proc/self/cgroup").ok()?;
489    let rel = content
490        .lines()
491        .find_map(|line| line.strip_prefix("0::"))?
492        .trim();
493    let rel = rel.strip_prefix('/').unwrap_or(rel);
494    Some(Path::new("/sys/fs/cgroup").join(rel))
495}
496
497/// Pure headroom math from raw `memory.max` / `memory.current` file contents
498/// (both bytes; `memory.max` may be the literal `"max"` sentinel = unlimited).
499/// `memory.current` counts reclaimable page cache, so the result is a
500/// conservative (under-)estimate of true headroom — the safe direction for
501/// OOM avoidance.
502#[cfg(any(target_os = "linux", test))]
503fn cgroup_headroom_mb(memory_max: &str, memory_current: &str) -> Option<u64> {
504    let max = memory_max.trim();
505    if max == "max" {
506        return None;
507    }
508    let max: u64 = max.parse().ok()?;
509    let current: u64 = memory_current.trim().parse().ok()?;
510    Some(max.saturating_sub(current) / (1024 * 1024))
511}
512
513struct Discovery {
514    cases: Vec<TestCase>,
515    files_with_tests: usize,
516    discovery_errors: Vec<TestResult>,
517}
518
519fn discover_test_cases(files: &[PathBuf], filter: Option<&str>, workers: usize) -> Discovery {
520    let mut cases = Vec::new();
521    let mut files_with_tests = 0usize;
522    let mut discovery_errors = Vec::new();
523
524    for file in files {
525        let source = match fs::read_to_string(file) {
526            Ok(s) => s,
527            Err(e) => {
528                discovery_errors.push(TestResult {
529                    name: "<file error>".to_string(),
530                    file: file.display().to_string(),
531                    passed: false,
532                    error: Some(format!("Failed to read {}: {e}", file.display())),
533                    duration_ms: 0,
534                    phases: PhaseTimings::default(),
535                });
536                continue;
537            }
538        };
539
540        let program = match parse_program(&source) {
541            Ok(p) => p,
542            Err(e) => {
543                discovery_errors.push(TestResult {
544                    name: "<file error>".to_string(),
545                    file: file.display().to_string(),
546                    passed: false,
547                    error: Some(e),
548                    duration_ms: 0,
549                    phases: PhaseTimings::default(),
550                });
551                continue;
552            }
553        };
554
555        let source = Arc::new(source);
556        let program = Arc::new(program);
557        let file_cases = extract_cases_from_program(file, &source, &program, filter, workers);
558        if !file_cases.is_empty() {
559            files_with_tests += 1;
560            cases.extend(file_cases);
561        }
562    }
563
564    Discovery {
565        cases,
566        files_with_tests,
567        discovery_errors,
568    }
569}
570
571fn parse_program(source: &str) -> Result<Vec<SNode>, String> {
572    let mut lexer = Lexer::new(source);
573    let tokens = lexer.tokenize().map_err(|e| format!("{e}"))?;
574    let mut parser = Parser::new(tokens);
575    parser.parse().map_err(|e| format!("{e}"))
576}
577
578fn extract_cases_from_program(
579    file: &Path,
580    source: &Arc<String>,
581    program: &Arc<Vec<SNode>>,
582    filter: Option<&str>,
583    workers: usize,
584) -> Vec<TestCase> {
585    let mut cases = Vec::new();
586    for snode in program.iter() {
587        let Some(meta) = inspect_test_pipeline(snode) else {
588            continue;
589        };
590        if let Some(pattern) = filter {
591            if !meta.name.contains(pattern) {
592                continue;
593            }
594        }
595        // Cap heavy weight so a single annotated test never deadlocks
596        // when the pool is smaller than the requested concurrency.
597        let weight = meta.weight.min(workers).max(1);
598        cases.push(TestCase {
599            file: file.to_path_buf(),
600            name: meta.name,
601            source: Arc::clone(source),
602            program: Arc::clone(program),
603            serial_group: meta.serial_group,
604            weight,
605        });
606    }
607    cases
608}
609
610struct PipelineMeta {
611    name: String,
612    serial_group: Option<String>,
613    weight: usize,
614}
615
616fn inspect_test_pipeline(snode: &SNode) -> Option<PipelineMeta> {
617    // Pipelines marked `@test`, or named `test_*`, are user tests. The
618    // companion attributes `@serial` and `@heavy` only tune the scheduler
619    // and never make a non-test pipeline discoverable on their own.
620    let (attributes, inner) = match &snode.node {
621        Node::AttributedDecl { attributes, inner } => (attributes.as_slice(), inner.as_ref()),
622        _ => (&[][..], snode),
623    };
624    let name = match &inner.node {
625        Node::Pipeline { name, .. } => name.clone(),
626        _ => return None,
627    };
628    let has_test_attr = attributes.iter().any(|a| a.name == "test");
629    if !has_test_attr && !name.starts_with("test_") {
630        return None;
631    }
632    let serial_group = attributes
633        .iter()
634        .find(|a| a.name == "serial")
635        .map(serial_group_for);
636    let weight = attributes
637        .iter()
638        .find(|a| a.name == "heavy")
639        .and_then(heavy_weight_for)
640        .unwrap_or(1);
641    Some(PipelineMeta {
642        name,
643        serial_group,
644        weight,
645    })
646}
647
648fn serial_group_for(attr: &Attribute) -> String {
649    attr.string_arg("group")
650        .unwrap_or_else(|| "__default__".to_string())
651}
652
653fn heavy_weight_for(attr: &Attribute) -> Option<usize> {
654    attr.args
655        .iter()
656        .find(|a| a.name.as_deref() == Some("threads"))
657        .and_then(|a| match &a.value.node {
658            Node::IntLiteral(n) if *n >= 1 => Some(*n as usize),
659            _ => None,
660        })
661}
662
663fn sort_cases_longest_first(cases: &mut [TestCase], timings: &BTreeMap<String, u64>) {
664    // Sort ascending so the slowest tests sit at the tail and get popped
665    // first by workers. New (unmeasured) tests share the bottom of the
666    // queue alongside the fastest known ones — they'll appear in stable
667    // file/name order, and once they get their first timing they'll
668    // float up to where they belong.
669    cases.sort_by(|a, b| {
670        let key_a = timings_key(&a.file, &a.name);
671        let key_b = timings_key(&b.file, &b.name);
672        let dur_a = timings.get(&key_a).copied().unwrap_or(0);
673        let dur_b = timings.get(&key_b).copied().unwrap_or(0);
674        dur_a
675            .cmp(&dur_b)
676            .then_with(|| a.file.cmp(&b.file))
677            .then_with(|| a.name.cmp(&b.name))
678    });
679}
680
681fn timings_key(file: &Path, name: &str) -> String {
682    format!("{}::{}", file.display(), name)
683}
684
685fn timings_cache_path(target: &Path) -> Option<PathBuf> {
686    // Anchor the cache at the project root if discoverable, otherwise at
687    // the directory the suite was launched from. The cache is shared
688    // across runs in the same workspace, so a per-suite cache would
689    // fragment timings whenever a user runs a subset.
690    let probe_root = if target.is_dir() {
691        target.to_path_buf()
692    } else {
693        target.parent()?.to_path_buf()
694    };
695    let root = harn_vm::stdlib::process::find_project_root(&probe_root)
696        .unwrap_or_else(|| probe_root.clone());
697    Some(root.join(TIMINGS_CACHE_RELATIVE_PATH))
698}
699
700fn load_timings_cache(path: &Path) -> BTreeMap<String, u64> {
701    let Ok(contents) = fs::read_to_string(path) else {
702        return BTreeMap::new();
703    };
704    serde_json::from_str::<BTreeMap<String, u64>>(&contents).unwrap_or_default()
705}
706
707fn update_timings_cache(path: &Path, mut existing: BTreeMap<String, u64>, results: &[TestResult]) {
708    for result in results {
709        if result.name == "<file error>" || result.name == "<join error>" {
710            continue;
711        }
712        existing.insert(
713            timings_key(Path::new(&result.file), &result.name),
714            result.duration_ms,
715        );
716    }
717    if let Some(parent) = path.parent() {
718        let _ = fs::create_dir_all(parent);
719    }
720    if let Ok(serialized) = serde_json::to_string(&existing) {
721        let _ = fs::write(path, serialized);
722    }
723}
724
725async fn execute_cases(
726    cases: Vec<TestCase>,
727    workers: usize,
728    options: &RunOptions,
729    total_tests: usize,
730) -> Vec<TestResult> {
731    if cases.is_empty() {
732        return Vec::new();
733    }
734    let completed = Arc::new(Mutex::new(0usize));
735    if workers <= 1 {
736        let mut results = Vec::with_capacity(cases.len());
737        for case in cases {
738            let cwd = case_execution_cwd(&case);
739            let test_index = next_test_index(&completed);
740            emit_progress(
741                &options.progress,
742                TestRunEvent::TestStarted {
743                    name: case.name.clone(),
744                    file: case.file.display().to_string(),
745                    test_index,
746                    total_tests,
747                },
748            );
749            let result =
750                execute_case(&case, &cwd, options.timeout_ms, &options.cli_skill_dirs).await;
751            if options.diagnose {
752                result.emit_diagnose();
753            }
754            emit_progress(
755                &options.progress,
756                TestRunEvent::TestFinished(result.clone()),
757            );
758            results.push(result);
759        }
760        return results;
761    }
762
763    let queue = Arc::new(Mutex::new(cases));
764    let gate = Arc::new(ResourceGate::new(workers));
765    let results: Arc<Mutex<Vec<TestResult>>> = Arc::new(Mutex::new(Vec::new()));
766
767    let mut handles = Vec::with_capacity(workers);
768    for worker_idx in 0..workers {
769        let queue = Arc::clone(&queue);
770        let gate = Arc::clone(&gate);
771        let results = Arc::clone(&results);
772        let completed = Arc::clone(&completed);
773        let timeout_ms = options.timeout_ms;
774        let cli_skill_dirs = options.cli_skill_dirs.clone();
775        let progress = options.progress.clone();
776        let diagnose = options.diagnose;
777        let handle = thread::Builder::new()
778            .name(format!("harn-test-worker-{worker_idx}"))
779            .spawn(move || {
780                let runtime = match tokio::runtime::Builder::new_current_thread()
781                    .enable_all()
782                    .build()
783                {
784                    Ok(rt) => rt,
785                    Err(error) => {
786                        results.lock().unwrap().push(TestResult {
787                            name: "<worker error>".to_string(),
788                            file: String::new(),
789                            passed: false,
790                            error: Some(format!("failed to start test runtime: {error}")),
791                            duration_ms: 0,
792                            phases: PhaseTimings::default(),
793                        });
794                        return;
795                    }
796                };
797                // Cases are sorted ascending by historical duration; popping
798                // from the tail gives this worker the slowest unclaimed
799                // test, which front-loads long poles and prevents workers
800                // from stranding on quick tests at the end of the run.
801                while let Some(case) = queue.lock().unwrap().pop() {
802                    let _guard = gate.acquire(case.weight, case.serial_group.as_deref());
803                    let cwd = case_execution_cwd(&case);
804                    let test_index = next_test_index(&completed);
805                    emit_progress(
806                        &progress,
807                        TestRunEvent::TestStarted {
808                            name: case.name.clone(),
809                            file: case.file.display().to_string(),
810                            test_index,
811                            total_tests,
812                        },
813                    );
814                    let result =
815                        runtime.block_on(execute_case(&case, &cwd, timeout_ms, &cli_skill_dirs));
816                    if diagnose {
817                        result.emit_diagnose();
818                    }
819                    emit_progress(&progress, TestRunEvent::TestFinished(result.clone()));
820                    results.lock().unwrap().push(result);
821                }
822            })
823            .expect("spawning a harn-test worker thread should succeed");
824        handles.push(handle);
825    }
826
827    for handle in handles {
828        let _ = handle.join();
829    }
830
831    // All workers have joined, so this Arc holds the only remaining
832    // reference. The lock-and-clone fallback survives the unlikely case
833    // where a panic-unwind kept an extra reference alive.
834    Arc::try_unwrap(results)
835        .map(|m| m.into_inner().unwrap_or_default())
836        .unwrap_or_else(|arc| arc.lock().unwrap().clone())
837}
838
839fn next_test_index(counter: &Mutex<usize>) -> usize {
840    let mut guard = counter.lock().unwrap();
841    *guard += 1;
842    *guard
843}
844
845fn case_execution_cwd(case: &TestCase) -> PathBuf {
846    case.file
847        .parent()
848        .filter(|p| !p.as_os_str().is_empty())
849        .map(Path::to_path_buf)
850        .unwrap_or_else(test_execution_cwd)
851}
852
853/// Coordinates worker permits and serial-group exclusivity without
854/// requiring an async lock — workers are dedicated OS threads, so a
855/// classic Mutex+Condvar gate keeps everything off the tokio scheduler.
856struct ResourceGate {
857    state: Mutex<GateState>,
858    cond: Condvar,
859    capacity: usize,
860}
861
862struct GateState {
863    available: usize,
864    busy_groups: HashSet<String>,
865}
866
867struct GateGuard<'a> {
868    gate: &'a ResourceGate,
869    weight: usize,
870    group: Option<String>,
871}
872
873impl ResourceGate {
874    fn new(capacity: usize) -> Self {
875        Self {
876            state: Mutex::new(GateState {
877                available: capacity,
878                busy_groups: HashSet::new(),
879            }),
880            cond: Condvar::new(),
881            capacity,
882        }
883    }
884
885    fn acquire(&self, weight: usize, group: Option<&str>) -> GateGuard<'_> {
886        let weight = weight.min(self.capacity).max(1);
887        let mut state = self.state.lock().unwrap();
888        loop {
889            let group_free = group.is_none_or(|g| !state.busy_groups.contains(g));
890            if state.available >= weight && group_free {
891                state.available -= weight;
892                if let Some(g) = group {
893                    state.busy_groups.insert(g.to_string());
894                }
895                return GateGuard {
896                    gate: self,
897                    weight,
898                    group: group.map(str::to_owned),
899                };
900            }
901            state = self.cond.wait(state).unwrap();
902        }
903    }
904}
905
906impl Drop for GateGuard<'_> {
907    fn drop(&mut self) {
908        let mut state = self.gate.state.lock().unwrap();
909        state.available += self.weight;
910        if let Some(group) = self.group.as_deref() {
911            state.busy_groups.remove(group);
912        }
913        self.gate.cond.notify_all();
914    }
915}
916
917async fn execute_case(
918    case: &TestCase,
919    execution_cwd: &Path,
920    timeout_ms: u64,
921    cli_skill_dirs: &[PathBuf],
922) -> TestResult {
923    harn_vm::reset_thread_local_state();
924
925    let mut phases = PhaseTimings::default();
926    let total_start = Instant::now();
927
928    let compile_start = Instant::now();
929    let chunk = match harn_vm::Compiler::new().compile_named(&case.program, &case.name) {
930        Ok(c) => c,
931        Err(e) => {
932            phases.compile_ms = compile_start.elapsed().as_millis() as u64;
933            return TestResult {
934                name: case.name.clone(),
935                file: case.file.display().to_string(),
936                passed: false,
937                error: Some(format!("Compile error: {e}")),
938                duration_ms: total_start.elapsed().as_millis() as u64,
939                phases,
940            };
941        }
942    };
943    phases.compile_ms = compile_start.elapsed().as_millis() as u64;
944
945    let local = tokio::task::LocalSet::new();
946    let timeout = std::time::Duration::from_millis(timeout_ms);
947    let file_display = case.file.display().to_string();
948    let setup_start = Instant::now();
949    let result = tokio::time::timeout(
950        timeout,
951        local.run_until(async {
952            let mut vm = harn_vm::Vm::new();
953            harn_vm::register_vm_stdlib(&mut vm);
954            crate::install_default_hostlib(&mut vm);
955            let source_parent = case.file.parent().unwrap_or(Path::new("."));
956            let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
957            let store_base = project_root.as_deref().unwrap_or(source_parent);
958            let source_dir = source_parent.to_string_lossy().into_owned();
959            harn_vm::register_store_builtins(&mut vm, store_base);
960            harn_vm::register_metadata_builtins(&mut vm, store_base);
961            let pipeline_name = case
962                .file
963                .file_stem()
964                .and_then(|s| s.to_str())
965                .unwrap_or("test");
966            harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
967            vm.set_source_info(&file_display, &case.source);
968            harn_vm::stdlib::process::set_thread_execution_context(Some(
969                harn_vm::orchestration::RunExecutionRecord {
970                    cwd: Some(execution_cwd.to_string_lossy().into_owned()),
971                    source_dir: Some(source_dir),
972                    env: BTreeMap::new(),
973                    adapter: None,
974                    repo_path: None,
975                    worktree_path: None,
976                    branch: None,
977                    base_ref: None,
978                    cleanup: None,
979                },
980            ));
981            if let Some(ref root) = project_root {
982                vm.set_project_root(root);
983            }
984            if let Some(parent) = case.file.parent() {
985                if !parent.as_os_str().is_empty() {
986                    vm.set_source_dir(parent);
987                }
988            }
989            let loaded =
990                crate::skill_loader::load_skills(&crate::skill_loader::SkillLoaderInputs {
991                    cli_dirs: cli_skill_dirs.to_vec(),
992                    source_path: Some(case.file.clone()),
993                });
994            crate::skill_loader::emit_loader_warnings(&loaded.loader_warnings);
995            crate::skill_loader::install_skills_global(&mut vm, &loaded);
996            let extensions = crate::package::load_runtime_extensions(&case.file);
997            crate::package::install_runtime_extensions(&extensions);
998            crate::package::install_manifest_triggers(&mut vm, &extensions)
999                .await
1000                .map_err(|error| format!("failed to install manifest triggers: {error}"))?;
1001            crate::package::install_manifest_hooks(&mut vm, &extensions)
1002                .await
1003                .map_err(|error| format!("failed to install manifest hooks: {error}"))?;
1004            vm.set_harness(harn_vm::Harness::real());
1005            let setup_ms = setup_start.elapsed().as_millis() as u64;
1006            let exec_start = Instant::now();
1007            let outcome = match vm.execute(&chunk).await {
1008                Ok(val) => Ok(val),
1009                Err(e) => Err(vm.format_runtime_error(&e)),
1010            };
1011            let execute_ms = exec_start.elapsed().as_millis() as u64;
1012            harn_vm::egress::reset_egress_policy_for_host();
1013            Ok::<_, String>((outcome, setup_ms, execute_ms))
1014        }),
1015    )
1016    .await;
1017
1018    let teardown_start = Instant::now();
1019    // Clear thread-locals so the next case scheduled onto this worker
1020    // sees a clean slate. Wall clock for this work lands in the
1021    // teardown bucket so the phase breakdown sums to wall time.
1022    harn_vm::reset_thread_local_state();
1023    phases.teardown_ms = teardown_start.elapsed().as_millis() as u64;
1024
1025    let elapsed_ms = total_start.elapsed().as_millis() as u64;
1026    let (passed, error, duration_ms) = match result {
1027        Ok(Ok((outcome, setup_ms, execute_ms))) => {
1028            phases.setup_ms = setup_ms;
1029            phases.execute_ms = execute_ms;
1030            match outcome {
1031                Ok(_) => (true, None, elapsed_ms),
1032                Err(message) => (false, Some(message), elapsed_ms),
1033            }
1034        }
1035        Ok(Err(setup_error)) => (false, Some(setup_error), elapsed_ms),
1036        // Report `timeout_ms` rather than `elapsed_ms` for timeouts so a
1037        // suite-wide aggregate still reflects the configured budget that
1038        // was hit, not the slightly-earlier moment we tore down at.
1039        Err(_) => (
1040            false,
1041            Some(format!("timed out after {timeout_ms}ms")),
1042            timeout_ms,
1043        ),
1044    };
1045
1046    TestResult {
1047        name: case.name.clone(),
1048        file: file_display,
1049        passed,
1050        error,
1051        duration_ms,
1052        phases,
1053    }
1054}
1055
1056fn discover_test_files(dir: &Path) -> Vec<PathBuf> {
1057    let mut files = Vec::new();
1058    if let Ok(entries) = fs::read_dir(dir) {
1059        for entry in entries.flatten() {
1060            let path = entry.path();
1061            if path.is_dir() {
1062                files.extend(discover_test_files(&path));
1063            } else if path.extension().is_some_and(|e| e == "harn") {
1064                if let Ok(content) = fs::read_to_string(&path) {
1065                    if content.contains("test_") || content.contains("@test") {
1066                        files.push(canonicalize_existing_path(&path));
1067                    }
1068                }
1069            }
1070        }
1071    }
1072    files.sort();
1073    files
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079
1080    use std::sync::Arc;
1081    use std::time::Duration;
1082
1083    struct TempTestDir {
1084        inner: tempfile::TempDir,
1085    }
1086
1087    impl TempTestDir {
1088        fn new() -> Self {
1089            Self {
1090                inner: tempfile::tempdir().unwrap(),
1091            }
1092        }
1093
1094        fn write(&self, relative: &str, contents: &str) {
1095            let path = self.path().join(relative);
1096            if let Some(parent) = path.parent() {
1097                fs::create_dir_all(parent).unwrap();
1098            }
1099            fs::write(path, contents).unwrap();
1100        }
1101
1102        fn path(&self) -> &Path {
1103            self.inner.path()
1104        }
1105    }
1106
1107    #[test]
1108    fn discover_test_files_returns_canonical_absolute_paths() {
1109        let temp = TempTestDir::new();
1110        temp.write("suite/test_alpha.harn", "pipeline test_alpha(task) {}");
1111        temp.write("suite/nested/test_beta.harn", "pipeline test_beta(task) {}");
1112        temp.write("suite/annotated.harn", "@test\npipeline annotated(task) {}");
1113        temp.write("suite/ignore.harn", "pipeline build(task) {}");
1114
1115        let files = discover_test_files(&temp.path().join("suite"));
1116
1117        assert_eq!(files.len(), 3);
1118        assert!(files.iter().all(|path| path.is_absolute()));
1119        assert!(files
1120            .iter()
1121            .any(|path| path.ends_with("suite/test_alpha.harn")));
1122        assert!(files
1123            .iter()
1124            .any(|path| path.ends_with("suite/nested/test_beta.harn")));
1125        assert!(files
1126            .iter()
1127            .any(|path| path.ends_with("suite/annotated.harn")));
1128    }
1129
1130    #[tokio::test]
1131    async fn run_tests_uses_file_parent_as_execution_cwd_and_restores_shell_cwd() {
1132        let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1133        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1134        let temp = TempTestDir::new();
1135        temp.write(
1136            "suite/test_cwd.harn",
1137            r"
1138pipeline test_current_dir(task) {
1139  assert_eq(cwd(), source_dir())
1140}
1141",
1142        );
1143
1144        let original_cwd = std::env::current_dir().unwrap();
1145        let summary = run_tests(&temp.path().join("suite"), None, 1_000, false, &[]).await;
1146        let restored_cwd = std::env::current_dir().unwrap();
1147
1148        assert_eq!(summary.failed, 0);
1149        assert_eq!(summary.passed, 1);
1150        assert_eq!(
1151            fs::canonicalize(restored_cwd).unwrap(),
1152            fs::canonicalize(original_cwd).unwrap()
1153        );
1154    }
1155
1156    #[tokio::test]
1157    async fn parallel_run_tests_uses_each_file_parent_as_execution_cwd() {
1158        let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1159        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1160        let temp = TempTestDir::new();
1161        temp.write(
1162            "suite/a/test_one.harn",
1163            r"
1164pipeline test_one(task) {
1165  assert_eq(cwd(), source_dir())
1166}
1167",
1168        );
1169        temp.write(
1170            "suite/b/test_two.harn",
1171            r"
1172pipeline test_two(task) {
1173  assert_eq(cwd(), source_dir())
1174}
1175",
1176        );
1177
1178        let summary = run_tests(&temp.path().join("suite"), None, 1_000, true, &[]).await;
1179        assert_eq!(summary.failed, 0);
1180        assert_eq!(summary.passed, 2);
1181    }
1182
1183    #[tokio::test]
1184    async fn run_tests_loads_cli_skill_dirs() {
1185        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1186        let temp = TempTestDir::new();
1187        temp.write(
1188            "skills/review/SKILL.md",
1189            r"---
1190name: review
1191short: Review PRs
1192description: Review pull requests
1193---
1194
1195Review instructions.
1196",
1197        );
1198        temp.write(
1199            "suite/test_skills.harn",
1200            r#"
1201pipeline test_cli_skills(task) {
1202  assert_eq(skill_count(skills), 1)
1203  let found = skill_find(skills, "review")
1204  assert_eq(found.name, "review")
1205}
1206"#,
1207        );
1208
1209        let summary = run_tests(
1210            &temp.path().join("suite"),
1211            None,
1212            1_000,
1213            false,
1214            &[temp.path().join("skills")],
1215        )
1216        .await;
1217
1218        assert_eq!(summary.failed, 0, "{:?}", summary.results[0].error);
1219        assert_eq!(summary.passed, 1);
1220    }
1221
1222    #[test]
1223    fn resolve_workers_honors_explicit_jobs() {
1224        let mut opts = RunOptions::new(1_000);
1225        opts.parallel = true;
1226        opts.jobs = Some(3);
1227        assert_eq!(resolve_workers(&opts), 3);
1228    }
1229
1230    #[test]
1231    fn resolve_workers_returns_one_when_not_parallel() {
1232        let mut opts = RunOptions::new(1_000);
1233        opts.parallel = false;
1234        opts.jobs = Some(8);
1235        assert_eq!(resolve_workers(&opts), 1);
1236    }
1237
1238    #[test]
1239    fn memory_worker_cap_backs_off_under_pressure() {
1240        // ~4 GiB available, 1 GiB reserved, 1 GiB/worker -> 3 workers.
1241        assert_eq!(memory_worker_cap(4096, 1024, 1024), 3);
1242    }
1243
1244    #[test]
1245    fn memory_worker_cap_is_generous_when_memory_is_plentiful() {
1246        // A roomy box dwarfs the core cap; resolve_workers then min()s this
1247        // against DEFAULT_PARALLEL_JOBS_CAP, so the core cap stays in force.
1248        assert!(memory_worker_cap(32_768, 1024, 1024) >= DEFAULT_PARALLEL_JOBS_CAP);
1249    }
1250
1251    #[test]
1252    fn memory_worker_cap_never_starves_to_zero() {
1253        // Even when reserved >= available, at least one worker must run.
1254        assert_eq!(memory_worker_cap(512, 1024, 1024), 1);
1255    }
1256
1257    #[test]
1258    fn cgroup_headroom_unlimited_is_none() {
1259        // The `max` sentinel means no cgroup limit -> defer to host memory.
1260        assert_eq!(cgroup_headroom_mb("max\n", "1048576\n"), None);
1261    }
1262
1263    #[test]
1264    fn cgroup_headroom_computes_slice_remainder() {
1265        // 4 GiB limit, 1 GiB in use -> 3 GiB (3072 MiB) headroom.
1266        let four_gib = (4_u64 * 1024 * 1024 * 1024).to_string();
1267        let one_gib = (1024_u64 * 1024 * 1024).to_string();
1268        assert_eq!(cgroup_headroom_mb(&four_gib, &one_gib), Some(3072));
1269    }
1270
1271    #[test]
1272    fn cgroup_headroom_saturates_when_over_limit() {
1273        // A transient current > max must yield 0, not an underflow panic.
1274        assert_eq!(cgroup_headroom_mb("1024", "999999999"), Some(0));
1275    }
1276
1277    #[test]
1278    fn cgroup_headroom_rejects_garbage() {
1279        assert_eq!(cgroup_headroom_mb("not-a-number", "123"), None);
1280    }
1281
1282    #[test]
1283    fn sort_cases_longest_first_uses_historical_durations() {
1284        let source = Arc::new(String::new());
1285        let program = Arc::new(Vec::new());
1286        let mk = |name: &str| TestCase {
1287            file: PathBuf::from("tests/a.harn"),
1288            name: name.to_string(),
1289            source: Arc::clone(&source),
1290            program: Arc::clone(&program),
1291            serial_group: None,
1292            weight: 1,
1293        };
1294        let mut cases = vec![mk("test_quick"), mk("test_slow"), mk("test_medium")];
1295        let mut timings = BTreeMap::new();
1296        timings.insert("tests/a.harn::test_slow".to_string(), 5_000);
1297        timings.insert("tests/a.harn::test_medium".to_string(), 1_000);
1298
1299        sort_cases_longest_first(&mut cases, &timings);
1300
1301        // Slowest tests live at the tail so workers pop them first.
1302        let order: Vec<&str> = cases.iter().map(|c| c.name.as_str()).collect();
1303        assert_eq!(order, vec!["test_quick", "test_medium", "test_slow"]);
1304    }
1305
1306    #[test]
1307    fn resource_gate_serializes_same_group() {
1308        let gate = Arc::new(ResourceGate::new(4));
1309        let trace = Arc::new(Mutex::new(Vec::<&'static str>::new()));
1310
1311        let g_a = {
1312            let trace = Arc::clone(&trace);
1313            let gate = Arc::clone(&gate);
1314            thread::spawn(move || {
1315                let _guard = gate.acquire(1, Some("login"));
1316                trace.lock().unwrap().push("a-start");
1317                thread::sleep(Duration::from_millis(50));
1318                trace.lock().unwrap().push("a-end");
1319            })
1320        };
1321        // Give thread A time to grab the lock first.
1322        thread::sleep(Duration::from_millis(10));
1323        let g_b = {
1324            let trace = Arc::clone(&trace);
1325            let gate = Arc::clone(&gate);
1326            thread::spawn(move || {
1327                let _guard = gate.acquire(1, Some("login"));
1328                trace.lock().unwrap().push("b-start");
1329                trace.lock().unwrap().push("b-end");
1330            })
1331        };
1332        g_a.join().unwrap();
1333        g_b.join().unwrap();
1334
1335        let trace = trace.lock().unwrap();
1336        // B must not start until A ends.
1337        let a_end = trace.iter().position(|t| *t == "a-end").unwrap();
1338        let b_start = trace.iter().position(|t| *t == "b-start").unwrap();
1339        assert!(a_end < b_start, "B started before A finished: {trace:?}");
1340    }
1341
1342    #[test]
1343    fn resource_gate_allows_independent_groups_in_parallel() {
1344        let gate = Arc::new(ResourceGate::new(4));
1345        let _guard_a = gate.acquire(1, Some("alpha"));
1346        // Acquiring beta should not block — the other group is unrelated.
1347        let acquired = std::sync::Mutex::new(false);
1348        thread::scope(|s| {
1349            s.spawn(|| {
1350                let _ = gate.acquire(1, Some("beta"));
1351                *acquired.lock().unwrap() = true;
1352            });
1353        });
1354        assert!(*acquired.lock().unwrap());
1355    }
1356
1357    #[test]
1358    fn resource_gate_caps_heavy_weight_at_capacity() {
1359        // A test that asks for more than the pool size must still be
1360        // schedulable rather than deadlocking.
1361        let gate = Arc::new(ResourceGate::new(2));
1362        let _g = gate.acquire(99, None);
1363        // Available is now 0; another single-weight task must still wait.
1364        let started = Arc::new(Mutex::new(false));
1365        let inner = Arc::clone(&started);
1366        let gate2 = Arc::clone(&gate);
1367        let handle = thread::spawn(move || {
1368            let _guard = gate2.acquire(1, None);
1369            *inner.lock().unwrap() = true;
1370        });
1371        thread::sleep(Duration::from_millis(20));
1372        assert!(!*started.lock().unwrap(), "should still be waiting");
1373        drop(_g);
1374        handle.join().unwrap();
1375        assert!(*started.lock().unwrap());
1376    }
1377
1378    #[tokio::test]
1379    async fn parallel_scheduler_runs_heavy_tests_without_oversubscribing() {
1380        // Heavy(2) should never run concurrently with another test when
1381        // the pool only has two workers — there are no spare permits.
1382        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1383        let temp = TempTestDir::new();
1384        temp.write(
1385            "suite/test_heavy.harn",
1386            r"
1387@test
1388@heavy(threads: 2)
1389pipeline test_heavy_one(task) {}
1390
1391@test
1392pipeline test_light(task) {}
1393",
1394        );
1395
1396        let opts = RunOptions {
1397            parallel: true,
1398            jobs: Some(2),
1399            ..RunOptions::new(5_000)
1400        };
1401        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1402        assert_eq!(summary.failed, 0, "{:?}", summary.results);
1403        assert_eq!(summary.total, 2);
1404    }
1405
1406    #[tokio::test]
1407    async fn parallel_scheduler_handles_serial_group_annotation() {
1408        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1409        let temp = TempTestDir::new();
1410        temp.write(
1411            "suite/test_serial.harn",
1412            r#"
1413@test
1414@serial(group: "fixture")
1415pipeline test_serial_one(task) {}
1416
1417@test
1418@serial(group: "fixture")
1419pipeline test_serial_two(task) {}
1420"#,
1421        );
1422
1423        let opts = RunOptions {
1424            parallel: true,
1425            jobs: Some(4),
1426            ..RunOptions::new(5_000)
1427        };
1428        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1429        assert_eq!(summary.failed, 0, "{:?}", summary.results);
1430        assert_eq!(summary.passed, 2);
1431    }
1432
1433    #[tokio::test]
1434    async fn parallel_scheduler_persists_timings_cache() {
1435        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1436        let temp = TempTestDir::new();
1437        temp.write(
1438            "suite/test_timed.harn",
1439            r"
1440@test
1441pipeline test_first(task) {}
1442
1443@test
1444pipeline test_second(task) {}
1445",
1446        );
1447
1448        let opts = RunOptions {
1449            parallel: true,
1450            jobs: Some(2),
1451            ..RunOptions::new(5_000)
1452        };
1453        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1454        assert_eq!(summary.passed, 2);
1455        let cache = temp.path().join("suite/.harn/test-timings.json");
1456        assert!(cache.exists(), "expected timings cache at {cache:?}");
1457        let stored: BTreeMap<String, u64> =
1458            serde_json::from_str(&fs::read_to_string(&cache).unwrap()).unwrap();
1459        assert!(
1460            stored.keys().any(|key| key.contains("test_first")),
1461            "expected timings for test_first in {stored:?}"
1462        );
1463        assert!(
1464            stored.keys().any(|key| key.contains("test_second")),
1465            "expected timings for test_second in {stored:?}"
1466        );
1467    }
1468
1469    /// Regression fixture: a worker thread that runs multiple cases must
1470    /// reset thread-local state between them. Test A pins the clock
1471    /// mock to a future timestamp; test B asserts the clock is fresh.
1472    /// Fails if the per-case `reset_thread_local_state()` in
1473    /// `execute_case` ever regresses. Pins workers to 1 so both tests
1474    /// land on the same scheduler thread.
1475    #[tokio::test]
1476    async fn worker_resets_thread_local_state_between_cases() {
1477        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1478        let temp = TempTestDir::new();
1479        temp.write(
1480            "suite/test_isolation.harn",
1481            r#"
1482// The leak probe pins the clock to a future-but-i64-safe value
1483// (year ~2128) so a leaked mock is observable. Larger values overflow
1484// the nanosecond conversion inside the mock clock.
1485pipeline test_a_pins_clock(task) {
1486  mock_time(5000000000000)
1487  assert_eq(now_ms(), 5000000000000)
1488}
1489
1490pipeline test_b_clock_is_fresh(task) {
1491  let ms = now_ms()
1492  assert(ms < 5000000000000, "clock mock leaked from previous test")
1493}
1494"#,
1495        );
1496
1497        let opts = RunOptions::new(5_000);
1498        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1499        assert_eq!(
1500            summary.failed,
1501            0,
1502            "state leaked between tests: {:?}",
1503            summary
1504                .results
1505                .iter()
1506                .filter(|r| !r.passed)
1507                .map(|r| (r.name.clone(), r.error.clone()))
1508                .collect::<Vec<_>>()
1509        );
1510        assert_eq!(summary.passed, 2);
1511    }
1512
1513    #[tokio::test]
1514    async fn summary_aggregate_timings_sum_phases_across_results() {
1515        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1516        let temp = TempTestDir::new();
1517        temp.write(
1518            "suite/test_phases.harn",
1519            r"
1520pipeline test_one(task) { assert_eq(1, 1) }
1521pipeline test_two(task) { assert_eq(2, 2) }
1522",
1523        );
1524
1525        let summary = run_tests(&temp.path().join("suite"), None, 5_000, false, &[]).await;
1526        assert_eq!(summary.passed, 2);
1527        let per_test_sum: u64 = summary
1528            .results
1529            .iter()
1530            .map(|r| r.phases.setup_ms.saturating_add(r.phases.compile_ms))
1531            .sum();
1532        let agg_sum = summary
1533            .aggregate
1534            .setup_ms
1535            .saturating_add(summary.aggregate.compile_ms);
1536        assert_eq!(
1537            per_test_sum, agg_sum,
1538            "aggregate setup+compile must equal sum of per-test setup+compile"
1539        );
1540    }
1541
1542    #[tokio::test]
1543    async fn parallel_scheduler_emits_progress_events() {
1544        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1545        let temp = TempTestDir::new();
1546        temp.write(
1547            "suite/test_events.harn",
1548            r"
1549@test
1550pipeline test_a(task) {}
1551
1552@test
1553pipeline test_b(task) {}
1554",
1555        );
1556
1557        let events: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
1558        let events_for_progress = Arc::clone(&events);
1559        let progress: TestRunProgress = Arc::new(move |event| {
1560            events_for_progress.lock().unwrap().push(match event {
1561                TestRunEvent::SuiteDiscovered { .. } => "suite",
1562                TestRunEvent::LargeSequentialSuite { .. } => "large-seq",
1563                TestRunEvent::TestStarted { .. } => "started",
1564                TestRunEvent::TestFinished(_) => "finished",
1565            });
1566        });
1567        let opts = RunOptions {
1568            parallel: true,
1569            jobs: Some(2),
1570            progress: Some(progress),
1571            ..RunOptions::new(5_000)
1572        };
1573        let _ = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1574        let events = events.lock().unwrap();
1575        assert_eq!(events.first().copied(), Some("suite"));
1576        assert_eq!(events.iter().filter(|e| **e == "started").count(), 2);
1577        assert_eq!(events.iter().filter(|e| **e == "finished").count(), 2);
1578    }
1579}