Skip to main content

harn_cli/
test_runner.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use std::time::Instant;
7
8use harn_lexer::Lexer;
9use harn_parser::{Attribute, Node, Parser, SNode};
10
11use crate::env_guard::ScopedEnvVar;
12
13/// Drain `harn-hostlib`'s process-global fs-snapshot sessions between test
14/// cases. The snapshot map is keyed by session id and only drained
15/// per-session by `drop_session_snapshots`, which a transient test workflow
16/// never reaches, so a reused test worker would otherwise accumulate one
17/// snapshot bundle per case. No-op when the `hostlib` feature is disabled.
18/// Lives here (next to its only callers) rather than in `lib.rs` to keep the
19/// ported-handler dispatch path off the LOC ratchet.
20#[cfg(feature = "hostlib")]
21fn reset_hostlib_state() {
22    harn_hostlib::fs_snapshot::reset_all_sessions();
23}
24
25#[cfg(not(feature = "hostlib"))]
26fn reset_hostlib_state() {}
27
28#[derive(Clone, Debug)]
29pub struct TestResult {
30    pub name: String,
31    pub file: String,
32    pub passed: bool,
33    pub error: Option<String>,
34    pub duration_ms: u64,
35    /// Per-phase timings. Populated by `execute_case`; default-zero on
36    /// discovery/worker-error rows. Used by `--diagnose` and the
37    /// `--timing` aggregate.
38    pub phases: PhaseTimings,
39}
40
41#[derive(Clone, Debug)]
42pub struct TestSummary {
43    pub results: Vec<TestResult>,
44    pub passed: usize,
45    pub failed: usize,
46    pub total: usize,
47    pub duration_ms: u64,
48    /// Aggregated phase costs across the entire run.
49    pub aggregate: AggregateTimings,
50}
51
52/// Wall-clock cost of each phase of a single test execution.
53///
54/// Sums to the test's `duration_ms` modulo measurement overhead. Surfaced
55/// so consumers can attribute cold-start vs assertion cost without
56/// having to instrument the runner externally.
57#[derive(Debug, Default, Clone, Copy)]
58pub struct PhaseTimings {
59    /// VM construction + stdlib/hostlib registration + skill install +
60    /// runtime extension install + manifest hooks/triggers install.
61    pub setup_ms: u64,
62    /// `Compiler::compile_named` time for this test's chunk.
63    pub compile_ms: u64,
64    /// `vm.execute(chunk)` wall time, i.e. the actual user-test body.
65    pub execute_ms: u64,
66    /// `reset_thread_local_state` between tests.
67    pub teardown_ms: u64,
68}
69
70/// Aggregated cost across the run. Mirrors [`PhaseTimings`] plus the
71/// suite-level collection cost (discover + parse).
72#[derive(Debug, Default, Clone, Copy)]
73pub struct AggregateTimings {
74    pub collection_ms: u64,
75    pub setup_ms: u64,
76    pub compile_ms: u64,
77    pub execute_ms: u64,
78    pub teardown_ms: u64,
79}
80
81impl AggregateTimings {
82    fn from_results(collection_ms: u64, results: &[TestResult]) -> Self {
83        results.iter().map(|r| r.phases).fold(
84            Self {
85                collection_ms,
86                ..Self::default()
87            },
88            |acc, p| Self {
89                collection_ms: acc.collection_ms,
90                setup_ms: acc.setup_ms.saturating_add(p.setup_ms),
91                compile_ms: acc.compile_ms.saturating_add(p.compile_ms),
92                execute_ms: acc.execute_ms.saturating_add(p.execute_ms),
93                teardown_ms: acc.teardown_ms.saturating_add(p.teardown_ms),
94            },
95        )
96    }
97}
98
99impl TestResult {
100    /// Emit a one-line phase breakdown to stderr. Driven by `--diagnose`
101    /// / `HARN_TEST_DIAGNOSE=1`. The format is intentionally
102    /// machine-readable so downstream eval pipelines can grep it.
103    fn emit_diagnose(&self) {
104        let outcome = if self.passed { "ok" } else { "FAIL" };
105        eprintln!(
106            "[harn test diag] {} {} setup={}ms compile={}ms execute={}ms teardown={}ms total={}ms",
107            outcome,
108            self.name,
109            self.phases.setup_ms,
110            self.phases.compile_ms,
111            self.phases.execute_ms,
112            self.phases.teardown_ms,
113            self.duration_ms,
114        );
115    }
116}
117
118#[derive(Clone, Debug)]
119pub enum TestRunEvent {
120    SuiteDiscovered {
121        total_tests: usize,
122        total_files: usize,
123        parallel: bool,
124        workers: usize,
125    },
126    LargeSequentialSuite {
127        total_tests: usize,
128        total_files: usize,
129    },
130    TestStarted {
131        name: String,
132        file: String,
133        test_index: usize,
134        total_tests: usize,
135    },
136    TestFinished(TestResult),
137}
138
139pub type TestRunProgress = Arc<dyn Fn(TestRunEvent) + Send + Sync>;
140
141const LARGE_SEQUENTIAL_TEST_THRESHOLD: usize = 50;
142const LARGE_SEQUENTIAL_FILE_THRESHOLD: usize = 10;
143const DEFAULT_PARALLEL_JOBS_CAP: usize = 8;
144const TIMINGS_CACHE_RELATIVE_PATH: &str = ".harn/test-timings.json";
145const HARN_TEST_JOBS_ENV: &str = "HARN_TEST_JOBS";
146
147/// Per-worker memory budget (MiB) used to cap *auto-detected* parallelism on
148/// memory-constrained or oversubscribed hosts. Overridable via
149/// `HARN_TEST_WORKER_MEMORY_MB`. A worker runs a full VM and may drive nested
150/// agent loops, so this is a deliberately conservative estimate. The cap only
151/// ever *lowers* the core-based default — it never raises it, and an explicit
152/// `--jobs` / `HARN_TEST_JOBS` always wins.
153const DEFAULT_WORKER_MEMORY_MB: u64 = 1024;
154const HARN_TEST_WORKER_MEMORY_MB_ENV: &str = "HARN_TEST_WORKER_MEMORY_MB";
155
156/// Memory (MiB) held back for the OS, the CI runner agent, and any co-tenant
157/// job, so an auto-sized suite cannot consume the last scrap of RAM and starve
158/// the runner's heartbeat. This is the failure mode behind the self-hosted
159/// "The operation was canceled" runner-loss cancellations: two runner agents
160/// share one box, two heavy jobs overcommit RAM + swap, and the kernel never
161/// fires the OOM-killer — instead a starved runner agent stops phoning home
162/// and the control plane declares the job lost.
163const RESERVED_SYSTEM_MEMORY_MB: u64 = 1024;
164
165/// Options that shape how a user-test suite is discovered and executed.
166///
167/// Held separately from the positional path so call sites (one-shot run,
168/// `--watch`, persona doctor) can share the same scheduler without
169/// keyword-argument explosion at the call sites.
170#[derive(Clone, Default)]
171pub struct RunOptions {
172    pub filter: Option<String>,
173    pub timeout_ms: u64,
174    /// When false, the scheduler runs with a single worker, preserving the
175    /// historical "everything sequential" semantics that `harn test`
176    /// defaulted to before `--parallel` was introduced.
177    pub parallel: bool,
178    /// Explicit worker limit (`-j`/`--jobs`). `None` defaults to the
179    /// available parallelism, capped by a small constant when running in
180    /// parallel mode. Ignored when `parallel = false`.
181    pub jobs: Option<usize>,
182    pub cli_skill_dirs: Vec<PathBuf>,
183    /// Optional progress callback. When set, the runner emits events as
184    /// the suite progresses; consumers (CLI, dev mode) render output.
185    pub progress: Option<TestRunProgress>,
186    /// Emit per-test phase timings (setup / compile / execute /
187    /// teardown) to stderr. Also honored via `HARN_TEST_DIAGNOSE=1` so
188    /// users can flip the flag without restarting their shell.
189    pub diagnose: bool,
190}
191
192impl RunOptions {
193    pub fn new(timeout_ms: u64) -> Self {
194        Self {
195            timeout_ms,
196            ..Default::default()
197        }
198    }
199}
200
201/// A single executable test discovered during scan. Workers compile and
202/// run each case in isolation; the parsed program is shared by `Arc` so
203/// large suites parse exactly once.
204#[derive(Clone)]
205struct TestCase {
206    file: PathBuf,
207    name: String,
208    source: Arc<String>,
209    program: Arc<Vec<SNode>>,
210    /// Optional serial group — tests with the same group never run
211    /// concurrently with each other, even if workers are idle. Used for
212    /// shared fixtures.
213    serial_group: Option<String>,
214    /// Number of workers this test reserves while running. Capped at the
215    /// pool size during discovery so heavy tests still get scheduled.
216    weight: usize,
217}
218
219fn canonicalize_existing_path(path: &Path) -> PathBuf {
220    path.canonicalize().unwrap_or_else(|_| path.to_path_buf())
221}
222
223fn test_execution_cwd() -> PathBuf {
224    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
225}
226
227fn emit_progress(progress: &Option<TestRunProgress>, event: TestRunEvent) {
228    if let Some(callback) = progress {
229        callback(event);
230    }
231}
232
233fn should_warn_large_sequential_suite(total_tests: usize, total_files: usize) -> bool {
234    total_tests >= LARGE_SEQUENTIAL_TEST_THRESHOLD || total_files >= LARGE_SEQUENTIAL_FILE_THRESHOLD
235}
236
237/// Discover and run tests in a file or directory.
238pub async fn run_tests(
239    path: &Path,
240    filter: Option<&str>,
241    timeout_ms: u64,
242    parallel: bool,
243    cli_skill_dirs: &[PathBuf],
244) -> TestSummary {
245    let options = RunOptions {
246        filter: filter.map(str::to_owned),
247        timeout_ms,
248        parallel,
249        jobs: None,
250        cli_skill_dirs: cli_skill_dirs.to_vec(),
251        progress: None,
252        diagnose: diagnose_enabled_via_env(),
253    };
254    run_tests_with_options(path, &options).await
255}
256
257/// Backwards-compatible progress-emitting entry point.
258pub async fn run_tests_with_progress(
259    path: &Path,
260    filter: Option<&str>,
261    timeout_ms: u64,
262    parallel: bool,
263    cli_skill_dirs: &[PathBuf],
264    progress: Option<TestRunProgress>,
265) -> TestSummary {
266    let options = RunOptions {
267        filter: filter.map(str::to_owned),
268        timeout_ms,
269        parallel,
270        jobs: None,
271        cli_skill_dirs: cli_skill_dirs.to_vec(),
272        progress,
273        diagnose: diagnose_enabled_via_env(),
274    };
275    run_tests_with_options(path, &options).await
276}
277
278fn diagnose_enabled_via_env() -> bool {
279    let Ok(raw) = std::env::var("HARN_TEST_DIAGNOSE") else {
280        return false;
281    };
282    matches!(
283        raw.to_ascii_lowercase().as_str(),
284        "1" | "true" | "yes" | "on"
285    )
286}
287
288/// Run tests with full control over scheduling, worker count, and
289/// progress reporting. Workers and scheduling mode are reported via
290/// `TestRunEvent::SuiteDiscovered` so consumers can render their own
291/// banner instead of the runner printing to stdout directly.
292pub async fn run_tests_with_options(path: &Path, options: &RunOptions) -> TestSummary {
293    // Default LLM provider to "mock" in test mode unless caller overrides.
294    let _default_llm_provider = ScopedEnvVar::set_if_unset("HARN_LLM_PROVIDER", "mock");
295    let _disable_llm_calls = ScopedEnvVar::set(harn_vm::llm::LLM_CALLS_DISABLED_ENV, "1");
296
297    let start = Instant::now();
298
299    let collection_start = Instant::now();
300    let canonical_target = canonicalize_existing_path(path);
301    let files = if canonical_target.is_dir() {
302        discover_test_files(&canonical_target)
303    } else {
304        vec![canonical_target.clone()]
305    };
306
307    let workers = resolve_workers(options);
308    let timings_path = timings_cache_path(&canonical_target);
309    let timings = timings_path
310        .as_deref()
311        .map(load_timings_cache)
312        .unwrap_or_default();
313
314    let discovery = discover_test_cases(&files, options.filter.as_deref(), workers);
315    let collection_ms = collection_start.elapsed().as_millis() as u64;
316
317    emit_progress(
318        &options.progress,
319        TestRunEvent::SuiteDiscovered {
320            total_tests: discovery.cases.len(),
321            total_files: discovery.files_with_tests,
322            parallel: options.parallel,
323            workers,
324        },
325    );
326    if workers == 1
327        && should_warn_large_sequential_suite(discovery.cases.len(), discovery.files_with_tests)
328    {
329        emit_progress(
330            &options.progress,
331            TestRunEvent::LargeSequentialSuite {
332                total_tests: discovery.cases.len(),
333                total_files: discovery.files_with_tests,
334            },
335        );
336    }
337
338    let mut cases = discovery.cases;
339    sort_cases_longest_first(&mut cases, &timings);
340
341    let mut all_results = discovery.discovery_errors;
342    let total_tests = cases.len();
343    all_results.extend(execute_cases(cases, workers, options, total_tests).await);
344
345    let total = all_results.len();
346    let passed = all_results.iter().filter(|r| r.passed).count();
347    let failed = total - passed;
348    let aggregate = AggregateTimings::from_results(collection_ms, &all_results);
349
350    if let Some(path) = timings_path.as_deref() {
351        update_timings_cache(path, timings, &all_results);
352    }
353
354    TestSummary {
355        results: all_results,
356        passed,
357        failed,
358        total,
359        duration_ms: start.elapsed().as_millis() as u64,
360        aggregate,
361    }
362}
363
364/// Backwards-compatible single-file API used by `harn dev`.
365///
366/// Runs every test in one file on the current thread. The new scheduler
367/// uses per-test worker threads, but `harn dev` re-runs a single module
368/// in the foreground after each rebuild — the queueing machinery would
369/// add latency without parallelism to gain back, so we keep this path
370/// minimal.
371pub async fn run_test_file(
372    path: &Path,
373    filter: Option<&str>,
374    timeout_ms: u64,
375    execution_cwd: Option<&Path>,
376    cli_skill_dirs: &[PathBuf],
377) -> Result<Vec<TestResult>, String> {
378    let source =
379        fs::read_to_string(path).map_err(|e| format!("Failed to read {}: {e}", path.display()))?;
380    let program = parse_program(&source)?;
381    let source = Arc::new(source);
382    let program = Arc::new(program);
383
384    let cases = extract_cases_from_program(path, &source, &program, filter, usize::MAX);
385
386    let mut results = Vec::with_capacity(cases.len());
387    let execution_cwd = execution_cwd
388        .map(Path::to_path_buf)
389        .unwrap_or_else(test_execution_cwd);
390    for case in cases {
391        results.push(execute_case(&case, &execution_cwd, timeout_ms, cli_skill_dirs).await);
392    }
393    Ok(results)
394}
395
396fn resolve_workers(options: &RunOptions) -> usize {
397    if !options.parallel {
398        return 1;
399    }
400    if let Some(jobs) = options.jobs {
401        return jobs.max(1);
402    }
403    if let Ok(raw) = std::env::var(HARN_TEST_JOBS_ENV) {
404        if let Ok(parsed) = raw.trim().parse::<usize>() {
405            if parsed >= 1 {
406                return parsed;
407            }
408        }
409    }
410    let detected = thread::available_parallelism()
411        .map(|n| n.get())
412        .unwrap_or(1);
413    let core_cap = detected.clamp(1, DEFAULT_PARALLEL_JOBS_CAP);
414    apply_memory_cap(core_cap)
415}
416
417/// Lower `core_cap` to what currently-available system memory can hold, so an
418/// auto-sized parallel suite backs off on a loaded or small host instead of
419/// overcommitting RAM. Returns `core_cap` unchanged when memory is plentiful
420/// or cannot be measured. Emits a one-line notice when the cap bites so CI
421/// logs explain the reduced parallelism.
422fn apply_memory_cap(core_cap: usize) -> usize {
423    let Some(available_mb) = available_memory_mb() else {
424        return core_cap;
425    };
426    let budget = per_worker_memory_mb();
427    let mem_cap = memory_worker_cap(available_mb, budget, RESERVED_SYSTEM_MEMORY_MB);
428    if mem_cap < core_cap {
429        eprintln!(
430            "harn test: capping workers {core_cap} -> {mem_cap} \
431             (~{available_mb} MiB available, {budget} MiB/worker; \
432             override with --jobs / HARN_TEST_JOBS)"
433        );
434        return mem_cap;
435    }
436    core_cap
437}
438
439/// Pure worker-count-from-memory math, factored out so it is unit-testable
440/// without touching the host. Always yields at least one worker.
441fn memory_worker_cap(available_mb: u64, per_worker_mb: u64, reserved_mb: u64) -> usize {
442    let usable = available_mb.saturating_sub(reserved_mb);
443    let per_worker = per_worker_mb.max(1);
444    ((usable / per_worker).max(1)) as usize
445}
446
447/// Per-worker memory budget, honoring the `HARN_TEST_WORKER_MEMORY_MB`
448/// override (values `>= 1`), else [`DEFAULT_WORKER_MEMORY_MB`].
449fn per_worker_memory_mb() -> u64 {
450    std::env::var(HARN_TEST_WORKER_MEMORY_MB_ENV)
451        .ok()
452        .and_then(|raw| raw.trim().parse::<u64>().ok())
453        .filter(|&n| n >= 1)
454        .unwrap_or(DEFAULT_WORKER_MEMORY_MB)
455}
456
457/// Best-effort "memory available for new work" in MiB: the lesser of the
458/// host's available memory and (on Linux) this process's cgroup-v2 headroom.
459///
460/// Host memory comes from `sysinfo`, so it is correct on Linux, macOS, and
461/// Windows. The cgroup min means a container or a systemd-sliced CI runner
462/// sizes to its *slice* rather than the whole host — the key to stopping two
463/// runner agents on one box from each sizing to ~100% and collectively
464/// overcommitting RAM (the "thundering herd" behind the self-hosted
465/// runner-loss cancellations). Returns `None` when nothing can be measured,
466/// leaving the core-based cap in force.
467fn available_memory_mb() -> Option<u64> {
468    let mut sys = sysinfo::System::new();
469    sys.refresh_memory();
470    let host_mb = match sys.available_memory() {
471        0 => None, // unsupported / detection failed — don't over-throttle
472        bytes => Some(bytes / (1024 * 1024)),
473    };
474    match (host_mb, cgroup_v2_headroom_mb()) {
475        (Some(h), Some(c)) => Some(h.min(c)),
476        (Some(h), None) => Some(h),
477        (None, c) => c,
478    }
479}
480
481/// cgroup-v2 memory headroom (MiB) for this process's own cgroup, or `None`
482/// when not on cgroup v2, no limit is set, or the files cannot be read.
483#[cfg(target_os = "linux")]
484fn cgroup_v2_headroom_mb() -> Option<u64> {
485    let dir = own_cgroup_v2_dir()?;
486    let max_raw = fs::read_to_string(dir.join("memory.max")).ok()?;
487    let current_raw = fs::read_to_string(dir.join("memory.current")).ok()?;
488    cgroup_headroom_mb(&max_raw, &current_raw)
489}
490
491#[cfg(not(target_os = "linux"))]
492fn cgroup_v2_headroom_mb() -> Option<u64> {
493    None
494}
495
496/// Resolve this process's own cgroup-v2 directory under `/sys/fs/cgroup` from
497/// the unified-hierarchy line (`0::<path>`) in `/proc/self/cgroup`. A limit
498/// set directly on a systemd service slice or on a container's namespaced
499/// root lives here; ancestor-only limits are not chased (the host min still
500/// backstops those). `None` on cgroup v1 / hybrid (no `0::` line).
501#[cfg(target_os = "linux")]
502fn own_cgroup_v2_dir() -> Option<PathBuf> {
503    let content = fs::read_to_string("/proc/self/cgroup").ok()?;
504    let rel = content
505        .lines()
506        .find_map(|line| line.strip_prefix("0::"))?
507        .trim();
508    let rel = rel.strip_prefix('/').unwrap_or(rel);
509    Some(Path::new("/sys/fs/cgroup").join(rel))
510}
511
512/// Pure headroom math from raw `memory.max` / `memory.current` file contents
513/// (both bytes; `memory.max` may be the literal `"max"` sentinel = unlimited).
514/// `memory.current` counts reclaimable page cache, so the result is a
515/// conservative (under-)estimate of true headroom — the safe direction for
516/// OOM avoidance.
517#[cfg(any(target_os = "linux", test))]
518fn cgroup_headroom_mb(memory_max: &str, memory_current: &str) -> Option<u64> {
519    let max = memory_max.trim();
520    if max == "max" {
521        return None;
522    }
523    let max: u64 = max.parse().ok()?;
524    let current: u64 = memory_current.trim().parse().ok()?;
525    Some(max.saturating_sub(current) / (1024 * 1024))
526}
527
528struct Discovery {
529    cases: Vec<TestCase>,
530    files_with_tests: usize,
531    discovery_errors: Vec<TestResult>,
532}
533
534fn discover_test_cases(files: &[PathBuf], filter: Option<&str>, workers: usize) -> Discovery {
535    let mut cases = Vec::new();
536    let mut files_with_tests = 0usize;
537    let mut discovery_errors = Vec::new();
538
539    for file in files {
540        let source = match fs::read_to_string(file) {
541            Ok(s) => s,
542            Err(e) => {
543                discovery_errors.push(TestResult {
544                    name: "<file error>".to_string(),
545                    file: file.display().to_string(),
546                    passed: false,
547                    error: Some(format!("Failed to read {}: {e}", file.display())),
548                    duration_ms: 0,
549                    phases: PhaseTimings::default(),
550                });
551                continue;
552            }
553        };
554
555        let program = match parse_program(&source) {
556            Ok(p) => p,
557            Err(e) => {
558                discovery_errors.push(TestResult {
559                    name: "<file error>".to_string(),
560                    file: file.display().to_string(),
561                    passed: false,
562                    error: Some(e),
563                    duration_ms: 0,
564                    phases: PhaseTimings::default(),
565                });
566                continue;
567            }
568        };
569
570        let source = Arc::new(source);
571        let program = Arc::new(program);
572        let file_cases = extract_cases_from_program(file, &source, &program, filter, workers);
573        if !file_cases.is_empty() {
574            files_with_tests += 1;
575            cases.extend(file_cases);
576        }
577    }
578
579    Discovery {
580        cases,
581        files_with_tests,
582        discovery_errors,
583    }
584}
585
586fn parse_program(source: &str) -> Result<Vec<SNode>, String> {
587    let mut lexer = Lexer::new(source);
588    let tokens = lexer.tokenize().map_err(|e| format!("{e}"))?;
589    let mut parser = Parser::new(tokens);
590    parser.parse().map_err(|e| format!("{e}"))
591}
592
593fn extract_cases_from_program(
594    file: &Path,
595    source: &Arc<String>,
596    program: &Arc<Vec<SNode>>,
597    filter: Option<&str>,
598    workers: usize,
599) -> Vec<TestCase> {
600    let mut cases = Vec::new();
601    for snode in program.iter() {
602        let Some(meta) = inspect_test_pipeline(snode) else {
603            continue;
604        };
605        if let Some(pattern) = filter {
606            if !meta.name.contains(pattern) {
607                continue;
608            }
609        }
610        // Cap heavy weight so a single annotated test never deadlocks
611        // when the pool is smaller than the requested concurrency.
612        let weight = meta.weight.min(workers).max(1);
613        cases.push(TestCase {
614            file: file.to_path_buf(),
615            name: meta.name,
616            source: Arc::clone(source),
617            program: Arc::clone(program),
618            serial_group: meta.serial_group,
619            weight,
620        });
621    }
622    cases
623}
624
625struct PipelineMeta {
626    name: String,
627    serial_group: Option<String>,
628    weight: usize,
629}
630
631fn inspect_test_pipeline(snode: &SNode) -> Option<PipelineMeta> {
632    // Pipelines marked `@test`, or named `test_*`, are user tests. The
633    // companion attributes `@serial` and `@heavy` only tune the scheduler
634    // and never make a non-test pipeline discoverable on their own.
635    let (attributes, inner) = match &snode.node {
636        Node::AttributedDecl { attributes, inner } => (attributes.as_slice(), inner.as_ref()),
637        _ => (&[][..], snode),
638    };
639    let name = match &inner.node {
640        Node::Pipeline { name, .. } => name.clone(),
641        _ => return None,
642    };
643    let has_test_attr = attributes.iter().any(|a| a.name == "test");
644    if !has_test_attr && !name.starts_with("test_") {
645        return None;
646    }
647    let serial_group = attributes
648        .iter()
649        .find(|a| a.name == "serial")
650        .map(serial_group_for);
651    let weight = attributes
652        .iter()
653        .find(|a| a.name == "heavy")
654        .and_then(heavy_weight_for)
655        .unwrap_or(1);
656    Some(PipelineMeta {
657        name,
658        serial_group,
659        weight,
660    })
661}
662
663fn serial_group_for(attr: &Attribute) -> String {
664    attr.string_arg("group")
665        .unwrap_or_else(|| "__default__".to_string())
666}
667
668fn heavy_weight_for(attr: &Attribute) -> Option<usize> {
669    attr.args
670        .iter()
671        .find(|a| a.name.as_deref() == Some("threads"))
672        .and_then(|a| match &a.value.node {
673            Node::IntLiteral(n) if *n >= 1 => Some(*n as usize),
674            _ => None,
675        })
676}
677
678fn sort_cases_longest_first(cases: &mut [TestCase], timings: &BTreeMap<String, u64>) {
679    // Sort ascending so the slowest tests sit at the tail and get popped
680    // first by workers. New (unmeasured) tests share the bottom of the
681    // queue alongside the fastest known ones — they'll appear in stable
682    // file/name order, and once they get their first timing they'll
683    // float up to where they belong.
684    cases.sort_by(|a, b| {
685        let key_a = timings_key(&a.file, &a.name);
686        let key_b = timings_key(&b.file, &b.name);
687        let dur_a = timings.get(&key_a).copied().unwrap_or(0);
688        let dur_b = timings.get(&key_b).copied().unwrap_or(0);
689        dur_a
690            .cmp(&dur_b)
691            .then_with(|| a.file.cmp(&b.file))
692            .then_with(|| a.name.cmp(&b.name))
693    });
694}
695
696fn timings_key(file: &Path, name: &str) -> String {
697    format!("{}::{}", file.display(), name)
698}
699
700fn timings_cache_path(target: &Path) -> Option<PathBuf> {
701    // Anchor the cache at the project root if discoverable, otherwise at
702    // the directory the suite was launched from. The cache is shared
703    // across runs in the same workspace, so a per-suite cache would
704    // fragment timings whenever a user runs a subset.
705    let probe_root = if target.is_dir() {
706        target.to_path_buf()
707    } else {
708        target.parent()?.to_path_buf()
709    };
710    let root = harn_vm::stdlib::process::find_project_root(&probe_root)
711        .unwrap_or_else(|| probe_root.clone());
712    Some(root.join(TIMINGS_CACHE_RELATIVE_PATH))
713}
714
715fn load_timings_cache(path: &Path) -> BTreeMap<String, u64> {
716    let Ok(contents) = fs::read_to_string(path) else {
717        return BTreeMap::new();
718    };
719    serde_json::from_str::<BTreeMap<String, u64>>(&contents).unwrap_or_default()
720}
721
722fn update_timings_cache(path: &Path, mut existing: BTreeMap<String, u64>, results: &[TestResult]) {
723    for result in results {
724        if result.name == "<file error>" || result.name == "<join error>" {
725            continue;
726        }
727        existing.insert(
728            timings_key(Path::new(&result.file), &result.name),
729            result.duration_ms,
730        );
731    }
732    if let Some(parent) = path.parent() {
733        let _ = fs::create_dir_all(parent);
734    }
735    if let Ok(serialized) = serde_json::to_string(&existing) {
736        let _ = fs::write(path, serialized);
737    }
738}
739
740async fn execute_cases(
741    cases: Vec<TestCase>,
742    workers: usize,
743    options: &RunOptions,
744    total_tests: usize,
745) -> Vec<TestResult> {
746    if cases.is_empty() {
747        return Vec::new();
748    }
749    let completed = Arc::new(Mutex::new(0usize));
750    if workers <= 1 {
751        let mut results = Vec::with_capacity(cases.len());
752        for case in cases {
753            let cwd = case_execution_cwd(&case);
754            let test_index = next_test_index(&completed);
755            emit_progress(
756                &options.progress,
757                TestRunEvent::TestStarted {
758                    name: case.name.clone(),
759                    file: case.file.display().to_string(),
760                    test_index,
761                    total_tests,
762                },
763            );
764            let result =
765                execute_case(&case, &cwd, options.timeout_ms, &options.cli_skill_dirs).await;
766            if options.diagnose {
767                result.emit_diagnose();
768            }
769            emit_progress(
770                &options.progress,
771                TestRunEvent::TestFinished(result.clone()),
772            );
773            results.push(result);
774        }
775        return results;
776    }
777
778    let queue = Arc::new(Mutex::new(cases));
779    let gate = Arc::new(ResourceGate::new(workers));
780    let results: Arc<Mutex<Vec<TestResult>>> = Arc::new(Mutex::new(Vec::new()));
781
782    let mut handles = Vec::with_capacity(workers);
783    for worker_idx in 0..workers {
784        let queue = Arc::clone(&queue);
785        let gate = Arc::clone(&gate);
786        let results = Arc::clone(&results);
787        let completed = Arc::clone(&completed);
788        let timeout_ms = options.timeout_ms;
789        let cli_skill_dirs = options.cli_skill_dirs.clone();
790        let progress = options.progress.clone();
791        let diagnose = options.diagnose;
792        let handle = thread::Builder::new()
793            .name(format!("harn-test-worker-{worker_idx}"))
794            .spawn(move || {
795                let runtime = match tokio::runtime::Builder::new_current_thread()
796                    .enable_all()
797                    .build()
798                {
799                    Ok(rt) => rt,
800                    Err(error) => {
801                        results.lock().unwrap().push(TestResult {
802                            name: "<worker error>".to_string(),
803                            file: String::new(),
804                            passed: false,
805                            error: Some(format!("failed to start test runtime: {error}")),
806                            duration_ms: 0,
807                            phases: PhaseTimings::default(),
808                        });
809                        return;
810                    }
811                };
812                // Cases are sorted ascending by historical duration; popping
813                // from the tail gives this worker the slowest unclaimed
814                // test, which front-loads long poles and prevents workers
815                // from stranding on quick tests at the end of the run.
816                while let Some(case) = queue.lock().unwrap().pop() {
817                    let _guard = gate.acquire(case.weight, case.serial_group.as_deref());
818                    let cwd = case_execution_cwd(&case);
819                    let test_index = next_test_index(&completed);
820                    emit_progress(
821                        &progress,
822                        TestRunEvent::TestStarted {
823                            name: case.name.clone(),
824                            file: case.file.display().to_string(),
825                            test_index,
826                            total_tests,
827                        },
828                    );
829                    let result =
830                        runtime.block_on(execute_case(&case, &cwd, timeout_ms, &cli_skill_dirs));
831                    if diagnose {
832                        result.emit_diagnose();
833                    }
834                    emit_progress(&progress, TestRunEvent::TestFinished(result.clone()));
835                    results.lock().unwrap().push(result);
836                }
837            })
838            .expect("spawning a harn-test worker thread should succeed");
839        handles.push(handle);
840    }
841
842    for handle in handles {
843        let _ = handle.join();
844    }
845
846    // All workers have joined, so this Arc holds the only remaining
847    // reference. The lock-and-clone fallback survives the unlikely case
848    // where a panic-unwind kept an extra reference alive.
849    Arc::try_unwrap(results)
850        .map(|m| m.into_inner().unwrap_or_default())
851        .unwrap_or_else(|arc| arc.lock().unwrap().clone())
852}
853
854fn next_test_index(counter: &Mutex<usize>) -> usize {
855    let mut guard = counter.lock().unwrap();
856    *guard += 1;
857    *guard
858}
859
860fn case_execution_cwd(case: &TestCase) -> PathBuf {
861    case.file
862        .parent()
863        .filter(|p| !p.as_os_str().is_empty())
864        .map(Path::to_path_buf)
865        .unwrap_or_else(test_execution_cwd)
866}
867
868/// Coordinates worker permits and serial-group exclusivity without
869/// requiring an async lock — workers are dedicated OS threads, so a
870/// classic Mutex+Condvar gate keeps everything off the tokio scheduler.
871struct ResourceGate {
872    state: Mutex<GateState>,
873    cond: Condvar,
874    capacity: usize,
875}
876
877struct GateState {
878    available: usize,
879    busy_groups: HashSet<String>,
880}
881
882struct GateGuard<'a> {
883    gate: &'a ResourceGate,
884    weight: usize,
885    group: Option<String>,
886}
887
888impl ResourceGate {
889    fn new(capacity: usize) -> Self {
890        Self {
891            state: Mutex::new(GateState {
892                available: capacity,
893                busy_groups: HashSet::new(),
894            }),
895            cond: Condvar::new(),
896            capacity,
897        }
898    }
899
900    fn acquire(&self, weight: usize, group: Option<&str>) -> GateGuard<'_> {
901        let weight = weight.min(self.capacity).max(1);
902        let mut state = self.state.lock().unwrap();
903        loop {
904            let group_free = group.is_none_or(|g| !state.busy_groups.contains(g));
905            if state.available >= weight && group_free {
906                state.available -= weight;
907                if let Some(g) = group {
908                    state.busy_groups.insert(g.to_string());
909                }
910                return GateGuard {
911                    gate: self,
912                    weight,
913                    group: group.map(str::to_owned),
914                };
915            }
916            state = self.cond.wait(state).unwrap();
917        }
918    }
919}
920
921impl Drop for GateGuard<'_> {
922    fn drop(&mut self) {
923        let mut state = self.gate.state.lock().unwrap();
924        state.available += self.weight;
925        if let Some(group) = self.group.as_deref() {
926            state.busy_groups.remove(group);
927        }
928        self.gate.cond.notify_all();
929    }
930}
931
932async fn execute_case(
933    case: &TestCase,
934    execution_cwd: &Path,
935    timeout_ms: u64,
936    cli_skill_dirs: &[PathBuf],
937) -> TestResult {
938    harn_vm::reset_thread_local_state();
939    reset_hostlib_state();
940
941    let mut phases = PhaseTimings::default();
942    let total_start = Instant::now();
943
944    let compile_start = Instant::now();
945    let chunk = match harn_vm::Compiler::new().compile_named(&case.program, &case.name) {
946        Ok(c) => c,
947        Err(e) => {
948            phases.compile_ms = compile_start.elapsed().as_millis() as u64;
949            return TestResult {
950                name: case.name.clone(),
951                file: case.file.display().to_string(),
952                passed: false,
953                error: Some(format!("Compile error: {e}")),
954                duration_ms: total_start.elapsed().as_millis() as u64,
955                phases,
956            };
957        }
958    };
959    phases.compile_ms = compile_start.elapsed().as_millis() as u64;
960
961    let local = tokio::task::LocalSet::new();
962    let timeout = std::time::Duration::from_millis(timeout_ms);
963    let file_display = case.file.display().to_string();
964    let setup_start = Instant::now();
965    let result = tokio::time::timeout(
966        timeout,
967        local.run_until(async {
968            let mut vm = harn_vm::Vm::new();
969            harn_vm::register_vm_stdlib(&mut vm);
970            crate::install_default_hostlib(&mut vm);
971            let source_parent = case.file.parent().unwrap_or(Path::new("."));
972            let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
973            let store_base = project_root.as_deref().unwrap_or(source_parent);
974            let source_dir = source_parent.to_string_lossy().into_owned();
975            harn_vm::register_store_builtins(&mut vm, store_base);
976            harn_vm::register_metadata_builtins(&mut vm, store_base);
977            let pipeline_name = case
978                .file
979                .file_stem()
980                .and_then(|s| s.to_str())
981                .unwrap_or("test");
982            harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
983            vm.set_source_info(&file_display, &case.source);
984            harn_vm::stdlib::process::set_thread_execution_context(Some(
985                harn_vm::orchestration::RunExecutionRecord {
986                    cwd: Some(execution_cwd.to_string_lossy().into_owned()),
987                    source_dir: Some(source_dir),
988                    env: BTreeMap::new(),
989                    adapter: None,
990                    repo_path: None,
991                    worktree_path: None,
992                    branch: None,
993                    base_ref: None,
994                    cleanup: None,
995                },
996            ));
997            if let Some(ref root) = project_root {
998                vm.set_project_root(root);
999            }
1000            if let Some(parent) = case.file.parent() {
1001                if !parent.as_os_str().is_empty() {
1002                    vm.set_source_dir(parent);
1003                }
1004            }
1005            let loaded =
1006                crate::skill_loader::load_skills(&crate::skill_loader::SkillLoaderInputs {
1007                    cli_dirs: cli_skill_dirs.to_vec(),
1008                    source_path: Some(case.file.clone()),
1009                });
1010            crate::skill_loader::emit_loader_warnings(&loaded.loader_warnings);
1011            crate::skill_loader::install_skills_global(&mut vm, &loaded);
1012            let extensions = crate::package::load_runtime_extensions(&case.file);
1013            crate::package::install_runtime_extensions(&extensions);
1014            crate::package::install_manifest_triggers(&mut vm, &extensions)
1015                .await
1016                .map_err(|error| format!("failed to install manifest triggers: {error}"))?;
1017            crate::package::install_manifest_hooks(&mut vm, &extensions)
1018                .await
1019                .map_err(|error| format!("failed to install manifest hooks: {error}"))?;
1020            vm.set_harness(harn_vm::Harness::real());
1021            let setup_ms = setup_start.elapsed().as_millis() as u64;
1022            let exec_start = Instant::now();
1023            let outcome = match vm.execute(&chunk).await {
1024                Ok(val) => Ok(val),
1025                Err(e) => Err(vm.format_runtime_error(&e)),
1026            };
1027            let execute_ms = exec_start.elapsed().as_millis() as u64;
1028            harn_vm::egress::reset_egress_policy_for_host();
1029            Ok::<_, String>((outcome, setup_ms, execute_ms))
1030        }),
1031    )
1032    .await;
1033
1034    let teardown_start = Instant::now();
1035    // Clear thread-locals so the next case scheduled onto this worker
1036    // sees a clean slate. Wall clock for this work lands in the
1037    // teardown bucket so the phase breakdown sums to wall time.
1038    harn_vm::reset_thread_local_state();
1039    reset_hostlib_state();
1040    phases.teardown_ms = teardown_start.elapsed().as_millis() as u64;
1041
1042    let elapsed_ms = total_start.elapsed().as_millis() as u64;
1043    let (passed, error, duration_ms) = match result {
1044        Ok(Ok((outcome, setup_ms, execute_ms))) => {
1045            phases.setup_ms = setup_ms;
1046            phases.execute_ms = execute_ms;
1047            match outcome {
1048                Ok(_) => (true, None, elapsed_ms),
1049                Err(message) => (false, Some(message), elapsed_ms),
1050            }
1051        }
1052        Ok(Err(setup_error)) => (false, Some(setup_error), elapsed_ms),
1053        // Report `timeout_ms` rather than `elapsed_ms` for timeouts so a
1054        // suite-wide aggregate still reflects the configured budget that
1055        // was hit, not the slightly-earlier moment we tore down at.
1056        Err(_) => (
1057            false,
1058            Some(format!("timed out after {timeout_ms}ms")),
1059            timeout_ms,
1060        ),
1061    };
1062
1063    TestResult {
1064        name: case.name.clone(),
1065        file: file_display,
1066        passed,
1067        error,
1068        duration_ms,
1069        phases,
1070    }
1071}
1072
1073fn discover_test_files(dir: &Path) -> Vec<PathBuf> {
1074    let mut files = Vec::new();
1075    if let Ok(entries) = fs::read_dir(dir) {
1076        for entry in entries.flatten() {
1077            let path = entry.path();
1078            if path.is_dir() {
1079                files.extend(discover_test_files(&path));
1080            } else if path.extension().is_some_and(|e| e == "harn") {
1081                if let Ok(content) = fs::read_to_string(&path) {
1082                    if content.contains("test_") || content.contains("@test") {
1083                        files.push(canonicalize_existing_path(&path));
1084                    }
1085                }
1086            }
1087        }
1088    }
1089    files.sort();
1090    files
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use super::*;
1096
1097    use std::sync::Arc;
1098    use std::time::Duration;
1099
1100    struct TempTestDir {
1101        inner: tempfile::TempDir,
1102    }
1103
1104    impl TempTestDir {
1105        fn new() -> Self {
1106            Self {
1107                inner: tempfile::tempdir().unwrap(),
1108            }
1109        }
1110
1111        fn write(&self, relative: &str, contents: &str) {
1112            let path = self.path().join(relative);
1113            if let Some(parent) = path.parent() {
1114                fs::create_dir_all(parent).unwrap();
1115            }
1116            fs::write(path, contents).unwrap();
1117        }
1118
1119        fn path(&self) -> &Path {
1120            self.inner.path()
1121        }
1122    }
1123
1124    #[test]
1125    fn discover_test_files_returns_canonical_absolute_paths() {
1126        let temp = TempTestDir::new();
1127        temp.write("suite/test_alpha.harn", "pipeline test_alpha(task) {}");
1128        temp.write("suite/nested/test_beta.harn", "pipeline test_beta(task) {}");
1129        temp.write("suite/annotated.harn", "@test\npipeline annotated(task) {}");
1130        temp.write("suite/ignore.harn", "pipeline build(task) {}");
1131
1132        let files = discover_test_files(&temp.path().join("suite"));
1133
1134        assert_eq!(files.len(), 3);
1135        assert!(files.iter().all(|path| path.is_absolute()));
1136        assert!(files
1137            .iter()
1138            .any(|path| path.ends_with("suite/test_alpha.harn")));
1139        assert!(files
1140            .iter()
1141            .any(|path| path.ends_with("suite/nested/test_beta.harn")));
1142        assert!(files
1143            .iter()
1144            .any(|path| path.ends_with("suite/annotated.harn")));
1145    }
1146
1147    #[tokio::test]
1148    async fn run_tests_uses_file_parent_as_execution_cwd_and_restores_shell_cwd() {
1149        let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1150        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1151        let temp = TempTestDir::new();
1152        temp.write(
1153            "suite/test_cwd.harn",
1154            r"
1155pipeline test_current_dir(task) {
1156  assert_eq(cwd(), source_dir())
1157}
1158",
1159        );
1160
1161        let original_cwd = std::env::current_dir().unwrap();
1162        let summary = run_tests(&temp.path().join("suite"), None, 1_000, false, &[]).await;
1163        let restored_cwd = std::env::current_dir().unwrap();
1164
1165        assert_eq!(summary.failed, 0);
1166        assert_eq!(summary.passed, 1);
1167        assert_eq!(
1168            fs::canonicalize(restored_cwd).unwrap(),
1169            fs::canonicalize(original_cwd).unwrap()
1170        );
1171    }
1172
1173    #[tokio::test]
1174    async fn parallel_run_tests_uses_each_file_parent_as_execution_cwd() {
1175        let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1176        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1177        let temp = TempTestDir::new();
1178        temp.write(
1179            "suite/a/test_one.harn",
1180            r"
1181pipeline test_one(task) {
1182  assert_eq(cwd(), source_dir())
1183}
1184",
1185        );
1186        temp.write(
1187            "suite/b/test_two.harn",
1188            r"
1189pipeline test_two(task) {
1190  assert_eq(cwd(), source_dir())
1191}
1192",
1193        );
1194
1195        let summary = run_tests(&temp.path().join("suite"), None, 1_000, true, &[]).await;
1196        assert_eq!(summary.failed, 0);
1197        assert_eq!(summary.passed, 2);
1198    }
1199
1200    #[tokio::test]
1201    async fn run_tests_loads_cli_skill_dirs() {
1202        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1203        let temp = TempTestDir::new();
1204        temp.write(
1205            "skills/review/SKILL.md",
1206            r"---
1207name: review
1208short: Review PRs
1209description: Review pull requests
1210---
1211
1212Review instructions.
1213",
1214        );
1215        temp.write(
1216            "suite/test_skills.harn",
1217            r#"
1218pipeline test_cli_skills(task) {
1219  assert_eq(skill_count(skills), 1)
1220  let found = skill_find(skills, "review")
1221  assert_eq(found.name, "review")
1222}
1223"#,
1224        );
1225
1226        let summary = run_tests(
1227            &temp.path().join("suite"),
1228            None,
1229            1_000,
1230            false,
1231            &[temp.path().join("skills")],
1232        )
1233        .await;
1234
1235        assert_eq!(summary.failed, 0, "{:?}", summary.results[0].error);
1236        assert_eq!(summary.passed, 1);
1237    }
1238
1239    #[test]
1240    fn resolve_workers_honors_explicit_jobs() {
1241        let mut opts = RunOptions::new(1_000);
1242        opts.parallel = true;
1243        opts.jobs = Some(3);
1244        assert_eq!(resolve_workers(&opts), 3);
1245    }
1246
1247    #[test]
1248    fn resolve_workers_returns_one_when_not_parallel() {
1249        let mut opts = RunOptions::new(1_000);
1250        opts.parallel = false;
1251        opts.jobs = Some(8);
1252        assert_eq!(resolve_workers(&opts), 1);
1253    }
1254
1255    #[test]
1256    fn memory_worker_cap_backs_off_under_pressure() {
1257        // ~4 GiB available, 1 GiB reserved, 1 GiB/worker -> 3 workers.
1258        assert_eq!(memory_worker_cap(4096, 1024, 1024), 3);
1259    }
1260
1261    #[test]
1262    fn memory_worker_cap_is_generous_when_memory_is_plentiful() {
1263        // A roomy box dwarfs the core cap; resolve_workers then min()s this
1264        // against DEFAULT_PARALLEL_JOBS_CAP, so the core cap stays in force.
1265        assert!(memory_worker_cap(32_768, 1024, 1024) >= DEFAULT_PARALLEL_JOBS_CAP);
1266    }
1267
1268    #[test]
1269    fn memory_worker_cap_never_starves_to_zero() {
1270        // Even when reserved >= available, at least one worker must run.
1271        assert_eq!(memory_worker_cap(512, 1024, 1024), 1);
1272    }
1273
1274    #[test]
1275    fn cgroup_headroom_unlimited_is_none() {
1276        // The `max` sentinel means no cgroup limit -> defer to host memory.
1277        assert_eq!(cgroup_headroom_mb("max\n", "1048576\n"), None);
1278    }
1279
1280    #[test]
1281    fn cgroup_headroom_computes_slice_remainder() {
1282        // 4 GiB limit, 1 GiB in use -> 3 GiB (3072 MiB) headroom.
1283        let four_gib = (4_u64 * 1024 * 1024 * 1024).to_string();
1284        let one_gib = (1024_u64 * 1024 * 1024).to_string();
1285        assert_eq!(cgroup_headroom_mb(&four_gib, &one_gib), Some(3072));
1286    }
1287
1288    #[test]
1289    fn cgroup_headroom_saturates_when_over_limit() {
1290        // A transient current > max must yield 0, not an underflow panic.
1291        assert_eq!(cgroup_headroom_mb("1024", "999999999"), Some(0));
1292    }
1293
1294    #[test]
1295    fn cgroup_headroom_rejects_garbage() {
1296        assert_eq!(cgroup_headroom_mb("not-a-number", "123"), None);
1297    }
1298
1299    #[test]
1300    fn sort_cases_longest_first_uses_historical_durations() {
1301        let source = Arc::new(String::new());
1302        let program = Arc::new(Vec::new());
1303        let mk = |name: &str| TestCase {
1304            file: PathBuf::from("tests/a.harn"),
1305            name: name.to_string(),
1306            source: Arc::clone(&source),
1307            program: Arc::clone(&program),
1308            serial_group: None,
1309            weight: 1,
1310        };
1311        let mut cases = vec![mk("test_quick"), mk("test_slow"), mk("test_medium")];
1312        let mut timings = BTreeMap::new();
1313        timings.insert("tests/a.harn::test_slow".to_string(), 5_000);
1314        timings.insert("tests/a.harn::test_medium".to_string(), 1_000);
1315
1316        sort_cases_longest_first(&mut cases, &timings);
1317
1318        // Slowest tests live at the tail so workers pop them first.
1319        let order: Vec<&str> = cases.iter().map(|c| c.name.as_str()).collect();
1320        assert_eq!(order, vec!["test_quick", "test_medium", "test_slow"]);
1321    }
1322
1323    #[test]
1324    fn resource_gate_serializes_same_group() {
1325        let gate = Arc::new(ResourceGate::new(4));
1326        let trace = Arc::new(Mutex::new(Vec::<&'static str>::new()));
1327
1328        let g_a = {
1329            let trace = Arc::clone(&trace);
1330            let gate = Arc::clone(&gate);
1331            thread::spawn(move || {
1332                let _guard = gate.acquire(1, Some("login"));
1333                trace.lock().unwrap().push("a-start");
1334                thread::sleep(Duration::from_millis(50));
1335                trace.lock().unwrap().push("a-end");
1336            })
1337        };
1338        // Give thread A time to grab the lock first.
1339        thread::sleep(Duration::from_millis(10));
1340        let g_b = {
1341            let trace = Arc::clone(&trace);
1342            let gate = Arc::clone(&gate);
1343            thread::spawn(move || {
1344                let _guard = gate.acquire(1, Some("login"));
1345                trace.lock().unwrap().push("b-start");
1346                trace.lock().unwrap().push("b-end");
1347            })
1348        };
1349        g_a.join().unwrap();
1350        g_b.join().unwrap();
1351
1352        let trace = trace.lock().unwrap();
1353        // B must not start until A ends.
1354        let a_end = trace.iter().position(|t| *t == "a-end").unwrap();
1355        let b_start = trace.iter().position(|t| *t == "b-start").unwrap();
1356        assert!(a_end < b_start, "B started before A finished: {trace:?}");
1357    }
1358
1359    #[test]
1360    fn resource_gate_allows_independent_groups_in_parallel() {
1361        let gate = Arc::new(ResourceGate::new(4));
1362        let _guard_a = gate.acquire(1, Some("alpha"));
1363        // Acquiring beta should not block — the other group is unrelated.
1364        let acquired = std::sync::Mutex::new(false);
1365        thread::scope(|s| {
1366            s.spawn(|| {
1367                let _ = gate.acquire(1, Some("beta"));
1368                *acquired.lock().unwrap() = true;
1369            });
1370        });
1371        assert!(*acquired.lock().unwrap());
1372    }
1373
1374    #[test]
1375    fn resource_gate_caps_heavy_weight_at_capacity() {
1376        // A test that asks for more than the pool size must still be
1377        // schedulable rather than deadlocking.
1378        let gate = Arc::new(ResourceGate::new(2));
1379        let _g = gate.acquire(99, None);
1380        // Available is now 0; another single-weight task must still wait.
1381        let started = Arc::new(Mutex::new(false));
1382        let inner = Arc::clone(&started);
1383        let gate2 = Arc::clone(&gate);
1384        let handle = thread::spawn(move || {
1385            let _guard = gate2.acquire(1, None);
1386            *inner.lock().unwrap() = true;
1387        });
1388        thread::sleep(Duration::from_millis(20));
1389        assert!(!*started.lock().unwrap(), "should still be waiting");
1390        drop(_g);
1391        handle.join().unwrap();
1392        assert!(*started.lock().unwrap());
1393    }
1394
1395    #[tokio::test]
1396    async fn parallel_scheduler_runs_heavy_tests_without_oversubscribing() {
1397        // Heavy(2) should never run concurrently with another test when
1398        // the pool only has two workers — there are no spare permits.
1399        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1400        let temp = TempTestDir::new();
1401        temp.write(
1402            "suite/test_heavy.harn",
1403            r"
1404@test
1405@heavy(threads: 2)
1406pipeline test_heavy_one(task) {}
1407
1408@test
1409pipeline test_light(task) {}
1410",
1411        );
1412
1413        let opts = RunOptions {
1414            parallel: true,
1415            jobs: Some(2),
1416            ..RunOptions::new(5_000)
1417        };
1418        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1419        assert_eq!(summary.failed, 0, "{:?}", summary.results);
1420        assert_eq!(summary.total, 2);
1421    }
1422
1423    #[tokio::test]
1424    async fn parallel_scheduler_handles_serial_group_annotation() {
1425        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1426        let temp = TempTestDir::new();
1427        temp.write(
1428            "suite/test_serial.harn",
1429            r#"
1430@test
1431@serial(group: "fixture")
1432pipeline test_serial_one(task) {}
1433
1434@test
1435@serial(group: "fixture")
1436pipeline test_serial_two(task) {}
1437"#,
1438        );
1439
1440        let opts = RunOptions {
1441            parallel: true,
1442            jobs: Some(4),
1443            ..RunOptions::new(5_000)
1444        };
1445        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1446        assert_eq!(summary.failed, 0, "{:?}", summary.results);
1447        assert_eq!(summary.passed, 2);
1448    }
1449
1450    #[tokio::test]
1451    async fn parallel_scheduler_persists_timings_cache() {
1452        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1453        let temp = TempTestDir::new();
1454        temp.write(
1455            "suite/test_timed.harn",
1456            r"
1457@test
1458pipeline test_first(task) {}
1459
1460@test
1461pipeline test_second(task) {}
1462",
1463        );
1464
1465        let opts = RunOptions {
1466            parallel: true,
1467            jobs: Some(2),
1468            ..RunOptions::new(5_000)
1469        };
1470        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1471        assert_eq!(summary.passed, 2);
1472        let cache = temp.path().join("suite/.harn/test-timings.json");
1473        assert!(cache.exists(), "expected timings cache at {cache:?}");
1474        let stored: BTreeMap<String, u64> =
1475            serde_json::from_str(&fs::read_to_string(&cache).unwrap()).unwrap();
1476        assert!(
1477            stored.keys().any(|key| key.contains("test_first")),
1478            "expected timings for test_first in {stored:?}"
1479        );
1480        assert!(
1481            stored.keys().any(|key| key.contains("test_second")),
1482            "expected timings for test_second in {stored:?}"
1483        );
1484    }
1485
1486    /// Regression fixture: a worker thread that runs multiple cases must
1487    /// reset thread-local state between them. Test A pins the clock
1488    /// mock to a future timestamp; test B asserts the clock is fresh.
1489    /// Fails if the per-case `reset_thread_local_state()` in
1490    /// `execute_case` ever regresses. Pins workers to 1 so both tests
1491    /// land on the same scheduler thread.
1492    #[tokio::test]
1493    async fn worker_resets_thread_local_state_between_cases() {
1494        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1495        let temp = TempTestDir::new();
1496        temp.write(
1497            "suite/test_isolation.harn",
1498            r#"
1499// The leak probe pins the clock to a future-but-i64-safe value
1500// (year ~2128) so a leaked mock is observable. Larger values overflow
1501// the nanosecond conversion inside the mock clock.
1502pipeline test_a_pins_clock(task) {
1503  mock_time(5000000000000)
1504  assert_eq(now_ms(), 5000000000000)
1505}
1506
1507pipeline test_b_clock_is_fresh(task) {
1508  let ms = now_ms()
1509  assert(ms < 5000000000000, "clock mock leaked from previous test")
1510}
1511"#,
1512        );
1513
1514        let opts = RunOptions::new(5_000);
1515        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1516        assert_eq!(
1517            summary.failed,
1518            0,
1519            "state leaked between tests: {:?}",
1520            summary
1521                .results
1522                .iter()
1523                .filter(|r| !r.passed)
1524                .map(|r| (r.name.clone(), r.error.clone()))
1525                .collect::<Vec<_>>()
1526        );
1527        assert_eq!(summary.passed, 2);
1528    }
1529
1530    #[tokio::test]
1531    async fn summary_aggregate_timings_sum_phases_across_results() {
1532        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1533        let temp = TempTestDir::new();
1534        temp.write(
1535            "suite/test_phases.harn",
1536            r"
1537pipeline test_one(task) { assert_eq(1, 1) }
1538pipeline test_two(task) { assert_eq(2, 2) }
1539",
1540        );
1541
1542        let summary = run_tests(&temp.path().join("suite"), None, 5_000, false, &[]).await;
1543        assert_eq!(summary.passed, 2);
1544        let per_test_sum: u64 = summary
1545            .results
1546            .iter()
1547            .map(|r| r.phases.setup_ms.saturating_add(r.phases.compile_ms))
1548            .sum();
1549        let agg_sum = summary
1550            .aggregate
1551            .setup_ms
1552            .saturating_add(summary.aggregate.compile_ms);
1553        assert_eq!(
1554            per_test_sum, agg_sum,
1555            "aggregate setup+compile must equal sum of per-test setup+compile"
1556        );
1557    }
1558
1559    #[tokio::test]
1560    async fn parallel_scheduler_emits_progress_events() {
1561        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1562        let temp = TempTestDir::new();
1563        temp.write(
1564            "suite/test_events.harn",
1565            r"
1566@test
1567pipeline test_a(task) {}
1568
1569@test
1570pipeline test_b(task) {}
1571",
1572        );
1573
1574        let events: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
1575        let events_for_progress = Arc::clone(&events);
1576        let progress: TestRunProgress = Arc::new(move |event| {
1577            events_for_progress.lock().unwrap().push(match event {
1578                TestRunEvent::SuiteDiscovered { .. } => "suite",
1579                TestRunEvent::LargeSequentialSuite { .. } => "large-seq",
1580                TestRunEvent::TestStarted { .. } => "started",
1581                TestRunEvent::TestFinished(_) => "finished",
1582            });
1583        });
1584        let opts = RunOptions {
1585            parallel: true,
1586            jobs: Some(2),
1587            progress: Some(progress),
1588            ..RunOptions::new(5_000)
1589        };
1590        let _ = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1591        let events = events.lock().unwrap();
1592        assert_eq!(events.first().copied(), Some("suite"));
1593        assert_eq!(events.iter().filter(|e| **e == "started").count(), 2);
1594        assert_eq!(events.iter().filter(|e| **e == "finished").count(), 2);
1595    }
1596}