1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use ryra_vm::machine::{ExecOutput, Machine, scp_dir_from_vm};
7
8#[async_trait]
10pub trait Executor: Send + Sync {
11 async fn exec(&self, cmd: &str) -> Result<ExecOutput>;
13
14 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput>;
16
17 async fn wait_for_service(&self, unit: &str, timeout: Duration, prefix: &str) -> Result<()>;
21
22 async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()>;
25
26 fn playwright_out_dir(&self, test_name: &str) -> String;
32}
33
34pub const VM_REGISTRY_PATH: &str = "/opt/ryra-test-registry";
38
39const HEALTH_DEFINED_FMT: &str = "{{if .State.Health}}yes{{else}}no{{end}}";
46
47fn registry_env_prefix(path: &str) -> String {
51 format!("export {}={}; ", ryra_core::REGISTRY_DIR_ENV, path)
52}
53
54pub struct VmExecutor<'a> {
56 pub vm: &'a Machine,
57 env_prefix: String,
58}
59
60impl<'a> VmExecutor<'a> {
61 pub fn new(vm: &'a Machine) -> Self {
62 Self {
63 vm,
64 env_prefix: registry_env_prefix(VM_REGISTRY_PATH),
65 }
66 }
67}
68
69#[async_trait]
70impl Executor for VmExecutor<'_> {
71 async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
72 let wrapped = format!("{}{cmd}", self.env_prefix);
73 self.vm.exec(&wrapped).await
74 }
75
76 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
77 let wrapped = format!("{}{cmd}", self.env_prefix);
78 self.vm.exec_streaming(&wrapped, prefix).await
79 }
80
81 async fn wait_for_service(&self, unit: &str, timeout: Duration, prefix: &str) -> Result<()> {
82 self.vm.wait_for_service(unit, timeout, prefix).await
83 }
84
85 async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()> {
86 if let Some(parent) = local_path.parent() {
88 tokio::fs::create_dir_all(parent).await.ok();
89 }
90 scp_dir_from_vm(self.vm, remote_path, local_path).await
91 }
92
93 fn playwright_out_dir(&self, test_name: &str) -> String {
94 format!("/home/ryra/.local/share/services-test/reports/{test_name}/playwright")
97 }
98}
99
100pub struct LocalExecutor {
106 env_prefix: String,
107}
108
109impl LocalExecutor {
110 pub fn with_registry(registry_path: &Path) -> Self {
114 let mut env_prefix = String::new();
115 if let Ok(exe) = std::env::current_exe()
123 && let Some(dir) = exe.parent()
124 {
125 env_prefix.push_str(&format!("export PATH=\"{}:$PATH\"; ", dir.display()));
126 }
127 env_prefix.push_str(®istry_env_prefix(®istry_path.display().to_string()));
128 Self { env_prefix }
129 }
130
131 pub fn with_config_dir(mut self, config_dir: &Path) -> Self {
135 self.env_prefix.push_str(&format!(
136 "export {}={}; ",
137 ryra_core::CONFIG_DIR_ENV,
138 config_dir.display()
139 ));
140 self
141 }
142
143 pub fn with_data_dir(mut self, data_dir: &Path) -> Self {
148 self.env_prefix.push_str(&format!(
149 "export {}={}; ",
150 ryra_core::DATA_DIR_ENV,
151 data_dir.display()
152 ));
153 self
154 }
155}
156
157impl Default for LocalExecutor {
158 fn default() -> Self {
162 Self {
163 env_prefix: String::new(),
164 }
165 }
166}
167
168#[async_trait]
169impl Executor for LocalExecutor {
170 async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
171 let wrapped = format!("{}{cmd}", self.env_prefix);
172 let output = tokio::process::Command::new("bash")
173 .args(["-c", &wrapped])
174 .output()
175 .await
176 .with_context(|| format!("failed to exec locally: {cmd}"))?;
177
178 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
179 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
180
181 if !output.status.success() {
182 anyhow::bail!(
183 "command failed locally (exit {}): {cmd}\nstdout: {stdout}\nstderr: {stderr}",
184 output.status,
185 );
186 }
187
188 Ok(ExecOutput { stdout, stderr })
189 }
190
191 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
192 use tokio::io::{AsyncBufReadExt, BufReader};
193
194 let wrapped = format!("{}{cmd}", self.env_prefix);
195 let mut child = tokio::process::Command::new("bash")
196 .args(["-c", &wrapped])
197 .stdout(std::process::Stdio::piped())
198 .stderr(std::process::Stdio::piped())
199 .spawn()
200 .with_context(|| format!("failed to exec locally: {cmd}"))?;
201
202 let stdout_pipe = child.stdout.take();
203 let stderr_pipe = child.stderr.take();
204
205 let prefix_out = prefix.to_string();
206 let prefix_err = prefix.to_string();
207
208 let stdout_handle = tokio::spawn(async move {
209 let mut lines = String::new();
210 if let Some(pipe) = stdout_pipe {
211 let mut reader = BufReader::new(pipe).lines();
212 while let Ok(Some(line)) = reader.next_line().await {
213 if prefix_out.is_empty() {
214 println!(" {line}");
215 } else {
216 println!("[{prefix_out}] {line}");
217 }
218 lines.push_str(&line);
219 lines.push('\n');
220 }
221 }
222 lines
223 });
224
225 let stderr_handle = tokio::spawn(async move {
226 let mut lines = String::new();
227 if let Some(pipe) = stderr_pipe {
228 let mut reader = BufReader::new(pipe).lines();
229 while let Ok(Some(line)) = reader.next_line().await {
230 if prefix_err.is_empty() {
231 eprintln!(" {line}");
232 } else {
233 eprintln!("[{prefix_err}] {line}");
234 }
235 lines.push_str(&line);
236 lines.push('\n');
237 }
238 }
239 lines
240 });
241
242 let status = child.wait().await?;
243 let stdout_buf = stdout_handle.await.context("stdout reader task panicked")?;
244 let stderr_buf = stderr_handle.await.context("stderr reader task panicked")?;
245
246 if !status.success() {
247 anyhow::bail!(
248 "command failed locally (exit {}): {cmd}\nstdout: {stdout_buf}\nstderr: {stderr_buf}",
249 status,
250 );
251 }
252
253 Ok(ExecOutput {
254 stdout: stdout_buf,
255 stderr: stderr_buf,
256 })
257 }
258
259 async fn wait_for_service(&self, unit: &str, timeout: Duration, prefix: &str) -> Result<()> {
260 let container = unit.trim_end_matches(".service");
262 let mut progress =
263 ryra_vm::progress::WaitProgress::new(unit, "systemctl + healthcheck", timeout)
264 .with_prefix(prefix);
265 loop {
266 let cmd = format!(
272 "a=$(systemctl --user is-active {unit} 2>/dev/null || true); \
273 f=$(systemctl --user is-failed {unit} 2>/dev/null || true); \
274 if [ \"$(podman inspect --format '{HEALTH_DEFINED_FMT}' {container} 2>/dev/null)\" = yes ]; then \
275 podman healthcheck run {container} >/dev/null 2>&1 && h=healthy || h=unhealthy; \
276 else h=none; fi; \
277 echo \"$a|$f|$h\""
278 );
279 let out = self.exec(&cmd).await?;
280 let line = out.stdout.trim();
281 let mut parts = line.split('|');
282 let active = parts.next().unwrap_or("");
283 let failed = parts.next().unwrap_or("");
284 let health = parts.next().unwrap_or("");
285
286 if failed == "failed" {
287 anyhow::bail!("service {unit} entered failed state");
288 }
289 if active == "active" {
290 match health {
297 "healthy" | "none" | "" => return Ok(()),
298 _ => {}
300 }
301 }
302 if progress.timed_out() {
303 anyhow::bail!(
304 "service {unit} not ready after {}s (active={active}, health={health})",
305 timeout.as_secs()
306 );
307 }
308 progress.tick();
309 tokio::time::sleep(Duration::from_secs(1)).await;
310 }
311 }
312
313 async fn fetch_dir(&self, _remote_path: &str, _local_path: &Path) -> Result<()> {
314 Ok(())
316 }
317
318 fn playwright_out_dir(&self, test_name: &str) -> String {
319 crate::reports::reports_dir()
322 .map(|d| d.join(test_name).join("playwright").display().to_string())
323 .unwrap_or_else(|_| format!("/tmp/ryra-test-reports/{test_name}/playwright"))
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[tokio::test]
332 async fn local_executor_runs_command() {
333 let exec = LocalExecutor::default();
334 let out = exec.exec("echo hello").await.unwrap();
335 assert_eq!(out.stdout.trim(), "hello");
336 }
337
338 #[tokio::test]
339 async fn local_executor_fails_on_bad_command() {
340 let exec = LocalExecutor::default();
341 let result = exec.exec("false").await;
342 assert!(result.is_err());
343 }
344
345 #[tokio::test]
346 async fn local_executor_streams_output() {
347 let exec = LocalExecutor::default();
348 let out = exec.exec_streaming("echo streamed", "test").await.unwrap();
349 assert_eq!(out.stdout.trim(), "streamed");
350 }
351
352 #[tokio::test]
353 async fn local_wait_for_nonexistent_service_times_out() {
354 let exec = LocalExecutor::default();
355 let result = exec
356 .wait_for_service(
357 "definitely-not-a-real-unit-xyz123.service",
358 Duration::from_secs(2),
359 " ",
360 )
361 .await;
362 assert!(result.is_err());
363 }
365}