Skip to main content

rch_common/e2e/
harness.rs

1//! E2E Test Harness Framework
2//!
3//! Provides infrastructure for running end-to-end tests including:
4//! - Process lifecycle management (start/stop daemon, workers)
5//! - Temporary directory and file management
6//! - Command execution with output capture
7//! - Assertions and matchers for test validation
8
9use std::collections::HashMap;
10use std::ffi::OsStr;
11use std::io::Read;
12use std::path::{Path, PathBuf};
13use std::process::{Child, Command, ExitStatus, Stdio};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::{Duration, Instant};
17
18use serde::{Deserialize, Serialize};
19
20use super::logging::{
21    LogLevel, LogSource, RELIABILITY_EVENT_SCHEMA_VERSION, ReliabilityContext,
22    ReliabilityEventInput, ReliabilityPhase, TestLogger, TestLoggerBuilder,
23};
24
25/// Error type for test harness operations
26#[derive(Debug, thiserror::Error)]
27pub enum HarnessError {
28    #[error("Process failed to start: {0}")]
29    ProcessStartFailed(String),
30
31    #[error("Process exited with non-zero status: {0}")]
32    ProcessFailed(i32),
33
34    #[error("Process timed out after {0:?}")]
35    Timeout(Duration),
36
37    #[error("Process not found: {0}")]
38    ProcessNotFound(String),
39
40    #[error("IO error: {0}")]
41    Io(#[from] std::io::Error),
42
43    #[error("Assertion failed: {0}")]
44    AssertionFailed(String),
45
46    #[error("Setup failed: {0}")]
47    SetupFailed(String),
48
49    #[error("Cleanup failed: {0}")]
50    CleanupFailed(String),
51}
52
53/// Result type for harness operations
54pub type HarnessResult<T> = Result<T, HarnessError>;
55
56/// Information about a managed process
57#[derive(Debug)]
58pub struct ProcessInfo {
59    pub name: String,
60    pub pid: u32,
61    pub started_at: Instant,
62    child: Child,
63}
64
65impl ProcessInfo {
66    /// Check if the process is still running
67    pub fn is_running(&mut self) -> bool {
68        matches!(self.child.try_wait(), Ok(None))
69    }
70
71    /// Get the process exit status (non-blocking)
72    pub fn try_exit_status(&mut self) -> Option<ExitStatus> {
73        self.child.try_wait().ok().flatten()
74    }
75
76    /// Kill the process
77    pub fn kill(&mut self) -> std::io::Result<()> {
78        self.child.kill()
79    }
80
81    /// Wait for the process to exit
82    pub fn wait(&mut self) -> std::io::Result<ExitStatus> {
83        self.child.wait()
84    }
85
86    /// Take stdout for reading (can only be called once)
87    pub fn take_stdout(&mut self) -> Option<std::process::ChildStdout> {
88        self.child.stdout.take()
89    }
90
91    /// Take stderr for reading (can only be called once)
92    pub fn take_stderr(&mut self) -> Option<std::process::ChildStderr> {
93        self.child.stderr.take()
94    }
95}
96
97/// Result of a command execution
98#[derive(Debug, Clone)]
99pub struct CommandResult {
100    pub exit_code: i32,
101    pub stdout: String,
102    pub stderr: String,
103    pub duration: Duration,
104}
105
106impl CommandResult {
107    /// Check if the command succeeded (exit code 0)
108    pub fn success(&self) -> bool {
109        self.exit_code == 0
110    }
111
112    /// Check if stdout contains a pattern
113    pub fn stdout_contains(&self, pattern: &str) -> bool {
114        self.stdout.contains(pattern)
115    }
116
117    /// Check if stderr contains a pattern
118    pub fn stderr_contains(&self, pattern: &str) -> bool {
119        self.stderr.contains(pattern)
120    }
121
122    /// Get combined output (stdout + stderr)
123    pub fn combined_output(&self) -> String {
124        format!("{}\n{}", self.stdout, self.stderr)
125    }
126}
127
128/// Configuration for the test harness
129#[derive(Debug, Clone)]
130pub struct HarnessConfig {
131    /// Base temporary directory for test artifacts
132    pub temp_dir: PathBuf,
133    /// Default timeout for commands
134    pub default_timeout: Duration,
135    /// Whether to clean up temp files on success
136    pub cleanup_on_success: bool,
137    /// Whether to clean up temp files on failure
138    pub cleanup_on_failure: bool,
139    /// Path to the rch binary
140    pub rch_binary: PathBuf,
141    /// Path to the rchd binary
142    pub rchd_binary: PathBuf,
143    /// Path to the rch-wkr binary
144    pub rch_wkr_binary: PathBuf,
145    /// Environment variables to set for all processes
146    pub env_vars: HashMap<String, String>,
147}
148
149impl Default for HarnessConfig {
150    fn default() -> Self {
151        fn cargo_bin_exe(candidates: &[&str]) -> Option<PathBuf> {
152            for candidate in candidates {
153                let key = format!("CARGO_BIN_EXE_{candidate}");
154                if let Ok(value) = std::env::var(&key) {
155                    let trimmed = value.trim();
156                    if !trimmed.is_empty() {
157                        return Some(PathBuf::from(trimmed));
158                    }
159                }
160            }
161            None
162        }
163
164        // Find binaries in target/debug or target/release.
165        //
166        // NOTE: The harness spawns processes with `current_dir = test_dir`, so we must resolve
167        // binary paths relative to the workspace root (not the per-test temp dir).
168        let manifest_dir = std::env::var("CARGO_MANIFEST_DIR")
169            .map(PathBuf::from)
170            .unwrap_or_else(|_| PathBuf::from(env!("CARGO_MANIFEST_DIR")));
171        let manifest_dir = manifest_dir.canonicalize().unwrap_or(manifest_dir);
172        let workspace_root = manifest_dir
173            .parent()
174            .unwrap_or(manifest_dir.as_path())
175            .to_path_buf();
176        let cargo_target = std::env::var("CARGO_TARGET_DIR")
177            .map(PathBuf::from)
178            .unwrap_or_else(|_| workspace_root.join("target"));
179        let cargo_target = if cargo_target.is_absolute() {
180            cargo_target
181        } else {
182            workspace_root.join(cargo_target)
183        };
184
185        let profile = if cfg!(debug_assertions) {
186            "debug"
187        } else {
188            "release"
189        };
190
191        let default_bin_dir = cargo_target.join(profile);
192        let bin_dir = if std::env::var("LLVM_PROFILE_FILE")
193            .map(|value| !value.trim().is_empty())
194            .unwrap_or(false)
195        {
196            let llvm_cov_dir = cargo_target.join("llvm-cov-target").join(profile);
197            if llvm_cov_dir.is_dir() {
198                llvm_cov_dir
199            } else {
200                default_bin_dir.clone()
201            }
202        } else {
203            default_bin_dir.clone()
204        };
205
206        let mut env_vars = HashMap::new();
207        if std::env::var("CI")
208            .map(|v| v == "1" || v.to_lowercase() == "true")
209            .unwrap_or(false)
210        {
211            env_vars.insert("RCH_MOCK_SSH".to_string(), "1".to_string());
212        }
213
214        Self {
215            temp_dir: std::env::temp_dir().join("rch_e2e_tests"),
216            default_timeout: Duration::from_secs(30),
217            cleanup_on_success: true,
218            cleanup_on_failure: false,
219            rch_binary: cargo_bin_exe(&["rch"])
220                .map(|path| {
221                    if path.is_relative() {
222                        workspace_root.join(path)
223                    } else {
224                        path
225                    }
226                })
227                .unwrap_or_else(|| bin_dir.join("rch")),
228            rchd_binary: cargo_bin_exe(&["rchd"])
229                .map(|path| {
230                    if path.is_relative() {
231                        workspace_root.join(path)
232                    } else {
233                        path
234                    }
235                })
236                .unwrap_or_else(|| bin_dir.join("rchd")),
237            rch_wkr_binary: cargo_bin_exe(&["rch-wkr", "rch_wkr"])
238                .map(|path| {
239                    if path.is_relative() {
240                        workspace_root.join(path)
241                    } else {
242                        path
243                    }
244                })
245                .unwrap_or_else(|| bin_dir.join("rch-wkr")),
246            env_vars,
247        }
248    }
249}
250
251/// Failure hooks that can be explicitly injected into reliability scenarios.
252#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
253#[serde(rename_all = "snake_case")]
254pub enum ReliabilityFailureHook {
255    NetworkCut,
256    SyncTimeout,
257    PartialUpdate,
258    DaemonRestart,
259}
260
261impl std::fmt::Display for ReliabilityFailureHook {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        let label = match self {
264            Self::NetworkCut => "network_cut",
265            Self::SyncTimeout => "sync_timeout",
266            Self::PartialUpdate => "partial_update",
267            Self::DaemonRestart => "daemon_restart",
268        };
269        write!(f, "{label}")
270    }
271}
272
273/// Explicit allowlist for failure hooks. Hooks are denied unless enabled here.
274#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
275pub struct ReliabilityFailureHookFlags {
276    pub allow_network_cut: bool,
277    pub allow_sync_timeout: bool,
278    pub allow_partial_update: bool,
279    pub allow_daemon_restart: bool,
280}
281
282impl ReliabilityFailureHookFlags {
283    /// Enable all hooks for explicit high-risk test scenarios.
284    pub fn allow_all() -> Self {
285        Self {
286            allow_network_cut: true,
287            allow_sync_timeout: true,
288            allow_partial_update: true,
289            allow_daemon_restart: true,
290        }
291    }
292
293    /// Returns true when the provided hook is explicitly enabled.
294    pub fn allows(&self, hook: ReliabilityFailureHook) -> bool {
295        match hook {
296            ReliabilityFailureHook::NetworkCut => self.allow_network_cut,
297            ReliabilityFailureHook::SyncTimeout => self.allow_sync_timeout,
298            ReliabilityFailureHook::PartialUpdate => self.allow_partial_update,
299            ReliabilityFailureHook::DaemonRestart => self.allow_daemon_restart,
300        }
301    }
302}
303
304fn default_required_success() -> bool {
305    true
306}
307
308/// One deterministic lifecycle command invoked by the reliability scenario runner.
309#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
310pub struct ReliabilityLifecycleCommand {
311    pub name: String,
312    pub program: String,
313    pub args: Vec<String>,
314    pub timeout_secs: Option<u64>,
315    #[serde(default = "default_required_success")]
316    pub required_success: bool,
317    #[serde(default)]
318    pub via_rch_exec: bool,
319}
320
321impl ReliabilityLifecycleCommand {
322    /// Build a command with required-success semantics.
323    pub fn new(
324        name: impl Into<String>,
325        program: impl Into<String>,
326        args: impl IntoIterator<Item = impl Into<String>>,
327    ) -> Self {
328        Self {
329            name: name.into(),
330            program: program.into(),
331            args: args.into_iter().map(Into::into).collect(),
332            timeout_secs: None,
333            required_success: true,
334            via_rch_exec: false,
335        }
336    }
337
338    /// Override timeout in seconds for this command.
339    pub fn with_timeout_secs(mut self, timeout_secs: u64) -> Self {
340        self.timeout_secs = Some(timeout_secs);
341        self
342    }
343
344    /// Marks this command optional; failures are logged but do not fail the scenario.
345    pub fn optional(mut self) -> Self {
346        self.required_success = false;
347        self
348    }
349
350    /// Run command via `rch exec -- <program> <args...>`.
351    pub fn via_rch_exec(mut self) -> Self {
352        self.via_rch_exec = true;
353        self
354    }
355}
356
357/// Worker lifecycle hooks consumed by reliability scenario suites.
358#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
359pub struct ReliabilityWorkerLifecycleHooks {
360    pub pre_checks: Vec<ReliabilityLifecycleCommand>,
361    pub remote_probes: Vec<ReliabilityLifecycleCommand>,
362    pub post_checks: Vec<ReliabilityLifecycleCommand>,
363    pub cleanup_verification: Vec<ReliabilityLifecycleCommand>,
364}
365
366/// Full reliability scenario definition used by the deterministic runner.
367#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
368pub struct ReliabilityScenarioSpec {
369    pub scenario_id: String,
370    pub worker_id: Option<String>,
371    pub repo_set: Vec<String>,
372    pub pressure_state: Option<String>,
373    pub triage_actions: Vec<String>,
374    pub lifecycle: ReliabilityWorkerLifecycleHooks,
375    pub execute_commands: Vec<ReliabilityLifecycleCommand>,
376    pub requested_failure_hooks: Vec<ReliabilityFailureHook>,
377    pub failure_hook_flags: ReliabilityFailureHookFlags,
378}
379
380impl ReliabilityScenarioSpec {
381    /// Creates a new scenario with deterministic defaults.
382    pub fn new(scenario_id: impl Into<String>) -> Self {
383        Self {
384            scenario_id: scenario_id.into(),
385            worker_id: None,
386            repo_set: Vec::new(),
387            pressure_state: None,
388            triage_actions: Vec::new(),
389            lifecycle: ReliabilityWorkerLifecycleHooks::default(),
390            execute_commands: Vec::new(),
391            requested_failure_hooks: Vec::new(),
392            failure_hook_flags: ReliabilityFailureHookFlags::default(),
393        }
394    }
395
396    pub fn with_worker_id(mut self, worker_id: impl Into<String>) -> Self {
397        self.worker_id = Some(worker_id.into());
398        self
399    }
400
401    pub fn with_repo_set(mut self, repos: impl IntoIterator<Item = impl Into<String>>) -> Self {
402        self.repo_set = repos.into_iter().map(Into::into).collect();
403        self
404    }
405
406    pub fn with_pressure_state(mut self, pressure_state: impl Into<String>) -> Self {
407        self.pressure_state = Some(pressure_state.into());
408        self
409    }
410
411    pub fn add_triage_action(mut self, action: impl Into<String>) -> Self {
412        self.triage_actions.push(action.into());
413        self
414    }
415
416    pub fn add_pre_check(mut self, command: ReliabilityLifecycleCommand) -> Self {
417        self.lifecycle.pre_checks.push(command);
418        self
419    }
420
421    pub fn add_remote_probe(mut self, command: ReliabilityLifecycleCommand) -> Self {
422        self.lifecycle.remote_probes.push(command);
423        self
424    }
425
426    pub fn add_post_check(mut self, command: ReliabilityLifecycleCommand) -> Self {
427        self.lifecycle.post_checks.push(command);
428        self
429    }
430
431    pub fn add_cleanup_verification(mut self, command: ReliabilityLifecycleCommand) -> Self {
432        self.lifecycle.cleanup_verification.push(command);
433        self
434    }
435
436    pub fn add_execute_command(mut self, command: ReliabilityLifecycleCommand) -> Self {
437        self.execute_commands.push(command);
438        self
439    }
440
441    pub fn request_failure_hook(mut self, hook: ReliabilityFailureHook) -> Self {
442        self.requested_failure_hooks.push(hook);
443        self
444    }
445
446    pub fn with_failure_hook_flags(mut self, flags: ReliabilityFailureHookFlags) -> Self {
447        self.failure_hook_flags = flags;
448        self
449    }
450}
451
452/// Recorded result of one lifecycle command execution.
453#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
454pub struct ReliabilityCommandRecord {
455    pub phase: ReliabilityPhase,
456    pub stage: String,
457    pub command_name: String,
458    pub invoked_program: String,
459    pub invoked_args: Vec<String>,
460    pub exit_code: i32,
461    pub duration_ms: u64,
462    pub required_success: bool,
463    pub succeeded: bool,
464    pub artifact_paths: Vec<String>,
465}
466
467/// Replay-focused artifact index for one scenario run.
468#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
469pub struct ReliabilityScenarioReport {
470    pub schema_version: String,
471    pub scenario_id: String,
472    pub phase_order: Vec<ReliabilityPhase>,
473    pub activated_failure_hooks: Vec<ReliabilityFailureHook>,
474    pub command_records: Vec<ReliabilityCommandRecord>,
475    pub artifact_paths: Vec<String>,
476    pub manifest_path: Option<PathBuf>,
477}
478
479impl ReliabilityScenarioReport {
480    fn new(scenario_id: &str) -> Self {
481        Self {
482            schema_version: RELIABILITY_EVENT_SCHEMA_VERSION.to_string(),
483            scenario_id: scenario_id.to_string(),
484            phase_order: Vec::new(),
485            activated_failure_hooks: Vec::new(),
486            command_records: Vec::new(),
487            artifact_paths: Vec::new(),
488            manifest_path: None,
489        }
490    }
491}
492
493/// Clean up stale sockets and test directories from previous test runs.
494///
495/// This function should be called during test harness setup to prevent
496/// leftover sockets from causing connection failures in new tests.
497///
498/// # Arguments
499/// * `base_dir` - The base directory containing test artifacts (e.g., `/tmp/rch_e2e_tests`)
500/// * `max_age` - Maximum age of directories to keep (default: 1 hour)
501pub fn cleanup_stale_test_artifacts(base_dir: &Path, max_age: Duration) {
502    if !base_dir.exists() {
503        return;
504    }
505
506    let now = std::time::SystemTime::now();
507    let mut cleaned_sockets = 0;
508    let mut cleaned_dirs = 0;
509
510    // First pass: clean up stale sockets in all test directories
511    if let Ok(entries) = std::fs::read_dir(base_dir) {
512        for entry in entries.flatten() {
513            let path = entry.path();
514            if !path.is_dir() {
515                continue;
516            }
517
518            // Check if the directory is old enough to clean
519            let is_stale = entry
520                .metadata()
521                .ok()
522                .and_then(|m| m.modified().ok())
523                .map(|modified| {
524                    now.duration_since(modified)
525                        .map(|age| age > max_age)
526                        .unwrap_or(false)
527                })
528                .unwrap_or(false);
529
530            // Clean up sockets in stale directories
531            if is_stale {
532                // Look for socket files
533                if let Ok(dir_entries) = std::fs::read_dir(&path) {
534                    for file_entry in dir_entries.flatten() {
535                        let file_path = file_entry.path();
536                        if file_path.extension().is_some_and(|e| e == "sock")
537                            && std::fs::remove_file(&file_path).is_ok()
538                        {
539                            cleaned_sockets += 1;
540                        }
541                    }
542                }
543
544                // Try to remove the stale directory
545                if std::fs::remove_dir_all(&path).is_ok() {
546                    cleaned_dirs += 1;
547                }
548            }
549        }
550    }
551
552    if cleaned_sockets > 0 || cleaned_dirs > 0 {
553        eprintln!(
554            "[e2e::harness] Pre-test cleanup: removed {} stale sockets, {} stale directories",
555            cleaned_sockets, cleaned_dirs
556        );
557    }
558}
559
560/// E2E Test Harness for managing test execution
561pub struct TestHarness {
562    pub config: HarnessConfig,
563    pub logger: TestLogger,
564    test_dir: PathBuf,
565    managed_processes: Arc<Mutex<HashMap<String, ProcessInfo>>>,
566    created_files: Arc<Mutex<Vec<PathBuf>>>,
567    created_dirs: Arc<Mutex<Vec<PathBuf>>>,
568    test_passed: Arc<Mutex<bool>>,
569}
570
571impl TestHarness {
572    /// Create a new test harness with the given configuration
573    pub fn new(test_name: &str, config: HarnessConfig) -> HarnessResult<Self> {
574        // Clean up stale artifacts from previous test runs (older than 1 hour)
575        cleanup_stale_test_artifacts(&config.temp_dir, Duration::from_secs(3600));
576
577        // Create unique test directory
578        let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S_%3f");
579        let test_dir =
580            config
581                .temp_dir
582                .join(format!("{}_{}", test_name.replace("::", "_"), timestamp));
583
584        std::fs::create_dir_all(&test_dir)?;
585
586        // Create logger with log dir in test directory
587        let logger = TestLoggerBuilder::new(test_name)
588            .log_dir(test_dir.join("logs"))
589            .build();
590
591        logger.info(format!("Test harness initialized: {test_name}"));
592        logger.debug(format!("Test directory: {}", test_dir.display()));
593        logger.log_reliability_event(ReliabilityEventInput {
594            level: LogLevel::Info,
595            phase: ReliabilityPhase::Setup,
596            scenario_id: test_name.to_string(),
597            message: "Test harness initialized".to_string(),
598            context: ReliabilityContext {
599                worker_id: None,
600                repo_set: vec![test_dir.display().to_string()],
601                pressure_state: None,
602                triage_actions: Vec::new(),
603                decision_code: "HARNESS_INIT".to_string(),
604                fallback_reason: None,
605            },
606            artifact_paths: Vec::new(),
607        });
608
609        Ok(Self {
610            config,
611            logger,
612            test_dir,
613            managed_processes: Arc::new(Mutex::new(HashMap::new())),
614            created_files: Arc::new(Mutex::new(Vec::new())),
615            created_dirs: Arc::new(Mutex::new(vec![])),
616            test_passed: Arc::new(Mutex::new(false)),
617        })
618    }
619
620    /// Create a harness with default configuration
621    pub fn default_for_test(test_name: &str) -> HarnessResult<Self> {
622        Self::new(test_name, HarnessConfig::default())
623    }
624
625    /// Get the test directory path
626    pub fn test_dir(&self) -> &Path {
627        &self.test_dir
628    }
629
630    /// Run a reliability scenario with deterministic setup/execute/verify/cleanup phases.
631    ///
632    /// This is the shared foundation consumed by scenario-specific suites (path dependencies,
633    /// repo convergence, disk pressure, and process triage) so they can reuse one orchestration
634    /// engine instead of bespoke scaffolding.
635    pub fn run_reliability_scenario(
636        &self,
637        scenario: &ReliabilityScenarioSpec,
638    ) -> HarnessResult<ReliabilityScenarioReport> {
639        let mut report = ReliabilityScenarioReport::new(&scenario.scenario_id);
640        let mut triage_actions = scenario.triage_actions.clone();
641
642        report.phase_order.push(ReliabilityPhase::Setup);
643        self.log_scenario_event(
644            scenario,
645            ReliabilityPhase::Setup,
646            LogLevel::Info,
647            "Reliability setup phase started",
648            "SCENARIO_SETUP_START",
649            &triage_actions,
650            None,
651            Vec::new(),
652        );
653
654        let setup_result: HarnessResult<()> = (|| {
655            report.activated_failure_hooks =
656                self.activate_failure_hooks(scenario, &mut report, &mut triage_actions)?;
657
658            self.run_phase_lifecycle_commands(
659                scenario,
660                ReliabilityPhase::Setup,
661                "pre_checks",
662                &scenario.lifecycle.pre_checks,
663                &mut report,
664                &mut triage_actions,
665            )
666        })();
667
668        if let Err(error) = setup_result.as_ref() {
669            self.log_scenario_event(
670                scenario,
671                ReliabilityPhase::Setup,
672                LogLevel::Error,
673                format!("Reliability setup phase failed: {error}"),
674                "SCENARIO_SETUP_FAIL",
675                &triage_actions,
676                Some(error.to_string()),
677                Vec::new(),
678            );
679        } else {
680            self.log_scenario_event(
681                scenario,
682                ReliabilityPhase::Setup,
683                LogLevel::Info,
684                "Reliability setup phase completed",
685                "SCENARIO_SETUP_DONE",
686                &triage_actions,
687                None,
688                Vec::new(),
689            );
690        }
691
692        report.phase_order.push(ReliabilityPhase::Execute);
693        let execute_result: HarnessResult<()> = if setup_result.is_ok() {
694            self.log_scenario_event(
695                scenario,
696                ReliabilityPhase::Execute,
697                LogLevel::Info,
698                "Reliability execute phase started",
699                "SCENARIO_EXECUTE_START",
700                &triage_actions,
701                None,
702                Vec::new(),
703            );
704
705            let result = (|| {
706                self.run_phase_lifecycle_commands(
707                    scenario,
708                    ReliabilityPhase::Execute,
709                    "execute",
710                    &scenario.execute_commands,
711                    &mut report,
712                    &mut triage_actions,
713                )?;
714                self.run_phase_lifecycle_commands(
715                    scenario,
716                    ReliabilityPhase::Execute,
717                    "remote_probes",
718                    &scenario.lifecycle.remote_probes,
719                    &mut report,
720                    &mut triage_actions,
721                )
722            })();
723
724            if let Err(error) = result.as_ref() {
725                self.log_scenario_event(
726                    scenario,
727                    ReliabilityPhase::Execute,
728                    LogLevel::Error,
729                    format!("Reliability execute phase failed: {error}"),
730                    "SCENARIO_EXECUTE_FAIL",
731                    &triage_actions,
732                    Some(error.to_string()),
733                    Vec::new(),
734                );
735            } else {
736                self.log_scenario_event(
737                    scenario,
738                    ReliabilityPhase::Execute,
739                    LogLevel::Info,
740                    "Reliability execute phase completed",
741                    "SCENARIO_EXECUTE_DONE",
742                    &triage_actions,
743                    None,
744                    Vec::new(),
745                );
746            }
747
748            result
749        } else {
750            self.log_scenario_event(
751                scenario,
752                ReliabilityPhase::Execute,
753                LogLevel::Warn,
754                "Reliability execute phase skipped due setup failure",
755                "SCENARIO_EXECUTE_SKIPPED",
756                &triage_actions,
757                Some("setup phase failed".to_string()),
758                Vec::new(),
759            );
760            Ok(())
761        };
762
763        report.phase_order.push(ReliabilityPhase::Verify);
764        let verify_result: HarnessResult<()> = if setup_result.is_ok() && execute_result.is_ok() {
765            self.log_scenario_event(
766                scenario,
767                ReliabilityPhase::Verify,
768                LogLevel::Info,
769                "Reliability verify phase started",
770                "SCENARIO_VERIFY_START",
771                &triage_actions,
772                None,
773                Vec::new(),
774            );
775
776            let result = self.run_phase_lifecycle_commands(
777                scenario,
778                ReliabilityPhase::Verify,
779                "post_checks",
780                &scenario.lifecycle.post_checks,
781                &mut report,
782                &mut triage_actions,
783            );
784
785            if let Err(error) = result.as_ref() {
786                self.log_scenario_event(
787                    scenario,
788                    ReliabilityPhase::Verify,
789                    LogLevel::Error,
790                    format!("Reliability verify phase failed: {error}"),
791                    "SCENARIO_VERIFY_FAIL",
792                    &triage_actions,
793                    Some(error.to_string()),
794                    Vec::new(),
795                );
796            } else {
797                self.log_scenario_event(
798                    scenario,
799                    ReliabilityPhase::Verify,
800                    LogLevel::Info,
801                    "Reliability verify phase completed",
802                    "SCENARIO_VERIFY_DONE",
803                    &triage_actions,
804                    None,
805                    Vec::new(),
806                );
807            }
808
809            result
810        } else {
811            self.log_scenario_event(
812                scenario,
813                ReliabilityPhase::Verify,
814                LogLevel::Warn,
815                "Reliability verify phase skipped due earlier failure",
816                "SCENARIO_VERIFY_SKIPPED",
817                &triage_actions,
818                Some("setup or execute phase failed".to_string()),
819                Vec::new(),
820            );
821            Ok(())
822        };
823
824        report.phase_order.push(ReliabilityPhase::Cleanup);
825        self.log_scenario_event(
826            scenario,
827            ReliabilityPhase::Cleanup,
828            LogLevel::Info,
829            "Reliability cleanup phase started",
830            "SCENARIO_CLEANUP_START",
831            &triage_actions,
832            None,
833            Vec::new(),
834        );
835
836        let cleanup_result = self.run_phase_lifecycle_commands(
837            scenario,
838            ReliabilityPhase::Cleanup,
839            "cleanup_verification",
840            &scenario.lifecycle.cleanup_verification,
841            &mut report,
842            &mut triage_actions,
843        );
844
845        let manifest_payload = serde_json::json!({
846            "schema_version": report.schema_version,
847            "scenario_id": report.scenario_id,
848            "phase_order": report.phase_order.iter().map(ToString::to_string).collect::<Vec<_>>(),
849            "activated_failure_hooks": report
850                .activated_failure_hooks
851                .iter()
852                .map(ToString::to_string)
853                .collect::<Vec<_>>(),
854            "command_records": report.command_records,
855            "artifact_paths": report.artifact_paths,
856        });
857        let mut cleanup_artifacts = Vec::new();
858        if let Ok(path) = self.logger.capture_artifact_json(
859            &scenario.scenario_id,
860            "scenario_artifact_index",
861            &manifest_payload,
862        ) {
863            let as_string = path.display().to_string();
864            report.manifest_path = Some(path);
865            Self::push_unique_string(&mut report.artifact_paths, as_string.clone());
866            cleanup_artifacts.push(as_string);
867        }
868
869        if let Err(error) = cleanup_result.as_ref() {
870            self.log_scenario_event(
871                scenario,
872                ReliabilityPhase::Cleanup,
873                LogLevel::Error,
874                format!("Reliability cleanup phase failed: {error}"),
875                "SCENARIO_CLEANUP_FAIL",
876                &triage_actions,
877                Some(error.to_string()),
878                cleanup_artifacts,
879            );
880        } else {
881            self.log_scenario_event(
882                scenario,
883                ReliabilityPhase::Cleanup,
884                LogLevel::Info,
885                "Reliability cleanup phase completed",
886                "SCENARIO_CLEANUP_DONE",
887                &triage_actions,
888                None,
889                cleanup_artifacts,
890            );
891        }
892
893        setup_result?;
894        execute_result?;
895        verify_result?;
896        cleanup_result?;
897
898        Ok(report)
899    }
900
901    fn run_phase_lifecycle_commands(
902        &self,
903        scenario: &ReliabilityScenarioSpec,
904        phase: ReliabilityPhase,
905        stage: &str,
906        commands: &[ReliabilityLifecycleCommand],
907        report: &mut ReliabilityScenarioReport,
908        triage_actions: &mut Vec<String>,
909    ) -> HarnessResult<()> {
910        for command in commands {
911            self.execute_lifecycle_command(
912                scenario,
913                phase,
914                stage,
915                command,
916                report,
917                triage_actions,
918            )?;
919        }
920        Ok(())
921    }
922
923    fn execute_lifecycle_command(
924        &self,
925        scenario: &ReliabilityScenarioSpec,
926        phase: ReliabilityPhase,
927        stage: &str,
928        command: &ReliabilityLifecycleCommand,
929        report: &mut ReliabilityScenarioReport,
930        triage_actions: &mut Vec<String>,
931    ) -> HarnessResult<()> {
932        let mut invoked_args = command.args.clone();
933        let invoked_program = if command.via_rch_exec {
934            let mut wrapped = vec![
935                "exec".to_string(),
936                "--".to_string(),
937                command.program.clone(),
938            ];
939            wrapped.extend(invoked_args);
940            invoked_args = wrapped;
941            "rch".to_string()
942        } else {
943            command.program.clone()
944        };
945
946        let timeout = command
947            .timeout_secs
948            .map(Duration::from_secs)
949            .unwrap_or(self.config.default_timeout);
950        let result = self.exec_with_timeout(
951            &invoked_program,
952            invoked_args.iter().map(String::as_str),
953            timeout,
954        )?;
955
956        let artifact_prefix = format!(
957            "{}_{}_{}",
958            Self::sanitize_artifact_component(stage),
959            Self::sanitize_artifact_component(&command.name),
960            chrono::Utc::now().format("%Y%m%d_%H%M%S_%3f")
961        );
962        let mut command_artifacts = Vec::new();
963        let trace_payload = serde_json::json!({
964            "scenario_id": scenario.scenario_id,
965            "phase": phase.to_string(),
966            "stage": stage,
967            "command_name": command.name,
968            "invoked_program": invoked_program,
969            "invoked_args": invoked_args,
970            "required_success": command.required_success,
971            "exit_code": result.exit_code,
972            "duration_ms": result.duration.as_millis(),
973        });
974        if let Ok(path) = self.logger.capture_artifact_json(
975            &scenario.scenario_id,
976            &format!("{artifact_prefix}_trace"),
977            &trace_payload,
978        ) {
979            command_artifacts.push(path.display().to_string());
980        }
981        if !result.stdout.is_empty()
982            && let Ok(path) = self.logger.capture_artifact_text(
983                &scenario.scenario_id,
984                &format!("{artifact_prefix}_stdout"),
985                &result.stdout,
986            )
987        {
988            command_artifacts.push(path.display().to_string());
989        }
990        if !result.stderr.is_empty()
991            && let Ok(path) = self.logger.capture_artifact_text(
992                &scenario.scenario_id,
993                &format!("{artifact_prefix}_stderr"),
994                &result.stderr,
995            )
996        {
997            command_artifacts.push(path.display().to_string());
998        }
999
1000        for artifact_path in &command_artifacts {
1001            Self::push_unique_string(&mut report.artifact_paths, artifact_path.clone());
1002        }
1003
1004        let mut event_triage = triage_actions.clone();
1005        if result.success() {
1006            Self::push_unique_string(
1007                &mut event_triage,
1008                format!("{}_{}_pass", stage, command.name),
1009            );
1010        } else {
1011            Self::push_unique_string(
1012                &mut event_triage,
1013                format!("{}_{}_fail", stage, command.name),
1014            );
1015        }
1016
1017        let decision_code = format!(
1018            "{}_{}",
1019            Self::sanitize_decision_token(stage),
1020            if result.success() { "PASS" } else { "FAIL" }
1021        );
1022
1023        self.log_scenario_event(
1024            scenario,
1025            phase,
1026            if result.success() {
1027                LogLevel::Info
1028            } else {
1029                LogLevel::Warn
1030            },
1031            format!(
1032                "Lifecycle command '{}' in stage '{}' finished with exit {}",
1033                command.name, stage, result.exit_code
1034            ),
1035            decision_code,
1036            &event_triage,
1037            if result.success() {
1038                None
1039            } else {
1040                Some(format!(
1041                    "lifecycle command '{}' failed with exit {}",
1042                    command.name, result.exit_code
1043                ))
1044            },
1045            command_artifacts.clone(),
1046        );
1047
1048        report.command_records.push(ReliabilityCommandRecord {
1049            phase,
1050            stage: stage.to_string(),
1051            command_name: command.name.clone(),
1052            invoked_program: invoked_program.clone(),
1053            invoked_args: invoked_args.clone(),
1054            exit_code: result.exit_code,
1055            duration_ms: result.duration.as_millis() as u64,
1056            required_success: command.required_success,
1057            succeeded: result.success(),
1058            artifact_paths: command_artifacts,
1059        });
1060
1061        if !result.success() && command.required_success {
1062            return Err(HarnessError::AssertionFailed(format!(
1063                "required lifecycle command '{}' failed in stage '{}' (exit={})",
1064                command.name, stage, result.exit_code
1065            )));
1066        }
1067        if !result.success() && !command.required_success {
1068            Self::push_unique_string(
1069                triage_actions,
1070                format!("optional_command_failed:{}:{}", stage, command.name),
1071            );
1072        }
1073
1074        Ok(())
1075    }
1076
1077    fn activate_failure_hooks(
1078        &self,
1079        scenario: &ReliabilityScenarioSpec,
1080        report: &mut ReliabilityScenarioReport,
1081        triage_actions: &mut Vec<String>,
1082    ) -> HarnessResult<Vec<ReliabilityFailureHook>> {
1083        let mut activated = Vec::new();
1084        if scenario.requested_failure_hooks.is_empty() {
1085            return Ok(activated);
1086        }
1087
1088        let marker_dir = self.test_dir.join(".reliability-hooks");
1089        std::fs::create_dir_all(&marker_dir)?;
1090
1091        for hook in &scenario.requested_failure_hooks {
1092            if !scenario.failure_hook_flags.allows(*hook) {
1093                self.log_scenario_event(
1094                    scenario,
1095                    ReliabilityPhase::Setup,
1096                    LogLevel::Error,
1097                    format!("Failure hook denied: {hook}"),
1098                    "FAILURE_HOOK_DENIED",
1099                    triage_actions,
1100                    Some(format!("hook {hook} requested without explicit allow flag")),
1101                    Vec::new(),
1102                );
1103                return Err(HarnessError::SetupFailed(format!(
1104                    "failure hook '{hook}' requested but not enabled"
1105                )));
1106            }
1107
1108            let marker_path = marker_dir.join(format!("{hook}.enabled"));
1109            std::fs::write(
1110                &marker_path,
1111                format!(
1112                    "scenario_id={}\nhook={hook}\narmed_at={}\n",
1113                    scenario.scenario_id,
1114                    chrono::Utc::now().to_rfc3339()
1115                ),
1116            )?;
1117
1118            let mut hook_artifacts = Vec::new();
1119            if let Ok(path) = self.logger.capture_artifact_text(
1120                &scenario.scenario_id,
1121                &format!("failure_hook_{hook}_marker"),
1122                &format!("marker_path={}", marker_path.display()),
1123            ) {
1124                hook_artifacts.push(path.display().to_string());
1125            }
1126            let hook_payload = serde_json::json!({
1127                "scenario_id": scenario.scenario_id,
1128                "hook": hook.to_string(),
1129                "marker_path": marker_path.display().to_string(),
1130                "armed_at": chrono::Utc::now().to_rfc3339(),
1131            });
1132            if let Ok(path) = self.logger.capture_artifact_json(
1133                &scenario.scenario_id,
1134                &format!("failure_hook_{hook}_payload"),
1135                &hook_payload,
1136            ) {
1137                hook_artifacts.push(path.display().to_string());
1138            }
1139
1140            for artifact_path in &hook_artifacts {
1141                Self::push_unique_string(&mut report.artifact_paths, artifact_path.clone());
1142            }
1143
1144            Self::push_unique_string(triage_actions, format!("failure_hook:{hook}:armed"));
1145
1146            self.log_scenario_event(
1147                scenario,
1148                ReliabilityPhase::Setup,
1149                LogLevel::Info,
1150                format!("Failure hook armed: {hook}"),
1151                "FAILURE_HOOK_ARMED",
1152                triage_actions,
1153                None,
1154                hook_artifacts,
1155            );
1156
1157            activated.push(*hook);
1158        }
1159
1160        Ok(activated)
1161    }
1162
1163    #[allow(clippy::too_many_arguments)]
1164    fn log_scenario_event(
1165        &self,
1166        scenario: &ReliabilityScenarioSpec,
1167        phase: ReliabilityPhase,
1168        level: LogLevel,
1169        message: impl Into<String>,
1170        decision_code: impl Into<String>,
1171        triage_actions: &[String],
1172        fallback_reason: Option<String>,
1173        artifact_paths: Vec<String>,
1174    ) {
1175        let repo_set = if scenario.repo_set.is_empty() {
1176            vec![self.test_dir.display().to_string()]
1177        } else {
1178            scenario.repo_set.clone()
1179        };
1180
1181        self.logger.log_reliability_event(ReliabilityEventInput {
1182            level,
1183            phase,
1184            scenario_id: scenario.scenario_id.clone(),
1185            message: message.into(),
1186            context: ReliabilityContext {
1187                worker_id: scenario.worker_id.clone(),
1188                repo_set,
1189                pressure_state: scenario.pressure_state.clone(),
1190                triage_actions: triage_actions.to_vec(),
1191                decision_code: decision_code.into(),
1192                fallback_reason,
1193            },
1194            artifact_paths,
1195        });
1196    }
1197
1198    fn push_unique_string(values: &mut Vec<String>, value: String) {
1199        if !values.iter().any(|existing| existing == &value) {
1200            values.push(value);
1201        }
1202    }
1203
1204    fn sanitize_decision_token(raw: &str) -> String {
1205        let mut token = String::with_capacity(raw.len());
1206        for ch in raw.chars() {
1207            if ch.is_ascii_alphanumeric() {
1208                token.push(ch.to_ascii_uppercase());
1209            } else {
1210                token.push('_');
1211            }
1212        }
1213        if token.is_empty() {
1214            "PHASE".to_string()
1215        } else {
1216            token
1217        }
1218    }
1219
1220    /// Create a subdirectory in the test directory
1221    pub fn create_dir(&self, name: &str) -> HarnessResult<PathBuf> {
1222        let path = self.test_dir.join(name);
1223        std::fs::create_dir_all(&path)?;
1224        self.created_dirs.lock().unwrap().push(path.clone());
1225        self.logger
1226            .debug(format!("Created directory: {}", path.display()));
1227        Ok(path)
1228    }
1229
1230    /// Create a file in the test directory
1231    pub fn create_file(&self, name: &str, content: &str) -> HarnessResult<PathBuf> {
1232        let path = self.test_dir.join(name);
1233        if let Some(parent) = path.parent() {
1234            std::fs::create_dir_all(parent)?;
1235        }
1236        std::fs::write(&path, content)?;
1237        self.created_files.lock().unwrap().push(path.clone());
1238        self.logger
1239            .debug(format!("Created file: {}", path.display()));
1240        Ok(path)
1241    }
1242
1243    /// Create a config file for the daemon
1244    pub fn create_daemon_config(&self, config_content: &str) -> HarnessResult<PathBuf> {
1245        let config_dir = self.create_dir("config")?;
1246        let config_path = config_dir.join("daemon.toml");
1247        std::fs::write(&config_path, config_content)?;
1248        self.logger
1249            .info(format!("Created daemon config: {}", config_path.display()));
1250        Ok(config_path)
1251    }
1252
1253    /// Create a workers config file
1254    pub fn create_workers_config(&self, config_content: &str) -> HarnessResult<PathBuf> {
1255        let config_dir = self.test_dir.join("config");
1256        std::fs::create_dir_all(&config_dir)?;
1257        let config_path = config_dir.join("workers.toml");
1258        std::fs::write(&config_path, config_content)?;
1259        self.logger
1260            .info(format!("Created workers config: {}", config_path.display()));
1261        Ok(config_path)
1262    }
1263
1264    /// Spawn a managed process
1265    pub fn spawn_process<I, S>(
1266        &self,
1267        name: &str,
1268        program: &Path,
1269        args: I,
1270        env: Option<&HashMap<String, String>>,
1271    ) -> HarnessResult<u32>
1272    where
1273        I: IntoIterator<Item = S>,
1274        S: AsRef<OsStr>,
1275    {
1276        let mut cmd = Command::new(program);
1277        // Use Stdio::null() to prevent pipe buffer blocking. Long-running
1278        // daemon processes can fill the 64KB pipe buffer on Linux, blocking
1279        // on write and becoming unresponsive to socket requests. Since no
1280        // test reads from the daemon's piped stdout/stderr, null is correct.
1281        cmd.args(args)
1282            .current_dir(&self.test_dir)
1283            .stdout(Stdio::null())
1284            .stderr(Stdio::null());
1285
1286        // Set default environment variables
1287        for (k, v) in &self.config.env_vars {
1288            cmd.env(k, v);
1289        }
1290
1291        // Set additional environment variables
1292        if let Some(env_vars) = env {
1293            for (k, v) in env_vars {
1294                cmd.env(k, v);
1295            }
1296        }
1297
1298        self.logger.info(format!(
1299            "Spawning process: {} {:?}",
1300            name,
1301            program.display()
1302        ));
1303
1304        let child = cmd.spawn().map_err(|e| {
1305            HarnessError::ProcessStartFailed(format!("{}: {}", program.display(), e))
1306        })?;
1307
1308        let pid = child.id();
1309        let info = ProcessInfo {
1310            name: name.to_string(),
1311            pid,
1312            started_at: Instant::now(),
1313            child,
1314        };
1315
1316        self.managed_processes
1317            .lock()
1318            .unwrap()
1319            .insert(name.to_string(), info);
1320
1321        self.logger.log_with_context(
1322            LogLevel::Info,
1323            LogSource::Harness,
1324            format!("Process spawned: {name}"),
1325            vec![("pid".to_string(), pid.to_string())],
1326        );
1327
1328        Ok(pid)
1329    }
1330
1331    /// Start the daemon process
1332    pub fn start_daemon(&self, extra_args: &[&str]) -> HarnessResult<u32> {
1333        let workers_config = self.test_dir.join("config").join("workers.toml");
1334        let workers_config_str: String = workers_config.to_string_lossy().into_owned();
1335        let mut args: Vec<&str> = vec!["--workers-config", &workers_config_str];
1336        args.extend(extra_args);
1337
1338        self.spawn_process("daemon", &self.config.rchd_binary, &args, None)
1339    }
1340
1341    /// Stop a managed process by name
1342    pub fn stop_process(&self, name: &str) -> HarnessResult<()> {
1343        let mut processes = self.managed_processes.lock().unwrap();
1344        if let Some(mut info) = processes.remove(name) {
1345            self.logger
1346                .info(format!("Stopping process: {} (pid={})", name, info.pid));
1347            info.kill()?;
1348            let status = info.wait()?;
1349            self.logger
1350                .debug(format!("Process {} exited with status: {:?}", name, status));
1351            Ok(())
1352        } else {
1353            Err(HarnessError::ProcessNotFound(name.to_string()))
1354        }
1355    }
1356
1357    /// Wait for a managed process to exit without killing it.
1358    pub fn wait_for_process_exit(
1359        &self,
1360        name: &str,
1361        timeout: Duration,
1362    ) -> HarnessResult<ExitStatus> {
1363        self.logger
1364            .debug(format!("Waiting for process to exit: {name} ({timeout:?})"));
1365
1366        let start = Instant::now();
1367        while start.elapsed() < timeout {
1368            let exit_status = {
1369                let mut processes = self.managed_processes.lock().unwrap();
1370                let Some(info) = processes.get_mut(name) else {
1371                    return Err(HarnessError::ProcessNotFound(name.to_string()));
1372                };
1373
1374                match info.try_exit_status() {
1375                    Some(status) => {
1376                        self.logger.debug(format!(
1377                            "Process {name} exited after {:?} with status: {:?}",
1378                            start.elapsed(),
1379                            status
1380                        ));
1381                        Some(status)
1382                    }
1383                    None => None,
1384                }
1385            };
1386
1387            if let Some(status) = exit_status {
1388                self.managed_processes.lock().unwrap().remove(name);
1389                return Ok(status);
1390            }
1391
1392            thread::sleep(Duration::from_millis(50));
1393        }
1394
1395        self.logger
1396            .error(format!("Timeout waiting for process to exit: {name}"));
1397        Err(HarnessError::Timeout(timeout))
1398    }
1399
1400    /// Stop all managed processes
1401    pub fn stop_all_processes(&self) {
1402        let mut processes = self.managed_processes.lock().unwrap();
1403        for (name, mut info) in processes.drain() {
1404            self.logger
1405                .info(format!("Stopping process: {} (pid={})", name, info.pid));
1406            if let Err(e) = info.kill() {
1407                self.logger.warn(format!("Failed to kill {}: {}", name, e));
1408            }
1409            match info.wait() {
1410                Ok(status) => {
1411                    self.logger
1412                        .debug(format!("Process {} exited: {:?}", name, status));
1413                }
1414                Err(e) => {
1415                    self.logger
1416                        .warn(format!("Failed to wait for {}: {}", name, e));
1417                }
1418            }
1419        }
1420    }
1421
1422    /// Execute a command and capture output
1423    pub fn exec<I, S>(&self, program: &str, args: I) -> HarnessResult<CommandResult>
1424    where
1425        I: IntoIterator<Item = S>,
1426        S: AsRef<OsStr>,
1427    {
1428        self.exec_with_timeout(program, args, self.config.default_timeout)
1429    }
1430
1431    /// Execute a command with a specific timeout
1432    ///
1433    /// Terminates the process if it exceeds the provided timeout.
1434    pub fn exec_with_timeout<I, S>(
1435        &self,
1436        program: &str,
1437        args: I,
1438        timeout: Duration,
1439    ) -> HarnessResult<CommandResult>
1440    where
1441        I: IntoIterator<Item = S>,
1442        S: AsRef<OsStr>,
1443    {
1444        let args: Vec<_> = args.into_iter().collect();
1445        let args_display: Vec<_> = args.iter().map(|s| s.as_ref().to_string_lossy()).collect();
1446
1447        self.logger
1448            .debug(format!("Executing: {} {}", program, args_display.join(" ")));
1449
1450        let start = Instant::now();
1451
1452        let mut cmd = Command::new(program);
1453        cmd.args(&args)
1454            .current_dir(&self.test_dir)
1455            .stdout(Stdio::piped())
1456            .stderr(Stdio::piped());
1457
1458        // Set default environment variables
1459        for (k, v) in &self.config.env_vars {
1460            cmd.env(k, v);
1461        }
1462
1463        let mut child = cmd.spawn()?;
1464        let stdout_handle = child
1465            .stdout
1466            .take()
1467            .map(|mut stdout| thread::spawn(move || Self::read_to_string(&mut stdout)));
1468        let stderr_handle = child
1469            .stderr
1470            .take()
1471            .map(|mut stderr| thread::spawn(move || Self::read_to_string(&mut stderr)));
1472
1473        let mut timed_out = false;
1474        let exit_status = loop {
1475            if let Some(status) = child.try_wait()? {
1476                break Some(status);
1477            }
1478
1479            if start.elapsed() >= timeout {
1480                timed_out = true;
1481                let _ = child.kill();
1482                break child.wait().ok();
1483            }
1484
1485            thread::sleep(Duration::from_millis(10));
1486        };
1487
1488        let duration = start.elapsed();
1489        let stdout = Self::join_output(stdout_handle);
1490        let mut stderr = Self::join_output(stderr_handle);
1491        if timed_out {
1492            if !stderr.is_empty() {
1493                stderr.push('\n');
1494            }
1495            stderr.push_str(&format!("Process timed out after {:?}.", timeout));
1496        }
1497
1498        let exit_code = exit_status
1499            .and_then(|status| status.code())
1500            .unwrap_or(if timed_out { 124 } else { -1 });
1501
1502        let result = CommandResult {
1503            exit_code,
1504            stdout,
1505            stderr,
1506            duration,
1507        };
1508
1509        let command_line = format!("{} {}", program, args_display.join(" "));
1510        let artifact_prefix = format!(
1511            "{}_{}",
1512            Self::sanitize_artifact_component(program),
1513            chrono::Utc::now().format("%Y%m%d_%H%M%S_%3f")
1514        );
1515        let mut artifact_paths = Vec::new();
1516        let trace_payload = serde_json::json!({
1517            "command": command_line,
1518            "program": program,
1519            "args": args_display,
1520            "exit_code": result.exit_code,
1521            "duration_ms": duration.as_millis(),
1522            "timed_out": timed_out,
1523            "stdout_len_bytes": result.stdout.len(),
1524            "stderr_len_bytes": result.stderr.len(),
1525        });
1526        if let Ok(path) = self.logger.capture_artifact_json(
1527            self.logger.test_name(),
1528            &format!("{artifact_prefix}_trace"),
1529            &trace_payload,
1530        ) {
1531            artifact_paths.push(path.display().to_string());
1532        }
1533        if !result.stdout.is_empty()
1534            && let Ok(path) = self.logger.capture_artifact_text(
1535                self.logger.test_name(),
1536                &format!("{artifact_prefix}_stdout"),
1537                &result.stdout,
1538            )
1539        {
1540            artifact_paths.push(path.display().to_string());
1541        }
1542        if !result.stderr.is_empty()
1543            && let Ok(path) = self.logger.capture_artifact_text(
1544                self.logger.test_name(),
1545                &format!("{artifact_prefix}_stderr"),
1546                &result.stderr,
1547            )
1548        {
1549            artifact_paths.push(path.display().to_string());
1550        }
1551
1552        self.logger.log_with_context(
1553            if result.success() {
1554                LogLevel::Debug
1555            } else {
1556                LogLevel::Warn
1557            },
1558            LogSource::Harness,
1559            format!("Command completed: {program}"),
1560            vec![
1561                ("exit_code".to_string(), result.exit_code.to_string()),
1562                ("duration_ms".to_string(), duration.as_millis().to_string()),
1563                ("timed_out".to_string(), timed_out.to_string()),
1564            ],
1565        );
1566
1567        let decision_code = if timed_out {
1568            "CMD_TIMEOUT"
1569        } else if result.success() {
1570            "CMD_SUCCESS"
1571        } else {
1572            "CMD_FAILURE"
1573        };
1574        self.logger.log_reliability_event(ReliabilityEventInput {
1575            level: if result.success() {
1576                LogLevel::Info
1577            } else {
1578                LogLevel::Warn
1579            },
1580            phase: ReliabilityPhase::Execute,
1581            scenario_id: self.logger.test_name().to_string(),
1582            message: format!("Command execution finished: {program}"),
1583            context: ReliabilityContext {
1584                worker_id: None,
1585                repo_set: vec![self.test_dir.display().to_string()],
1586                pressure_state: None,
1587                triage_actions: if timed_out {
1588                    vec!["process_killed_after_timeout".to_string()]
1589                } else {
1590                    Vec::new()
1591                },
1592                decision_code: decision_code.to_string(),
1593                fallback_reason: if timed_out {
1594                    Some(format!("command exceeded timeout {:?}", timeout))
1595                } else {
1596                    None
1597                },
1598            },
1599            artifact_paths,
1600        });
1601
1602        if !result.stdout.is_empty() {
1603            for line in result.stdout.lines() {
1604                self.logger.log(
1605                    LogLevel::Trace,
1606                    LogSource::Custom(program.to_string()),
1607                    line,
1608                );
1609            }
1610        }
1611
1612        if !result.stderr.is_empty() {
1613            for line in result.stderr.lines() {
1614                self.logger.log(
1615                    LogLevel::Trace,
1616                    LogSource::Custom(format!("{program}:stderr")),
1617                    line,
1618                );
1619            }
1620        }
1621
1622        Ok(result)
1623    }
1624
1625    fn read_to_string<R: Read>(reader: &mut R) -> String {
1626        let mut buffer = Vec::new();
1627        if reader.read_to_end(&mut buffer).is_ok() {
1628            String::from_utf8_lossy(&buffer).to_string()
1629        } else {
1630            String::new()
1631        }
1632    }
1633
1634    fn join_output(handle: Option<thread::JoinHandle<String>>) -> String {
1635        match handle {
1636            Some(handle) => handle.join().unwrap_or_default(),
1637            None => String::new(),
1638        }
1639    }
1640
1641    fn sanitize_artifact_component(raw: &str) -> String {
1642        let mut sanitized = String::with_capacity(raw.len());
1643        for ch in raw.chars() {
1644            if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
1645                sanitized.push(ch);
1646            } else {
1647                sanitized.push('_');
1648            }
1649        }
1650        if sanitized.is_empty() {
1651            "artifact".to_string()
1652        } else {
1653            sanitized
1654        }
1655    }
1656
1657    /// Execute the rch binary
1658    pub fn exec_rch<I, S>(&self, args: I) -> HarnessResult<CommandResult>
1659    where
1660        I: IntoIterator<Item = S>,
1661        S: AsRef<OsStr>,
1662    {
1663        let binary = self.config.rch_binary.to_string_lossy().to_string();
1664        self.exec(&binary, args)
1665    }
1666
1667    /// Execute the rchd binary
1668    pub fn exec_rchd<I, S>(&self, args: I) -> HarnessResult<CommandResult>
1669    where
1670        I: IntoIterator<Item = S>,
1671        S: AsRef<OsStr>,
1672    {
1673        let binary = self.config.rchd_binary.to_string_lossy().to_string();
1674        self.exec(&binary, args)
1675    }
1676
1677    /// Execute the rch-wkr binary
1678    pub fn exec_rch_wkr<I, S>(&self, args: I) -> HarnessResult<CommandResult>
1679    where
1680        I: IntoIterator<Item = S>,
1681        S: AsRef<OsStr>,
1682    {
1683        let binary = self.config.rch_wkr_binary.to_string_lossy().to_string();
1684        self.exec(&binary, args)
1685    }
1686
1687    /// Wait for a condition to become true
1688    pub fn wait_for<F>(
1689        &self,
1690        description: &str,
1691        timeout: Duration,
1692        interval: Duration,
1693        condition: F,
1694    ) -> HarnessResult<()>
1695    where
1696        F: Fn() -> bool,
1697    {
1698        self.logger
1699            .debug(format!("Waiting for: {description} (timeout: {timeout:?})"));
1700
1701        let start = Instant::now();
1702        while start.elapsed() < timeout {
1703            if condition() {
1704                self.logger.debug(format!(
1705                    "Condition satisfied: {description} after {:?}",
1706                    start.elapsed()
1707                ));
1708                return Ok(());
1709            }
1710            std::thread::sleep(interval);
1711        }
1712
1713        self.logger.error(format!(
1714            "Timeout waiting for: {description} after {timeout:?}"
1715        ));
1716        Err(HarnessError::Timeout(timeout))
1717    }
1718
1719    /// Wait for a file to exist
1720    pub fn wait_for_file(&self, path: &Path, timeout: Duration) -> HarnessResult<()> {
1721        let path_display = path.display().to_string();
1722        self.wait_for(
1723            &format!("file to exist: {path_display}"),
1724            timeout,
1725            Duration::from_millis(100),
1726            || path.exists(),
1727        )
1728    }
1729
1730    /// Wait for a socket to be available
1731    pub fn wait_for_socket(&self, socket_path: &Path, timeout: Duration) -> HarnessResult<()> {
1732        self.wait_for_socket_with_backoff(socket_path, timeout)
1733    }
1734
1735    /// Wait for a socket using exponential backoff for more reliable detection.
1736    ///
1737    /// Starts with a 10ms delay and doubles up to a maximum of 500ms per iteration.
1738    /// This reduces CPU usage while maintaining responsiveness for fast-starting daemons.
1739    pub fn wait_for_socket_with_backoff(
1740        &self,
1741        socket_path: &Path,
1742        max_wait: Duration,
1743    ) -> HarnessResult<()> {
1744        let socket_display = socket_path.display().to_string();
1745        self.logger.debug(format!(
1746            "Waiting for socket with backoff: {socket_display} (timeout: {max_wait:?})"
1747        ));
1748
1749        let start = std::time::Instant::now();
1750        let mut delay = Duration::from_millis(10);
1751        let max_delay = Duration::from_millis(500);
1752
1753        while start.elapsed() < max_wait {
1754            if socket_path.exists() {
1755                #[cfg(unix)]
1756                {
1757                    // A socket file can exist while the daemon is not yet accepting connections
1758                    // (or after a previous process died). Prefer probing connect() to avoid flakes.
1759                    match std::os::unix::net::UnixStream::connect(socket_path) {
1760                        Ok(stream) => {
1761                            drop(stream);
1762                            self.logger.info(format!(
1763                                "Socket ready after {:?}: {socket_display}",
1764                                start.elapsed()
1765                            ));
1766                            return Ok(());
1767                        }
1768                        Err(err) => {
1769                            self.logger.debug(format!(
1770                                "Socket exists but not connectable yet ({err}); retrying..."
1771                            ));
1772                        }
1773                    }
1774                }
1775
1776                #[cfg(not(unix))]
1777                {
1778                    self.logger.info(format!(
1779                        "Socket ready after {:?}: {socket_display}",
1780                        start.elapsed()
1781                    ));
1782                    return Ok(());
1783                }
1784            }
1785            std::thread::sleep(delay);
1786            delay = (delay * 2).min(max_delay);
1787        }
1788
1789        self.logger.error(format!(
1790            "Socket timeout after {:?}: {socket_display}",
1791            max_wait
1792        ));
1793        Err(HarnessError::Timeout(max_wait))
1794    }
1795
1796    /// Mark the test as passed
1797    pub fn mark_passed(&self) {
1798        *self.test_passed.lock().unwrap() = true;
1799        self.logger.info("Test marked as PASSED");
1800    }
1801
1802    /// Mark the test as failed with a reason
1803    pub fn mark_failed(&self, reason: &str) {
1804        *self.test_passed.lock().unwrap() = false;
1805        self.logger
1806            .error(format!("Test marked as FAILED: {reason}"));
1807    }
1808
1809    /// Check if the test passed
1810    pub fn passed(&self) -> bool {
1811        *self.test_passed.lock().unwrap()
1812    }
1813
1814    /// Assert that a condition is true
1815    pub fn assert(&self, condition: bool, message: &str) -> HarnessResult<()> {
1816        if condition {
1817            self.logger.debug(format!("Assertion passed: {message}"));
1818            self.logger
1819                .log_reliability_event(ReliabilityEventInput::with_decision(
1820                    ReliabilityPhase::Verify,
1821                    self.logger.test_name().to_string(),
1822                    format!("Assertion passed: {message}"),
1823                    "ASSERT_PASS",
1824                ));
1825            Ok(())
1826        } else {
1827            self.logger.error(format!("Assertion failed: {message}"));
1828            self.logger
1829                .log_reliability_event(ReliabilityEventInput::with_decision(
1830                    ReliabilityPhase::Verify,
1831                    self.logger.test_name().to_string(),
1832                    format!("Assertion failed: {message}"),
1833                    "ASSERT_FAIL",
1834                ));
1835            Err(HarnessError::AssertionFailed(message.to_string()))
1836        }
1837    }
1838
1839    /// Assert that two values are equal
1840    pub fn assert_eq<T: PartialEq + std::fmt::Debug>(
1841        &self,
1842        actual: T,
1843        expected: T,
1844        message: &str,
1845    ) -> HarnessResult<()> {
1846        if actual == expected {
1847            self.logger.debug(format!("Assertion passed: {message}"));
1848            self.logger
1849                .log_reliability_event(ReliabilityEventInput::with_decision(
1850                    ReliabilityPhase::Verify,
1851                    self.logger.test_name().to_string(),
1852                    format!("Equality assertion passed: {message}"),
1853                    "ASSERT_EQ_PASS",
1854                ));
1855            Ok(())
1856        } else {
1857            let msg = format!("{}: expected {:?}, got {:?}", message, expected, actual);
1858            self.logger.error(format!("Assertion failed: {msg}"));
1859            self.logger
1860                .log_reliability_event(ReliabilityEventInput::with_decision(
1861                    ReliabilityPhase::Verify,
1862                    self.logger.test_name().to_string(),
1863                    format!("Equality assertion failed: {msg}"),
1864                    "ASSERT_EQ_FAIL",
1865                ));
1866            Err(HarnessError::AssertionFailed(msg))
1867        }
1868    }
1869
1870    /// Assert that a command result succeeded
1871    pub fn assert_success(&self, result: &CommandResult, context: &str) -> HarnessResult<()> {
1872        if result.success() {
1873            self.logger.debug(format!("Command succeeded: {context}"));
1874            self.logger
1875                .log_reliability_event(ReliabilityEventInput::with_decision(
1876                    ReliabilityPhase::Verify,
1877                    self.logger.test_name().to_string(),
1878                    format!("Command success assertion passed: {context}"),
1879                    "CMD_ASSERT_SUCCESS",
1880                ));
1881            Ok(())
1882        } else {
1883            let msg = format!(
1884                "{}: command failed with exit code {} - stdout: {}, stderr: {}",
1885                context,
1886                result.exit_code,
1887                result.stdout.trim(),
1888                result.stderr.trim()
1889            );
1890            self.logger.error(&msg);
1891            self.logger
1892                .log_reliability_event(ReliabilityEventInput::with_decision(
1893                    ReliabilityPhase::Verify,
1894                    self.logger.test_name().to_string(),
1895                    format!("Command success assertion failed: {msg}"),
1896                    "CMD_ASSERT_FAIL",
1897                ));
1898            Err(HarnessError::AssertionFailed(msg))
1899        }
1900    }
1901
1902    /// Assert that a command result contains expected output
1903    pub fn assert_stdout_contains(
1904        &self,
1905        result: &CommandResult,
1906        pattern: &str,
1907        context: &str,
1908    ) -> HarnessResult<()> {
1909        if result.stdout_contains(pattern) {
1910            self.logger.debug(format!(
1911                "Stdout contains expected pattern: {context} -> {pattern}"
1912            ));
1913            self.logger
1914                .log_reliability_event(ReliabilityEventInput::with_decision(
1915                    ReliabilityPhase::Verify,
1916                    self.logger.test_name().to_string(),
1917                    format!("Stdout assertion passed for pattern '{pattern}'"),
1918                    "STDOUT_PATTERN_PASS",
1919                ));
1920            Ok(())
1921        } else {
1922            let msg = format!(
1923                "{}: stdout does not contain '{}'. Actual stdout: {}",
1924                context,
1925                pattern,
1926                result.stdout.trim()
1927            );
1928            self.logger.error(&msg);
1929            self.logger
1930                .log_reliability_event(ReliabilityEventInput::with_decision(
1931                    ReliabilityPhase::Verify,
1932                    self.logger.test_name().to_string(),
1933                    format!("Stdout assertion failed for pattern '{pattern}'"),
1934                    "STDOUT_PATTERN_FAIL",
1935                ));
1936            Err(HarnessError::AssertionFailed(msg))
1937        }
1938    }
1939
1940    /// Perform cleanup
1941    pub fn cleanup(&self) {
1942        self.logger.info("Starting cleanup");
1943        self.logger
1944            .log_reliability_event(ReliabilityEventInput::with_decision(
1945                ReliabilityPhase::Cleanup,
1946                self.logger.test_name().to_string(),
1947                "Cleanup started",
1948                "CLEANUP_START",
1949            ));
1950
1951        // Stop all managed processes
1952        self.stop_all_processes();
1953
1954        // Determine if we should clean up files
1955        let should_cleanup = if *self.test_passed.lock().unwrap() {
1956            self.config.cleanup_on_success
1957        } else {
1958            self.config.cleanup_on_failure
1959        };
1960
1961        if should_cleanup {
1962            self.logger.debug(format!(
1963                "Removing test directory: {}",
1964                self.test_dir.display()
1965            ));
1966            if let Err(e) = std::fs::remove_dir_all(&self.test_dir) {
1967                self.logger
1968                    .warn(format!("Failed to remove test directory: {}", e));
1969            }
1970        } else {
1971            self.logger.info(format!(
1972                "Preserving test directory for inspection: {}",
1973                self.test_dir.display()
1974            ));
1975        }
1976
1977        self.logger
1978            .log_reliability_event(ReliabilityEventInput::with_decision(
1979                ReliabilityPhase::Cleanup,
1980                self.logger.test_name().to_string(),
1981                "Cleanup finished",
1982                "CLEANUP_DONE",
1983            ));
1984
1985        self.logger.print_summary();
1986    }
1987}
1988
1989impl Drop for TestHarness {
1990    fn drop(&mut self) {
1991        self.cleanup();
1992    }
1993}
1994
1995/// Builder for creating a TestHarness with custom configuration
1996pub struct TestHarnessBuilder {
1997    test_name: String,
1998    config: HarnessConfig,
1999}
2000
2001impl TestHarnessBuilder {
2002    /// Create a new builder for the given test name
2003    pub fn new(test_name: &str) -> Self {
2004        Self {
2005            test_name: test_name.to_string(),
2006            config: HarnessConfig::default(),
2007        }
2008    }
2009
2010    /// Set the temp directory
2011    pub fn temp_dir(mut self, dir: impl Into<PathBuf>) -> Self {
2012        self.config.temp_dir = dir.into();
2013        self
2014    }
2015
2016    /// Set the default command timeout
2017    pub fn default_timeout(mut self, timeout: Duration) -> Self {
2018        self.config.default_timeout = timeout;
2019        self
2020    }
2021
2022    /// Set whether to cleanup on success
2023    pub fn cleanup_on_success(mut self, cleanup: bool) -> Self {
2024        self.config.cleanup_on_success = cleanup;
2025        self
2026    }
2027
2028    /// Set whether to cleanup on failure
2029    pub fn cleanup_on_failure(mut self, cleanup: bool) -> Self {
2030        self.config.cleanup_on_failure = cleanup;
2031        self
2032    }
2033
2034    /// Set the rch binary path
2035    pub fn rch_binary(mut self, path: impl Into<PathBuf>) -> Self {
2036        self.config.rch_binary = path.into();
2037        self
2038    }
2039
2040    /// Set the rchd binary path
2041    pub fn rchd_binary(mut self, path: impl Into<PathBuf>) -> Self {
2042        self.config.rchd_binary = path.into();
2043        self
2044    }
2045
2046    /// Set the rch-wkr binary path
2047    pub fn rch_wkr_binary(mut self, path: impl Into<PathBuf>) -> Self {
2048        self.config.rch_wkr_binary = path.into();
2049        self
2050    }
2051
2052    /// Add an environment variable
2053    pub fn env(mut self, key: &str, value: &str) -> Self {
2054        self.config
2055            .env_vars
2056            .insert(key.to_string(), value.to_string());
2057        self
2058    }
2059
2060    /// Build the TestHarness
2061    pub fn build(self) -> HarnessResult<TestHarness> {
2062        TestHarness::new(&self.test_name, self.config)
2063    }
2064}
2065
2066#[cfg(test)]
2067mod tests {
2068    use super::*;
2069
2070    #[test]
2071    fn test_harness_creation() {
2072        let harness = TestHarnessBuilder::new("test_creation")
2073            .cleanup_on_success(true)
2074            .build()
2075            .unwrap();
2076
2077        assert!(harness.test_dir().exists());
2078        // Will cleanup on drop
2079    }
2080
2081    #[test]
2082    fn test_harness_file_creation() {
2083        let harness = TestHarnessBuilder::new("test_files")
2084            .cleanup_on_success(true)
2085            .build()
2086            .unwrap();
2087
2088        let file_path = harness.create_file("test.txt", "hello world").unwrap();
2089        assert!(file_path.exists());
2090
2091        let content = std::fs::read_to_string(&file_path).unwrap();
2092        assert_eq!(content, "hello world");
2093    }
2094
2095    #[test]
2096    fn test_harness_exec() {
2097        let harness = TestHarnessBuilder::new("test_exec")
2098            .cleanup_on_success(true)
2099            .build()
2100            .unwrap();
2101
2102        let result = harness.exec("echo", ["hello"]).unwrap();
2103        assert!(result.success());
2104        assert!(result.stdout_contains("hello"));
2105    }
2106
2107    #[cfg(unix)]
2108    #[test]
2109    fn test_harness_exec_timeout() {
2110        let harness = TestHarnessBuilder::new("test_exec_timeout")
2111            .cleanup_on_success(true)
2112            .build()
2113            .unwrap();
2114
2115        let result = harness
2116            .exec_with_timeout("sleep", ["1"], Duration::from_millis(50))
2117            .unwrap();
2118
2119        assert!(!result.success());
2120        assert_eq!(result.exit_code, 124);
2121        assert!(result.stderr_contains("timed out"));
2122    }
2123
2124    #[test]
2125    fn test_command_result() {
2126        let result = CommandResult {
2127            exit_code: 0,
2128            stdout: "hello world\n".to_string(),
2129            stderr: "".to_string(),
2130            duration: Duration::from_millis(10),
2131        };
2132
2133        assert!(result.success());
2134        assert!(result.stdout_contains("hello"));
2135        assert!(!result.stderr_contains("error"));
2136    }
2137
2138    #[test]
2139    fn test_harness_assertions() {
2140        let harness = TestHarnessBuilder::new("test_assertions")
2141            .cleanup_on_success(true)
2142            .build()
2143            .unwrap();
2144
2145        harness.assert(true, "should pass").unwrap();
2146        harness.assert_eq(1, 1, "numbers equal").unwrap();
2147
2148        let result = CommandResult {
2149            exit_code: 0,
2150            stdout: "success".to_string(),
2151            stderr: "".to_string(),
2152            duration: Duration::from_millis(1),
2153        };
2154        harness.assert_success(&result, "echo command").unwrap();
2155
2156        harness.mark_passed();
2157        assert!(harness.passed());
2158    }
2159
2160    #[test]
2161    fn test_reliability_harness_phase_order_and_manifest_index() {
2162        let harness = TestHarnessBuilder::new("reliability_harness_phase_order")
2163            .cleanup_on_success(true)
2164            .build()
2165            .unwrap();
2166
2167        let scenario = ReliabilityScenarioSpec::new("reliability_harness_phase_order")
2168            .with_worker_id("worker-a")
2169            .with_repo_set([harness.test_dir().display().to_string()])
2170            .with_pressure_state("disk:normal,memory:normal")
2171            .add_triage_action("initial_context")
2172            .add_pre_check(ReliabilityLifecycleCommand::new(
2173                "pre-check",
2174                "echo",
2175                ["pre-check"],
2176            ))
2177            .add_execute_command(ReliabilityLifecycleCommand::new(
2178                "execute-build",
2179                "echo",
2180                ["execute-build"],
2181            ))
2182            .add_remote_probe(ReliabilityLifecycleCommand::new(
2183                "remote-probe",
2184                "echo",
2185                ["remote-probe"],
2186            ))
2187            .add_post_check(ReliabilityLifecycleCommand::new(
2188                "post-check",
2189                "echo",
2190                ["post-check"],
2191            ))
2192            .add_cleanup_verification(ReliabilityLifecycleCommand::new(
2193                "cleanup-check",
2194                "echo",
2195                ["cleanup-check"],
2196            ));
2197
2198        let report = harness.run_reliability_scenario(&scenario).unwrap();
2199        assert_eq!(
2200            report.phase_order,
2201            vec![
2202                ReliabilityPhase::Setup,
2203                ReliabilityPhase::Execute,
2204                ReliabilityPhase::Verify,
2205                ReliabilityPhase::Cleanup
2206            ]
2207        );
2208        let stages: Vec<_> = report
2209            .command_records
2210            .iter()
2211            .map(|record| record.stage.as_str())
2212            .collect();
2213        assert_eq!(
2214            stages,
2215            vec![
2216                "pre_checks",
2217                "execute",
2218                "remote_probes",
2219                "post_checks",
2220                "cleanup_verification"
2221            ]
2222        );
2223
2224        let manifest_path = report.manifest_path.expect("manifest path should exist");
2225        assert!(manifest_path.exists());
2226        let manifest = std::fs::read_to_string(manifest_path).unwrap();
2227        assert!(manifest.contains("\"scenario_id\": \"reliability_harness_phase_order\""));
2228    }
2229
2230    #[test]
2231    fn test_reliability_harness_denies_unflagged_failure_hooks() {
2232        let harness = TestHarnessBuilder::new("reliability_harness_hook_denied")
2233            .cleanup_on_success(true)
2234            .build()
2235            .unwrap();
2236
2237        let scenario = ReliabilityScenarioSpec::new("reliability_harness_hook_denied")
2238            .request_failure_hook(ReliabilityFailureHook::NetworkCut);
2239
2240        let err = harness
2241            .run_reliability_scenario(&scenario)
2242            .expect_err("unflagged failure hook must be rejected");
2243        assert!(matches!(err, HarnessError::SetupFailed(_)));
2244        assert!(err.to_string().contains("not enabled"));
2245    }
2246
2247    #[test]
2248    fn test_reliability_harness_arms_explicit_failure_hooks() {
2249        let harness = TestHarnessBuilder::new("reliability_harness_hook_enabled")
2250            .cleanup_on_success(true)
2251            .build()
2252            .unwrap();
2253
2254        let flags = ReliabilityFailureHookFlags::allow_all();
2255
2256        let scenario = ReliabilityScenarioSpec::new("reliability_harness_hook_enabled")
2257            .with_repo_set([harness.test_dir().display().to_string()])
2258            .request_failure_hook(ReliabilityFailureHook::NetworkCut)
2259            .request_failure_hook(ReliabilityFailureHook::SyncTimeout)
2260            .request_failure_hook(ReliabilityFailureHook::PartialUpdate)
2261            .request_failure_hook(ReliabilityFailureHook::DaemonRestart)
2262            .with_failure_hook_flags(flags)
2263            .add_cleanup_verification(ReliabilityLifecycleCommand::new(
2264                "cleanup-check",
2265                "echo",
2266                ["cleanup-check"],
2267            ));
2268
2269        let report = harness.run_reliability_scenario(&scenario).unwrap();
2270        assert_eq!(
2271            report.activated_failure_hooks,
2272            vec![
2273                ReliabilityFailureHook::NetworkCut,
2274                ReliabilityFailureHook::SyncTimeout,
2275                ReliabilityFailureHook::PartialUpdate,
2276                ReliabilityFailureHook::DaemonRestart
2277            ]
2278        );
2279        assert!(
2280            report
2281                .artifact_paths
2282                .iter()
2283                .any(|path| path.contains("failure_hook_network_cut"))
2284        );
2285        assert!(
2286            harness
2287                .test_dir()
2288                .join(".reliability-hooks/network_cut.enabled")
2289                .exists()
2290        );
2291        assert!(
2292            harness
2293                .test_dir()
2294                .join(".reliability-hooks/sync_timeout.enabled")
2295                .exists()
2296        );
2297        assert!(
2298            harness
2299                .test_dir()
2300                .join(".reliability-hooks/partial_update.enabled")
2301                .exists()
2302        );
2303        assert!(
2304            harness
2305                .test_dir()
2306                .join(".reliability-hooks/daemon_restart.enabled")
2307                .exists()
2308        );
2309    }
2310
2311    #[test]
2312    fn test_reliability_harness_primitives_cover_downstream_scenarios() {
2313        let harness = TestHarnessBuilder::new("reliability_harness_downstream")
2314            .cleanup_on_success(true)
2315            .build()
2316            .unwrap();
2317
2318        let scenario_ids = ["bd-vvmd.2.8", "bd-vvmd.3.6", "bd-vvmd.4.6", "bd-vvmd.5.6"];
2319        for scenario_id in scenario_ids {
2320            let scenario = ReliabilityScenarioSpec::new(scenario_id)
2321                .with_repo_set([harness.test_dir().display().to_string()])
2322                .add_execute_command(ReliabilityLifecycleCommand::new(
2323                    "smoke",
2324                    "echo",
2325                    [format!("scenario={scenario_id}")],
2326                ));
2327
2328            let report = harness
2329                .run_reliability_scenario(&scenario)
2330                .unwrap_or_else(|error| panic!("scenario {scenario_id} failed: {error}"));
2331            assert!(
2332                report.manifest_path.is_some(),
2333                "missing manifest for {scenario_id}"
2334            );
2335            assert!(
2336                report
2337                    .command_records
2338                    .iter()
2339                    .any(|record| record.stage == "execute"),
2340                "missing execute stage record for {scenario_id}"
2341            );
2342        }
2343    }
2344}