Skip to main content

ryra_test/
executor.rs

1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use ryra_vm::machine::{ExecOutput, Machine, scp_dir_from_vm};
7
8/// Abstraction over where commands run — VM (SSH) or local host.
9#[async_trait]
10pub trait Executor: Send + Sync {
11    /// Run a command and return its output. Fails if exit code != 0.
12    async fn exec(&self, cmd: &str) -> Result<ExecOutput>;
13
14    /// Run a command, streaming stdout/stderr to the terminal.
15    async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput>;
16
17    /// Wait for a systemd user service to become active.
18    async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()>;
19
20    /// Copy a directory from the execution environment to the host.
21    /// No-op for local execution (files are already on the host).
22    async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()>;
23}
24
25/// Path inside the VM where the test runner scps the registry fixtures.
26/// Set as `RYRA_REGISTRY_DIR` so every `ryra` invocation resolves the
27/// default registry to this local copy instead of cloning from GitHub.
28pub const VM_REGISTRY_PATH: &str = "/opt/ryra-test-registry";
29
30/// Build the env-var prefix the executors prepend to every command so
31/// `ryra` resolves the default registry against the test fixtures dir
32/// instead of cloning from the internet on each test.
33fn registry_env_prefix(path: &str) -> String {
34    format!("export {}={}; ", ryra_core::REGISTRY_DIR_ENV, path)
35}
36
37/// Executes commands inside a QEMU VM via SSH.
38pub struct VmExecutor<'a> {
39    pub vm: &'a Machine,
40    env_prefix: String,
41}
42
43impl<'a> VmExecutor<'a> {
44    pub fn new(vm: &'a Machine) -> Self {
45        Self {
46            vm,
47            env_prefix: registry_env_prefix(VM_REGISTRY_PATH),
48        }
49    }
50}
51
52#[async_trait]
53impl Executor for VmExecutor<'_> {
54    async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
55        let wrapped = format!("{}{cmd}", self.env_prefix);
56        self.vm.exec(&wrapped).await
57    }
58
59    async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
60        let wrapped = format!("{}{cmd}", self.env_prefix);
61        self.vm.exec_streaming(&wrapped, prefix).await
62    }
63
64    async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
65        self.vm.wait_for_service(unit, timeout).await
66    }
67
68    async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()> {
69        // Ensure the local parent dir exists so scp writes into local_path/
70        if let Some(parent) = local_path.parent() {
71            tokio::fs::create_dir_all(parent).await.ok();
72        }
73        scp_dir_from_vm(self.vm, remote_path, local_path).await
74    }
75}
76
77/// Executes commands directly on the host machine.
78///
79/// Carries an optional `RYRA_REGISTRY_DIR` override prepended to every
80/// command — bare-mode tests use this to point ryra at the in-repo
81/// `registry/` checkout instead of cloning from GitHub on each run.
82pub struct LocalExecutor {
83    env_prefix: String,
84}
85
86impl LocalExecutor {
87    /// Construct a LocalExecutor that prepends `RYRA_REGISTRY_DIR=<path>`
88    /// to every command, so `ryra` resolves the default registry to the
89    /// local checkout under test.
90    pub fn with_registry(registry_path: &Path) -> Self {
91        Self {
92            env_prefix: registry_env_prefix(&registry_path.display().to_string()),
93        }
94    }
95}
96
97impl Default for LocalExecutor {
98    /// LocalExecutor with no registry override — only useful when the
99    /// commands run don't touch `ryra`. Most callers want
100    /// [`LocalExecutor::with_registry`].
101    fn default() -> Self {
102        Self {
103            env_prefix: String::new(),
104        }
105    }
106}
107
108#[async_trait]
109impl Executor for LocalExecutor {
110    async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
111        let wrapped = format!("{}{cmd}", self.env_prefix);
112        let output = tokio::process::Command::new("bash")
113            .args(["-c", &wrapped])
114            .output()
115            .await
116            .with_context(|| format!("failed to exec locally: {cmd}"))?;
117
118        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
119        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
120
121        if !output.status.success() {
122            anyhow::bail!(
123                "command failed locally (exit {}): {cmd}\nstdout: {stdout}\nstderr: {stderr}",
124                output.status,
125            );
126        }
127
128        Ok(ExecOutput { stdout, stderr })
129    }
130
131    async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
132        use tokio::io::{AsyncBufReadExt, BufReader};
133
134        let wrapped = format!("{}{cmd}", self.env_prefix);
135        let mut child = tokio::process::Command::new("bash")
136            .args(["-c", &wrapped])
137            .stdout(std::process::Stdio::piped())
138            .stderr(std::process::Stdio::piped())
139            .spawn()
140            .with_context(|| format!("failed to exec locally: {cmd}"))?;
141
142        let stdout_pipe = child.stdout.take();
143        let stderr_pipe = child.stderr.take();
144
145        let prefix_out = prefix.to_string();
146        let prefix_err = prefix.to_string();
147
148        let stdout_handle = tokio::spawn(async move {
149            let mut lines = String::new();
150            if let Some(pipe) = stdout_pipe {
151                let mut reader = BufReader::new(pipe).lines();
152                while let Ok(Some(line)) = reader.next_line().await {
153                    if prefix_out.is_empty() {
154                        println!("    {line}");
155                    } else {
156                        println!("[{prefix_out}]     {line}");
157                    }
158                    lines.push_str(&line);
159                    lines.push('\n');
160                }
161            }
162            lines
163        });
164
165        let stderr_handle = tokio::spawn(async move {
166            let mut lines = String::new();
167            if let Some(pipe) = stderr_pipe {
168                let mut reader = BufReader::new(pipe).lines();
169                while let Ok(Some(line)) = reader.next_line().await {
170                    if prefix_err.is_empty() {
171                        eprintln!("    {line}");
172                    } else {
173                        eprintln!("[{prefix_err}]     {line}");
174                    }
175                    lines.push_str(&line);
176                    lines.push('\n');
177                }
178            }
179            lines
180        });
181
182        let status = child.wait().await?;
183        let stdout_buf = stdout_handle.await.context("stdout reader task panicked")?;
184        let stderr_buf = stderr_handle.await.context("stderr reader task panicked")?;
185
186        if !status.success() {
187            anyhow::bail!(
188                "command failed locally (exit {}): {cmd}\nstdout: {stdout_buf}\nstderr: {stderr_buf}",
189                status,
190            );
191        }
192
193        Ok(ExecOutput {
194            stdout: stdout_buf,
195            stderr: stderr_buf,
196        })
197    }
198
199    async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
200        let start = std::time::Instant::now();
201        loop {
202            let cmd = format!(
203                "a=$(systemctl --user is-active {unit} 2>/dev/null || true); \
204                 f=$(systemctl --user is-failed {unit} 2>/dev/null || true); \
205                 echo \"$a|$f\""
206            );
207            let out = self.exec(&cmd).await?;
208            let line = out.stdout.trim();
209            let mut parts = line.split('|');
210            let active = parts.next().unwrap_or("");
211            let failed = parts.next().unwrap_or("");
212
213            if active == "active" {
214                return Ok(());
215            }
216            if failed == "failed" {
217                anyhow::bail!("service {unit} entered failed state");
218            }
219            if start.elapsed() > timeout {
220                anyhow::bail!("service {unit} not active after {}s", timeout.as_secs());
221            }
222            tokio::time::sleep(Duration::from_secs(1)).await;
223        }
224    }
225
226    async fn fetch_dir(&self, _remote_path: &str, _local_path: &Path) -> Result<()> {
227        // No-op: in local mode the "remote" path is already on the host.
228        Ok(())
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[tokio::test]
237    async fn local_executor_runs_command() {
238        let exec = LocalExecutor::default();
239        let out = exec.exec("echo hello").await.unwrap();
240        assert_eq!(out.stdout.trim(), "hello");
241    }
242
243    #[tokio::test]
244    async fn local_executor_fails_on_bad_command() {
245        let exec = LocalExecutor::default();
246        let result = exec.exec("false").await;
247        assert!(result.is_err());
248    }
249
250    #[tokio::test]
251    async fn local_executor_streams_output() {
252        let exec = LocalExecutor::default();
253        let out = exec.exec_streaming("echo streamed", "test").await.unwrap();
254        assert_eq!(out.stdout.trim(), "streamed");
255    }
256
257    #[tokio::test]
258    async fn local_wait_for_nonexistent_service_times_out() {
259        let exec = LocalExecutor::default();
260        let result = exec
261            .wait_for_service(
262                "definitely-not-a-real-unit-xyz123.service",
263                Duration::from_secs(2),
264            )
265            .await;
266        assert!(result.is_err());
267        // Should time out or report non-active, not hang
268    }
269}