1use std::collections::HashMap;
10use std::ffi::OsStr;
11use std::io::Read;
12use std::path::{Path, PathBuf};
13use std::process::{Child, Command, ExitStatus, Stdio};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::{Duration, Instant};
17
18use serde::{Deserialize, Serialize};
19
20use super::logging::{
21 LogLevel, LogSource, RELIABILITY_EVENT_SCHEMA_VERSION, ReliabilityContext,
22 ReliabilityEventInput, ReliabilityPhase, TestLogger, TestLoggerBuilder,
23};
24
25#[derive(Debug, thiserror::Error)]
27pub enum HarnessError {
28 #[error("Process failed to start: {0}")]
29 ProcessStartFailed(String),
30
31 #[error("Process exited with non-zero status: {0}")]
32 ProcessFailed(i32),
33
34 #[error("Process timed out after {0:?}")]
35 Timeout(Duration),
36
37 #[error("Process not found: {0}")]
38 ProcessNotFound(String),
39
40 #[error("IO error: {0}")]
41 Io(#[from] std::io::Error),
42
43 #[error("Assertion failed: {0}")]
44 AssertionFailed(String),
45
46 #[error("Setup failed: {0}")]
47 SetupFailed(String),
48
49 #[error("Cleanup failed: {0}")]
50 CleanupFailed(String),
51}
52
53pub type HarnessResult<T> = Result<T, HarnessError>;
55
56#[derive(Debug)]
58pub struct ProcessInfo {
59 pub name: String,
60 pub pid: u32,
61 pub started_at: Instant,
62 child: Child,
63}
64
65impl ProcessInfo {
66 pub fn is_running(&mut self) -> bool {
68 matches!(self.child.try_wait(), Ok(None))
69 }
70
71 pub fn try_exit_status(&mut self) -> Option<ExitStatus> {
73 self.child.try_wait().ok().flatten()
74 }
75
76 pub fn kill(&mut self) -> std::io::Result<()> {
78 self.child.kill()
79 }
80
81 pub fn wait(&mut self) -> std::io::Result<ExitStatus> {
83 self.child.wait()
84 }
85
86 pub fn take_stdout(&mut self) -> Option<std::process::ChildStdout> {
88 self.child.stdout.take()
89 }
90
91 pub fn take_stderr(&mut self) -> Option<std::process::ChildStderr> {
93 self.child.stderr.take()
94 }
95}
96
97#[derive(Debug, Clone)]
99pub struct CommandResult {
100 pub exit_code: i32,
101 pub stdout: String,
102 pub stderr: String,
103 pub duration: Duration,
104}
105
106impl CommandResult {
107 pub fn success(&self) -> bool {
109 self.exit_code == 0
110 }
111
112 pub fn stdout_contains(&self, pattern: &str) -> bool {
114 self.stdout.contains(pattern)
115 }
116
117 pub fn stderr_contains(&self, pattern: &str) -> bool {
119 self.stderr.contains(pattern)
120 }
121
122 pub fn combined_output(&self) -> String {
124 format!("{}\n{}", self.stdout, self.stderr)
125 }
126}
127
128#[derive(Debug, Clone)]
130pub struct HarnessConfig {
131 pub temp_dir: PathBuf,
133 pub default_timeout: Duration,
135 pub cleanup_on_success: bool,
137 pub cleanup_on_failure: bool,
139 pub rch_binary: PathBuf,
141 pub rchd_binary: PathBuf,
143 pub rch_wkr_binary: PathBuf,
145 pub env_vars: HashMap<String, String>,
147}
148
149impl Default for HarnessConfig {
150 fn default() -> Self {
151 fn cargo_bin_exe(candidates: &[&str]) -> Option<PathBuf> {
152 for candidate in candidates {
153 let key = format!("CARGO_BIN_EXE_{candidate}");
154 if let Ok(value) = std::env::var(&key) {
155 let trimmed = value.trim();
156 if !trimmed.is_empty() {
157 return Some(PathBuf::from(trimmed));
158 }
159 }
160 }
161 None
162 }
163
164 let manifest_dir = std::env::var("CARGO_MANIFEST_DIR")
169 .map(PathBuf::from)
170 .unwrap_or_else(|_| PathBuf::from(env!("CARGO_MANIFEST_DIR")));
171 let manifest_dir = manifest_dir.canonicalize().unwrap_or(manifest_dir);
172 let workspace_root = manifest_dir
173 .parent()
174 .unwrap_or(manifest_dir.as_path())
175 .to_path_buf();
176 let cargo_target = std::env::var("CARGO_TARGET_DIR")
177 .map(PathBuf::from)
178 .unwrap_or_else(|_| workspace_root.join("target"));
179 let cargo_target = if cargo_target.is_absolute() {
180 cargo_target
181 } else {
182 workspace_root.join(cargo_target)
183 };
184
185 let profile = if cfg!(debug_assertions) {
186 "debug"
187 } else {
188 "release"
189 };
190
191 let default_bin_dir = cargo_target.join(profile);
192 let bin_dir = if std::env::var("LLVM_PROFILE_FILE")
193 .map(|value| !value.trim().is_empty())
194 .unwrap_or(false)
195 {
196 let llvm_cov_dir = cargo_target.join("llvm-cov-target").join(profile);
197 if llvm_cov_dir.is_dir() {
198 llvm_cov_dir
199 } else {
200 default_bin_dir.clone()
201 }
202 } else {
203 default_bin_dir.clone()
204 };
205
206 let mut env_vars = HashMap::new();
207 if std::env::var("CI")
208 .map(|v| v == "1" || v.to_lowercase() == "true")
209 .unwrap_or(false)
210 {
211 env_vars.insert("RCH_MOCK_SSH".to_string(), "1".to_string());
212 }
213
214 Self {
215 temp_dir: std::env::temp_dir().join("rch_e2e_tests"),
216 default_timeout: Duration::from_secs(30),
217 cleanup_on_success: true,
218 cleanup_on_failure: false,
219 rch_binary: cargo_bin_exe(&["rch"])
220 .map(|path| {
221 if path.is_relative() {
222 workspace_root.join(path)
223 } else {
224 path
225 }
226 })
227 .unwrap_or_else(|| bin_dir.join("rch")),
228 rchd_binary: cargo_bin_exe(&["rchd"])
229 .map(|path| {
230 if path.is_relative() {
231 workspace_root.join(path)
232 } else {
233 path
234 }
235 })
236 .unwrap_or_else(|| bin_dir.join("rchd")),
237 rch_wkr_binary: cargo_bin_exe(&["rch-wkr", "rch_wkr"])
238 .map(|path| {
239 if path.is_relative() {
240 workspace_root.join(path)
241 } else {
242 path
243 }
244 })
245 .unwrap_or_else(|| bin_dir.join("rch-wkr")),
246 env_vars,
247 }
248 }
249}
250
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub enum ReliabilityFailureHook {
255 NetworkCut,
256 SyncTimeout,
257 PartialUpdate,
258 DaemonRestart,
259}
260
261impl std::fmt::Display for ReliabilityFailureHook {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 let label = match self {
264 Self::NetworkCut => "network_cut",
265 Self::SyncTimeout => "sync_timeout",
266 Self::PartialUpdate => "partial_update",
267 Self::DaemonRestart => "daemon_restart",
268 };
269 write!(f, "{label}")
270 }
271}
272
273#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
275pub struct ReliabilityFailureHookFlags {
276 pub allow_network_cut: bool,
277 pub allow_sync_timeout: bool,
278 pub allow_partial_update: bool,
279 pub allow_daemon_restart: bool,
280}
281
282impl ReliabilityFailureHookFlags {
283 pub fn allow_all() -> Self {
285 Self {
286 allow_network_cut: true,
287 allow_sync_timeout: true,
288 allow_partial_update: true,
289 allow_daemon_restart: true,
290 }
291 }
292
293 pub fn allows(&self, hook: ReliabilityFailureHook) -> bool {
295 match hook {
296 ReliabilityFailureHook::NetworkCut => self.allow_network_cut,
297 ReliabilityFailureHook::SyncTimeout => self.allow_sync_timeout,
298 ReliabilityFailureHook::PartialUpdate => self.allow_partial_update,
299 ReliabilityFailureHook::DaemonRestart => self.allow_daemon_restart,
300 }
301 }
302}
303
304fn default_required_success() -> bool {
305 true
306}
307
308#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
310pub struct ReliabilityLifecycleCommand {
311 pub name: String,
312 pub program: String,
313 pub args: Vec<String>,
314 pub timeout_secs: Option<u64>,
315 #[serde(default = "default_required_success")]
316 pub required_success: bool,
317 #[serde(default)]
318 pub via_rch_exec: bool,
319}
320
321impl ReliabilityLifecycleCommand {
322 pub fn new(
324 name: impl Into<String>,
325 program: impl Into<String>,
326 args: impl IntoIterator<Item = impl Into<String>>,
327 ) -> Self {
328 Self {
329 name: name.into(),
330 program: program.into(),
331 args: args.into_iter().map(Into::into).collect(),
332 timeout_secs: None,
333 required_success: true,
334 via_rch_exec: false,
335 }
336 }
337
338 pub fn with_timeout_secs(mut self, timeout_secs: u64) -> Self {
340 self.timeout_secs = Some(timeout_secs);
341 self
342 }
343
344 pub fn optional(mut self) -> Self {
346 self.required_success = false;
347 self
348 }
349
350 pub fn via_rch_exec(mut self) -> Self {
352 self.via_rch_exec = true;
353 self
354 }
355}
356
357#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
359pub struct ReliabilityWorkerLifecycleHooks {
360 pub pre_checks: Vec<ReliabilityLifecycleCommand>,
361 pub remote_probes: Vec<ReliabilityLifecycleCommand>,
362 pub post_checks: Vec<ReliabilityLifecycleCommand>,
363 pub cleanup_verification: Vec<ReliabilityLifecycleCommand>,
364}
365
366#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
368pub struct ReliabilityScenarioSpec {
369 pub scenario_id: String,
370 pub worker_id: Option<String>,
371 pub repo_set: Vec<String>,
372 pub pressure_state: Option<String>,
373 pub triage_actions: Vec<String>,
374 pub lifecycle: ReliabilityWorkerLifecycleHooks,
375 pub execute_commands: Vec<ReliabilityLifecycleCommand>,
376 pub requested_failure_hooks: Vec<ReliabilityFailureHook>,
377 pub failure_hook_flags: ReliabilityFailureHookFlags,
378}
379
380impl ReliabilityScenarioSpec {
381 pub fn new(scenario_id: impl Into<String>) -> Self {
383 Self {
384 scenario_id: scenario_id.into(),
385 worker_id: None,
386 repo_set: Vec::new(),
387 pressure_state: None,
388 triage_actions: Vec::new(),
389 lifecycle: ReliabilityWorkerLifecycleHooks::default(),
390 execute_commands: Vec::new(),
391 requested_failure_hooks: Vec::new(),
392 failure_hook_flags: ReliabilityFailureHookFlags::default(),
393 }
394 }
395
396 pub fn with_worker_id(mut self, worker_id: impl Into<String>) -> Self {
397 self.worker_id = Some(worker_id.into());
398 self
399 }
400
401 pub fn with_repo_set(mut self, repos: impl IntoIterator<Item = impl Into<String>>) -> Self {
402 self.repo_set = repos.into_iter().map(Into::into).collect();
403 self
404 }
405
406 pub fn with_pressure_state(mut self, pressure_state: impl Into<String>) -> Self {
407 self.pressure_state = Some(pressure_state.into());
408 self
409 }
410
411 pub fn add_triage_action(mut self, action: impl Into<String>) -> Self {
412 self.triage_actions.push(action.into());
413 self
414 }
415
416 pub fn add_pre_check(mut self, command: ReliabilityLifecycleCommand) -> Self {
417 self.lifecycle.pre_checks.push(command);
418 self
419 }
420
421 pub fn add_remote_probe(mut self, command: ReliabilityLifecycleCommand) -> Self {
422 self.lifecycle.remote_probes.push(command);
423 self
424 }
425
426 pub fn add_post_check(mut self, command: ReliabilityLifecycleCommand) -> Self {
427 self.lifecycle.post_checks.push(command);
428 self
429 }
430
431 pub fn add_cleanup_verification(mut self, command: ReliabilityLifecycleCommand) -> Self {
432 self.lifecycle.cleanup_verification.push(command);
433 self
434 }
435
436 pub fn add_execute_command(mut self, command: ReliabilityLifecycleCommand) -> Self {
437 self.execute_commands.push(command);
438 self
439 }
440
441 pub fn request_failure_hook(mut self, hook: ReliabilityFailureHook) -> Self {
442 self.requested_failure_hooks.push(hook);
443 self
444 }
445
446 pub fn with_failure_hook_flags(mut self, flags: ReliabilityFailureHookFlags) -> Self {
447 self.failure_hook_flags = flags;
448 self
449 }
450}
451
452#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
454pub struct ReliabilityCommandRecord {
455 pub phase: ReliabilityPhase,
456 pub stage: String,
457 pub command_name: String,
458 pub invoked_program: String,
459 pub invoked_args: Vec<String>,
460 pub exit_code: i32,
461 pub duration_ms: u64,
462 pub required_success: bool,
463 pub succeeded: bool,
464 pub artifact_paths: Vec<String>,
465}
466
467#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
469pub struct ReliabilityScenarioReport {
470 pub schema_version: String,
471 pub scenario_id: String,
472 pub phase_order: Vec<ReliabilityPhase>,
473 pub activated_failure_hooks: Vec<ReliabilityFailureHook>,
474 pub command_records: Vec<ReliabilityCommandRecord>,
475 pub artifact_paths: Vec<String>,
476 pub manifest_path: Option<PathBuf>,
477}
478
479impl ReliabilityScenarioReport {
480 fn new(scenario_id: &str) -> Self {
481 Self {
482 schema_version: RELIABILITY_EVENT_SCHEMA_VERSION.to_string(),
483 scenario_id: scenario_id.to_string(),
484 phase_order: Vec::new(),
485 activated_failure_hooks: Vec::new(),
486 command_records: Vec::new(),
487 artifact_paths: Vec::new(),
488 manifest_path: None,
489 }
490 }
491}
492
493pub fn cleanup_stale_test_artifacts(base_dir: &Path, max_age: Duration) {
502 if !base_dir.exists() {
503 return;
504 }
505
506 let now = std::time::SystemTime::now();
507 let mut cleaned_sockets = 0;
508 let mut cleaned_dirs = 0;
509
510 if let Ok(entries) = std::fs::read_dir(base_dir) {
512 for entry in entries.flatten() {
513 let path = entry.path();
514 if !path.is_dir() {
515 continue;
516 }
517
518 let is_stale = entry
520 .metadata()
521 .ok()
522 .and_then(|m| m.modified().ok())
523 .map(|modified| {
524 now.duration_since(modified)
525 .map(|age| age > max_age)
526 .unwrap_or(false)
527 })
528 .unwrap_or(false);
529
530 if is_stale {
532 if let Ok(dir_entries) = std::fs::read_dir(&path) {
534 for file_entry in dir_entries.flatten() {
535 let file_path = file_entry.path();
536 if file_path.extension().is_some_and(|e| e == "sock")
537 && std::fs::remove_file(&file_path).is_ok()
538 {
539 cleaned_sockets += 1;
540 }
541 }
542 }
543
544 if std::fs::remove_dir_all(&path).is_ok() {
546 cleaned_dirs += 1;
547 }
548 }
549 }
550 }
551
552 if cleaned_sockets > 0 || cleaned_dirs > 0 {
553 eprintln!(
554 "[e2e::harness] Pre-test cleanup: removed {} stale sockets, {} stale directories",
555 cleaned_sockets, cleaned_dirs
556 );
557 }
558}
559
560pub struct TestHarness {
562 pub config: HarnessConfig,
563 pub logger: TestLogger,
564 test_dir: PathBuf,
565 managed_processes: Arc<Mutex<HashMap<String, ProcessInfo>>>,
566 created_files: Arc<Mutex<Vec<PathBuf>>>,
567 created_dirs: Arc<Mutex<Vec<PathBuf>>>,
568 test_passed: Arc<Mutex<bool>>,
569}
570
571impl TestHarness {
572 pub fn new(test_name: &str, config: HarnessConfig) -> HarnessResult<Self> {
574 cleanup_stale_test_artifacts(&config.temp_dir, Duration::from_secs(3600));
576
577 let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S_%3f");
579 let test_dir =
580 config
581 .temp_dir
582 .join(format!("{}_{}", test_name.replace("::", "_"), timestamp));
583
584 std::fs::create_dir_all(&test_dir)?;
585
586 let logger = TestLoggerBuilder::new(test_name)
588 .log_dir(test_dir.join("logs"))
589 .build();
590
591 logger.info(format!("Test harness initialized: {test_name}"));
592 logger.debug(format!("Test directory: {}", test_dir.display()));
593 logger.log_reliability_event(ReliabilityEventInput {
594 level: LogLevel::Info,
595 phase: ReliabilityPhase::Setup,
596 scenario_id: test_name.to_string(),
597 message: "Test harness initialized".to_string(),
598 context: ReliabilityContext {
599 worker_id: None,
600 repo_set: vec![test_dir.display().to_string()],
601 pressure_state: None,
602 triage_actions: Vec::new(),
603 decision_code: "HARNESS_INIT".to_string(),
604 fallback_reason: None,
605 },
606 artifact_paths: Vec::new(),
607 });
608
609 Ok(Self {
610 config,
611 logger,
612 test_dir,
613 managed_processes: Arc::new(Mutex::new(HashMap::new())),
614 created_files: Arc::new(Mutex::new(Vec::new())),
615 created_dirs: Arc::new(Mutex::new(vec![])),
616 test_passed: Arc::new(Mutex::new(false)),
617 })
618 }
619
620 pub fn default_for_test(test_name: &str) -> HarnessResult<Self> {
622 Self::new(test_name, HarnessConfig::default())
623 }
624
625 pub fn test_dir(&self) -> &Path {
627 &self.test_dir
628 }
629
630 pub fn run_reliability_scenario(
636 &self,
637 scenario: &ReliabilityScenarioSpec,
638 ) -> HarnessResult<ReliabilityScenarioReport> {
639 let mut report = ReliabilityScenarioReport::new(&scenario.scenario_id);
640 let mut triage_actions = scenario.triage_actions.clone();
641
642 report.phase_order.push(ReliabilityPhase::Setup);
643 self.log_scenario_event(
644 scenario,
645 ReliabilityPhase::Setup,
646 LogLevel::Info,
647 "Reliability setup phase started",
648 "SCENARIO_SETUP_START",
649 &triage_actions,
650 None,
651 Vec::new(),
652 );
653
654 let setup_result: HarnessResult<()> = (|| {
655 report.activated_failure_hooks =
656 self.activate_failure_hooks(scenario, &mut report, &mut triage_actions)?;
657
658 self.run_phase_lifecycle_commands(
659 scenario,
660 ReliabilityPhase::Setup,
661 "pre_checks",
662 &scenario.lifecycle.pre_checks,
663 &mut report,
664 &mut triage_actions,
665 )
666 })();
667
668 if let Err(error) = setup_result.as_ref() {
669 self.log_scenario_event(
670 scenario,
671 ReliabilityPhase::Setup,
672 LogLevel::Error,
673 format!("Reliability setup phase failed: {error}"),
674 "SCENARIO_SETUP_FAIL",
675 &triage_actions,
676 Some(error.to_string()),
677 Vec::new(),
678 );
679 } else {
680 self.log_scenario_event(
681 scenario,
682 ReliabilityPhase::Setup,
683 LogLevel::Info,
684 "Reliability setup phase completed",
685 "SCENARIO_SETUP_DONE",
686 &triage_actions,
687 None,
688 Vec::new(),
689 );
690 }
691
692 report.phase_order.push(ReliabilityPhase::Execute);
693 let execute_result: HarnessResult<()> = if setup_result.is_ok() {
694 self.log_scenario_event(
695 scenario,
696 ReliabilityPhase::Execute,
697 LogLevel::Info,
698 "Reliability execute phase started",
699 "SCENARIO_EXECUTE_START",
700 &triage_actions,
701 None,
702 Vec::new(),
703 );
704
705 let result = (|| {
706 self.run_phase_lifecycle_commands(
707 scenario,
708 ReliabilityPhase::Execute,
709 "execute",
710 &scenario.execute_commands,
711 &mut report,
712 &mut triage_actions,
713 )?;
714 self.run_phase_lifecycle_commands(
715 scenario,
716 ReliabilityPhase::Execute,
717 "remote_probes",
718 &scenario.lifecycle.remote_probes,
719 &mut report,
720 &mut triage_actions,
721 )
722 })();
723
724 if let Err(error) = result.as_ref() {
725 self.log_scenario_event(
726 scenario,
727 ReliabilityPhase::Execute,
728 LogLevel::Error,
729 format!("Reliability execute phase failed: {error}"),
730 "SCENARIO_EXECUTE_FAIL",
731 &triage_actions,
732 Some(error.to_string()),
733 Vec::new(),
734 );
735 } else {
736 self.log_scenario_event(
737 scenario,
738 ReliabilityPhase::Execute,
739 LogLevel::Info,
740 "Reliability execute phase completed",
741 "SCENARIO_EXECUTE_DONE",
742 &triage_actions,
743 None,
744 Vec::new(),
745 );
746 }
747
748 result
749 } else {
750 self.log_scenario_event(
751 scenario,
752 ReliabilityPhase::Execute,
753 LogLevel::Warn,
754 "Reliability execute phase skipped due setup failure",
755 "SCENARIO_EXECUTE_SKIPPED",
756 &triage_actions,
757 Some("setup phase failed".to_string()),
758 Vec::new(),
759 );
760 Ok(())
761 };
762
763 report.phase_order.push(ReliabilityPhase::Verify);
764 let verify_result: HarnessResult<()> = if setup_result.is_ok() && execute_result.is_ok() {
765 self.log_scenario_event(
766 scenario,
767 ReliabilityPhase::Verify,
768 LogLevel::Info,
769 "Reliability verify phase started",
770 "SCENARIO_VERIFY_START",
771 &triage_actions,
772 None,
773 Vec::new(),
774 );
775
776 let result = self.run_phase_lifecycle_commands(
777 scenario,
778 ReliabilityPhase::Verify,
779 "post_checks",
780 &scenario.lifecycle.post_checks,
781 &mut report,
782 &mut triage_actions,
783 );
784
785 if let Err(error) = result.as_ref() {
786 self.log_scenario_event(
787 scenario,
788 ReliabilityPhase::Verify,
789 LogLevel::Error,
790 format!("Reliability verify phase failed: {error}"),
791 "SCENARIO_VERIFY_FAIL",
792 &triage_actions,
793 Some(error.to_string()),
794 Vec::new(),
795 );
796 } else {
797 self.log_scenario_event(
798 scenario,
799 ReliabilityPhase::Verify,
800 LogLevel::Info,
801 "Reliability verify phase completed",
802 "SCENARIO_VERIFY_DONE",
803 &triage_actions,
804 None,
805 Vec::new(),
806 );
807 }
808
809 result
810 } else {
811 self.log_scenario_event(
812 scenario,
813 ReliabilityPhase::Verify,
814 LogLevel::Warn,
815 "Reliability verify phase skipped due earlier failure",
816 "SCENARIO_VERIFY_SKIPPED",
817 &triage_actions,
818 Some("setup or execute phase failed".to_string()),
819 Vec::new(),
820 );
821 Ok(())
822 };
823
824 report.phase_order.push(ReliabilityPhase::Cleanup);
825 self.log_scenario_event(
826 scenario,
827 ReliabilityPhase::Cleanup,
828 LogLevel::Info,
829 "Reliability cleanup phase started",
830 "SCENARIO_CLEANUP_START",
831 &triage_actions,
832 None,
833 Vec::new(),
834 );
835
836 let cleanup_result = self.run_phase_lifecycle_commands(
837 scenario,
838 ReliabilityPhase::Cleanup,
839 "cleanup_verification",
840 &scenario.lifecycle.cleanup_verification,
841 &mut report,
842 &mut triage_actions,
843 );
844
845 let manifest_payload = serde_json::json!({
846 "schema_version": report.schema_version,
847 "scenario_id": report.scenario_id,
848 "phase_order": report.phase_order.iter().map(ToString::to_string).collect::<Vec<_>>(),
849 "activated_failure_hooks": report
850 .activated_failure_hooks
851 .iter()
852 .map(ToString::to_string)
853 .collect::<Vec<_>>(),
854 "command_records": report.command_records,
855 "artifact_paths": report.artifact_paths,
856 });
857 let mut cleanup_artifacts = Vec::new();
858 if let Ok(path) = self.logger.capture_artifact_json(
859 &scenario.scenario_id,
860 "scenario_artifact_index",
861 &manifest_payload,
862 ) {
863 let as_string = path.display().to_string();
864 report.manifest_path = Some(path);
865 Self::push_unique_string(&mut report.artifact_paths, as_string.clone());
866 cleanup_artifacts.push(as_string);
867 }
868
869 if let Err(error) = cleanup_result.as_ref() {
870 self.log_scenario_event(
871 scenario,
872 ReliabilityPhase::Cleanup,
873 LogLevel::Error,
874 format!("Reliability cleanup phase failed: {error}"),
875 "SCENARIO_CLEANUP_FAIL",
876 &triage_actions,
877 Some(error.to_string()),
878 cleanup_artifacts,
879 );
880 } else {
881 self.log_scenario_event(
882 scenario,
883 ReliabilityPhase::Cleanup,
884 LogLevel::Info,
885 "Reliability cleanup phase completed",
886 "SCENARIO_CLEANUP_DONE",
887 &triage_actions,
888 None,
889 cleanup_artifacts,
890 );
891 }
892
893 setup_result?;
894 execute_result?;
895 verify_result?;
896 cleanup_result?;
897
898 Ok(report)
899 }
900
901 fn run_phase_lifecycle_commands(
902 &self,
903 scenario: &ReliabilityScenarioSpec,
904 phase: ReliabilityPhase,
905 stage: &str,
906 commands: &[ReliabilityLifecycleCommand],
907 report: &mut ReliabilityScenarioReport,
908 triage_actions: &mut Vec<String>,
909 ) -> HarnessResult<()> {
910 for command in commands {
911 self.execute_lifecycle_command(
912 scenario,
913 phase,
914 stage,
915 command,
916 report,
917 triage_actions,
918 )?;
919 }
920 Ok(())
921 }
922
923 fn execute_lifecycle_command(
924 &self,
925 scenario: &ReliabilityScenarioSpec,
926 phase: ReliabilityPhase,
927 stage: &str,
928 command: &ReliabilityLifecycleCommand,
929 report: &mut ReliabilityScenarioReport,
930 triage_actions: &mut Vec<String>,
931 ) -> HarnessResult<()> {
932 let mut invoked_args = command.args.clone();
933 let invoked_program = if command.via_rch_exec {
934 let mut wrapped = vec![
935 "exec".to_string(),
936 "--".to_string(),
937 command.program.clone(),
938 ];
939 wrapped.extend(invoked_args);
940 invoked_args = wrapped;
941 "rch".to_string()
942 } else {
943 command.program.clone()
944 };
945
946 let timeout = command
947 .timeout_secs
948 .map(Duration::from_secs)
949 .unwrap_or(self.config.default_timeout);
950 let result = self.exec_with_timeout(
951 &invoked_program,
952 invoked_args.iter().map(String::as_str),
953 timeout,
954 )?;
955
956 let artifact_prefix = format!(
957 "{}_{}_{}",
958 Self::sanitize_artifact_component(stage),
959 Self::sanitize_artifact_component(&command.name),
960 chrono::Utc::now().format("%Y%m%d_%H%M%S_%3f")
961 );
962 let mut command_artifacts = Vec::new();
963 let trace_payload = serde_json::json!({
964 "scenario_id": scenario.scenario_id,
965 "phase": phase.to_string(),
966 "stage": stage,
967 "command_name": command.name,
968 "invoked_program": invoked_program,
969 "invoked_args": invoked_args,
970 "required_success": command.required_success,
971 "exit_code": result.exit_code,
972 "duration_ms": result.duration.as_millis(),
973 });
974 if let Ok(path) = self.logger.capture_artifact_json(
975 &scenario.scenario_id,
976 &format!("{artifact_prefix}_trace"),
977 &trace_payload,
978 ) {
979 command_artifacts.push(path.display().to_string());
980 }
981 if !result.stdout.is_empty()
982 && let Ok(path) = self.logger.capture_artifact_text(
983 &scenario.scenario_id,
984 &format!("{artifact_prefix}_stdout"),
985 &result.stdout,
986 )
987 {
988 command_artifacts.push(path.display().to_string());
989 }
990 if !result.stderr.is_empty()
991 && let Ok(path) = self.logger.capture_artifact_text(
992 &scenario.scenario_id,
993 &format!("{artifact_prefix}_stderr"),
994 &result.stderr,
995 )
996 {
997 command_artifacts.push(path.display().to_string());
998 }
999
1000 for artifact_path in &command_artifacts {
1001 Self::push_unique_string(&mut report.artifact_paths, artifact_path.clone());
1002 }
1003
1004 let mut event_triage = triage_actions.clone();
1005 if result.success() {
1006 Self::push_unique_string(
1007 &mut event_triage,
1008 format!("{}_{}_pass", stage, command.name),
1009 );
1010 } else {
1011 Self::push_unique_string(
1012 &mut event_triage,
1013 format!("{}_{}_fail", stage, command.name),
1014 );
1015 }
1016
1017 let decision_code = format!(
1018 "{}_{}",
1019 Self::sanitize_decision_token(stage),
1020 if result.success() { "PASS" } else { "FAIL" }
1021 );
1022
1023 self.log_scenario_event(
1024 scenario,
1025 phase,
1026 if result.success() {
1027 LogLevel::Info
1028 } else {
1029 LogLevel::Warn
1030 },
1031 format!(
1032 "Lifecycle command '{}' in stage '{}' finished with exit {}",
1033 command.name, stage, result.exit_code
1034 ),
1035 decision_code,
1036 &event_triage,
1037 if result.success() {
1038 None
1039 } else {
1040 Some(format!(
1041 "lifecycle command '{}' failed with exit {}",
1042 command.name, result.exit_code
1043 ))
1044 },
1045 command_artifacts.clone(),
1046 );
1047
1048 report.command_records.push(ReliabilityCommandRecord {
1049 phase,
1050 stage: stage.to_string(),
1051 command_name: command.name.clone(),
1052 invoked_program: invoked_program.clone(),
1053 invoked_args: invoked_args.clone(),
1054 exit_code: result.exit_code,
1055 duration_ms: result.duration.as_millis() as u64,
1056 required_success: command.required_success,
1057 succeeded: result.success(),
1058 artifact_paths: command_artifacts,
1059 });
1060
1061 if !result.success() && command.required_success {
1062 return Err(HarnessError::AssertionFailed(format!(
1063 "required lifecycle command '{}' failed in stage '{}' (exit={})",
1064 command.name, stage, result.exit_code
1065 )));
1066 }
1067 if !result.success() && !command.required_success {
1068 Self::push_unique_string(
1069 triage_actions,
1070 format!("optional_command_failed:{}:{}", stage, command.name),
1071 );
1072 }
1073
1074 Ok(())
1075 }
1076
1077 fn activate_failure_hooks(
1078 &self,
1079 scenario: &ReliabilityScenarioSpec,
1080 report: &mut ReliabilityScenarioReport,
1081 triage_actions: &mut Vec<String>,
1082 ) -> HarnessResult<Vec<ReliabilityFailureHook>> {
1083 let mut activated = Vec::new();
1084 if scenario.requested_failure_hooks.is_empty() {
1085 return Ok(activated);
1086 }
1087
1088 let marker_dir = self.test_dir.join(".reliability-hooks");
1089 std::fs::create_dir_all(&marker_dir)?;
1090
1091 for hook in &scenario.requested_failure_hooks {
1092 if !scenario.failure_hook_flags.allows(*hook) {
1093 self.log_scenario_event(
1094 scenario,
1095 ReliabilityPhase::Setup,
1096 LogLevel::Error,
1097 format!("Failure hook denied: {hook}"),
1098 "FAILURE_HOOK_DENIED",
1099 triage_actions,
1100 Some(format!("hook {hook} requested without explicit allow flag")),
1101 Vec::new(),
1102 );
1103 return Err(HarnessError::SetupFailed(format!(
1104 "failure hook '{hook}' requested but not enabled"
1105 )));
1106 }
1107
1108 let marker_path = marker_dir.join(format!("{hook}.enabled"));
1109 std::fs::write(
1110 &marker_path,
1111 format!(
1112 "scenario_id={}\nhook={hook}\narmed_at={}\n",
1113 scenario.scenario_id,
1114 chrono::Utc::now().to_rfc3339()
1115 ),
1116 )?;
1117
1118 let mut hook_artifacts = Vec::new();
1119 if let Ok(path) = self.logger.capture_artifact_text(
1120 &scenario.scenario_id,
1121 &format!("failure_hook_{hook}_marker"),
1122 &format!("marker_path={}", marker_path.display()),
1123 ) {
1124 hook_artifacts.push(path.display().to_string());
1125 }
1126 let hook_payload = serde_json::json!({
1127 "scenario_id": scenario.scenario_id,
1128 "hook": hook.to_string(),
1129 "marker_path": marker_path.display().to_string(),
1130 "armed_at": chrono::Utc::now().to_rfc3339(),
1131 });
1132 if let Ok(path) = self.logger.capture_artifact_json(
1133 &scenario.scenario_id,
1134 &format!("failure_hook_{hook}_payload"),
1135 &hook_payload,
1136 ) {
1137 hook_artifacts.push(path.display().to_string());
1138 }
1139
1140 for artifact_path in &hook_artifacts {
1141 Self::push_unique_string(&mut report.artifact_paths, artifact_path.clone());
1142 }
1143
1144 Self::push_unique_string(triage_actions, format!("failure_hook:{hook}:armed"));
1145
1146 self.log_scenario_event(
1147 scenario,
1148 ReliabilityPhase::Setup,
1149 LogLevel::Info,
1150 format!("Failure hook armed: {hook}"),
1151 "FAILURE_HOOK_ARMED",
1152 triage_actions,
1153 None,
1154 hook_artifacts,
1155 );
1156
1157 activated.push(*hook);
1158 }
1159
1160 Ok(activated)
1161 }
1162
1163 #[allow(clippy::too_many_arguments)]
1164 fn log_scenario_event(
1165 &self,
1166 scenario: &ReliabilityScenarioSpec,
1167 phase: ReliabilityPhase,
1168 level: LogLevel,
1169 message: impl Into<String>,
1170 decision_code: impl Into<String>,
1171 triage_actions: &[String],
1172 fallback_reason: Option<String>,
1173 artifact_paths: Vec<String>,
1174 ) {
1175 let repo_set = if scenario.repo_set.is_empty() {
1176 vec![self.test_dir.display().to_string()]
1177 } else {
1178 scenario.repo_set.clone()
1179 };
1180
1181 self.logger.log_reliability_event(ReliabilityEventInput {
1182 level,
1183 phase,
1184 scenario_id: scenario.scenario_id.clone(),
1185 message: message.into(),
1186 context: ReliabilityContext {
1187 worker_id: scenario.worker_id.clone(),
1188 repo_set,
1189 pressure_state: scenario.pressure_state.clone(),
1190 triage_actions: triage_actions.to_vec(),
1191 decision_code: decision_code.into(),
1192 fallback_reason,
1193 },
1194 artifact_paths,
1195 });
1196 }
1197
1198 fn push_unique_string(values: &mut Vec<String>, value: String) {
1199 if !values.iter().any(|existing| existing == &value) {
1200 values.push(value);
1201 }
1202 }
1203
1204 fn sanitize_decision_token(raw: &str) -> String {
1205 let mut token = String::with_capacity(raw.len());
1206 for ch in raw.chars() {
1207 if ch.is_ascii_alphanumeric() {
1208 token.push(ch.to_ascii_uppercase());
1209 } else {
1210 token.push('_');
1211 }
1212 }
1213 if token.is_empty() {
1214 "PHASE".to_string()
1215 } else {
1216 token
1217 }
1218 }
1219
1220 pub fn create_dir(&self, name: &str) -> HarnessResult<PathBuf> {
1222 let path = self.test_dir.join(name);
1223 std::fs::create_dir_all(&path)?;
1224 self.created_dirs.lock().unwrap().push(path.clone());
1225 self.logger
1226 .debug(format!("Created directory: {}", path.display()));
1227 Ok(path)
1228 }
1229
1230 pub fn create_file(&self, name: &str, content: &str) -> HarnessResult<PathBuf> {
1232 let path = self.test_dir.join(name);
1233 if let Some(parent) = path.parent() {
1234 std::fs::create_dir_all(parent)?;
1235 }
1236 std::fs::write(&path, content)?;
1237 self.created_files.lock().unwrap().push(path.clone());
1238 self.logger
1239 .debug(format!("Created file: {}", path.display()));
1240 Ok(path)
1241 }
1242
1243 pub fn create_daemon_config(&self, config_content: &str) -> HarnessResult<PathBuf> {
1245 let config_dir = self.create_dir("config")?;
1246 let config_path = config_dir.join("daemon.toml");
1247 std::fs::write(&config_path, config_content)?;
1248 self.logger
1249 .info(format!("Created daemon config: {}", config_path.display()));
1250 Ok(config_path)
1251 }
1252
1253 pub fn create_workers_config(&self, config_content: &str) -> HarnessResult<PathBuf> {
1255 let config_dir = self.test_dir.join("config");
1256 std::fs::create_dir_all(&config_dir)?;
1257 let config_path = config_dir.join("workers.toml");
1258 std::fs::write(&config_path, config_content)?;
1259 self.logger
1260 .info(format!("Created workers config: {}", config_path.display()));
1261 Ok(config_path)
1262 }
1263
1264 pub fn spawn_process<I, S>(
1266 &self,
1267 name: &str,
1268 program: &Path,
1269 args: I,
1270 env: Option<&HashMap<String, String>>,
1271 ) -> HarnessResult<u32>
1272 where
1273 I: IntoIterator<Item = S>,
1274 S: AsRef<OsStr>,
1275 {
1276 let mut cmd = Command::new(program);
1277 cmd.args(args)
1282 .current_dir(&self.test_dir)
1283 .stdout(Stdio::null())
1284 .stderr(Stdio::null());
1285
1286 for (k, v) in &self.config.env_vars {
1288 cmd.env(k, v);
1289 }
1290
1291 if let Some(env_vars) = env {
1293 for (k, v) in env_vars {
1294 cmd.env(k, v);
1295 }
1296 }
1297
1298 self.logger.info(format!(
1299 "Spawning process: {} {:?}",
1300 name,
1301 program.display()
1302 ));
1303
1304 let child = cmd.spawn().map_err(|e| {
1305 HarnessError::ProcessStartFailed(format!("{}: {}", program.display(), e))
1306 })?;
1307
1308 let pid = child.id();
1309 let info = ProcessInfo {
1310 name: name.to_string(),
1311 pid,
1312 started_at: Instant::now(),
1313 child,
1314 };
1315
1316 self.managed_processes
1317 .lock()
1318 .unwrap()
1319 .insert(name.to_string(), info);
1320
1321 self.logger.log_with_context(
1322 LogLevel::Info,
1323 LogSource::Harness,
1324 format!("Process spawned: {name}"),
1325 vec![("pid".to_string(), pid.to_string())],
1326 );
1327
1328 Ok(pid)
1329 }
1330
1331 pub fn start_daemon(&self, extra_args: &[&str]) -> HarnessResult<u32> {
1333 let workers_config = self.test_dir.join("config").join("workers.toml");
1334 let workers_config_str: String = workers_config.to_string_lossy().into_owned();
1335 let mut args: Vec<&str> = vec!["--workers-config", &workers_config_str];
1336 args.extend(extra_args);
1337
1338 self.spawn_process("daemon", &self.config.rchd_binary, &args, None)
1339 }
1340
1341 pub fn stop_process(&self, name: &str) -> HarnessResult<()> {
1343 let mut processes = self.managed_processes.lock().unwrap();
1344 if let Some(mut info) = processes.remove(name) {
1345 self.logger
1346 .info(format!("Stopping process: {} (pid={})", name, info.pid));
1347 info.kill()?;
1348 let status = info.wait()?;
1349 self.logger
1350 .debug(format!("Process {} exited with status: {:?}", name, status));
1351 Ok(())
1352 } else {
1353 Err(HarnessError::ProcessNotFound(name.to_string()))
1354 }
1355 }
1356
1357 pub fn wait_for_process_exit(
1359 &self,
1360 name: &str,
1361 timeout: Duration,
1362 ) -> HarnessResult<ExitStatus> {
1363 self.logger
1364 .debug(format!("Waiting for process to exit: {name} ({timeout:?})"));
1365
1366 let start = Instant::now();
1367 while start.elapsed() < timeout {
1368 let exit_status = {
1369 let mut processes = self.managed_processes.lock().unwrap();
1370 let Some(info) = processes.get_mut(name) else {
1371 return Err(HarnessError::ProcessNotFound(name.to_string()));
1372 };
1373
1374 match info.try_exit_status() {
1375 Some(status) => {
1376 self.logger.debug(format!(
1377 "Process {name} exited after {:?} with status: {:?}",
1378 start.elapsed(),
1379 status
1380 ));
1381 Some(status)
1382 }
1383 None => None,
1384 }
1385 };
1386
1387 if let Some(status) = exit_status {
1388 self.managed_processes.lock().unwrap().remove(name);
1389 return Ok(status);
1390 }
1391
1392 thread::sleep(Duration::from_millis(50));
1393 }
1394
1395 self.logger
1396 .error(format!("Timeout waiting for process to exit: {name}"));
1397 Err(HarnessError::Timeout(timeout))
1398 }
1399
1400 pub fn stop_all_processes(&self) {
1402 let mut processes = self.managed_processes.lock().unwrap();
1403 for (name, mut info) in processes.drain() {
1404 self.logger
1405 .info(format!("Stopping process: {} (pid={})", name, info.pid));
1406 if let Err(e) = info.kill() {
1407 self.logger.warn(format!("Failed to kill {}: {}", name, e));
1408 }
1409 match info.wait() {
1410 Ok(status) => {
1411 self.logger
1412 .debug(format!("Process {} exited: {:?}", name, status));
1413 }
1414 Err(e) => {
1415 self.logger
1416 .warn(format!("Failed to wait for {}: {}", name, e));
1417 }
1418 }
1419 }
1420 }
1421
1422 pub fn exec<I, S>(&self, program: &str, args: I) -> HarnessResult<CommandResult>
1424 where
1425 I: IntoIterator<Item = S>,
1426 S: AsRef<OsStr>,
1427 {
1428 self.exec_with_timeout(program, args, self.config.default_timeout)
1429 }
1430
1431 pub fn exec_with_timeout<I, S>(
1435 &self,
1436 program: &str,
1437 args: I,
1438 timeout: Duration,
1439 ) -> HarnessResult<CommandResult>
1440 where
1441 I: IntoIterator<Item = S>,
1442 S: AsRef<OsStr>,
1443 {
1444 let args: Vec<_> = args.into_iter().collect();
1445 let args_display: Vec<_> = args.iter().map(|s| s.as_ref().to_string_lossy()).collect();
1446
1447 self.logger
1448 .debug(format!("Executing: {} {}", program, args_display.join(" ")));
1449
1450 let start = Instant::now();
1451
1452 let mut cmd = Command::new(program);
1453 cmd.args(&args)
1454 .current_dir(&self.test_dir)
1455 .stdout(Stdio::piped())
1456 .stderr(Stdio::piped());
1457
1458 for (k, v) in &self.config.env_vars {
1460 cmd.env(k, v);
1461 }
1462
1463 let mut child = cmd.spawn()?;
1464 let stdout_handle = child
1465 .stdout
1466 .take()
1467 .map(|mut stdout| thread::spawn(move || Self::read_to_string(&mut stdout)));
1468 let stderr_handle = child
1469 .stderr
1470 .take()
1471 .map(|mut stderr| thread::spawn(move || Self::read_to_string(&mut stderr)));
1472
1473 let mut timed_out = false;
1474 let exit_status = loop {
1475 if let Some(status) = child.try_wait()? {
1476 break Some(status);
1477 }
1478
1479 if start.elapsed() >= timeout {
1480 timed_out = true;
1481 let _ = child.kill();
1482 break child.wait().ok();
1483 }
1484
1485 thread::sleep(Duration::from_millis(10));
1486 };
1487
1488 let duration = start.elapsed();
1489 let stdout = Self::join_output(stdout_handle);
1490 let mut stderr = Self::join_output(stderr_handle);
1491 if timed_out {
1492 if !stderr.is_empty() {
1493 stderr.push('\n');
1494 }
1495 stderr.push_str(&format!("Process timed out after {:?}.", timeout));
1496 }
1497
1498 let exit_code = exit_status
1499 .and_then(|status| status.code())
1500 .unwrap_or(if timed_out { 124 } else { -1 });
1501
1502 let result = CommandResult {
1503 exit_code,
1504 stdout,
1505 stderr,
1506 duration,
1507 };
1508
1509 let command_line = format!("{} {}", program, args_display.join(" "));
1510 let artifact_prefix = format!(
1511 "{}_{}",
1512 Self::sanitize_artifact_component(program),
1513 chrono::Utc::now().format("%Y%m%d_%H%M%S_%3f")
1514 );
1515 let mut artifact_paths = Vec::new();
1516 let trace_payload = serde_json::json!({
1517 "command": command_line,
1518 "program": program,
1519 "args": args_display,
1520 "exit_code": result.exit_code,
1521 "duration_ms": duration.as_millis(),
1522 "timed_out": timed_out,
1523 "stdout_len_bytes": result.stdout.len(),
1524 "stderr_len_bytes": result.stderr.len(),
1525 });
1526 if let Ok(path) = self.logger.capture_artifact_json(
1527 self.logger.test_name(),
1528 &format!("{artifact_prefix}_trace"),
1529 &trace_payload,
1530 ) {
1531 artifact_paths.push(path.display().to_string());
1532 }
1533 if !result.stdout.is_empty()
1534 && let Ok(path) = self.logger.capture_artifact_text(
1535 self.logger.test_name(),
1536 &format!("{artifact_prefix}_stdout"),
1537 &result.stdout,
1538 )
1539 {
1540 artifact_paths.push(path.display().to_string());
1541 }
1542 if !result.stderr.is_empty()
1543 && let Ok(path) = self.logger.capture_artifact_text(
1544 self.logger.test_name(),
1545 &format!("{artifact_prefix}_stderr"),
1546 &result.stderr,
1547 )
1548 {
1549 artifact_paths.push(path.display().to_string());
1550 }
1551
1552 self.logger.log_with_context(
1553 if result.success() {
1554 LogLevel::Debug
1555 } else {
1556 LogLevel::Warn
1557 },
1558 LogSource::Harness,
1559 format!("Command completed: {program}"),
1560 vec![
1561 ("exit_code".to_string(), result.exit_code.to_string()),
1562 ("duration_ms".to_string(), duration.as_millis().to_string()),
1563 ("timed_out".to_string(), timed_out.to_string()),
1564 ],
1565 );
1566
1567 let decision_code = if timed_out {
1568 "CMD_TIMEOUT"
1569 } else if result.success() {
1570 "CMD_SUCCESS"
1571 } else {
1572 "CMD_FAILURE"
1573 };
1574 self.logger.log_reliability_event(ReliabilityEventInput {
1575 level: if result.success() {
1576 LogLevel::Info
1577 } else {
1578 LogLevel::Warn
1579 },
1580 phase: ReliabilityPhase::Execute,
1581 scenario_id: self.logger.test_name().to_string(),
1582 message: format!("Command execution finished: {program}"),
1583 context: ReliabilityContext {
1584 worker_id: None,
1585 repo_set: vec![self.test_dir.display().to_string()],
1586 pressure_state: None,
1587 triage_actions: if timed_out {
1588 vec!["process_killed_after_timeout".to_string()]
1589 } else {
1590 Vec::new()
1591 },
1592 decision_code: decision_code.to_string(),
1593 fallback_reason: if timed_out {
1594 Some(format!("command exceeded timeout {:?}", timeout))
1595 } else {
1596 None
1597 },
1598 },
1599 artifact_paths,
1600 });
1601
1602 if !result.stdout.is_empty() {
1603 for line in result.stdout.lines() {
1604 self.logger.log(
1605 LogLevel::Trace,
1606 LogSource::Custom(program.to_string()),
1607 line,
1608 );
1609 }
1610 }
1611
1612 if !result.stderr.is_empty() {
1613 for line in result.stderr.lines() {
1614 self.logger.log(
1615 LogLevel::Trace,
1616 LogSource::Custom(format!("{program}:stderr")),
1617 line,
1618 );
1619 }
1620 }
1621
1622 Ok(result)
1623 }
1624
1625 fn read_to_string<R: Read>(reader: &mut R) -> String {
1626 let mut buffer = Vec::new();
1627 if reader.read_to_end(&mut buffer).is_ok() {
1628 String::from_utf8_lossy(&buffer).to_string()
1629 } else {
1630 String::new()
1631 }
1632 }
1633
1634 fn join_output(handle: Option<thread::JoinHandle<String>>) -> String {
1635 match handle {
1636 Some(handle) => handle.join().unwrap_or_default(),
1637 None => String::new(),
1638 }
1639 }
1640
1641 fn sanitize_artifact_component(raw: &str) -> String {
1642 let mut sanitized = String::with_capacity(raw.len());
1643 for ch in raw.chars() {
1644 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
1645 sanitized.push(ch);
1646 } else {
1647 sanitized.push('_');
1648 }
1649 }
1650 if sanitized.is_empty() {
1651 "artifact".to_string()
1652 } else {
1653 sanitized
1654 }
1655 }
1656
1657 pub fn exec_rch<I, S>(&self, args: I) -> HarnessResult<CommandResult>
1659 where
1660 I: IntoIterator<Item = S>,
1661 S: AsRef<OsStr>,
1662 {
1663 let binary = self.config.rch_binary.to_string_lossy().to_string();
1664 self.exec(&binary, args)
1665 }
1666
1667 pub fn exec_rchd<I, S>(&self, args: I) -> HarnessResult<CommandResult>
1669 where
1670 I: IntoIterator<Item = S>,
1671 S: AsRef<OsStr>,
1672 {
1673 let binary = self.config.rchd_binary.to_string_lossy().to_string();
1674 self.exec(&binary, args)
1675 }
1676
1677 pub fn exec_rch_wkr<I, S>(&self, args: I) -> HarnessResult<CommandResult>
1679 where
1680 I: IntoIterator<Item = S>,
1681 S: AsRef<OsStr>,
1682 {
1683 let binary = self.config.rch_wkr_binary.to_string_lossy().to_string();
1684 self.exec(&binary, args)
1685 }
1686
1687 pub fn wait_for<F>(
1689 &self,
1690 description: &str,
1691 timeout: Duration,
1692 interval: Duration,
1693 condition: F,
1694 ) -> HarnessResult<()>
1695 where
1696 F: Fn() -> bool,
1697 {
1698 self.logger
1699 .debug(format!("Waiting for: {description} (timeout: {timeout:?})"));
1700
1701 let start = Instant::now();
1702 while start.elapsed() < timeout {
1703 if condition() {
1704 self.logger.debug(format!(
1705 "Condition satisfied: {description} after {:?}",
1706 start.elapsed()
1707 ));
1708 return Ok(());
1709 }
1710 std::thread::sleep(interval);
1711 }
1712
1713 self.logger.error(format!(
1714 "Timeout waiting for: {description} after {timeout:?}"
1715 ));
1716 Err(HarnessError::Timeout(timeout))
1717 }
1718
1719 pub fn wait_for_file(&self, path: &Path, timeout: Duration) -> HarnessResult<()> {
1721 let path_display = path.display().to_string();
1722 self.wait_for(
1723 &format!("file to exist: {path_display}"),
1724 timeout,
1725 Duration::from_millis(100),
1726 || path.exists(),
1727 )
1728 }
1729
1730 pub fn wait_for_socket(&self, socket_path: &Path, timeout: Duration) -> HarnessResult<()> {
1732 self.wait_for_socket_with_backoff(socket_path, timeout)
1733 }
1734
1735 pub fn wait_for_socket_with_backoff(
1740 &self,
1741 socket_path: &Path,
1742 max_wait: Duration,
1743 ) -> HarnessResult<()> {
1744 let socket_display = socket_path.display().to_string();
1745 self.logger.debug(format!(
1746 "Waiting for socket with backoff: {socket_display} (timeout: {max_wait:?})"
1747 ));
1748
1749 let start = std::time::Instant::now();
1750 let mut delay = Duration::from_millis(10);
1751 let max_delay = Duration::from_millis(500);
1752
1753 while start.elapsed() < max_wait {
1754 if socket_path.exists() {
1755 #[cfg(unix)]
1756 {
1757 match std::os::unix::net::UnixStream::connect(socket_path) {
1760 Ok(stream) => {
1761 drop(stream);
1762 self.logger.info(format!(
1763 "Socket ready after {:?}: {socket_display}",
1764 start.elapsed()
1765 ));
1766 return Ok(());
1767 }
1768 Err(err) => {
1769 self.logger.debug(format!(
1770 "Socket exists but not connectable yet ({err}); retrying..."
1771 ));
1772 }
1773 }
1774 }
1775
1776 #[cfg(not(unix))]
1777 {
1778 self.logger.info(format!(
1779 "Socket ready after {:?}: {socket_display}",
1780 start.elapsed()
1781 ));
1782 return Ok(());
1783 }
1784 }
1785 std::thread::sleep(delay);
1786 delay = (delay * 2).min(max_delay);
1787 }
1788
1789 self.logger.error(format!(
1790 "Socket timeout after {:?}: {socket_display}",
1791 max_wait
1792 ));
1793 Err(HarnessError::Timeout(max_wait))
1794 }
1795
1796 pub fn mark_passed(&self) {
1798 *self.test_passed.lock().unwrap() = true;
1799 self.logger.info("Test marked as PASSED");
1800 }
1801
1802 pub fn mark_failed(&self, reason: &str) {
1804 *self.test_passed.lock().unwrap() = false;
1805 self.logger
1806 .error(format!("Test marked as FAILED: {reason}"));
1807 }
1808
1809 pub fn passed(&self) -> bool {
1811 *self.test_passed.lock().unwrap()
1812 }
1813
1814 pub fn assert(&self, condition: bool, message: &str) -> HarnessResult<()> {
1816 if condition {
1817 self.logger.debug(format!("Assertion passed: {message}"));
1818 self.logger
1819 .log_reliability_event(ReliabilityEventInput::with_decision(
1820 ReliabilityPhase::Verify,
1821 self.logger.test_name().to_string(),
1822 format!("Assertion passed: {message}"),
1823 "ASSERT_PASS",
1824 ));
1825 Ok(())
1826 } else {
1827 self.logger.error(format!("Assertion failed: {message}"));
1828 self.logger
1829 .log_reliability_event(ReliabilityEventInput::with_decision(
1830 ReliabilityPhase::Verify,
1831 self.logger.test_name().to_string(),
1832 format!("Assertion failed: {message}"),
1833 "ASSERT_FAIL",
1834 ));
1835 Err(HarnessError::AssertionFailed(message.to_string()))
1836 }
1837 }
1838
1839 pub fn assert_eq<T: PartialEq + std::fmt::Debug>(
1841 &self,
1842 actual: T,
1843 expected: T,
1844 message: &str,
1845 ) -> HarnessResult<()> {
1846 if actual == expected {
1847 self.logger.debug(format!("Assertion passed: {message}"));
1848 self.logger
1849 .log_reliability_event(ReliabilityEventInput::with_decision(
1850 ReliabilityPhase::Verify,
1851 self.logger.test_name().to_string(),
1852 format!("Equality assertion passed: {message}"),
1853 "ASSERT_EQ_PASS",
1854 ));
1855 Ok(())
1856 } else {
1857 let msg = format!("{}: expected {:?}, got {:?}", message, expected, actual);
1858 self.logger.error(format!("Assertion failed: {msg}"));
1859 self.logger
1860 .log_reliability_event(ReliabilityEventInput::with_decision(
1861 ReliabilityPhase::Verify,
1862 self.logger.test_name().to_string(),
1863 format!("Equality assertion failed: {msg}"),
1864 "ASSERT_EQ_FAIL",
1865 ));
1866 Err(HarnessError::AssertionFailed(msg))
1867 }
1868 }
1869
1870 pub fn assert_success(&self, result: &CommandResult, context: &str) -> HarnessResult<()> {
1872 if result.success() {
1873 self.logger.debug(format!("Command succeeded: {context}"));
1874 self.logger
1875 .log_reliability_event(ReliabilityEventInput::with_decision(
1876 ReliabilityPhase::Verify,
1877 self.logger.test_name().to_string(),
1878 format!("Command success assertion passed: {context}"),
1879 "CMD_ASSERT_SUCCESS",
1880 ));
1881 Ok(())
1882 } else {
1883 let msg = format!(
1884 "{}: command failed with exit code {} - stdout: {}, stderr: {}",
1885 context,
1886 result.exit_code,
1887 result.stdout.trim(),
1888 result.stderr.trim()
1889 );
1890 self.logger.error(&msg);
1891 self.logger
1892 .log_reliability_event(ReliabilityEventInput::with_decision(
1893 ReliabilityPhase::Verify,
1894 self.logger.test_name().to_string(),
1895 format!("Command success assertion failed: {msg}"),
1896 "CMD_ASSERT_FAIL",
1897 ));
1898 Err(HarnessError::AssertionFailed(msg))
1899 }
1900 }
1901
1902 pub fn assert_stdout_contains(
1904 &self,
1905 result: &CommandResult,
1906 pattern: &str,
1907 context: &str,
1908 ) -> HarnessResult<()> {
1909 if result.stdout_contains(pattern) {
1910 self.logger.debug(format!(
1911 "Stdout contains expected pattern: {context} -> {pattern}"
1912 ));
1913 self.logger
1914 .log_reliability_event(ReliabilityEventInput::with_decision(
1915 ReliabilityPhase::Verify,
1916 self.logger.test_name().to_string(),
1917 format!("Stdout assertion passed for pattern '{pattern}'"),
1918 "STDOUT_PATTERN_PASS",
1919 ));
1920 Ok(())
1921 } else {
1922 let msg = format!(
1923 "{}: stdout does not contain '{}'. Actual stdout: {}",
1924 context,
1925 pattern,
1926 result.stdout.trim()
1927 );
1928 self.logger.error(&msg);
1929 self.logger
1930 .log_reliability_event(ReliabilityEventInput::with_decision(
1931 ReliabilityPhase::Verify,
1932 self.logger.test_name().to_string(),
1933 format!("Stdout assertion failed for pattern '{pattern}'"),
1934 "STDOUT_PATTERN_FAIL",
1935 ));
1936 Err(HarnessError::AssertionFailed(msg))
1937 }
1938 }
1939
1940 pub fn cleanup(&self) {
1942 self.logger.info("Starting cleanup");
1943 self.logger
1944 .log_reliability_event(ReliabilityEventInput::with_decision(
1945 ReliabilityPhase::Cleanup,
1946 self.logger.test_name().to_string(),
1947 "Cleanup started",
1948 "CLEANUP_START",
1949 ));
1950
1951 self.stop_all_processes();
1953
1954 let should_cleanup = if *self.test_passed.lock().unwrap() {
1956 self.config.cleanup_on_success
1957 } else {
1958 self.config.cleanup_on_failure
1959 };
1960
1961 if should_cleanup {
1962 self.logger.debug(format!(
1963 "Removing test directory: {}",
1964 self.test_dir.display()
1965 ));
1966 if let Err(e) = std::fs::remove_dir_all(&self.test_dir) {
1967 self.logger
1968 .warn(format!("Failed to remove test directory: {}", e));
1969 }
1970 } else {
1971 self.logger.info(format!(
1972 "Preserving test directory for inspection: {}",
1973 self.test_dir.display()
1974 ));
1975 }
1976
1977 self.logger
1978 .log_reliability_event(ReliabilityEventInput::with_decision(
1979 ReliabilityPhase::Cleanup,
1980 self.logger.test_name().to_string(),
1981 "Cleanup finished",
1982 "CLEANUP_DONE",
1983 ));
1984
1985 self.logger.print_summary();
1986 }
1987}
1988
1989impl Drop for TestHarness {
1990 fn drop(&mut self) {
1991 self.cleanup();
1992 }
1993}
1994
1995pub struct TestHarnessBuilder {
1997 test_name: String,
1998 config: HarnessConfig,
1999}
2000
2001impl TestHarnessBuilder {
2002 pub fn new(test_name: &str) -> Self {
2004 Self {
2005 test_name: test_name.to_string(),
2006 config: HarnessConfig::default(),
2007 }
2008 }
2009
2010 pub fn temp_dir(mut self, dir: impl Into<PathBuf>) -> Self {
2012 self.config.temp_dir = dir.into();
2013 self
2014 }
2015
2016 pub fn default_timeout(mut self, timeout: Duration) -> Self {
2018 self.config.default_timeout = timeout;
2019 self
2020 }
2021
2022 pub fn cleanup_on_success(mut self, cleanup: bool) -> Self {
2024 self.config.cleanup_on_success = cleanup;
2025 self
2026 }
2027
2028 pub fn cleanup_on_failure(mut self, cleanup: bool) -> Self {
2030 self.config.cleanup_on_failure = cleanup;
2031 self
2032 }
2033
2034 pub fn rch_binary(mut self, path: impl Into<PathBuf>) -> Self {
2036 self.config.rch_binary = path.into();
2037 self
2038 }
2039
2040 pub fn rchd_binary(mut self, path: impl Into<PathBuf>) -> Self {
2042 self.config.rchd_binary = path.into();
2043 self
2044 }
2045
2046 pub fn rch_wkr_binary(mut self, path: impl Into<PathBuf>) -> Self {
2048 self.config.rch_wkr_binary = path.into();
2049 self
2050 }
2051
2052 pub fn env(mut self, key: &str, value: &str) -> Self {
2054 self.config
2055 .env_vars
2056 .insert(key.to_string(), value.to_string());
2057 self
2058 }
2059
2060 pub fn build(self) -> HarnessResult<TestHarness> {
2062 TestHarness::new(&self.test_name, self.config)
2063 }
2064}
2065
2066#[cfg(test)]
2067mod tests {
2068 use super::*;
2069
2070 #[test]
2071 fn test_harness_creation() {
2072 let harness = TestHarnessBuilder::new("test_creation")
2073 .cleanup_on_success(true)
2074 .build()
2075 .unwrap();
2076
2077 assert!(harness.test_dir().exists());
2078 }
2080
2081 #[test]
2082 fn test_harness_file_creation() {
2083 let harness = TestHarnessBuilder::new("test_files")
2084 .cleanup_on_success(true)
2085 .build()
2086 .unwrap();
2087
2088 let file_path = harness.create_file("test.txt", "hello world").unwrap();
2089 assert!(file_path.exists());
2090
2091 let content = std::fs::read_to_string(&file_path).unwrap();
2092 assert_eq!(content, "hello world");
2093 }
2094
2095 #[test]
2096 fn test_harness_exec() {
2097 let harness = TestHarnessBuilder::new("test_exec")
2098 .cleanup_on_success(true)
2099 .build()
2100 .unwrap();
2101
2102 let result = harness.exec("echo", ["hello"]).unwrap();
2103 assert!(result.success());
2104 assert!(result.stdout_contains("hello"));
2105 }
2106
2107 #[cfg(unix)]
2108 #[test]
2109 fn test_harness_exec_timeout() {
2110 let harness = TestHarnessBuilder::new("test_exec_timeout")
2111 .cleanup_on_success(true)
2112 .build()
2113 .unwrap();
2114
2115 let result = harness
2116 .exec_with_timeout("sleep", ["1"], Duration::from_millis(50))
2117 .unwrap();
2118
2119 assert!(!result.success());
2120 assert_eq!(result.exit_code, 124);
2121 assert!(result.stderr_contains("timed out"));
2122 }
2123
2124 #[test]
2125 fn test_command_result() {
2126 let result = CommandResult {
2127 exit_code: 0,
2128 stdout: "hello world\n".to_string(),
2129 stderr: "".to_string(),
2130 duration: Duration::from_millis(10),
2131 };
2132
2133 assert!(result.success());
2134 assert!(result.stdout_contains("hello"));
2135 assert!(!result.stderr_contains("error"));
2136 }
2137
2138 #[test]
2139 fn test_harness_assertions() {
2140 let harness = TestHarnessBuilder::new("test_assertions")
2141 .cleanup_on_success(true)
2142 .build()
2143 .unwrap();
2144
2145 harness.assert(true, "should pass").unwrap();
2146 harness.assert_eq(1, 1, "numbers equal").unwrap();
2147
2148 let result = CommandResult {
2149 exit_code: 0,
2150 stdout: "success".to_string(),
2151 stderr: "".to_string(),
2152 duration: Duration::from_millis(1),
2153 };
2154 harness.assert_success(&result, "echo command").unwrap();
2155
2156 harness.mark_passed();
2157 assert!(harness.passed());
2158 }
2159
2160 #[test]
2161 fn test_reliability_harness_phase_order_and_manifest_index() {
2162 let harness = TestHarnessBuilder::new("reliability_harness_phase_order")
2163 .cleanup_on_success(true)
2164 .build()
2165 .unwrap();
2166
2167 let scenario = ReliabilityScenarioSpec::new("reliability_harness_phase_order")
2168 .with_worker_id("worker-a")
2169 .with_repo_set([harness.test_dir().display().to_string()])
2170 .with_pressure_state("disk:normal,memory:normal")
2171 .add_triage_action("initial_context")
2172 .add_pre_check(ReliabilityLifecycleCommand::new(
2173 "pre-check",
2174 "echo",
2175 ["pre-check"],
2176 ))
2177 .add_execute_command(ReliabilityLifecycleCommand::new(
2178 "execute-build",
2179 "echo",
2180 ["execute-build"],
2181 ))
2182 .add_remote_probe(ReliabilityLifecycleCommand::new(
2183 "remote-probe",
2184 "echo",
2185 ["remote-probe"],
2186 ))
2187 .add_post_check(ReliabilityLifecycleCommand::new(
2188 "post-check",
2189 "echo",
2190 ["post-check"],
2191 ))
2192 .add_cleanup_verification(ReliabilityLifecycleCommand::new(
2193 "cleanup-check",
2194 "echo",
2195 ["cleanup-check"],
2196 ));
2197
2198 let report = harness.run_reliability_scenario(&scenario).unwrap();
2199 assert_eq!(
2200 report.phase_order,
2201 vec![
2202 ReliabilityPhase::Setup,
2203 ReliabilityPhase::Execute,
2204 ReliabilityPhase::Verify,
2205 ReliabilityPhase::Cleanup
2206 ]
2207 );
2208 let stages: Vec<_> = report
2209 .command_records
2210 .iter()
2211 .map(|record| record.stage.as_str())
2212 .collect();
2213 assert_eq!(
2214 stages,
2215 vec![
2216 "pre_checks",
2217 "execute",
2218 "remote_probes",
2219 "post_checks",
2220 "cleanup_verification"
2221 ]
2222 );
2223
2224 let manifest_path = report.manifest_path.expect("manifest path should exist");
2225 assert!(manifest_path.exists());
2226 let manifest = std::fs::read_to_string(manifest_path).unwrap();
2227 assert!(manifest.contains("\"scenario_id\": \"reliability_harness_phase_order\""));
2228 }
2229
2230 #[test]
2231 fn test_reliability_harness_denies_unflagged_failure_hooks() {
2232 let harness = TestHarnessBuilder::new("reliability_harness_hook_denied")
2233 .cleanup_on_success(true)
2234 .build()
2235 .unwrap();
2236
2237 let scenario = ReliabilityScenarioSpec::new("reliability_harness_hook_denied")
2238 .request_failure_hook(ReliabilityFailureHook::NetworkCut);
2239
2240 let err = harness
2241 .run_reliability_scenario(&scenario)
2242 .expect_err("unflagged failure hook must be rejected");
2243 assert!(matches!(err, HarnessError::SetupFailed(_)));
2244 assert!(err.to_string().contains("not enabled"));
2245 }
2246
2247 #[test]
2248 fn test_reliability_harness_arms_explicit_failure_hooks() {
2249 let harness = TestHarnessBuilder::new("reliability_harness_hook_enabled")
2250 .cleanup_on_success(true)
2251 .build()
2252 .unwrap();
2253
2254 let flags = ReliabilityFailureHookFlags::allow_all();
2255
2256 let scenario = ReliabilityScenarioSpec::new("reliability_harness_hook_enabled")
2257 .with_repo_set([harness.test_dir().display().to_string()])
2258 .request_failure_hook(ReliabilityFailureHook::NetworkCut)
2259 .request_failure_hook(ReliabilityFailureHook::SyncTimeout)
2260 .request_failure_hook(ReliabilityFailureHook::PartialUpdate)
2261 .request_failure_hook(ReliabilityFailureHook::DaemonRestart)
2262 .with_failure_hook_flags(flags)
2263 .add_cleanup_verification(ReliabilityLifecycleCommand::new(
2264 "cleanup-check",
2265 "echo",
2266 ["cleanup-check"],
2267 ));
2268
2269 let report = harness.run_reliability_scenario(&scenario).unwrap();
2270 assert_eq!(
2271 report.activated_failure_hooks,
2272 vec![
2273 ReliabilityFailureHook::NetworkCut,
2274 ReliabilityFailureHook::SyncTimeout,
2275 ReliabilityFailureHook::PartialUpdate,
2276 ReliabilityFailureHook::DaemonRestart
2277 ]
2278 );
2279 assert!(
2280 report
2281 .artifact_paths
2282 .iter()
2283 .any(|path| path.contains("failure_hook_network_cut"))
2284 );
2285 assert!(
2286 harness
2287 .test_dir()
2288 .join(".reliability-hooks/network_cut.enabled")
2289 .exists()
2290 );
2291 assert!(
2292 harness
2293 .test_dir()
2294 .join(".reliability-hooks/sync_timeout.enabled")
2295 .exists()
2296 );
2297 assert!(
2298 harness
2299 .test_dir()
2300 .join(".reliability-hooks/partial_update.enabled")
2301 .exists()
2302 );
2303 assert!(
2304 harness
2305 .test_dir()
2306 .join(".reliability-hooks/daemon_restart.enabled")
2307 .exists()
2308 );
2309 }
2310
2311 #[test]
2312 fn test_reliability_harness_primitives_cover_downstream_scenarios() {
2313 let harness = TestHarnessBuilder::new("reliability_harness_downstream")
2314 .cleanup_on_success(true)
2315 .build()
2316 .unwrap();
2317
2318 let scenario_ids = ["bd-vvmd.2.8", "bd-vvmd.3.6", "bd-vvmd.4.6", "bd-vvmd.5.6"];
2319 for scenario_id in scenario_ids {
2320 let scenario = ReliabilityScenarioSpec::new(scenario_id)
2321 .with_repo_set([harness.test_dir().display().to_string()])
2322 .add_execute_command(ReliabilityLifecycleCommand::new(
2323 "smoke",
2324 "echo",
2325 [format!("scenario={scenario_id}")],
2326 ));
2327
2328 let report = harness
2329 .run_reliability_scenario(&scenario)
2330 .unwrap_or_else(|error| panic!("scenario {scenario_id} failed: {error}"));
2331 assert!(
2332 report.manifest_path.is_some(),
2333 "missing manifest for {scenario_id}"
2334 );
2335 assert!(
2336 report
2337 .command_records
2338 .iter()
2339 .any(|record| record.stage == "execute"),
2340 "missing execute stage record for {scenario_id}"
2341 );
2342 }
2343 }
2344}