Skip to main content

harn_cli/
test_runner.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use std::time::Instant;
7
8use harn_lexer::Lexer;
9use harn_parser::{Attribute, Node, Parser, SNode};
10
11use crate::env_guard::ScopedEnvVar;
12
13#[derive(Clone, Debug)]
14pub struct TestResult {
15    pub name: String,
16    pub file: String,
17    pub passed: bool,
18    pub error: Option<String>,
19    pub duration_ms: u64,
20    /// Per-phase timings. Populated by `execute_case`; default-zero on
21    /// discovery/worker-error rows. Used by `--diagnose` and the
22    /// `--timing` aggregate.
23    pub phases: PhaseTimings,
24}
25
26#[derive(Clone, Debug)]
27pub struct TestSummary {
28    pub results: Vec<TestResult>,
29    pub passed: usize,
30    pub failed: usize,
31    pub total: usize,
32    pub duration_ms: u64,
33    /// Aggregated phase costs across the entire run.
34    pub aggregate: AggregateTimings,
35}
36
37/// Wall-clock cost of each phase of a single test execution.
38///
39/// Sums to the test's `duration_ms` modulo measurement overhead. Surfaced
40/// so consumers can attribute cold-start vs assertion cost without
41/// having to instrument the runner externally.
42#[derive(Debug, Default, Clone, Copy)]
43pub struct PhaseTimings {
44    /// VM construction + stdlib/hostlib registration + skill install +
45    /// runtime extension install + manifest hooks/triggers install.
46    pub setup_ms: u64,
47    /// `Compiler::compile_named` time for this test's chunk.
48    pub compile_ms: u64,
49    /// `vm.execute(chunk)` wall time, i.e. the actual user-test body.
50    pub execute_ms: u64,
51    /// `reset_thread_local_state` between tests.
52    pub teardown_ms: u64,
53}
54
55/// Aggregated cost across the run. Mirrors [`PhaseTimings`] plus the
56/// suite-level collection cost (discover + parse).
57#[derive(Debug, Default, Clone, Copy)]
58pub struct AggregateTimings {
59    pub collection_ms: u64,
60    pub setup_ms: u64,
61    pub compile_ms: u64,
62    pub execute_ms: u64,
63    pub teardown_ms: u64,
64}
65
66impl AggregateTimings {
67    fn from_results(collection_ms: u64, results: &[TestResult]) -> Self {
68        results.iter().map(|r| r.phases).fold(
69            Self {
70                collection_ms,
71                ..Self::default()
72            },
73            |acc, p| Self {
74                collection_ms: acc.collection_ms,
75                setup_ms: acc.setup_ms.saturating_add(p.setup_ms),
76                compile_ms: acc.compile_ms.saturating_add(p.compile_ms),
77                execute_ms: acc.execute_ms.saturating_add(p.execute_ms),
78                teardown_ms: acc.teardown_ms.saturating_add(p.teardown_ms),
79            },
80        )
81    }
82}
83
84impl TestResult {
85    /// Emit a one-line phase breakdown to stderr. Driven by `--diagnose`
86    /// / `HARN_TEST_DIAGNOSE=1`. The format is intentionally
87    /// machine-readable so downstream eval pipelines can grep it.
88    fn emit_diagnose(&self) {
89        let outcome = if self.passed { "ok" } else { "FAIL" };
90        eprintln!(
91            "[harn test diag] {} {} setup={}ms compile={}ms execute={}ms teardown={}ms total={}ms",
92            outcome,
93            self.name,
94            self.phases.setup_ms,
95            self.phases.compile_ms,
96            self.phases.execute_ms,
97            self.phases.teardown_ms,
98            self.duration_ms,
99        );
100    }
101}
102
103#[derive(Clone, Debug)]
104pub enum TestRunEvent {
105    SuiteDiscovered {
106        total_tests: usize,
107        total_files: usize,
108        parallel: bool,
109        workers: usize,
110    },
111    LargeSequentialSuite {
112        total_tests: usize,
113        total_files: usize,
114    },
115    TestStarted {
116        name: String,
117        file: String,
118        test_index: usize,
119        total_tests: usize,
120    },
121    TestFinished(TestResult),
122}
123
124pub type TestRunProgress = Arc<dyn Fn(TestRunEvent) + Send + Sync>;
125
126const LARGE_SEQUENTIAL_TEST_THRESHOLD: usize = 50;
127const LARGE_SEQUENTIAL_FILE_THRESHOLD: usize = 10;
128const DEFAULT_PARALLEL_JOBS_CAP: usize = 8;
129const TIMINGS_CACHE_RELATIVE_PATH: &str = ".harn/test-timings.json";
130const HARN_TEST_JOBS_ENV: &str = "HARN_TEST_JOBS";
131
132/// Options that shape how a user-test suite is discovered and executed.
133///
134/// Held separately from the positional path so call sites (one-shot run,
135/// `--watch`, persona doctor) can share the same scheduler without
136/// keyword-argument explosion at the call sites.
137#[derive(Clone, Default)]
138pub struct RunOptions {
139    pub filter: Option<String>,
140    pub timeout_ms: u64,
141    /// When false, the scheduler runs with a single worker, preserving the
142    /// historical "everything sequential" semantics that `harn test`
143    /// defaulted to before `--parallel` was introduced.
144    pub parallel: bool,
145    /// Explicit worker limit (`-j`/`--jobs`). `None` defaults to the
146    /// available parallelism, capped by a small constant when running in
147    /// parallel mode. Ignored when `parallel = false`.
148    pub jobs: Option<usize>,
149    pub cli_skill_dirs: Vec<PathBuf>,
150    /// Optional progress callback. When set, the runner emits events as
151    /// the suite progresses; consumers (CLI, dev mode) render output.
152    pub progress: Option<TestRunProgress>,
153    /// Emit per-test phase timings (setup / compile / execute /
154    /// teardown) to stderr. Also honored via `HARN_TEST_DIAGNOSE=1` so
155    /// users can flip the flag without restarting their shell.
156    pub diagnose: bool,
157}
158
159impl RunOptions {
160    pub fn new(timeout_ms: u64) -> Self {
161        Self {
162            timeout_ms,
163            ..Default::default()
164        }
165    }
166}
167
168/// A single executable test discovered during scan. Workers compile and
169/// run each case in isolation; the parsed program is shared by `Arc` so
170/// large suites parse exactly once.
171#[derive(Clone)]
172struct TestCase {
173    file: PathBuf,
174    name: String,
175    source: Arc<String>,
176    program: Arc<Vec<SNode>>,
177    /// Optional serial group — tests with the same group never run
178    /// concurrently with each other, even if workers are idle. Used for
179    /// shared fixtures.
180    serial_group: Option<String>,
181    /// Number of workers this test reserves while running. Capped at the
182    /// pool size during discovery so heavy tests still get scheduled.
183    weight: usize,
184}
185
186fn canonicalize_existing_path(path: &Path) -> PathBuf {
187    path.canonicalize().unwrap_or_else(|_| path.to_path_buf())
188}
189
190fn test_execution_cwd() -> PathBuf {
191    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
192}
193
194fn emit_progress(progress: &Option<TestRunProgress>, event: TestRunEvent) {
195    if let Some(callback) = progress {
196        callback(event);
197    }
198}
199
200fn should_warn_large_sequential_suite(total_tests: usize, total_files: usize) -> bool {
201    total_tests >= LARGE_SEQUENTIAL_TEST_THRESHOLD || total_files >= LARGE_SEQUENTIAL_FILE_THRESHOLD
202}
203
204/// Discover and run tests in a file or directory.
205pub async fn run_tests(
206    path: &Path,
207    filter: Option<&str>,
208    timeout_ms: u64,
209    parallel: bool,
210    cli_skill_dirs: &[PathBuf],
211) -> TestSummary {
212    let options = RunOptions {
213        filter: filter.map(str::to_owned),
214        timeout_ms,
215        parallel,
216        jobs: None,
217        cli_skill_dirs: cli_skill_dirs.to_vec(),
218        progress: None,
219        diagnose: diagnose_enabled_via_env(),
220    };
221    run_tests_with_options(path, &options).await
222}
223
224/// Backwards-compatible progress-emitting entry point.
225pub async fn run_tests_with_progress(
226    path: &Path,
227    filter: Option<&str>,
228    timeout_ms: u64,
229    parallel: bool,
230    cli_skill_dirs: &[PathBuf],
231    progress: Option<TestRunProgress>,
232) -> TestSummary {
233    let options = RunOptions {
234        filter: filter.map(str::to_owned),
235        timeout_ms,
236        parallel,
237        jobs: None,
238        cli_skill_dirs: cli_skill_dirs.to_vec(),
239        progress,
240        diagnose: diagnose_enabled_via_env(),
241    };
242    run_tests_with_options(path, &options).await
243}
244
245fn diagnose_enabled_via_env() -> bool {
246    let Ok(raw) = std::env::var("HARN_TEST_DIAGNOSE") else {
247        return false;
248    };
249    matches!(
250        raw.to_ascii_lowercase().as_str(),
251        "1" | "true" | "yes" | "on"
252    )
253}
254
255/// Run tests with full control over scheduling, worker count, and
256/// progress reporting. Workers and scheduling mode are reported via
257/// `TestRunEvent::SuiteDiscovered` so consumers can render their own
258/// banner instead of the runner printing to stdout directly.
259pub async fn run_tests_with_options(path: &Path, options: &RunOptions) -> TestSummary {
260    // Default LLM provider to "mock" in test mode unless caller overrides.
261    let _default_llm_provider = ScopedEnvVar::set_if_unset("HARN_LLM_PROVIDER", "mock");
262    let _disable_llm_calls = ScopedEnvVar::set(harn_vm::llm::LLM_CALLS_DISABLED_ENV, "1");
263
264    let start = Instant::now();
265
266    let collection_start = Instant::now();
267    let canonical_target = canonicalize_existing_path(path);
268    let files = if canonical_target.is_dir() {
269        discover_test_files(&canonical_target)
270    } else {
271        vec![canonical_target.clone()]
272    };
273
274    let workers = resolve_workers(options);
275    let timings_path = timings_cache_path(&canonical_target);
276    let timings = timings_path
277        .as_deref()
278        .map(load_timings_cache)
279        .unwrap_or_default();
280
281    let discovery = discover_test_cases(&files, options.filter.as_deref(), workers);
282    let collection_ms = collection_start.elapsed().as_millis() as u64;
283
284    emit_progress(
285        &options.progress,
286        TestRunEvent::SuiteDiscovered {
287            total_tests: discovery.cases.len(),
288            total_files: discovery.files_with_tests,
289            parallel: options.parallel,
290            workers,
291        },
292    );
293    if workers == 1
294        && should_warn_large_sequential_suite(discovery.cases.len(), discovery.files_with_tests)
295    {
296        emit_progress(
297            &options.progress,
298            TestRunEvent::LargeSequentialSuite {
299                total_tests: discovery.cases.len(),
300                total_files: discovery.files_with_tests,
301            },
302        );
303    }
304
305    let mut cases = discovery.cases;
306    sort_cases_longest_first(&mut cases, &timings);
307
308    let mut all_results = discovery.discovery_errors;
309    let total_tests = cases.len();
310    all_results.extend(execute_cases(cases, workers, options, total_tests).await);
311
312    let total = all_results.len();
313    let passed = all_results.iter().filter(|r| r.passed).count();
314    let failed = total - passed;
315    let aggregate = AggregateTimings::from_results(collection_ms, &all_results);
316
317    if let Some(path) = timings_path.as_deref() {
318        update_timings_cache(path, timings, &all_results);
319    }
320
321    TestSummary {
322        results: all_results,
323        passed,
324        failed,
325        total,
326        duration_ms: start.elapsed().as_millis() as u64,
327        aggregate,
328    }
329}
330
331/// Backwards-compatible single-file API used by `harn dev`.
332///
333/// Runs every test in one file on the current thread. The new scheduler
334/// uses per-test worker threads, but `harn dev` re-runs a single module
335/// in the foreground after each rebuild — the queueing machinery would
336/// add latency without parallelism to gain back, so we keep this path
337/// minimal.
338pub async fn run_test_file(
339    path: &Path,
340    filter: Option<&str>,
341    timeout_ms: u64,
342    execution_cwd: Option<&Path>,
343    cli_skill_dirs: &[PathBuf],
344) -> Result<Vec<TestResult>, String> {
345    let source =
346        fs::read_to_string(path).map_err(|e| format!("Failed to read {}: {e}", path.display()))?;
347    let program = parse_program(&source)?;
348    let source = Arc::new(source);
349    let program = Arc::new(program);
350
351    let cases = extract_cases_from_program(path, &source, &program, filter, usize::MAX);
352
353    let mut results = Vec::with_capacity(cases.len());
354    let execution_cwd = execution_cwd
355        .map(Path::to_path_buf)
356        .unwrap_or_else(test_execution_cwd);
357    for case in cases {
358        results.push(execute_case(&case, &execution_cwd, timeout_ms, cli_skill_dirs).await);
359    }
360    Ok(results)
361}
362
363fn resolve_workers(options: &RunOptions) -> usize {
364    if !options.parallel {
365        return 1;
366    }
367    if let Some(jobs) = options.jobs {
368        return jobs.max(1);
369    }
370    if let Ok(raw) = std::env::var(HARN_TEST_JOBS_ENV) {
371        if let Ok(parsed) = raw.trim().parse::<usize>() {
372            if parsed >= 1 {
373                return parsed;
374            }
375        }
376    }
377    let detected = thread::available_parallelism()
378        .map(|n| n.get())
379        .unwrap_or(1);
380    detected.clamp(1, DEFAULT_PARALLEL_JOBS_CAP)
381}
382
383struct Discovery {
384    cases: Vec<TestCase>,
385    files_with_tests: usize,
386    discovery_errors: Vec<TestResult>,
387}
388
389fn discover_test_cases(files: &[PathBuf], filter: Option<&str>, workers: usize) -> Discovery {
390    let mut cases = Vec::new();
391    let mut files_with_tests = 0usize;
392    let mut discovery_errors = Vec::new();
393
394    for file in files {
395        let source = match fs::read_to_string(file) {
396            Ok(s) => s,
397            Err(e) => {
398                discovery_errors.push(TestResult {
399                    name: "<file error>".to_string(),
400                    file: file.display().to_string(),
401                    passed: false,
402                    error: Some(format!("Failed to read {}: {e}", file.display())),
403                    duration_ms: 0,
404                    phases: PhaseTimings::default(),
405                });
406                continue;
407            }
408        };
409
410        let program = match parse_program(&source) {
411            Ok(p) => p,
412            Err(e) => {
413                discovery_errors.push(TestResult {
414                    name: "<file error>".to_string(),
415                    file: file.display().to_string(),
416                    passed: false,
417                    error: Some(e),
418                    duration_ms: 0,
419                    phases: PhaseTimings::default(),
420                });
421                continue;
422            }
423        };
424
425        let source = Arc::new(source);
426        let program = Arc::new(program);
427        let file_cases = extract_cases_from_program(file, &source, &program, filter, workers);
428        if !file_cases.is_empty() {
429            files_with_tests += 1;
430            cases.extend(file_cases);
431        }
432    }
433
434    Discovery {
435        cases,
436        files_with_tests,
437        discovery_errors,
438    }
439}
440
441fn parse_program(source: &str) -> Result<Vec<SNode>, String> {
442    let mut lexer = Lexer::new(source);
443    let tokens = lexer.tokenize().map_err(|e| format!("{e}"))?;
444    let mut parser = Parser::new(tokens);
445    parser.parse().map_err(|e| format!("{e}"))
446}
447
448fn extract_cases_from_program(
449    file: &Path,
450    source: &Arc<String>,
451    program: &Arc<Vec<SNode>>,
452    filter: Option<&str>,
453    workers: usize,
454) -> Vec<TestCase> {
455    let mut cases = Vec::new();
456    for snode in program.iter() {
457        let Some(meta) = inspect_test_pipeline(snode) else {
458            continue;
459        };
460        if let Some(pattern) = filter {
461            if !meta.name.contains(pattern) {
462                continue;
463            }
464        }
465        // Cap heavy weight so a single annotated test never deadlocks
466        // when the pool is smaller than the requested concurrency.
467        let weight = meta.weight.min(workers).max(1);
468        cases.push(TestCase {
469            file: file.to_path_buf(),
470            name: meta.name,
471            source: Arc::clone(source),
472            program: Arc::clone(program),
473            serial_group: meta.serial_group,
474            weight,
475        });
476    }
477    cases
478}
479
480struct PipelineMeta {
481    name: String,
482    serial_group: Option<String>,
483    weight: usize,
484}
485
486fn inspect_test_pipeline(snode: &SNode) -> Option<PipelineMeta> {
487    // Pipelines marked `@test`, or named `test_*`, are user tests. The
488    // companion attributes `@serial` and `@heavy` only tune the scheduler
489    // and never make a non-test pipeline discoverable on their own.
490    let (attributes, inner) = match &snode.node {
491        Node::AttributedDecl { attributes, inner } => (attributes.as_slice(), inner.as_ref()),
492        _ => (&[][..], snode),
493    };
494    let name = match &inner.node {
495        Node::Pipeline { name, .. } => name.clone(),
496        _ => return None,
497    };
498    let has_test_attr = attributes.iter().any(|a| a.name == "test");
499    if !has_test_attr && !name.starts_with("test_") {
500        return None;
501    }
502    let serial_group = attributes
503        .iter()
504        .find(|a| a.name == "serial")
505        .map(serial_group_for);
506    let weight = attributes
507        .iter()
508        .find(|a| a.name == "heavy")
509        .and_then(heavy_weight_for)
510        .unwrap_or(1);
511    Some(PipelineMeta {
512        name,
513        serial_group,
514        weight,
515    })
516}
517
518fn serial_group_for(attr: &Attribute) -> String {
519    attr.string_arg("group")
520        .unwrap_or_else(|| "__default__".to_string())
521}
522
523fn heavy_weight_for(attr: &Attribute) -> Option<usize> {
524    attr.args
525        .iter()
526        .find(|a| a.name.as_deref() == Some("threads"))
527        .and_then(|a| match &a.value.node {
528            Node::IntLiteral(n) if *n >= 1 => Some(*n as usize),
529            _ => None,
530        })
531}
532
533fn sort_cases_longest_first(cases: &mut [TestCase], timings: &BTreeMap<String, u64>) {
534    // Sort ascending so the slowest tests sit at the tail and get popped
535    // first by workers. New (unmeasured) tests share the bottom of the
536    // queue alongside the fastest known ones — they'll appear in stable
537    // file/name order, and once they get their first timing they'll
538    // float up to where they belong.
539    cases.sort_by(|a, b| {
540        let key_a = timings_key(&a.file, &a.name);
541        let key_b = timings_key(&b.file, &b.name);
542        let dur_a = timings.get(&key_a).copied().unwrap_or(0);
543        let dur_b = timings.get(&key_b).copied().unwrap_or(0);
544        dur_a
545            .cmp(&dur_b)
546            .then_with(|| a.file.cmp(&b.file))
547            .then_with(|| a.name.cmp(&b.name))
548    });
549}
550
551fn timings_key(file: &Path, name: &str) -> String {
552    format!("{}::{}", file.display(), name)
553}
554
555fn timings_cache_path(target: &Path) -> Option<PathBuf> {
556    // Anchor the cache at the project root if discoverable, otherwise at
557    // the directory the suite was launched from. The cache is shared
558    // across runs in the same workspace, so a per-suite cache would
559    // fragment timings whenever a user runs a subset.
560    let probe_root = if target.is_dir() {
561        target.to_path_buf()
562    } else {
563        target.parent()?.to_path_buf()
564    };
565    let root = harn_vm::stdlib::process::find_project_root(&probe_root)
566        .unwrap_or_else(|| probe_root.clone());
567    Some(root.join(TIMINGS_CACHE_RELATIVE_PATH))
568}
569
570fn load_timings_cache(path: &Path) -> BTreeMap<String, u64> {
571    let Ok(contents) = fs::read_to_string(path) else {
572        return BTreeMap::new();
573    };
574    serde_json::from_str::<BTreeMap<String, u64>>(&contents).unwrap_or_default()
575}
576
577fn update_timings_cache(path: &Path, mut existing: BTreeMap<String, u64>, results: &[TestResult]) {
578    for result in results {
579        if result.name == "<file error>" || result.name == "<join error>" {
580            continue;
581        }
582        existing.insert(
583            timings_key(Path::new(&result.file), &result.name),
584            result.duration_ms,
585        );
586    }
587    if let Some(parent) = path.parent() {
588        let _ = fs::create_dir_all(parent);
589    }
590    if let Ok(serialized) = serde_json::to_string(&existing) {
591        let _ = fs::write(path, serialized);
592    }
593}
594
595async fn execute_cases(
596    cases: Vec<TestCase>,
597    workers: usize,
598    options: &RunOptions,
599    total_tests: usize,
600) -> Vec<TestResult> {
601    if cases.is_empty() {
602        return Vec::new();
603    }
604    let completed = Arc::new(Mutex::new(0usize));
605    if workers <= 1 {
606        let mut results = Vec::with_capacity(cases.len());
607        for case in cases {
608            let cwd = case_execution_cwd(&case);
609            let test_index = next_test_index(&completed);
610            emit_progress(
611                &options.progress,
612                TestRunEvent::TestStarted {
613                    name: case.name.clone(),
614                    file: case.file.display().to_string(),
615                    test_index,
616                    total_tests,
617                },
618            );
619            let result =
620                execute_case(&case, &cwd, options.timeout_ms, &options.cli_skill_dirs).await;
621            if options.diagnose {
622                result.emit_diagnose();
623            }
624            emit_progress(
625                &options.progress,
626                TestRunEvent::TestFinished(result.clone()),
627            );
628            results.push(result);
629        }
630        return results;
631    }
632
633    let queue = Arc::new(Mutex::new(cases));
634    let gate = Arc::new(ResourceGate::new(workers));
635    let results: Arc<Mutex<Vec<TestResult>>> = Arc::new(Mutex::new(Vec::new()));
636
637    let mut handles = Vec::with_capacity(workers);
638    for worker_idx in 0..workers {
639        let queue = Arc::clone(&queue);
640        let gate = Arc::clone(&gate);
641        let results = Arc::clone(&results);
642        let completed = Arc::clone(&completed);
643        let timeout_ms = options.timeout_ms;
644        let cli_skill_dirs = options.cli_skill_dirs.clone();
645        let progress = options.progress.clone();
646        let diagnose = options.diagnose;
647        let handle = thread::Builder::new()
648            .name(format!("harn-test-worker-{worker_idx}"))
649            .spawn(move || {
650                let runtime = match tokio::runtime::Builder::new_current_thread()
651                    .enable_all()
652                    .build()
653                {
654                    Ok(rt) => rt,
655                    Err(error) => {
656                        results.lock().unwrap().push(TestResult {
657                            name: "<worker error>".to_string(),
658                            file: String::new(),
659                            passed: false,
660                            error: Some(format!("failed to start test runtime: {error}")),
661                            duration_ms: 0,
662                            phases: PhaseTimings::default(),
663                        });
664                        return;
665                    }
666                };
667                // Cases are sorted ascending by historical duration; popping
668                // from the tail gives this worker the slowest unclaimed
669                // test, which front-loads long poles and prevents workers
670                // from stranding on quick tests at the end of the run.
671                while let Some(case) = queue.lock().unwrap().pop() {
672                    let _guard = gate.acquire(case.weight, case.serial_group.as_deref());
673                    let cwd = case_execution_cwd(&case);
674                    let test_index = next_test_index(&completed);
675                    emit_progress(
676                        &progress,
677                        TestRunEvent::TestStarted {
678                            name: case.name.clone(),
679                            file: case.file.display().to_string(),
680                            test_index,
681                            total_tests,
682                        },
683                    );
684                    let result =
685                        runtime.block_on(execute_case(&case, &cwd, timeout_ms, &cli_skill_dirs));
686                    if diagnose {
687                        result.emit_diagnose();
688                    }
689                    emit_progress(&progress, TestRunEvent::TestFinished(result.clone()));
690                    results.lock().unwrap().push(result);
691                }
692            })
693            .expect("spawning a harn-test worker thread should succeed");
694        handles.push(handle);
695    }
696
697    for handle in handles {
698        let _ = handle.join();
699    }
700
701    // All workers have joined, so this Arc holds the only remaining
702    // reference. The lock-and-clone fallback survives the unlikely case
703    // where a panic-unwind kept an extra reference alive.
704    Arc::try_unwrap(results)
705        .map(|m| m.into_inner().unwrap_or_default())
706        .unwrap_or_else(|arc| arc.lock().unwrap().clone())
707}
708
709fn next_test_index(counter: &Mutex<usize>) -> usize {
710    let mut guard = counter.lock().unwrap();
711    *guard += 1;
712    *guard
713}
714
715fn case_execution_cwd(case: &TestCase) -> PathBuf {
716    case.file
717        .parent()
718        .filter(|p| !p.as_os_str().is_empty())
719        .map(Path::to_path_buf)
720        .unwrap_or_else(test_execution_cwd)
721}
722
723/// Coordinates worker permits and serial-group exclusivity without
724/// requiring an async lock — workers are dedicated OS threads, so a
725/// classic Mutex+Condvar gate keeps everything off the tokio scheduler.
726struct ResourceGate {
727    state: Mutex<GateState>,
728    cond: Condvar,
729    capacity: usize,
730}
731
732struct GateState {
733    available: usize,
734    busy_groups: HashSet<String>,
735}
736
737struct GateGuard<'a> {
738    gate: &'a ResourceGate,
739    weight: usize,
740    group: Option<String>,
741}
742
743impl ResourceGate {
744    fn new(capacity: usize) -> Self {
745        Self {
746            state: Mutex::new(GateState {
747                available: capacity,
748                busy_groups: HashSet::new(),
749            }),
750            cond: Condvar::new(),
751            capacity,
752        }
753    }
754
755    fn acquire(&self, weight: usize, group: Option<&str>) -> GateGuard<'_> {
756        let weight = weight.min(self.capacity).max(1);
757        let mut state = self.state.lock().unwrap();
758        loop {
759            let group_free = group.is_none_or(|g| !state.busy_groups.contains(g));
760            if state.available >= weight && group_free {
761                state.available -= weight;
762                if let Some(g) = group {
763                    state.busy_groups.insert(g.to_string());
764                }
765                return GateGuard {
766                    gate: self,
767                    weight,
768                    group: group.map(str::to_owned),
769                };
770            }
771            state = self.cond.wait(state).unwrap();
772        }
773    }
774}
775
776impl Drop for GateGuard<'_> {
777    fn drop(&mut self) {
778        let mut state = self.gate.state.lock().unwrap();
779        state.available += self.weight;
780        if let Some(group) = self.group.as_deref() {
781            state.busy_groups.remove(group);
782        }
783        self.gate.cond.notify_all();
784    }
785}
786
787async fn execute_case(
788    case: &TestCase,
789    execution_cwd: &Path,
790    timeout_ms: u64,
791    cli_skill_dirs: &[PathBuf],
792) -> TestResult {
793    harn_vm::reset_thread_local_state();
794
795    let mut phases = PhaseTimings::default();
796    let total_start = Instant::now();
797
798    let compile_start = Instant::now();
799    let chunk = match harn_vm::Compiler::new().compile_named(&case.program, &case.name) {
800        Ok(c) => c,
801        Err(e) => {
802            phases.compile_ms = compile_start.elapsed().as_millis() as u64;
803            return TestResult {
804                name: case.name.clone(),
805                file: case.file.display().to_string(),
806                passed: false,
807                error: Some(format!("Compile error: {e}")),
808                duration_ms: total_start.elapsed().as_millis() as u64,
809                phases,
810            };
811        }
812    };
813    phases.compile_ms = compile_start.elapsed().as_millis() as u64;
814
815    let local = tokio::task::LocalSet::new();
816    let timeout = std::time::Duration::from_millis(timeout_ms);
817    let file_display = case.file.display().to_string();
818    let setup_start = Instant::now();
819    let result = tokio::time::timeout(
820        timeout,
821        local.run_until(async {
822            let mut vm = harn_vm::Vm::new();
823            harn_vm::register_vm_stdlib(&mut vm);
824            crate::install_default_hostlib(&mut vm);
825            let source_parent = case.file.parent().unwrap_or(Path::new("."));
826            let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
827            let store_base = project_root.as_deref().unwrap_or(source_parent);
828            let source_dir = source_parent.to_string_lossy().into_owned();
829            harn_vm::register_store_builtins(&mut vm, store_base);
830            harn_vm::register_metadata_builtins(&mut vm, store_base);
831            let pipeline_name = case
832                .file
833                .file_stem()
834                .and_then(|s| s.to_str())
835                .unwrap_or("test");
836            harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
837            vm.set_source_info(&file_display, &case.source);
838            harn_vm::stdlib::process::set_thread_execution_context(Some(
839                harn_vm::orchestration::RunExecutionRecord {
840                    cwd: Some(execution_cwd.to_string_lossy().into_owned()),
841                    source_dir: Some(source_dir),
842                    env: BTreeMap::new(),
843                    adapter: None,
844                    repo_path: None,
845                    worktree_path: None,
846                    branch: None,
847                    base_ref: None,
848                    cleanup: None,
849                },
850            ));
851            if let Some(ref root) = project_root {
852                vm.set_project_root(root);
853            }
854            if let Some(parent) = case.file.parent() {
855                if !parent.as_os_str().is_empty() {
856                    vm.set_source_dir(parent);
857                }
858            }
859            let loaded =
860                crate::skill_loader::load_skills(&crate::skill_loader::SkillLoaderInputs {
861                    cli_dirs: cli_skill_dirs.to_vec(),
862                    source_path: Some(case.file.clone()),
863                });
864            crate::skill_loader::emit_loader_warnings(&loaded.loader_warnings);
865            crate::skill_loader::install_skills_global(&mut vm, &loaded);
866            let extensions = crate::package::load_runtime_extensions(&case.file);
867            crate::package::install_runtime_extensions(&extensions);
868            crate::package::install_manifest_triggers(&mut vm, &extensions)
869                .await
870                .map_err(|error| format!("failed to install manifest triggers: {error}"))?;
871            crate::package::install_manifest_hooks(&mut vm, &extensions)
872                .await
873                .map_err(|error| format!("failed to install manifest hooks: {error}"))?;
874            vm.set_harness(harn_vm::Harness::real());
875            let setup_ms = setup_start.elapsed().as_millis() as u64;
876            let exec_start = Instant::now();
877            let outcome = match vm.execute(&chunk).await {
878                Ok(val) => Ok(val),
879                Err(e) => Err(vm.format_runtime_error(&e)),
880            };
881            let execute_ms = exec_start.elapsed().as_millis() as u64;
882            harn_vm::egress::reset_egress_policy_for_host();
883            Ok::<_, String>((outcome, setup_ms, execute_ms))
884        }),
885    )
886    .await;
887
888    let teardown_start = Instant::now();
889    // Clear thread-locals so the next case scheduled onto this worker
890    // sees a clean slate. Wall clock for this work lands in the
891    // teardown bucket so the phase breakdown sums to wall time.
892    harn_vm::reset_thread_local_state();
893    phases.teardown_ms = teardown_start.elapsed().as_millis() as u64;
894
895    let elapsed_ms = total_start.elapsed().as_millis() as u64;
896    let (passed, error, duration_ms) = match result {
897        Ok(Ok((outcome, setup_ms, execute_ms))) => {
898            phases.setup_ms = setup_ms;
899            phases.execute_ms = execute_ms;
900            match outcome {
901                Ok(_) => (true, None, elapsed_ms),
902                Err(message) => (false, Some(message), elapsed_ms),
903            }
904        }
905        Ok(Err(setup_error)) => (false, Some(setup_error), elapsed_ms),
906        // Report `timeout_ms` rather than `elapsed_ms` for timeouts so a
907        // suite-wide aggregate still reflects the configured budget that
908        // was hit, not the slightly-earlier moment we tore down at.
909        Err(_) => (
910            false,
911            Some(format!("timed out after {timeout_ms}ms")),
912            timeout_ms,
913        ),
914    };
915
916    TestResult {
917        name: case.name.clone(),
918        file: file_display,
919        passed,
920        error,
921        duration_ms,
922        phases,
923    }
924}
925
926fn discover_test_files(dir: &Path) -> Vec<PathBuf> {
927    let mut files = Vec::new();
928    if let Ok(entries) = fs::read_dir(dir) {
929        for entry in entries.flatten() {
930            let path = entry.path();
931            if path.is_dir() {
932                files.extend(discover_test_files(&path));
933            } else if path.extension().is_some_and(|e| e == "harn") {
934                if let Ok(content) = fs::read_to_string(&path) {
935                    if content.contains("test_") || content.contains("@test") {
936                        files.push(canonicalize_existing_path(&path));
937                    }
938                }
939            }
940        }
941    }
942    files.sort();
943    files
944}
945
946#[cfg(test)]
947mod tests {
948    use super::*;
949
950    use std::sync::Arc;
951    use std::time::Duration;
952
953    struct TempTestDir {
954        inner: tempfile::TempDir,
955    }
956
957    impl TempTestDir {
958        fn new() -> Self {
959            Self {
960                inner: tempfile::tempdir().unwrap(),
961            }
962        }
963
964        fn write(&self, relative: &str, contents: &str) {
965            let path = self.path().join(relative);
966            if let Some(parent) = path.parent() {
967                fs::create_dir_all(parent).unwrap();
968            }
969            fs::write(path, contents).unwrap();
970        }
971
972        fn path(&self) -> &Path {
973            self.inner.path()
974        }
975    }
976
977    #[test]
978    fn discover_test_files_returns_canonical_absolute_paths() {
979        let temp = TempTestDir::new();
980        temp.write("suite/test_alpha.harn", "pipeline test_alpha(task) {}");
981        temp.write("suite/nested/test_beta.harn", "pipeline test_beta(task) {}");
982        temp.write("suite/annotated.harn", "@test\npipeline annotated(task) {}");
983        temp.write("suite/ignore.harn", "pipeline build(task) {}");
984
985        let files = discover_test_files(&temp.path().join("suite"));
986
987        assert_eq!(files.len(), 3);
988        assert!(files.iter().all(|path| path.is_absolute()));
989        assert!(files
990            .iter()
991            .any(|path| path.ends_with("suite/test_alpha.harn")));
992        assert!(files
993            .iter()
994            .any(|path| path.ends_with("suite/nested/test_beta.harn")));
995        assert!(files
996            .iter()
997            .any(|path| path.ends_with("suite/annotated.harn")));
998    }
999
1000    #[tokio::test]
1001    async fn run_tests_uses_file_parent_as_execution_cwd_and_restores_shell_cwd() {
1002        let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1003        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1004        let temp = TempTestDir::new();
1005        temp.write(
1006            "suite/test_cwd.harn",
1007            r#"
1008pipeline test_current_dir(task) {
1009  assert_eq(cwd(), source_dir())
1010}
1011"#,
1012        );
1013
1014        let original_cwd = std::env::current_dir().unwrap();
1015        let summary = run_tests(&temp.path().join("suite"), None, 1_000, false, &[]).await;
1016        let restored_cwd = std::env::current_dir().unwrap();
1017
1018        assert_eq!(summary.failed, 0);
1019        assert_eq!(summary.passed, 1);
1020        assert_eq!(
1021            fs::canonicalize(restored_cwd).unwrap(),
1022            fs::canonicalize(original_cwd).unwrap()
1023        );
1024    }
1025
1026    #[tokio::test]
1027    async fn parallel_run_tests_uses_each_file_parent_as_execution_cwd() {
1028        let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1029        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1030        let temp = TempTestDir::new();
1031        temp.write(
1032            "suite/a/test_one.harn",
1033            r#"
1034pipeline test_one(task) {
1035  assert_eq(cwd(), source_dir())
1036}
1037"#,
1038        );
1039        temp.write(
1040            "suite/b/test_two.harn",
1041            r#"
1042pipeline test_two(task) {
1043  assert_eq(cwd(), source_dir())
1044}
1045"#,
1046        );
1047
1048        let summary = run_tests(&temp.path().join("suite"), None, 1_000, true, &[]).await;
1049        assert_eq!(summary.failed, 0);
1050        assert_eq!(summary.passed, 2);
1051    }
1052
1053    #[tokio::test]
1054    async fn run_tests_loads_cli_skill_dirs() {
1055        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1056        let temp = TempTestDir::new();
1057        temp.write(
1058            "skills/review/SKILL.md",
1059            r#"---
1060name: review
1061short: Review PRs
1062description: Review pull requests
1063---
1064
1065Review instructions.
1066"#,
1067        );
1068        temp.write(
1069            "suite/test_skills.harn",
1070            r#"
1071pipeline test_cli_skills(task) {
1072  assert_eq(skill_count(skills), 1)
1073  let found = skill_find(skills, "review")
1074  assert_eq(found.name, "review")
1075}
1076"#,
1077        );
1078
1079        let summary = run_tests(
1080            &temp.path().join("suite"),
1081            None,
1082            1_000,
1083            false,
1084            &[temp.path().join("skills")],
1085        )
1086        .await;
1087
1088        assert_eq!(summary.failed, 0, "{:?}", summary.results[0].error);
1089        assert_eq!(summary.passed, 1);
1090    }
1091
1092    #[test]
1093    fn resolve_workers_honors_explicit_jobs() {
1094        let mut opts = RunOptions::new(1_000);
1095        opts.parallel = true;
1096        opts.jobs = Some(3);
1097        assert_eq!(resolve_workers(&opts), 3);
1098    }
1099
1100    #[test]
1101    fn resolve_workers_returns_one_when_not_parallel() {
1102        let mut opts = RunOptions::new(1_000);
1103        opts.parallel = false;
1104        opts.jobs = Some(8);
1105        assert_eq!(resolve_workers(&opts), 1);
1106    }
1107
1108    #[test]
1109    fn sort_cases_longest_first_uses_historical_durations() {
1110        let source = Arc::new(String::new());
1111        let program = Arc::new(Vec::new());
1112        let mk = |name: &str| TestCase {
1113            file: PathBuf::from("tests/a.harn"),
1114            name: name.to_string(),
1115            source: Arc::clone(&source),
1116            program: Arc::clone(&program),
1117            serial_group: None,
1118            weight: 1,
1119        };
1120        let mut cases = vec![mk("test_quick"), mk("test_slow"), mk("test_medium")];
1121        let mut timings = BTreeMap::new();
1122        timings.insert("tests/a.harn::test_slow".to_string(), 5_000);
1123        timings.insert("tests/a.harn::test_medium".to_string(), 1_000);
1124
1125        sort_cases_longest_first(&mut cases, &timings);
1126
1127        // Slowest tests live at the tail so workers pop them first.
1128        let order: Vec<&str> = cases.iter().map(|c| c.name.as_str()).collect();
1129        assert_eq!(order, vec!["test_quick", "test_medium", "test_slow"]);
1130    }
1131
1132    #[test]
1133    fn resource_gate_serializes_same_group() {
1134        let gate = Arc::new(ResourceGate::new(4));
1135        let trace = Arc::new(Mutex::new(Vec::<&'static str>::new()));
1136
1137        let g_a = {
1138            let trace = Arc::clone(&trace);
1139            let gate = Arc::clone(&gate);
1140            thread::spawn(move || {
1141                let _guard = gate.acquire(1, Some("login"));
1142                trace.lock().unwrap().push("a-start");
1143                thread::sleep(Duration::from_millis(50));
1144                trace.lock().unwrap().push("a-end");
1145            })
1146        };
1147        // Give thread A time to grab the lock first.
1148        thread::sleep(Duration::from_millis(10));
1149        let g_b = {
1150            let trace = Arc::clone(&trace);
1151            let gate = Arc::clone(&gate);
1152            thread::spawn(move || {
1153                let _guard = gate.acquire(1, Some("login"));
1154                trace.lock().unwrap().push("b-start");
1155                trace.lock().unwrap().push("b-end");
1156            })
1157        };
1158        g_a.join().unwrap();
1159        g_b.join().unwrap();
1160
1161        let trace = trace.lock().unwrap();
1162        // B must not start until A ends.
1163        let a_end = trace.iter().position(|t| *t == "a-end").unwrap();
1164        let b_start = trace.iter().position(|t| *t == "b-start").unwrap();
1165        assert!(a_end < b_start, "B started before A finished: {trace:?}");
1166    }
1167
1168    #[test]
1169    fn resource_gate_allows_independent_groups_in_parallel() {
1170        let gate = Arc::new(ResourceGate::new(4));
1171        let _guard_a = gate.acquire(1, Some("alpha"));
1172        // Acquiring beta should not block — the other group is unrelated.
1173        let acquired = std::sync::Mutex::new(false);
1174        thread::scope(|s| {
1175            s.spawn(|| {
1176                let _ = gate.acquire(1, Some("beta"));
1177                *acquired.lock().unwrap() = true;
1178            });
1179        });
1180        assert!(*acquired.lock().unwrap());
1181    }
1182
1183    #[test]
1184    fn resource_gate_caps_heavy_weight_at_capacity() {
1185        // A test that asks for more than the pool size must still be
1186        // schedulable rather than deadlocking.
1187        let gate = Arc::new(ResourceGate::new(2));
1188        let _g = gate.acquire(99, None);
1189        // Available is now 0; another single-weight task must still wait.
1190        let started = Arc::new(Mutex::new(false));
1191        let inner = Arc::clone(&started);
1192        let gate2 = Arc::clone(&gate);
1193        let handle = thread::spawn(move || {
1194            let _guard = gate2.acquire(1, None);
1195            *inner.lock().unwrap() = true;
1196        });
1197        thread::sleep(Duration::from_millis(20));
1198        assert!(!*started.lock().unwrap(), "should still be waiting");
1199        drop(_g);
1200        handle.join().unwrap();
1201        assert!(*started.lock().unwrap());
1202    }
1203
1204    #[tokio::test]
1205    async fn parallel_scheduler_runs_heavy_tests_without_oversubscribing() {
1206        // Heavy(2) should never run concurrently with another test when
1207        // the pool only has two workers — there are no spare permits.
1208        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1209        let temp = TempTestDir::new();
1210        temp.write(
1211            "suite/test_heavy.harn",
1212            r#"
1213@test
1214@heavy(threads: 2)
1215pipeline test_heavy_one(task) {}
1216
1217@test
1218pipeline test_light(task) {}
1219"#,
1220        );
1221
1222        let opts = RunOptions {
1223            parallel: true,
1224            jobs: Some(2),
1225            ..RunOptions::new(5_000)
1226        };
1227        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1228        assert_eq!(summary.failed, 0, "{:?}", summary.results);
1229        assert_eq!(summary.total, 2);
1230    }
1231
1232    #[tokio::test]
1233    async fn parallel_scheduler_handles_serial_group_annotation() {
1234        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1235        let temp = TempTestDir::new();
1236        temp.write(
1237            "suite/test_serial.harn",
1238            r#"
1239@test
1240@serial(group: "fixture")
1241pipeline test_serial_one(task) {}
1242
1243@test
1244@serial(group: "fixture")
1245pipeline test_serial_two(task) {}
1246"#,
1247        );
1248
1249        let opts = RunOptions {
1250            parallel: true,
1251            jobs: Some(4),
1252            ..RunOptions::new(5_000)
1253        };
1254        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1255        assert_eq!(summary.failed, 0, "{:?}", summary.results);
1256        assert_eq!(summary.passed, 2);
1257    }
1258
1259    #[tokio::test]
1260    async fn parallel_scheduler_persists_timings_cache() {
1261        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1262        let temp = TempTestDir::new();
1263        temp.write(
1264            "suite/test_timed.harn",
1265            r#"
1266@test
1267pipeline test_first(task) {}
1268
1269@test
1270pipeline test_second(task) {}
1271"#,
1272        );
1273
1274        let opts = RunOptions {
1275            parallel: true,
1276            jobs: Some(2),
1277            ..RunOptions::new(5_000)
1278        };
1279        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1280        assert_eq!(summary.passed, 2);
1281        let cache = temp.path().join("suite/.harn/test-timings.json");
1282        assert!(cache.exists(), "expected timings cache at {cache:?}");
1283        let stored: BTreeMap<String, u64> =
1284            serde_json::from_str(&fs::read_to_string(&cache).unwrap()).unwrap();
1285        assert!(
1286            stored.keys().any(|key| key.contains("test_first")),
1287            "expected timings for test_first in {stored:?}"
1288        );
1289        assert!(
1290            stored.keys().any(|key| key.contains("test_second")),
1291            "expected timings for test_second in {stored:?}"
1292        );
1293    }
1294
1295    /// Regression fixture: a worker thread that runs multiple cases must
1296    /// reset thread-local state between them. Test A pins the clock
1297    /// mock to a future timestamp; test B asserts the clock is fresh.
1298    /// Fails if the per-case `reset_thread_local_state()` in
1299    /// `execute_case` ever regresses. Pins workers to 1 so both tests
1300    /// land on the same scheduler thread.
1301    #[tokio::test]
1302    async fn worker_resets_thread_local_state_between_cases() {
1303        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1304        let temp = TempTestDir::new();
1305        temp.write(
1306            "suite/test_isolation.harn",
1307            r#"
1308// The leak probe pins the clock to a future-but-i64-safe value
1309// (year ~2128) so a leaked mock is observable. Larger values overflow
1310// the nanosecond conversion inside the mock clock.
1311pipeline test_a_pins_clock(task) {
1312  mock_time(5000000000000)
1313  assert_eq(now_ms(), 5000000000000)
1314}
1315
1316pipeline test_b_clock_is_fresh(task) {
1317  let ms = now_ms()
1318  assert(ms < 5000000000000, "clock mock leaked from previous test")
1319}
1320"#,
1321        );
1322
1323        let opts = RunOptions::new(5_000);
1324        let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1325        assert_eq!(
1326            summary.failed,
1327            0,
1328            "state leaked between tests: {:?}",
1329            summary
1330                .results
1331                .iter()
1332                .filter(|r| !r.passed)
1333                .map(|r| (r.name.clone(), r.error.clone()))
1334                .collect::<Vec<_>>()
1335        );
1336        assert_eq!(summary.passed, 2);
1337    }
1338
1339    #[tokio::test]
1340    async fn summary_aggregate_timings_sum_phases_across_results() {
1341        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1342        let temp = TempTestDir::new();
1343        temp.write(
1344            "suite/test_phases.harn",
1345            r#"
1346pipeline test_one(task) { assert_eq(1, 1) }
1347pipeline test_two(task) { assert_eq(2, 2) }
1348"#,
1349        );
1350
1351        let summary = run_tests(&temp.path().join("suite"), None, 5_000, false, &[]).await;
1352        assert_eq!(summary.passed, 2);
1353        let per_test_sum: u64 = summary
1354            .results
1355            .iter()
1356            .map(|r| r.phases.setup_ms.saturating_add(r.phases.compile_ms))
1357            .sum();
1358        let agg_sum = summary
1359            .aggregate
1360            .setup_ms
1361            .saturating_add(summary.aggregate.compile_ms);
1362        assert_eq!(
1363            per_test_sum, agg_sum,
1364            "aggregate setup+compile must equal sum of per-test setup+compile"
1365        );
1366    }
1367
1368    #[tokio::test]
1369    async fn parallel_scheduler_emits_progress_events() {
1370        let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1371        let temp = TempTestDir::new();
1372        temp.write(
1373            "suite/test_events.harn",
1374            r#"
1375@test
1376pipeline test_a(task) {}
1377
1378@test
1379pipeline test_b(task) {}
1380"#,
1381        );
1382
1383        let events: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
1384        let events_for_progress = Arc::clone(&events);
1385        let progress: TestRunProgress = Arc::new(move |event| {
1386            events_for_progress.lock().unwrap().push(match event {
1387                TestRunEvent::SuiteDiscovered { .. } => "suite",
1388                TestRunEvent::LargeSequentialSuite { .. } => "large-seq",
1389                TestRunEvent::TestStarted { .. } => "started",
1390                TestRunEvent::TestFinished(_) => "finished",
1391            });
1392        });
1393        let opts = RunOptions {
1394            parallel: true,
1395            jobs: Some(2),
1396            progress: Some(progress),
1397            ..RunOptions::new(5_000)
1398        };
1399        let _ = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1400        let events = events.lock().unwrap();
1401        assert_eq!(events.first().copied(), Some("suite"));
1402        assert_eq!(events.iter().filter(|e| **e == "started").count(), 2);
1403        assert_eq!(events.iter().filter(|e| **e == "finished").count(), 2);
1404    }
1405}