use std::path::Path;
use std::time::Duration;
use anyhow::{Context, Result};
use async_trait::async_trait;
use ryra_vm::machine::{ExecOutput, Machine, scp_dir_from_vm};
#[async_trait]
pub trait Executor: Send + Sync {
async fn exec(&self, cmd: &str) -> Result<ExecOutput>;
async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput>;
async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()>;
async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()>;
}
pub struct VmExecutor<'a> {
pub vm: &'a Machine,
}
impl<'a> VmExecutor<'a> {
pub fn new(vm: &'a Machine) -> Self {
Self { vm }
}
}
#[async_trait]
impl Executor for VmExecutor<'_> {
async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
self.vm.exec(cmd).await
}
async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
self.vm.exec_streaming(cmd, prefix).await
}
async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
self.vm.wait_for_service(unit, timeout).await
}
async fn fetch_dir(&self, remote_path: &str, local_path: &Path) -> Result<()> {
if let Some(parent) = local_path.parent() {
tokio::fs::create_dir_all(parent).await.ok();
}
scp_dir_from_vm(self.vm, remote_path, local_path).await
}
}
pub struct LocalExecutor;
#[async_trait]
impl Executor for LocalExecutor {
async fn exec(&self, cmd: &str) -> Result<ExecOutput> {
let output = tokio::process::Command::new("bash")
.args(["-c", cmd])
.output()
.await
.with_context(|| format!("failed to exec locally: {cmd}"))?;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !output.status.success() {
anyhow::bail!(
"command failed locally (exit {}): {cmd}\nstdout: {stdout}\nstderr: {stderr}",
output.status,
);
}
Ok(ExecOutput { stdout, stderr })
}
async fn exec_streaming(&self, cmd: &str, prefix: &str) -> Result<ExecOutput> {
use tokio::io::{AsyncBufReadExt, BufReader};
let mut child = tokio::process::Command::new("bash")
.args(["-c", cmd])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.with_context(|| format!("failed to exec locally: {cmd}"))?;
let stdout_pipe = child.stdout.take();
let stderr_pipe = child.stderr.take();
let prefix_out = prefix.to_string();
let prefix_err = prefix.to_string();
let stdout_handle = tokio::spawn(async move {
let mut lines = String::new();
if let Some(pipe) = stdout_pipe {
let mut reader = BufReader::new(pipe).lines();
while let Ok(Some(line)) = reader.next_line().await {
if prefix_out.is_empty() {
println!(" {line}");
} else {
println!("[{prefix_out}] {line}");
}
lines.push_str(&line);
lines.push('\n');
}
}
lines
});
let stderr_handle = tokio::spawn(async move {
let mut lines = String::new();
if let Some(pipe) = stderr_pipe {
let mut reader = BufReader::new(pipe).lines();
while let Ok(Some(line)) = reader.next_line().await {
if prefix_err.is_empty() {
eprintln!(" {line}");
} else {
eprintln!("[{prefix_err}] {line}");
}
lines.push_str(&line);
lines.push('\n');
}
}
lines
});
let status = child.wait().await?;
let stdout_buf = stdout_handle.await.context("stdout reader task panicked")?;
let stderr_buf = stderr_handle.await.context("stderr reader task panicked")?;
if !status.success() {
anyhow::bail!(
"command failed locally (exit {}): {cmd}\nstdout: {stdout_buf}\nstderr: {stderr_buf}",
status,
);
}
Ok(ExecOutput {
stdout: stdout_buf,
stderr: stderr_buf,
})
}
async fn wait_for_service(&self, unit: &str, timeout: Duration) -> Result<()> {
let start = std::time::Instant::now();
loop {
let cmd = format!(
"a=$(systemctl --user is-active {unit} 2>/dev/null || true); \
f=$(systemctl --user is-failed {unit} 2>/dev/null || true); \
echo \"$a|$f\""
);
let out = self.exec(&cmd).await?;
let line = out.stdout.trim();
let mut parts = line.split('|');
let active = parts.next().unwrap_or("");
let failed = parts.next().unwrap_or("");
if active == "active" {
return Ok(());
}
if failed == "failed" {
anyhow::bail!("service {unit} entered failed state");
}
if start.elapsed() > timeout {
anyhow::bail!("service {unit} not active after {}s", timeout.as_secs());
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn fetch_dir(&self, _remote_path: &str, _local_path: &Path) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn local_executor_runs_command() {
let exec = LocalExecutor;
let out = exec.exec("echo hello").await.unwrap();
assert_eq!(out.stdout.trim(), "hello");
}
#[tokio::test]
async fn local_executor_fails_on_bad_command() {
let exec = LocalExecutor;
let result = exec.exec("false").await;
assert!(result.is_err());
}
#[tokio::test]
async fn local_executor_streams_output() {
let exec = LocalExecutor;
let out = exec.exec_streaming("echo streamed", "test").await.unwrap();
assert_eq!(out.stdout.trim(), "streamed");
}
#[tokio::test]
async fn local_wait_for_nonexistent_service_times_out() {
let exec = LocalExecutor;
let result = exec
.wait_for_service(
"definitely-not-a-real-unit-xyz123.service",
Duration::from_secs(2),
)
.await;
assert!(result.is_err());
}
}