Skip to main content

ryra_test/
executor.rs

1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use ryra_vm::machine::{ExecOutput, Machine, scp_dir_from_vm};
7
8/// Abstraction over where commands run — VM (SSH) or local host.
9#[async_trait]
10pub trait Executor: Send + Sync {
11    /// Run a command and return its output. Fails if exit code != 0.
12    async fn exec(&self, cmd: &str) -> Result<ExecOutput>;
13
14    /// Run a command, streaming stdout/stderr to the terminal.
15    async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput>;
16
17    /// Wait for a systemd user service to become active.
18    async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()>;
19
20    /// Copy a directory from the execution environment to the host.
21    /// No-op for local execution (files are already on the host).
22    async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()>;
23}
24
25/// Executes commands inside a QEMU VM via SSH.
26pub struct VmExecutor<'a> {
27    pub vm: &'a Machine,
28}
29
30impl<'a> VmExecutor<'a> {
31    pub fn new(vm: &'a Machine) -> Self {
32        Self { vm }
33    }
34}
35
36#[async_trait]
37impl Executor for VmExecutor<'_> {
38    async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
39        self.vm.exec(cmd).await
40    }
41
42    async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
43        self.vm.exec_streaming(cmd, prefix).await
44    }
45
46    async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
47        self.vm.wait_for_service(unit, timeout).await
48    }
49
50    async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()> {
51        // Ensure the local parent dir exists so scp writes into local_path/
52        if let Some(parent) = local_path.parent() {
53            tokio::fs::create_dir_all(parent).await.ok();
54        }
55        scp_dir_from_vm(self.vm, remote_path, local_path).await
56    }
57}
58
59/// Executes commands directly on the host machine.
60pub struct LocalExecutor;
61
62#[async_trait]
63impl Executor for LocalExecutor {
64    async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
65        let output = tokio::process::Command::new("bash")
66            .args(["-c", cmd])
67            .output()
68            .await
69            .with_context(|| format!("failed to exec locally: {cmd}"))?;
70
71        let stdout = String::from_utf8_lossy(&output.stdout).to_string();
72        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
73
74        if !output.status.success() {
75            anyhow::bail!(
76                "command failed locally (exit {}): {cmd}\nstdout: {stdout}\nstderr: {stderr}",
77                output.status,
78            );
79        }
80
81        Ok(ExecOutput { stdout, stderr })
82    }
83
84    async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
85        use tokio::io::{AsyncBufReadExt, BufReader};
86
87        let mut child = tokio::process::Command::new("bash")
88            .args(["-c", cmd])
89            .stdout(std::process::Stdio::piped())
90            .stderr(std::process::Stdio::piped())
91            .spawn()
92            .with_context(|| format!("failed to exec locally: {cmd}"))?;
93
94        let stdout_pipe = child.stdout.take();
95        let stderr_pipe = child.stderr.take();
96
97        let prefix_out = prefix.to_string();
98        let prefix_err = prefix.to_string();
99
100        let stdout_handle = tokio::spawn(async move {
101            let mut lines = String::new();
102            if let Some(pipe) = stdout_pipe {
103                let mut reader = BufReader::new(pipe).lines();
104                while let Ok(Some(line)) = reader.next_line().await {
105                    if prefix_out.is_empty() {
106                        println!("    {line}");
107                    } else {
108                        println!("[{prefix_out}]     {line}");
109                    }
110                    lines.push_str(&line);
111                    lines.push('\n');
112                }
113            }
114            lines
115        });
116
117        let stderr_handle = tokio::spawn(async move {
118            let mut lines = String::new();
119            if let Some(pipe) = stderr_pipe {
120                let mut reader = BufReader::new(pipe).lines();
121                while let Ok(Some(line)) = reader.next_line().await {
122                    if prefix_err.is_empty() {
123                        eprintln!("    {line}");
124                    } else {
125                        eprintln!("[{prefix_err}]     {line}");
126                    }
127                    lines.push_str(&line);
128                    lines.push('\n');
129                }
130            }
131            lines
132        });
133
134        let status = child.wait().await?;
135        let stdout_buf = stdout_handle.await.context("stdout reader task panicked")?;
136        let stderr_buf = stderr_handle.await.context("stderr reader task panicked")?;
137
138        if !status.success() {
139            anyhow::bail!(
140                "command failed locally (exit {}): {cmd}\nstdout: {stdout_buf}\nstderr: {stderr_buf}",
141                status,
142            );
143        }
144
145        Ok(ExecOutput {
146            stdout: stdout_buf,
147            stderr: stderr_buf,
148        })
149    }
150
151    async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
152        let start = std::time::Instant::now();
153        loop {
154            let cmd = format!(
155                "a=$(systemctl --user is-active {unit} 2>/dev/null || true); \
156                 f=$(systemctl --user is-failed {unit} 2>/dev/null || true); \
157                 echo \"$a|$f\""
158            );
159            let out = self.exec(&cmd).await?;
160            let line = out.stdout.trim();
161            let mut parts = line.split('|');
162            let active = parts.next().unwrap_or("");
163            let failed = parts.next().unwrap_or("");
164
165            if active == "active" {
166                return Ok(());
167            }
168            if failed == "failed" {
169                anyhow::bail!("service {unit} entered failed state");
170            }
171            if start.elapsed() > timeout {
172                anyhow::bail!("service {unit} not active after {}s", timeout.as_secs());
173            }
174            tokio::time::sleep(Duration::from_secs(1)).await;
175        }
176    }
177
178    async fn fetch_dir(&self, _remote_path: &str, _local_path: &Path) -> Result<()> {
179        // No-op: in local mode the "remote" path is already on the host.
180        Ok(())
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    #[tokio::test]
189    async fn local_executor_runs_command() {
190        let exec = LocalExecutor;
191        let out = exec.exec("echo hello").await.unwrap();
192        assert_eq!(out.stdout.trim(), "hello");
193    }
194
195    #[tokio::test]
196    async fn local_executor_fails_on_bad_command() {
197        let exec = LocalExecutor;
198        let result = exec.exec("false").await;
199        assert!(result.is_err());
200    }
201
202    #[tokio::test]
203    async fn local_executor_streams_output() {
204        let exec = LocalExecutor;
205        let out = exec.exec_streaming("echo streamed", "test").await.unwrap();
206        assert_eq!(out.stdout.trim(), "streamed");
207    }
208
209    #[tokio::test]
210    async fn local_wait_for_nonexistent_service_times_out() {
211        let exec = LocalExecutor;
212        let result = exec
213            .wait_for_service(
214                "definitely-not-a-real-unit-xyz123.service",
215                Duration::from_secs(2),
216            )
217            .await;
218        assert!(result.is_err());
219        // Should time out or report non-active, not hang
220    }
221}