1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use ryra_vm::machine::{ExecOutput, Machine, scp_dir_from_vm};
7
8#[async_trait]
10pub trait Executor: Send + Sync {
11 async fn exec(&self, cmd: &str) -> Result<ExecOutput>;
13
14 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput>;
16
17 async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()>;
19
20 async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()>;
23}
24
25pub const VM_REGISTRY_PATH: &str = "/opt/ryra-test-registry";
29
30fn registry_env_prefix(path: &str) -> String {
34 format!("export {}={}; ", ryra_core::REGISTRY_DIR_ENV, path)
35}
36
37pub struct VmExecutor<'a> {
39 pub vm: &'a Machine,
40 env_prefix: String,
41}
42
43impl<'a> VmExecutor<'a> {
44 pub fn new(vm: &'a Machine) -> Self {
45 Self {
46 vm,
47 env_prefix: registry_env_prefix(VM_REGISTRY_PATH),
48 }
49 }
50}
51
52#[async_trait]
53impl Executor for VmExecutor<'_> {
54 async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
55 let wrapped = format!("{}{cmd}", self.env_prefix);
56 self.vm.exec(&wrapped).await
57 }
58
59 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
60 let wrapped = format!("{}{cmd}", self.env_prefix);
61 self.vm.exec_streaming(&wrapped, prefix).await
62 }
63
64 async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
65 self.vm.wait_for_service(unit, timeout).await
66 }
67
68 async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()> {
69 if let Some(parent) = local_path.parent() {
71 tokio::fs::create_dir_all(parent).await.ok();
72 }
73 scp_dir_from_vm(self.vm, remote_path, local_path).await
74 }
75}
76
77pub struct LocalExecutor {
83 env_prefix: String,
84}
85
86impl LocalExecutor {
87 pub fn with_registry(registry_path: &Path) -> Self {
91 Self {
92 env_prefix: registry_env_prefix(®istry_path.display().to_string()),
93 }
94 }
95}
96
97impl Default for LocalExecutor {
98 fn default() -> Self {
102 Self {
103 env_prefix: String::new(),
104 }
105 }
106}
107
108#[async_trait]
109impl Executor for LocalExecutor {
110 async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
111 let wrapped = format!("{}{cmd}", self.env_prefix);
112 let output = tokio::process::Command::new("bash")
113 .args(["-c", &wrapped])
114 .output()
115 .await
116 .with_context(|| format!("failed to exec locally: {cmd}"))?;
117
118 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
119 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
120
121 if !output.status.success() {
122 anyhow::bail!(
123 "command failed locally (exit {}): {cmd}\nstdout: {stdout}\nstderr: {stderr}",
124 output.status,
125 );
126 }
127
128 Ok(ExecOutput { stdout, stderr })
129 }
130
131 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
132 use tokio::io::{AsyncBufReadExt, BufReader};
133
134 let wrapped = format!("{}{cmd}", self.env_prefix);
135 let mut child = tokio::process::Command::new("bash")
136 .args(["-c", &wrapped])
137 .stdout(std::process::Stdio::piped())
138 .stderr(std::process::Stdio::piped())
139 .spawn()
140 .with_context(|| format!("failed to exec locally: {cmd}"))?;
141
142 let stdout_pipe = child.stdout.take();
143 let stderr_pipe = child.stderr.take();
144
145 let prefix_out = prefix.to_string();
146 let prefix_err = prefix.to_string();
147
148 let stdout_handle = tokio::spawn(async move {
149 let mut lines = String::new();
150 if let Some(pipe) = stdout_pipe {
151 let mut reader = BufReader::new(pipe).lines();
152 while let Ok(Some(line)) = reader.next_line().await {
153 if prefix_out.is_empty() {
154 println!(" {line}");
155 } else {
156 println!("[{prefix_out}] {line}");
157 }
158 lines.push_str(&line);
159 lines.push('\n');
160 }
161 }
162 lines
163 });
164
165 let stderr_handle = tokio::spawn(async move {
166 let mut lines = String::new();
167 if let Some(pipe) = stderr_pipe {
168 let mut reader = BufReader::new(pipe).lines();
169 while let Ok(Some(line)) = reader.next_line().await {
170 if prefix_err.is_empty() {
171 eprintln!(" {line}");
172 } else {
173 eprintln!("[{prefix_err}] {line}");
174 }
175 lines.push_str(&line);
176 lines.push('\n');
177 }
178 }
179 lines
180 });
181
182 let status = child.wait().await?;
183 let stdout_buf = stdout_handle.await.context("stdout reader task panicked")?;
184 let stderr_buf = stderr_handle.await.context("stderr reader task panicked")?;
185
186 if !status.success() {
187 anyhow::bail!(
188 "command failed locally (exit {}): {cmd}\nstdout: {stdout_buf}\nstderr: {stderr_buf}",
189 status,
190 );
191 }
192
193 Ok(ExecOutput {
194 stdout: stdout_buf,
195 stderr: stderr_buf,
196 })
197 }
198
199 async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
200 let start = std::time::Instant::now();
201 loop {
202 let cmd = format!(
203 "a=$(systemctl --user is-active {unit} 2>/dev/null || true); \
204 f=$(systemctl --user is-failed {unit} 2>/dev/null || true); \
205 echo \"$a|$f\""
206 );
207 let out = self.exec(&cmd).await?;
208 let line = out.stdout.trim();
209 let mut parts = line.split('|');
210 let active = parts.next().unwrap_or("");
211 let failed = parts.next().unwrap_or("");
212
213 if active == "active" {
214 return Ok(());
215 }
216 if failed == "failed" {
217 anyhow::bail!("service {unit} entered failed state");
218 }
219 if start.elapsed() > timeout {
220 anyhow::bail!("service {unit} not active after {}s", timeout.as_secs());
221 }
222 tokio::time::sleep(Duration::from_secs(1)).await;
223 }
224 }
225
226 async fn fetch_dir(&self, _remote_path: &str, _local_path: &Path) -> Result<()> {
227 Ok(())
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[tokio::test]
237 async fn local_executor_runs_command() {
238 let exec = LocalExecutor::default();
239 let out = exec.exec("echo hello").await.unwrap();
240 assert_eq!(out.stdout.trim(), "hello");
241 }
242
243 #[tokio::test]
244 async fn local_executor_fails_on_bad_command() {
245 let exec = LocalExecutor::default();
246 let result = exec.exec("false").await;
247 assert!(result.is_err());
248 }
249
250 #[tokio::test]
251 async fn local_executor_streams_output() {
252 let exec = LocalExecutor::default();
253 let out = exec.exec_streaming("echo streamed", "test").await.unwrap();
254 assert_eq!(out.stdout.trim(), "streamed");
255 }
256
257 #[tokio::test]
258 async fn local_wait_for_nonexistent_service_times_out() {
259 let exec = LocalExecutor::default();
260 let result = exec
261 .wait_for_service(
262 "definitely-not-a-real-unit-xyz123.service",
263 Duration::from_secs(2),
264 )
265 .await;
266 assert!(result.is_err());
267 }
269}