1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{Context, Result};
5use async_trait::async_trait;
6use ryra_vm::machine::{ExecOutput, Machine, scp_dir_from_vm};
7
8#[async_trait]
10pub trait Executor: Send + Sync {
11 async fn exec(&self, cmd: &str) -> Result<ExecOutput>;
13
14 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput>;
16
17 async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()>;
19
20 async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()>;
23}
24
25pub struct VmExecutor<'a> {
27 pub vm: &'a Machine,
28}
29
30impl<'a> VmExecutor<'a> {
31 pub fn new(vm: &'a Machine) -> Self {
32 Self { vm }
33 }
34}
35
36#[async_trait]
37impl Executor for VmExecutor<'_> {
38 async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
39 self.vm.exec(cmd).await
40 }
41
42 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
43 self.vm.exec_streaming(cmd, prefix).await
44 }
45
46 async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
47 self.vm.wait_for_service(unit, timeout).await
48 }
49
50 async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()> {
51 if let Some(parent) = local_path.parent() {
53 tokio::fs::create_dir_all(parent).await.ok();
54 }
55 scp_dir_from_vm(self.vm, remote_path, local_path).await
56 }
57}
58
59pub struct LocalExecutor;
61
62#[async_trait]
63impl Executor for LocalExecutor {
64 async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
65 let output = tokio::process::Command::new("bash")
66 .args(["-c", cmd])
67 .output()
68 .await
69 .with_context(|| format!("failed to exec locally: {cmd}"))?;
70
71 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
72 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
73
74 if !output.status.success() {
75 anyhow::bail!(
76 "command failed locally (exit {}): {cmd}\nstdout: {stdout}\nstderr: {stderr}",
77 output.status,
78 );
79 }
80
81 Ok(ExecOutput { stdout, stderr })
82 }
83
84 async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
85 use tokio::io::{AsyncBufReadExt, BufReader};
86
87 let mut child = tokio::process::Command::new("bash")
88 .args(["-c", cmd])
89 .stdout(std::process::Stdio::piped())
90 .stderr(std::process::Stdio::piped())
91 .spawn()
92 .with_context(|| format!("failed to exec locally: {cmd}"))?;
93
94 let stdout_pipe = child.stdout.take();
95 let stderr_pipe = child.stderr.take();
96
97 let prefix_out = prefix.to_string();
98 let prefix_err = prefix.to_string();
99
100 let stdout_handle = tokio::spawn(async move {
101 let mut lines = String::new();
102 if let Some(pipe) = stdout_pipe {
103 let mut reader = BufReader::new(pipe).lines();
104 while let Ok(Some(line)) = reader.next_line().await {
105 if prefix_out.is_empty() {
106 println!(" {line}");
107 } else {
108 println!("[{prefix_out}] {line}");
109 }
110 lines.push_str(&line);
111 lines.push('\n');
112 }
113 }
114 lines
115 });
116
117 let stderr_handle = tokio::spawn(async move {
118 let mut lines = String::new();
119 if let Some(pipe) = stderr_pipe {
120 let mut reader = BufReader::new(pipe).lines();
121 while let Ok(Some(line)) = reader.next_line().await {
122 if prefix_err.is_empty() {
123 eprintln!(" {line}");
124 } else {
125 eprintln!("[{prefix_err}] {line}");
126 }
127 lines.push_str(&line);
128 lines.push('\n');
129 }
130 }
131 lines
132 });
133
134 let status = child.wait().await?;
135 let stdout_buf = stdout_handle.await.context("stdout reader task panicked")?;
136 let stderr_buf = stderr_handle.await.context("stderr reader task panicked")?;
137
138 if !status.success() {
139 anyhow::bail!(
140 "command failed locally (exit {}): {cmd}\nstdout: {stdout_buf}\nstderr: {stderr_buf}",
141 status,
142 );
143 }
144
145 Ok(ExecOutput {
146 stdout: stdout_buf,
147 stderr: stderr_buf,
148 })
149 }
150
151 async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
152 let start = std::time::Instant::now();
153 loop {
154 let cmd = format!(
155 "a=$(systemctl --user is-active {unit} 2>/dev/null || true); \
156 f=$(systemctl --user is-failed {unit} 2>/dev/null || true); \
157 echo \"$a|$f\""
158 );
159 let out = self.exec(&cmd).await?;
160 let line = out.stdout.trim();
161 let mut parts = line.split('|');
162 let active = parts.next().unwrap_or("");
163 let failed = parts.next().unwrap_or("");
164
165 if active == "active" {
166 return Ok(());
167 }
168 if failed == "failed" {
169 anyhow::bail!("service {unit} entered failed state");
170 }
171 if start.elapsed() > timeout {
172 anyhow::bail!("service {unit} not active after {}s", timeout.as_secs());
173 }
174 tokio::time::sleep(Duration::from_secs(1)).await;
175 }
176 }
177
178 async fn fetch_dir(&self, _remote_path: &str, _local_path: &Path) -> Result<()> {
179 Ok(())
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187
188 #[tokio::test]
189 async fn local_executor_runs_command() {
190 let exec = LocalExecutor;
191 let out = exec.exec("echo hello").await.unwrap();
192 assert_eq!(out.stdout.trim(), "hello");
193 }
194
195 #[tokio::test]
196 async fn local_executor_fails_on_bad_command() {
197 let exec = LocalExecutor;
198 let result = exec.exec("false").await;
199 assert!(result.is_err());
200 }
201
202 #[tokio::test]
203 async fn local_executor_streams_output() {
204 let exec = LocalExecutor;
205 let out = exec.exec_streaming("echo streamed", "test").await.unwrap();
206 assert_eq!(out.stdout.trim(), "streamed");
207 }
208
209 #[tokio::test]
210 async fn local_wait_for_nonexistent_service_times_out() {
211 let exec = LocalExecutor;
212 let result = exec
213 .wait_for_service(
214 "definitely-not-a-real-unit-xyz123.service",
215 Duration::from_secs(2),
216 )
217 .await;
218 assert!(result.is_err());
219 }
221}