1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use ryra_vm::machine::{ExecOutput, Machine, scp_dir_from_vm};
7
8#[async_trait]
10pub trait Executor: Send + Sync {
11 async fn exec(&self, cmd: &str) -> Result<ExecOutput>;
13
14 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput>;
16
17 async fn wait_for_service(&self, unit: &str, timeout: Duration, prefix: &str) -> Result<()>;
21
22 async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()>;
25
26 fn playwright_out_dir(&self, test_name: &str) -> String;
32}
33
34pub const VM_REGISTRY_PATH: &str = "/opt/ryra-test-registry";
38
39const HEALTH_DEFINED_FMT: &str = "{{if .State.Health}}yes{{else}}no{{end}}";
46
47fn registry_env_prefix(path: &str) -> String {
51 format!("export {}={}; ", ryra_core::REGISTRY_DIR_ENV, path)
52}
53
54pub struct VmExecutor<'a> {
56 pub vm: &'a Machine,
57 env_prefix: String,
58}
59
60impl<'a> VmExecutor<'a> {
61 pub fn new(vm: &'a Machine) -> Self {
62 Self {
63 vm,
64 env_prefix: registry_env_prefix(VM_REGISTRY_PATH),
65 }
66 }
67}
68
69#[async_trait]
70impl Executor for VmExecutor<'_> {
71 async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
72 let wrapped = format!("{}{cmd}", self.env_prefix);
73 self.vm.exec(&wrapped).await
74 }
75
76 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
77 let wrapped = format!("{}{cmd}", self.env_prefix);
78 self.vm.exec_streaming(&wrapped, prefix).await
79 }
80
81 async fn wait_for_service(&self, unit: &str, timeout: Duration, prefix: &str) -> Result<()> {
82 self.vm.wait_for_service(unit, timeout, prefix).await
83 }
84
85 async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()> {
86 if let Some(parent) = local_path.parent() {
88 tokio::fs::create_dir_all(parent).await.ok();
89 }
90 scp_dir_from_vm(self.vm, remote_path, local_path).await
91 }
92
93 fn playwright_out_dir(&self, test_name: &str) -> String {
94 format!("/home/ryra/.local/share/services-test/reports/{test_name}/playwright")
97 }
98}
99
100pub struct LocalExecutor {
106 env_prefix: String,
107}
108
109impl LocalExecutor {
110 pub fn new() -> Self {
115 let mut env_prefix = String::new();
116 if let Ok(exe) = std::env::current_exe()
124 && let Some(dir) = exe.parent()
125 {
126 env_prefix.push_str(&format!("export PATH=\"{}:$PATH\"; ", dir.display()));
127 }
128 Self { env_prefix }
129 }
130
131 pub fn with_registry(registry_path: &Path) -> Self {
135 let mut s = Self::new();
136 s.env_prefix
137 .push_str(®istry_env_prefix(®istry_path.display().to_string()));
138 s
139 }
140
141 pub fn with_config_dir(mut self, config_dir: &Path) -> Self {
145 self.env_prefix.push_str(&format!(
146 "export {}={}; ",
147 ryra_core::CONFIG_DIR_ENV,
148 config_dir.display()
149 ));
150 self
151 }
152
153 pub fn with_data_dir(mut self, data_dir: &Path) -> Self {
158 self.env_prefix.push_str(&format!(
159 "export {}={}; ",
160 ryra_core::DATA_DIR_ENV,
161 data_dir.display()
162 ));
163 self
164 }
165}
166
167impl Default for LocalExecutor {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173#[async_trait]
174impl Executor for LocalExecutor {
175 async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
176 let wrapped = format!("{}{cmd}", self.env_prefix);
177 let output = tokio::process::Command::new("bash")
178 .args(["-c", &wrapped])
179 .output()
180 .await
181 .with_context(|| format!("failed to exec locally: {cmd}"))?;
182
183 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
184 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
185
186 if !output.status.success() {
187 anyhow::bail!(
188 "command failed locally (exit {}): {cmd}\nstdout: {stdout}\nstderr: {stderr}",
189 output.status,
190 );
191 }
192
193 Ok(ExecOutput { stdout, stderr })
194 }
195
196 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
197 use tokio::io::{AsyncBufReadExt, BufReader};
198
199 let wrapped = format!("{}{cmd}", self.env_prefix);
200 let mut child = tokio::process::Command::new("bash")
201 .args(["-c", &wrapped])
202 .stdout(std::process::Stdio::piped())
203 .stderr(std::process::Stdio::piped())
204 .spawn()
205 .with_context(|| format!("failed to exec locally: {cmd}"))?;
206
207 let stdout_pipe = child.stdout.take();
208 let stderr_pipe = child.stderr.take();
209
210 let prefix_out = prefix.to_string();
211 let prefix_err = prefix.to_string();
212
213 let stdout_handle = tokio::spawn(async move {
214 let mut lines = String::new();
215 if let Some(pipe) = stdout_pipe {
216 let mut reader = BufReader::new(pipe).lines();
217 while let Ok(Some(line)) = reader.next_line().await {
218 if prefix_out.is_empty() {
219 println!(" {line}");
220 } else {
221 println!("[{prefix_out}] {line}");
222 }
223 lines.push_str(&line);
224 lines.push('\n');
225 }
226 }
227 lines
228 });
229
230 let stderr_handle = tokio::spawn(async move {
231 let mut lines = String::new();
232 if let Some(pipe) = stderr_pipe {
233 let mut reader = BufReader::new(pipe).lines();
234 while let Ok(Some(line)) = reader.next_line().await {
235 if prefix_err.is_empty() {
236 eprintln!(" {line}");
237 } else {
238 eprintln!("[{prefix_err}] {line}");
239 }
240 lines.push_str(&line);
241 lines.push('\n');
242 }
243 }
244 lines
245 });
246
247 let status = child.wait().await?;
248 let stdout_buf = stdout_handle.await.context("stdout reader task panicked")?;
249 let stderr_buf = stderr_handle.await.context("stderr reader task panicked")?;
250
251 if !status.success() {
252 anyhow::bail!(
253 "command failed locally (exit {}): {cmd}\nstdout: {stdout_buf}\nstderr: {stderr_buf}",
254 status,
255 );
256 }
257
258 Ok(ExecOutput {
259 stdout: stdout_buf,
260 stderr: stderr_buf,
261 })
262 }
263
264 async fn wait_for_service(&self, unit: &str, timeout: Duration, prefix: &str) -> Result<()> {
265 let container = unit.trim_end_matches(".service");
267 let mut progress =
268 ryra_vm::progress::WaitProgress::new(unit, "systemctl + healthcheck", timeout)
269 .with_prefix(prefix);
270 loop {
271 let cmd = format!(
277 "a=$(systemctl --user is-active {unit} 2>/dev/null || true); \
278 f=$(systemctl --user is-failed {unit} 2>/dev/null || true); \
279 if [ \"$(podman inspect --format '{HEALTH_DEFINED_FMT}' {container} 2>/dev/null)\" = yes ]; then \
280 podman healthcheck run {container} >/dev/null 2>&1 && h=healthy || h=unhealthy; \
281 else h=none; fi; \
282 echo \"$a|$f|$h\""
283 );
284 let out = self.exec(&cmd).await?;
285 let line = out.stdout.trim();
286 let mut parts = line.split('|');
287 let active = parts.next().unwrap_or("");
288 let failed = parts.next().unwrap_or("");
289 let health = parts.next().unwrap_or("");
290
291 if failed == "failed" {
292 anyhow::bail!("service {unit} entered failed state");
293 }
294 if active == "active" {
295 match health {
302 "healthy" | "none" | "" => return Ok(()),
303 _ => {}
305 }
306 }
307 if progress.timed_out() {
308 anyhow::bail!(
309 "service {unit} not ready after {}s (active={active}, health={health})",
310 timeout.as_secs()
311 );
312 }
313 progress.tick();
314 tokio::time::sleep(Duration::from_secs(1)).await;
315 }
316 }
317
318 async fn fetch_dir(&self, _remote_path: &str, _local_path: &Path) -> Result<()> {
319 Ok(())
321 }
322
323 fn playwright_out_dir(&self, test_name: &str) -> String {
324 crate::reports::reports_dir()
327 .map(|d| d.join(test_name).join("playwright").display().to_string())
328 .unwrap_or_else(|_| format!("/tmp/ryra-test-reports/{test_name}/playwright"))
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[tokio::test]
337 async fn local_executor_runs_command() {
338 let exec = LocalExecutor::default();
339 let out = exec.exec("echo hello").await.unwrap();
340 assert_eq!(out.stdout.trim(), "hello");
341 }
342
343 #[tokio::test]
344 async fn local_executor_fails_on_bad_command() {
345 let exec = LocalExecutor::default();
346 let result = exec.exec("false").await;
347 assert!(result.is_err());
348 }
349
350 #[tokio::test]
351 async fn local_executor_streams_output() {
352 let exec = LocalExecutor::default();
353 let out = exec.exec_streaming("echo streamed", "test").await.unwrap();
354 assert_eq!(out.stdout.trim(), "streamed");
355 }
356
357 #[tokio::test]
358 async fn local_wait_for_nonexistent_service_times_out() {
359 let exec = LocalExecutor::default();
360 let result = exec
361 .wait_for_service(
362 "definitely-not-a-real-unit-xyz123.service",
363 Duration::from_secs(2),
364 " ",
365 )
366 .await;
367 assert!(result.is_err());
368 }
370}