1use std::collections::{BTreeMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::{Arc, Condvar, Mutex};
5use std::thread;
6use std::time::Instant;
7
8use harn_lexer::Lexer;
9use harn_parser::{Attribute, Node, Parser, SNode};
10
11use crate::env_guard::ScopedEnvVar;
12
13#[cfg(feature = "hostlib")]
21fn reset_hostlib_state() {
22 harn_hostlib::fs_snapshot::reset_all_sessions();
23}
24
25#[cfg(not(feature = "hostlib"))]
26fn reset_hostlib_state() {}
27
28#[derive(Clone, Debug)]
29pub struct TestResult {
30 pub name: String,
31 pub file: String,
32 pub passed: bool,
33 pub error: Option<String>,
34 pub duration_ms: u64,
35 pub phases: PhaseTimings,
39}
40
41#[derive(Clone, Debug)]
42pub struct TestSummary {
43 pub results: Vec<TestResult>,
44 pub passed: usize,
45 pub failed: usize,
46 pub total: usize,
47 pub duration_ms: u64,
48 pub aggregate: AggregateTimings,
50}
51
52#[derive(Debug, Default, Clone, Copy)]
58pub struct PhaseTimings {
59 pub setup_ms: u64,
62 pub compile_ms: u64,
64 pub execute_ms: u64,
66 pub teardown_ms: u64,
68}
69
70#[derive(Debug, Default, Clone, Copy)]
73pub struct AggregateTimings {
74 pub collection_ms: u64,
75 pub setup_ms: u64,
76 pub compile_ms: u64,
77 pub execute_ms: u64,
78 pub teardown_ms: u64,
79}
80
81impl AggregateTimings {
82 fn from_results(collection_ms: u64, results: &[TestResult]) -> Self {
83 results.iter().map(|r| r.phases).fold(
84 Self {
85 collection_ms,
86 ..Self::default()
87 },
88 |acc, p| Self {
89 collection_ms: acc.collection_ms,
90 setup_ms: acc.setup_ms.saturating_add(p.setup_ms),
91 compile_ms: acc.compile_ms.saturating_add(p.compile_ms),
92 execute_ms: acc.execute_ms.saturating_add(p.execute_ms),
93 teardown_ms: acc.teardown_ms.saturating_add(p.teardown_ms),
94 },
95 )
96 }
97}
98
99impl TestResult {
100 fn emit_diagnose(&self) {
104 let outcome = if self.passed { "ok" } else { "FAIL" };
105 eprintln!(
106 "[harn test diag] {} {} setup={}ms compile={}ms execute={}ms teardown={}ms total={}ms",
107 outcome,
108 self.name,
109 self.phases.setup_ms,
110 self.phases.compile_ms,
111 self.phases.execute_ms,
112 self.phases.teardown_ms,
113 self.duration_ms,
114 );
115 }
116}
117
118#[derive(Clone, Debug)]
119pub enum TestRunEvent {
120 SuiteDiscovered {
121 total_tests: usize,
122 total_files: usize,
123 parallel: bool,
124 workers: usize,
125 },
126 LargeSequentialSuite {
127 total_tests: usize,
128 total_files: usize,
129 },
130 TestStarted {
131 name: String,
132 file: String,
133 test_index: usize,
134 total_tests: usize,
135 },
136 TestFinished(TestResult),
137}
138
139pub type TestRunProgress = Arc<dyn Fn(TestRunEvent) + Send + Sync>;
140
141const LARGE_SEQUENTIAL_TEST_THRESHOLD: usize = 50;
142const LARGE_SEQUENTIAL_FILE_THRESHOLD: usize = 10;
143const DEFAULT_PARALLEL_JOBS_CAP: usize = 8;
144const TIMINGS_CACHE_RELATIVE_PATH: &str = ".harn/test-timings.json";
145const HARN_TEST_JOBS_ENV: &str = "HARN_TEST_JOBS";
146
147const DEFAULT_WORKER_MEMORY_MB: u64 = 1024;
154const HARN_TEST_WORKER_MEMORY_MB_ENV: &str = "HARN_TEST_WORKER_MEMORY_MB";
155
156const RESERVED_SYSTEM_MEMORY_MB: u64 = 1024;
164
165#[derive(Clone, Default)]
171pub struct RunOptions {
172 pub filter: Option<String>,
173 pub timeout_ms: u64,
174 pub parallel: bool,
178 pub jobs: Option<usize>,
182 pub cli_skill_dirs: Vec<PathBuf>,
183 pub progress: Option<TestRunProgress>,
186 pub diagnose: bool,
190}
191
192impl RunOptions {
193 pub fn new(timeout_ms: u64) -> Self {
194 Self {
195 timeout_ms,
196 ..Default::default()
197 }
198 }
199}
200
201#[derive(Clone)]
205struct TestCase {
206 file: PathBuf,
207 name: String,
208 source: Arc<String>,
209 program: Arc<Vec<SNode>>,
210 serial_group: Option<String>,
214 weight: usize,
217}
218
219fn canonicalize_existing_path(path: &Path) -> PathBuf {
220 path.canonicalize().unwrap_or_else(|_| path.to_path_buf())
221}
222
223fn test_execution_cwd() -> PathBuf {
224 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
225}
226
227fn emit_progress(progress: &Option<TestRunProgress>, event: TestRunEvent) {
228 if let Some(callback) = progress {
229 callback(event);
230 }
231}
232
233fn should_warn_large_sequential_suite(total_tests: usize, total_files: usize) -> bool {
234 total_tests >= LARGE_SEQUENTIAL_TEST_THRESHOLD || total_files >= LARGE_SEQUENTIAL_FILE_THRESHOLD
235}
236
237pub async fn run_tests(
239 path: &Path,
240 filter: Option<&str>,
241 timeout_ms: u64,
242 parallel: bool,
243 cli_skill_dirs: &[PathBuf],
244) -> TestSummary {
245 let options = RunOptions {
246 filter: filter.map(str::to_owned),
247 timeout_ms,
248 parallel,
249 jobs: None,
250 cli_skill_dirs: cli_skill_dirs.to_vec(),
251 progress: None,
252 diagnose: diagnose_enabled_via_env(),
253 };
254 run_tests_with_options(path, &options).await
255}
256
257pub async fn run_tests_with_progress(
259 path: &Path,
260 filter: Option<&str>,
261 timeout_ms: u64,
262 parallel: bool,
263 cli_skill_dirs: &[PathBuf],
264 progress: Option<TestRunProgress>,
265) -> TestSummary {
266 let options = RunOptions {
267 filter: filter.map(str::to_owned),
268 timeout_ms,
269 parallel,
270 jobs: None,
271 cli_skill_dirs: cli_skill_dirs.to_vec(),
272 progress,
273 diagnose: diagnose_enabled_via_env(),
274 };
275 run_tests_with_options(path, &options).await
276}
277
278fn diagnose_enabled_via_env() -> bool {
279 let Ok(raw) = std::env::var("HARN_TEST_DIAGNOSE") else {
280 return false;
281 };
282 matches!(
283 raw.to_ascii_lowercase().as_str(),
284 "1" | "true" | "yes" | "on"
285 )
286}
287
288pub async fn run_tests_with_options(path: &Path, options: &RunOptions) -> TestSummary {
293 let _default_llm_provider = ScopedEnvVar::set_if_unset("HARN_LLM_PROVIDER", "mock");
295 let _disable_llm_calls = ScopedEnvVar::set(harn_vm::llm::LLM_CALLS_DISABLED_ENV, "1");
296
297 let start = Instant::now();
298
299 let collection_start = Instant::now();
300 let canonical_target = canonicalize_existing_path(path);
301 let files = if canonical_target.is_dir() {
302 discover_test_files(&canonical_target)
303 } else {
304 vec![canonical_target.clone()]
305 };
306
307 let workers = resolve_workers(options);
308 let timings_path = timings_cache_path(&canonical_target);
309 let timings = timings_path
310 .as_deref()
311 .map(load_timings_cache)
312 .unwrap_or_default();
313
314 let discovery = discover_test_cases(&files, options.filter.as_deref(), workers);
315 let collection_ms = collection_start.elapsed().as_millis() as u64;
316
317 emit_progress(
318 &options.progress,
319 TestRunEvent::SuiteDiscovered {
320 total_tests: discovery.cases.len(),
321 total_files: discovery.files_with_tests,
322 parallel: options.parallel,
323 workers,
324 },
325 );
326 if workers == 1
327 && should_warn_large_sequential_suite(discovery.cases.len(), discovery.files_with_tests)
328 {
329 emit_progress(
330 &options.progress,
331 TestRunEvent::LargeSequentialSuite {
332 total_tests: discovery.cases.len(),
333 total_files: discovery.files_with_tests,
334 },
335 );
336 }
337
338 let mut cases = discovery.cases;
339 sort_cases_longest_first(&mut cases, &timings);
340
341 let mut all_results = discovery.discovery_errors;
342 let total_tests = cases.len();
343 all_results.extend(execute_cases(cases, workers, options, total_tests).await);
344
345 let total = all_results.len();
346 let passed = all_results.iter().filter(|r| r.passed).count();
347 let failed = total - passed;
348 let aggregate = AggregateTimings::from_results(collection_ms, &all_results);
349
350 if let Some(path) = timings_path.as_deref() {
351 update_timings_cache(path, timings, &all_results);
352 }
353
354 TestSummary {
355 results: all_results,
356 passed,
357 failed,
358 total,
359 duration_ms: start.elapsed().as_millis() as u64,
360 aggregate,
361 }
362}
363
364pub async fn run_test_file(
372 path: &Path,
373 filter: Option<&str>,
374 timeout_ms: u64,
375 execution_cwd: Option<&Path>,
376 cli_skill_dirs: &[PathBuf],
377) -> Result<Vec<TestResult>, String> {
378 let source =
379 fs::read_to_string(path).map_err(|e| format!("Failed to read {}: {e}", path.display()))?;
380 let program = parse_program(&source)?;
381 let source = Arc::new(source);
382 let program = Arc::new(program);
383
384 let cases = extract_cases_from_program(path, &source, &program, filter, usize::MAX);
385
386 let mut results = Vec::with_capacity(cases.len());
387 let execution_cwd = execution_cwd
388 .map(Path::to_path_buf)
389 .unwrap_or_else(test_execution_cwd);
390 for case in cases {
391 results.push(execute_case(&case, &execution_cwd, timeout_ms, cli_skill_dirs).await);
392 }
393 Ok(results)
394}
395
396fn resolve_workers(options: &RunOptions) -> usize {
397 if !options.parallel {
398 return 1;
399 }
400 if let Some(jobs) = options.jobs {
401 return jobs.max(1);
402 }
403 if let Ok(raw) = std::env::var(HARN_TEST_JOBS_ENV) {
404 if let Ok(parsed) = raw.trim().parse::<usize>() {
405 if parsed >= 1 {
406 return parsed;
407 }
408 }
409 }
410 let detected = thread::available_parallelism()
411 .map(|n| n.get())
412 .unwrap_or(1);
413 let core_cap = detected.clamp(1, DEFAULT_PARALLEL_JOBS_CAP);
414 apply_memory_cap(core_cap)
415}
416
417fn apply_memory_cap(core_cap: usize) -> usize {
423 let Some(available_mb) = available_memory_mb() else {
424 return core_cap;
425 };
426 let budget = per_worker_memory_mb();
427 let mem_cap = memory_worker_cap(available_mb, budget, RESERVED_SYSTEM_MEMORY_MB);
428 if mem_cap < core_cap {
429 eprintln!(
430 "harn test: capping workers {core_cap} -> {mem_cap} \
431 (~{available_mb} MiB available, {budget} MiB/worker; \
432 override with --jobs / HARN_TEST_JOBS)"
433 );
434 return mem_cap;
435 }
436 core_cap
437}
438
439fn memory_worker_cap(available_mb: u64, per_worker_mb: u64, reserved_mb: u64) -> usize {
442 let usable = available_mb.saturating_sub(reserved_mb);
443 let per_worker = per_worker_mb.max(1);
444 ((usable / per_worker).max(1)) as usize
445}
446
447fn per_worker_memory_mb() -> u64 {
450 std::env::var(HARN_TEST_WORKER_MEMORY_MB_ENV)
451 .ok()
452 .and_then(|raw| raw.trim().parse::<u64>().ok())
453 .filter(|&n| n >= 1)
454 .unwrap_or(DEFAULT_WORKER_MEMORY_MB)
455}
456
457fn available_memory_mb() -> Option<u64> {
468 let mut sys = sysinfo::System::new();
469 sys.refresh_memory();
470 let host_mb = match sys.available_memory() {
471 0 => None, bytes => Some(bytes / (1024 * 1024)),
473 };
474 match (host_mb, cgroup_v2_headroom_mb()) {
475 (Some(h), Some(c)) => Some(h.min(c)),
476 (Some(h), None) => Some(h),
477 (None, c) => c,
478 }
479}
480
481#[cfg(target_os = "linux")]
484fn cgroup_v2_headroom_mb() -> Option<u64> {
485 let dir = own_cgroup_v2_dir()?;
486 let max_raw = fs::read_to_string(dir.join("memory.max")).ok()?;
487 let current_raw = fs::read_to_string(dir.join("memory.current")).ok()?;
488 cgroup_headroom_mb(&max_raw, ¤t_raw)
489}
490
491#[cfg(not(target_os = "linux"))]
492fn cgroup_v2_headroom_mb() -> Option<u64> {
493 None
494}
495
496#[cfg(target_os = "linux")]
502fn own_cgroup_v2_dir() -> Option<PathBuf> {
503 let content = fs::read_to_string("/proc/self/cgroup").ok()?;
504 let rel = content
505 .lines()
506 .find_map(|line| line.strip_prefix("0::"))?
507 .trim();
508 let rel = rel.strip_prefix('/').unwrap_or(rel);
509 Some(Path::new("/sys/fs/cgroup").join(rel))
510}
511
512#[cfg(any(target_os = "linux", test))]
518fn cgroup_headroom_mb(memory_max: &str, memory_current: &str) -> Option<u64> {
519 let max = memory_max.trim();
520 if max == "max" {
521 return None;
522 }
523 let max: u64 = max.parse().ok()?;
524 let current: u64 = memory_current.trim().parse().ok()?;
525 Some(max.saturating_sub(current) / (1024 * 1024))
526}
527
528struct Discovery {
529 cases: Vec<TestCase>,
530 files_with_tests: usize,
531 discovery_errors: Vec<TestResult>,
532}
533
534fn discover_test_cases(files: &[PathBuf], filter: Option<&str>, workers: usize) -> Discovery {
535 let mut cases = Vec::new();
536 let mut files_with_tests = 0usize;
537 let mut discovery_errors = Vec::new();
538
539 for file in files {
540 let source = match fs::read_to_string(file) {
541 Ok(s) => s,
542 Err(e) => {
543 discovery_errors.push(TestResult {
544 name: "<file error>".to_string(),
545 file: file.display().to_string(),
546 passed: false,
547 error: Some(format!("Failed to read {}: {e}", file.display())),
548 duration_ms: 0,
549 phases: PhaseTimings::default(),
550 });
551 continue;
552 }
553 };
554
555 let program = match parse_program(&source) {
556 Ok(p) => p,
557 Err(e) => {
558 discovery_errors.push(TestResult {
559 name: "<file error>".to_string(),
560 file: file.display().to_string(),
561 passed: false,
562 error: Some(e),
563 duration_ms: 0,
564 phases: PhaseTimings::default(),
565 });
566 continue;
567 }
568 };
569
570 let source = Arc::new(source);
571 let program = Arc::new(program);
572 let file_cases = extract_cases_from_program(file, &source, &program, filter, workers);
573 if !file_cases.is_empty() {
574 files_with_tests += 1;
575 cases.extend(file_cases);
576 }
577 }
578
579 Discovery {
580 cases,
581 files_with_tests,
582 discovery_errors,
583 }
584}
585
586fn parse_program(source: &str) -> Result<Vec<SNode>, String> {
587 let mut lexer = Lexer::new(source);
588 let tokens = lexer.tokenize().map_err(|e| format!("{e}"))?;
589 let mut parser = Parser::new(tokens);
590 parser.parse().map_err(|e| format!("{e}"))
591}
592
593fn extract_cases_from_program(
594 file: &Path,
595 source: &Arc<String>,
596 program: &Arc<Vec<SNode>>,
597 filter: Option<&str>,
598 workers: usize,
599) -> Vec<TestCase> {
600 let mut cases = Vec::new();
601 for snode in program.iter() {
602 let Some(meta) = inspect_test_pipeline(snode) else {
603 continue;
604 };
605 if let Some(pattern) = filter {
606 if !meta.name.contains(pattern) {
607 continue;
608 }
609 }
610 let weight = meta.weight.min(workers).max(1);
613 cases.push(TestCase {
614 file: file.to_path_buf(),
615 name: meta.name,
616 source: Arc::clone(source),
617 program: Arc::clone(program),
618 serial_group: meta.serial_group,
619 weight,
620 });
621 }
622 cases
623}
624
625struct PipelineMeta {
626 name: String,
627 serial_group: Option<String>,
628 weight: usize,
629}
630
631fn inspect_test_pipeline(snode: &SNode) -> Option<PipelineMeta> {
632 let (attributes, inner) = match &snode.node {
636 Node::AttributedDecl { attributes, inner } => (attributes.as_slice(), inner.as_ref()),
637 _ => (&[][..], snode),
638 };
639 let name = match &inner.node {
640 Node::Pipeline { name, .. } => name.clone(),
641 _ => return None,
642 };
643 let has_test_attr = attributes.iter().any(|a| a.name == "test");
644 if !has_test_attr && !name.starts_with("test_") {
645 return None;
646 }
647 let serial_group = attributes
648 .iter()
649 .find(|a| a.name == "serial")
650 .map(serial_group_for);
651 let weight = attributes
652 .iter()
653 .find(|a| a.name == "heavy")
654 .and_then(heavy_weight_for)
655 .unwrap_or(1);
656 Some(PipelineMeta {
657 name,
658 serial_group,
659 weight,
660 })
661}
662
663fn serial_group_for(attr: &Attribute) -> String {
664 attr.string_arg("group")
665 .unwrap_or_else(|| "__default__".to_string())
666}
667
668fn heavy_weight_for(attr: &Attribute) -> Option<usize> {
669 attr.args
670 .iter()
671 .find(|a| a.name.as_deref() == Some("threads"))
672 .and_then(|a| match &a.value.node {
673 Node::IntLiteral(n) if *n >= 1 => Some(*n as usize),
674 _ => None,
675 })
676}
677
678fn sort_cases_longest_first(cases: &mut [TestCase], timings: &BTreeMap<String, u64>) {
679 cases.sort_by(|a, b| {
685 let key_a = timings_key(&a.file, &a.name);
686 let key_b = timings_key(&b.file, &b.name);
687 let dur_a = timings.get(&key_a).copied().unwrap_or(0);
688 let dur_b = timings.get(&key_b).copied().unwrap_or(0);
689 dur_a
690 .cmp(&dur_b)
691 .then_with(|| a.file.cmp(&b.file))
692 .then_with(|| a.name.cmp(&b.name))
693 });
694}
695
696fn timings_key(file: &Path, name: &str) -> String {
697 format!("{}::{}", file.display(), name)
698}
699
700fn timings_cache_path(target: &Path) -> Option<PathBuf> {
701 let probe_root = if target.is_dir() {
706 target.to_path_buf()
707 } else {
708 target.parent()?.to_path_buf()
709 };
710 let root = harn_vm::stdlib::process::find_project_root(&probe_root)
711 .unwrap_or_else(|| probe_root.clone());
712 Some(root.join(TIMINGS_CACHE_RELATIVE_PATH))
713}
714
715fn load_timings_cache(path: &Path) -> BTreeMap<String, u64> {
716 let Ok(contents) = fs::read_to_string(path) else {
717 return BTreeMap::new();
718 };
719 serde_json::from_str::<BTreeMap<String, u64>>(&contents).unwrap_or_default()
720}
721
722fn update_timings_cache(path: &Path, mut existing: BTreeMap<String, u64>, results: &[TestResult]) {
723 for result in results {
724 if result.name == "<file error>" || result.name == "<join error>" {
725 continue;
726 }
727 existing.insert(
728 timings_key(Path::new(&result.file), &result.name),
729 result.duration_ms,
730 );
731 }
732 if let Some(parent) = path.parent() {
733 let _ = fs::create_dir_all(parent);
734 }
735 if let Ok(serialized) = serde_json::to_string(&existing) {
736 let _ = fs::write(path, serialized);
737 }
738}
739
740async fn execute_cases(
741 cases: Vec<TestCase>,
742 workers: usize,
743 options: &RunOptions,
744 total_tests: usize,
745) -> Vec<TestResult> {
746 if cases.is_empty() {
747 return Vec::new();
748 }
749 let completed = Arc::new(Mutex::new(0usize));
750 if workers <= 1 {
751 let mut results = Vec::with_capacity(cases.len());
752 for case in cases {
753 let cwd = case_execution_cwd(&case);
754 let test_index = next_test_index(&completed);
755 emit_progress(
756 &options.progress,
757 TestRunEvent::TestStarted {
758 name: case.name.clone(),
759 file: case.file.display().to_string(),
760 test_index,
761 total_tests,
762 },
763 );
764 let result =
765 execute_case(&case, &cwd, options.timeout_ms, &options.cli_skill_dirs).await;
766 if options.diagnose {
767 result.emit_diagnose();
768 }
769 emit_progress(
770 &options.progress,
771 TestRunEvent::TestFinished(result.clone()),
772 );
773 results.push(result);
774 }
775 return results;
776 }
777
778 let queue = Arc::new(Mutex::new(cases));
779 let gate = Arc::new(ResourceGate::new(workers));
780 let results: Arc<Mutex<Vec<TestResult>>> = Arc::new(Mutex::new(Vec::new()));
781
782 let mut handles = Vec::with_capacity(workers);
783 for worker_idx in 0..workers {
784 let queue = Arc::clone(&queue);
785 let gate = Arc::clone(&gate);
786 let results = Arc::clone(&results);
787 let completed = Arc::clone(&completed);
788 let timeout_ms = options.timeout_ms;
789 let cli_skill_dirs = options.cli_skill_dirs.clone();
790 let progress = options.progress.clone();
791 let diagnose = options.diagnose;
792 let handle = thread::Builder::new()
793 .name(format!("harn-test-worker-{worker_idx}"))
794 .spawn(move || {
795 let runtime = match tokio::runtime::Builder::new_current_thread()
796 .enable_all()
797 .build()
798 {
799 Ok(rt) => rt,
800 Err(error) => {
801 results.lock().unwrap().push(TestResult {
802 name: "<worker error>".to_string(),
803 file: String::new(),
804 passed: false,
805 error: Some(format!("failed to start test runtime: {error}")),
806 duration_ms: 0,
807 phases: PhaseTimings::default(),
808 });
809 return;
810 }
811 };
812 while let Some(case) = queue.lock().unwrap().pop() {
817 let _guard = gate.acquire(case.weight, case.serial_group.as_deref());
818 let cwd = case_execution_cwd(&case);
819 let test_index = next_test_index(&completed);
820 emit_progress(
821 &progress,
822 TestRunEvent::TestStarted {
823 name: case.name.clone(),
824 file: case.file.display().to_string(),
825 test_index,
826 total_tests,
827 },
828 );
829 let result =
830 runtime.block_on(execute_case(&case, &cwd, timeout_ms, &cli_skill_dirs));
831 if diagnose {
832 result.emit_diagnose();
833 }
834 emit_progress(&progress, TestRunEvent::TestFinished(result.clone()));
835 results.lock().unwrap().push(result);
836 }
837 })
838 .expect("spawning a harn-test worker thread should succeed");
839 handles.push(handle);
840 }
841
842 for handle in handles {
843 let _ = handle.join();
844 }
845
846 Arc::try_unwrap(results)
850 .map(|m| m.into_inner().unwrap_or_default())
851 .unwrap_or_else(|arc| arc.lock().unwrap().clone())
852}
853
854fn next_test_index(counter: &Mutex<usize>) -> usize {
855 let mut guard = counter.lock().unwrap();
856 *guard += 1;
857 *guard
858}
859
860fn case_execution_cwd(case: &TestCase) -> PathBuf {
861 case.file
862 .parent()
863 .filter(|p| !p.as_os_str().is_empty())
864 .map(Path::to_path_buf)
865 .unwrap_or_else(test_execution_cwd)
866}
867
868struct ResourceGate {
872 state: Mutex<GateState>,
873 cond: Condvar,
874 capacity: usize,
875}
876
877struct GateState {
878 available: usize,
879 busy_groups: HashSet<String>,
880}
881
882struct GateGuard<'a> {
883 gate: &'a ResourceGate,
884 weight: usize,
885 group: Option<String>,
886}
887
888impl ResourceGate {
889 fn new(capacity: usize) -> Self {
890 Self {
891 state: Mutex::new(GateState {
892 available: capacity,
893 busy_groups: HashSet::new(),
894 }),
895 cond: Condvar::new(),
896 capacity,
897 }
898 }
899
900 fn acquire(&self, weight: usize, group: Option<&str>) -> GateGuard<'_> {
901 let weight = weight.min(self.capacity).max(1);
902 let mut state = self.state.lock().unwrap();
903 loop {
904 let group_free = group.is_none_or(|g| !state.busy_groups.contains(g));
905 if state.available >= weight && group_free {
906 state.available -= weight;
907 if let Some(g) = group {
908 state.busy_groups.insert(g.to_string());
909 }
910 return GateGuard {
911 gate: self,
912 weight,
913 group: group.map(str::to_owned),
914 };
915 }
916 state = self.cond.wait(state).unwrap();
917 }
918 }
919}
920
921impl Drop for GateGuard<'_> {
922 fn drop(&mut self) {
923 let mut state = self.gate.state.lock().unwrap();
924 state.available += self.weight;
925 if let Some(group) = self.group.as_deref() {
926 state.busy_groups.remove(group);
927 }
928 self.gate.cond.notify_all();
929 }
930}
931
932async fn execute_case(
933 case: &TestCase,
934 execution_cwd: &Path,
935 timeout_ms: u64,
936 cli_skill_dirs: &[PathBuf],
937) -> TestResult {
938 harn_vm::reset_thread_local_state();
939 reset_hostlib_state();
940
941 let mut phases = PhaseTimings::default();
942 let total_start = Instant::now();
943
944 let compile_start = Instant::now();
945 let chunk = match harn_vm::Compiler::new().compile_named(&case.program, &case.name) {
946 Ok(c) => c,
947 Err(e) => {
948 phases.compile_ms = compile_start.elapsed().as_millis() as u64;
949 return TestResult {
950 name: case.name.clone(),
951 file: case.file.display().to_string(),
952 passed: false,
953 error: Some(format!("Compile error: {e}")),
954 duration_ms: total_start.elapsed().as_millis() as u64,
955 phases,
956 };
957 }
958 };
959 phases.compile_ms = compile_start.elapsed().as_millis() as u64;
960
961 let local = tokio::task::LocalSet::new();
962 let timeout = std::time::Duration::from_millis(timeout_ms);
963 let file_display = case.file.display().to_string();
964 let setup_start = Instant::now();
965 let result = tokio::time::timeout(
966 timeout,
967 local.run_until(async {
968 let mut vm = harn_vm::Vm::new();
969 harn_vm::register_vm_stdlib(&mut vm);
970 crate::install_default_hostlib(&mut vm);
971 let source_parent = case.file.parent().unwrap_or(Path::new("."));
972 let project_root = harn_vm::stdlib::process::find_project_root(source_parent);
973 let store_base = project_root.as_deref().unwrap_or(source_parent);
974 let source_dir = source_parent.to_string_lossy().into_owned();
975 harn_vm::register_store_builtins(&mut vm, store_base);
976 harn_vm::register_metadata_builtins(&mut vm, store_base);
977 let pipeline_name = case
978 .file
979 .file_stem()
980 .and_then(|s| s.to_str())
981 .unwrap_or("test");
982 harn_vm::register_checkpoint_builtins(&mut vm, store_base, pipeline_name);
983 vm.set_source_info(&file_display, &case.source);
984 harn_vm::stdlib::process::set_thread_execution_context(Some(
985 harn_vm::orchestration::RunExecutionRecord {
986 cwd: Some(execution_cwd.to_string_lossy().into_owned()),
987 source_dir: Some(source_dir),
988 env: BTreeMap::new(),
989 adapter: None,
990 repo_path: None,
991 worktree_path: None,
992 branch: None,
993 base_ref: None,
994 cleanup: None,
995 },
996 ));
997 if let Some(ref root) = project_root {
998 vm.set_project_root(root);
999 }
1000 if let Some(parent) = case.file.parent() {
1001 if !parent.as_os_str().is_empty() {
1002 vm.set_source_dir(parent);
1003 }
1004 }
1005 let loaded =
1006 crate::skill_loader::load_skills(&crate::skill_loader::SkillLoaderInputs {
1007 cli_dirs: cli_skill_dirs.to_vec(),
1008 source_path: Some(case.file.clone()),
1009 });
1010 crate::skill_loader::emit_loader_warnings(&loaded.loader_warnings);
1011 crate::skill_loader::install_skills_global(&mut vm, &loaded);
1012 let extensions = crate::package::load_runtime_extensions(&case.file);
1013 crate::package::install_runtime_extensions(&extensions);
1014 crate::package::install_manifest_triggers(&mut vm, &extensions)
1015 .await
1016 .map_err(|error| format!("failed to install manifest triggers: {error}"))?;
1017 crate::package::install_manifest_hooks(&mut vm, &extensions)
1018 .await
1019 .map_err(|error| format!("failed to install manifest hooks: {error}"))?;
1020 vm.set_harness(harn_vm::Harness::real());
1021 let setup_ms = setup_start.elapsed().as_millis() as u64;
1022 let exec_start = Instant::now();
1023 let outcome = match vm.execute(&chunk).await {
1024 Ok(val) => Ok(val),
1025 Err(e) => Err(vm.format_runtime_error(&e)),
1026 };
1027 let execute_ms = exec_start.elapsed().as_millis() as u64;
1028 harn_vm::egress::reset_egress_policy_for_host();
1029 Ok::<_, String>((outcome, setup_ms, execute_ms))
1030 }),
1031 )
1032 .await;
1033
1034 let teardown_start = Instant::now();
1035 harn_vm::reset_thread_local_state();
1039 reset_hostlib_state();
1040 phases.teardown_ms = teardown_start.elapsed().as_millis() as u64;
1041
1042 let elapsed_ms = total_start.elapsed().as_millis() as u64;
1043 let (passed, error, duration_ms) = match result {
1044 Ok(Ok((outcome, setup_ms, execute_ms))) => {
1045 phases.setup_ms = setup_ms;
1046 phases.execute_ms = execute_ms;
1047 match outcome {
1048 Ok(_) => (true, None, elapsed_ms),
1049 Err(message) => (false, Some(message), elapsed_ms),
1050 }
1051 }
1052 Ok(Err(setup_error)) => (false, Some(setup_error), elapsed_ms),
1053 Err(_) => (
1057 false,
1058 Some(format!("timed out after {timeout_ms}ms")),
1059 timeout_ms,
1060 ),
1061 };
1062
1063 TestResult {
1064 name: case.name.clone(),
1065 file: file_display,
1066 passed,
1067 error,
1068 duration_ms,
1069 phases,
1070 }
1071}
1072
1073fn discover_test_files(dir: &Path) -> Vec<PathBuf> {
1074 let mut files = Vec::new();
1075 if let Ok(entries) = fs::read_dir(dir) {
1076 for entry in entries.flatten() {
1077 let path = entry.path();
1078 if path.is_dir() {
1079 files.extend(discover_test_files(&path));
1080 } else if path.extension().is_some_and(|e| e == "harn") {
1081 if let Ok(content) = fs::read_to_string(&path) {
1082 if content.contains("test_") || content.contains("@test") {
1083 files.push(canonicalize_existing_path(&path));
1084 }
1085 }
1086 }
1087 }
1088 }
1089 files.sort();
1090 files
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use super::*;
1096
1097 use std::sync::Arc;
1098 use std::time::Duration;
1099
1100 struct TempTestDir {
1101 inner: tempfile::TempDir,
1102 }
1103
1104 impl TempTestDir {
1105 fn new() -> Self {
1106 Self {
1107 inner: tempfile::tempdir().unwrap(),
1108 }
1109 }
1110
1111 fn write(&self, relative: &str, contents: &str) {
1112 let path = self.path().join(relative);
1113 if let Some(parent) = path.parent() {
1114 fs::create_dir_all(parent).unwrap();
1115 }
1116 fs::write(path, contents).unwrap();
1117 }
1118
1119 fn path(&self) -> &Path {
1120 self.inner.path()
1121 }
1122 }
1123
1124 #[test]
1125 fn discover_test_files_returns_canonical_absolute_paths() {
1126 let temp = TempTestDir::new();
1127 temp.write("suite/test_alpha.harn", "pipeline test_alpha(task) {}");
1128 temp.write("suite/nested/test_beta.harn", "pipeline test_beta(task) {}");
1129 temp.write("suite/annotated.harn", "@test\npipeline annotated(task) {}");
1130 temp.write("suite/ignore.harn", "pipeline build(task) {}");
1131
1132 let files = discover_test_files(&temp.path().join("suite"));
1133
1134 assert_eq!(files.len(), 3);
1135 assert!(files.iter().all(|path| path.is_absolute()));
1136 assert!(files
1137 .iter()
1138 .any(|path| path.ends_with("suite/test_alpha.harn")));
1139 assert!(files
1140 .iter()
1141 .any(|path| path.ends_with("suite/nested/test_beta.harn")));
1142 assert!(files
1143 .iter()
1144 .any(|path| path.ends_with("suite/annotated.harn")));
1145 }
1146
1147 #[tokio::test]
1148 async fn run_tests_uses_file_parent_as_execution_cwd_and_restores_shell_cwd() {
1149 let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1150 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1151 let temp = TempTestDir::new();
1152 temp.write(
1153 "suite/test_cwd.harn",
1154 r"
1155pipeline test_current_dir(task) {
1156 assert_eq(cwd(), source_dir())
1157}
1158",
1159 );
1160
1161 let original_cwd = std::env::current_dir().unwrap();
1162 let summary = run_tests(&temp.path().join("suite"), None, 1_000, false, &[]).await;
1163 let restored_cwd = std::env::current_dir().unwrap();
1164
1165 assert_eq!(summary.failed, 0);
1166 assert_eq!(summary.passed, 1);
1167 assert_eq!(
1168 fs::canonicalize(restored_cwd).unwrap(),
1169 fs::canonicalize(original_cwd).unwrap()
1170 );
1171 }
1172
1173 #[tokio::test]
1174 async fn parallel_run_tests_uses_each_file_parent_as_execution_cwd() {
1175 let _cwd_guard = crate::tests::common::cwd_lock::lock_cwd_async().await;
1176 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1177 let temp = TempTestDir::new();
1178 temp.write(
1179 "suite/a/test_one.harn",
1180 r"
1181pipeline test_one(task) {
1182 assert_eq(cwd(), source_dir())
1183}
1184",
1185 );
1186 temp.write(
1187 "suite/b/test_two.harn",
1188 r"
1189pipeline test_two(task) {
1190 assert_eq(cwd(), source_dir())
1191}
1192",
1193 );
1194
1195 let summary = run_tests(&temp.path().join("suite"), None, 1_000, true, &[]).await;
1196 assert_eq!(summary.failed, 0);
1197 assert_eq!(summary.passed, 2);
1198 }
1199
1200 #[tokio::test]
1201 async fn run_tests_loads_cli_skill_dirs() {
1202 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1203 let temp = TempTestDir::new();
1204 temp.write(
1205 "skills/review/SKILL.md",
1206 r"---
1207name: review
1208short: Review PRs
1209description: Review pull requests
1210---
1211
1212Review instructions.
1213",
1214 );
1215 temp.write(
1216 "suite/test_skills.harn",
1217 r#"
1218pipeline test_cli_skills(task) {
1219 assert_eq(skill_count(skills), 1)
1220 let found = skill_find(skills, "review")
1221 assert_eq(found.name, "review")
1222}
1223"#,
1224 );
1225
1226 let summary = run_tests(
1227 &temp.path().join("suite"),
1228 None,
1229 1_000,
1230 false,
1231 &[temp.path().join("skills")],
1232 )
1233 .await;
1234
1235 assert_eq!(summary.failed, 0, "{:?}", summary.results[0].error);
1236 assert_eq!(summary.passed, 1);
1237 }
1238
1239 #[test]
1240 fn resolve_workers_honors_explicit_jobs() {
1241 let mut opts = RunOptions::new(1_000);
1242 opts.parallel = true;
1243 opts.jobs = Some(3);
1244 assert_eq!(resolve_workers(&opts), 3);
1245 }
1246
1247 #[test]
1248 fn resolve_workers_returns_one_when_not_parallel() {
1249 let mut opts = RunOptions::new(1_000);
1250 opts.parallel = false;
1251 opts.jobs = Some(8);
1252 assert_eq!(resolve_workers(&opts), 1);
1253 }
1254
1255 #[test]
1256 fn memory_worker_cap_backs_off_under_pressure() {
1257 assert_eq!(memory_worker_cap(4096, 1024, 1024), 3);
1259 }
1260
1261 #[test]
1262 fn memory_worker_cap_is_generous_when_memory_is_plentiful() {
1263 assert!(memory_worker_cap(32_768, 1024, 1024) >= DEFAULT_PARALLEL_JOBS_CAP);
1266 }
1267
1268 #[test]
1269 fn memory_worker_cap_never_starves_to_zero() {
1270 assert_eq!(memory_worker_cap(512, 1024, 1024), 1);
1272 }
1273
1274 #[test]
1275 fn cgroup_headroom_unlimited_is_none() {
1276 assert_eq!(cgroup_headroom_mb("max\n", "1048576\n"), None);
1278 }
1279
1280 #[test]
1281 fn cgroup_headroom_computes_slice_remainder() {
1282 let four_gib = (4_u64 * 1024 * 1024 * 1024).to_string();
1284 let one_gib = (1024_u64 * 1024 * 1024).to_string();
1285 assert_eq!(cgroup_headroom_mb(&four_gib, &one_gib), Some(3072));
1286 }
1287
1288 #[test]
1289 fn cgroup_headroom_saturates_when_over_limit() {
1290 assert_eq!(cgroup_headroom_mb("1024", "999999999"), Some(0));
1292 }
1293
1294 #[test]
1295 fn cgroup_headroom_rejects_garbage() {
1296 assert_eq!(cgroup_headroom_mb("not-a-number", "123"), None);
1297 }
1298
1299 #[test]
1300 fn sort_cases_longest_first_uses_historical_durations() {
1301 let source = Arc::new(String::new());
1302 let program = Arc::new(Vec::new());
1303 let mk = |name: &str| TestCase {
1304 file: PathBuf::from("tests/a.harn"),
1305 name: name.to_string(),
1306 source: Arc::clone(&source),
1307 program: Arc::clone(&program),
1308 serial_group: None,
1309 weight: 1,
1310 };
1311 let mut cases = vec![mk("test_quick"), mk("test_slow"), mk("test_medium")];
1312 let mut timings = BTreeMap::new();
1313 timings.insert("tests/a.harn::test_slow".to_string(), 5_000);
1314 timings.insert("tests/a.harn::test_medium".to_string(), 1_000);
1315
1316 sort_cases_longest_first(&mut cases, &timings);
1317
1318 let order: Vec<&str> = cases.iter().map(|c| c.name.as_str()).collect();
1320 assert_eq!(order, vec!["test_quick", "test_medium", "test_slow"]);
1321 }
1322
1323 #[test]
1324 fn resource_gate_serializes_same_group() {
1325 let gate = Arc::new(ResourceGate::new(4));
1326 let trace = Arc::new(Mutex::new(Vec::<&'static str>::new()));
1327
1328 let g_a = {
1329 let trace = Arc::clone(&trace);
1330 let gate = Arc::clone(&gate);
1331 thread::spawn(move || {
1332 let _guard = gate.acquire(1, Some("login"));
1333 trace.lock().unwrap().push("a-start");
1334 thread::sleep(Duration::from_millis(50));
1335 trace.lock().unwrap().push("a-end");
1336 })
1337 };
1338 thread::sleep(Duration::from_millis(10));
1340 let g_b = {
1341 let trace = Arc::clone(&trace);
1342 let gate = Arc::clone(&gate);
1343 thread::spawn(move || {
1344 let _guard = gate.acquire(1, Some("login"));
1345 trace.lock().unwrap().push("b-start");
1346 trace.lock().unwrap().push("b-end");
1347 })
1348 };
1349 g_a.join().unwrap();
1350 g_b.join().unwrap();
1351
1352 let trace = trace.lock().unwrap();
1353 let a_end = trace.iter().position(|t| *t == "a-end").unwrap();
1355 let b_start = trace.iter().position(|t| *t == "b-start").unwrap();
1356 assert!(a_end < b_start, "B started before A finished: {trace:?}");
1357 }
1358
1359 #[test]
1360 fn resource_gate_allows_independent_groups_in_parallel() {
1361 let gate = Arc::new(ResourceGate::new(4));
1362 let _guard_a = gate.acquire(1, Some("alpha"));
1363 let acquired = std::sync::Mutex::new(false);
1365 thread::scope(|s| {
1366 s.spawn(|| {
1367 let _ = gate.acquire(1, Some("beta"));
1368 *acquired.lock().unwrap() = true;
1369 });
1370 });
1371 assert!(*acquired.lock().unwrap());
1372 }
1373
1374 #[test]
1375 fn resource_gate_caps_heavy_weight_at_capacity() {
1376 let gate = Arc::new(ResourceGate::new(2));
1379 let _g = gate.acquire(99, None);
1380 let started = Arc::new(Mutex::new(false));
1382 let inner = Arc::clone(&started);
1383 let gate2 = Arc::clone(&gate);
1384 let handle = thread::spawn(move || {
1385 let _guard = gate2.acquire(1, None);
1386 *inner.lock().unwrap() = true;
1387 });
1388 thread::sleep(Duration::from_millis(20));
1389 assert!(!*started.lock().unwrap(), "should still be waiting");
1390 drop(_g);
1391 handle.join().unwrap();
1392 assert!(*started.lock().unwrap());
1393 }
1394
1395 #[tokio::test]
1396 async fn parallel_scheduler_runs_heavy_tests_without_oversubscribing() {
1397 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1400 let temp = TempTestDir::new();
1401 temp.write(
1402 "suite/test_heavy.harn",
1403 r"
1404@test
1405@heavy(threads: 2)
1406pipeline test_heavy_one(task) {}
1407
1408@test
1409pipeline test_light(task) {}
1410",
1411 );
1412
1413 let opts = RunOptions {
1414 parallel: true,
1415 jobs: Some(2),
1416 ..RunOptions::new(5_000)
1417 };
1418 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1419 assert_eq!(summary.failed, 0, "{:?}", summary.results);
1420 assert_eq!(summary.total, 2);
1421 }
1422
1423 #[tokio::test]
1424 async fn parallel_scheduler_handles_serial_group_annotation() {
1425 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1426 let temp = TempTestDir::new();
1427 temp.write(
1428 "suite/test_serial.harn",
1429 r#"
1430@test
1431@serial(group: "fixture")
1432pipeline test_serial_one(task) {}
1433
1434@test
1435@serial(group: "fixture")
1436pipeline test_serial_two(task) {}
1437"#,
1438 );
1439
1440 let opts = RunOptions {
1441 parallel: true,
1442 jobs: Some(4),
1443 ..RunOptions::new(5_000)
1444 };
1445 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1446 assert_eq!(summary.failed, 0, "{:?}", summary.results);
1447 assert_eq!(summary.passed, 2);
1448 }
1449
1450 #[tokio::test]
1451 async fn parallel_scheduler_persists_timings_cache() {
1452 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1453 let temp = TempTestDir::new();
1454 temp.write(
1455 "suite/test_timed.harn",
1456 r"
1457@test
1458pipeline test_first(task) {}
1459
1460@test
1461pipeline test_second(task) {}
1462",
1463 );
1464
1465 let opts = RunOptions {
1466 parallel: true,
1467 jobs: Some(2),
1468 ..RunOptions::new(5_000)
1469 };
1470 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1471 assert_eq!(summary.passed, 2);
1472 let cache = temp.path().join("suite/.harn/test-timings.json");
1473 assert!(cache.exists(), "expected timings cache at {cache:?}");
1474 let stored: BTreeMap<String, u64> =
1475 serde_json::from_str(&fs::read_to_string(&cache).unwrap()).unwrap();
1476 assert!(
1477 stored.keys().any(|key| key.contains("test_first")),
1478 "expected timings for test_first in {stored:?}"
1479 );
1480 assert!(
1481 stored.keys().any(|key| key.contains("test_second")),
1482 "expected timings for test_second in {stored:?}"
1483 );
1484 }
1485
1486 #[tokio::test]
1493 async fn worker_resets_thread_local_state_between_cases() {
1494 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1495 let temp = TempTestDir::new();
1496 temp.write(
1497 "suite/test_isolation.harn",
1498 r#"
1499// The leak probe pins the clock to a future-but-i64-safe value
1500// (year ~2128) so a leaked mock is observable. Larger values overflow
1501// the nanosecond conversion inside the mock clock.
1502pipeline test_a_pins_clock(task) {
1503 mock_time(5000000000000)
1504 assert_eq(now_ms(), 5000000000000)
1505}
1506
1507pipeline test_b_clock_is_fresh(task) {
1508 let ms = now_ms()
1509 assert(ms < 5000000000000, "clock mock leaked from previous test")
1510}
1511"#,
1512 );
1513
1514 let opts = RunOptions::new(5_000);
1515 let summary = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1516 assert_eq!(
1517 summary.failed,
1518 0,
1519 "state leaked between tests: {:?}",
1520 summary
1521 .results
1522 .iter()
1523 .filter(|r| !r.passed)
1524 .map(|r| (r.name.clone(), r.error.clone()))
1525 .collect::<Vec<_>>()
1526 );
1527 assert_eq!(summary.passed, 2);
1528 }
1529
1530 #[tokio::test]
1531 async fn summary_aggregate_timings_sum_phases_across_results() {
1532 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1533 let temp = TempTestDir::new();
1534 temp.write(
1535 "suite/test_phases.harn",
1536 r"
1537pipeline test_one(task) { assert_eq(1, 1) }
1538pipeline test_two(task) { assert_eq(2, 2) }
1539",
1540 );
1541
1542 let summary = run_tests(&temp.path().join("suite"), None, 5_000, false, &[]).await;
1543 assert_eq!(summary.passed, 2);
1544 let per_test_sum: u64 = summary
1545 .results
1546 .iter()
1547 .map(|r| r.phases.setup_ms.saturating_add(r.phases.compile_ms))
1548 .sum();
1549 let agg_sum = summary
1550 .aggregate
1551 .setup_ms
1552 .saturating_add(summary.aggregate.compile_ms);
1553 assert_eq!(
1554 per_test_sum, agg_sum,
1555 "aggregate setup+compile must equal sum of per-test setup+compile"
1556 );
1557 }
1558
1559 #[tokio::test]
1560 async fn parallel_scheduler_emits_progress_events() {
1561 let _env_guard = crate::tests::common::env_lock::lock_env().lock().await;
1562 let temp = TempTestDir::new();
1563 temp.write(
1564 "suite/test_events.harn",
1565 r"
1566@test
1567pipeline test_a(task) {}
1568
1569@test
1570pipeline test_b(task) {}
1571",
1572 );
1573
1574 let events: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
1575 let events_for_progress = Arc::clone(&events);
1576 let progress: TestRunProgress = Arc::new(move |event| {
1577 events_for_progress.lock().unwrap().push(match event {
1578 TestRunEvent::SuiteDiscovered { .. } => "suite",
1579 TestRunEvent::LargeSequentialSuite { .. } => "large-seq",
1580 TestRunEvent::TestStarted { .. } => "started",
1581 TestRunEvent::TestFinished(_) => "finished",
1582 });
1583 });
1584 let opts = RunOptions {
1585 parallel: true,
1586 jobs: Some(2),
1587 progress: Some(progress),
1588 ..RunOptions::new(5_000)
1589 };
1590 let _ = run_tests_with_options(&temp.path().join("suite"), &opts).await;
1591 let events = events.lock().unwrap();
1592 assert_eq!(events.first().copied(), Some("suite"));
1593 assert_eq!(events.iter().filter(|e| **e == "started").count(), 2);
1594 assert_eq!(events.iter().filter(|e| **e == "finished").count(), 2);
1595 }
1596}