use std::ffi::{OsStr, OsString};
use std::path::Path;
use std::time::Duration;
use fraisier_core::adapter_axes::{AdapterError, AdapterErrorKind};
const ETXTBSY_RETRIES: u32 = 5;
const ETXTBSY_BACKOFF: Duration = Duration::from_millis(20);
pub mod staging;
pub mod transport;
pub use transport::{SshTransport, Transport};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Captured {
pub code: Option<i32>,
pub stdout: String,
pub stderr: String,
}
impl Captured {
#[must_use]
pub fn succeeded(&self) -> bool {
self.code == Some(0)
}
#[must_use]
pub fn stderr_opt(&self) -> Option<String> {
(!self.stderr.trim().is_empty()).then(|| self.stderr.clone())
}
}
pub async fn run_command(
program: &OsStr,
args: &[OsString],
envs: &[(OsString, OsString)],
cwd: Option<&Path>,
adapter: &str,
operation: &str,
) -> Result<Captured, AdapterError> {
let mut command = tokio::process::Command::new(program);
command.args(args);
for (key, value) in envs {
command.env(key, value);
}
if let Some(dir) = cwd {
command.current_dir(dir);
}
command.kill_on_drop(true);
let output = spawn_with_etxtbsy_retry(&mut command)
.await
.map_err(|cause| {
let program = program.to_string_lossy();
error(
AdapterErrorKind::Execution,
adapter,
operation,
format!("failed to spawn '{program}': {cause}"),
None,
)
})?;
Ok(Captured {
code: output.status.code(),
stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
})
}
async fn spawn_with_etxtbsy_retry(
command: &mut tokio::process::Command,
) -> std::io::Result<std::process::Output> {
for _ in 0..ETXTBSY_RETRIES {
match command.output().await {
Err(cause) if cause.kind() == std::io::ErrorKind::ExecutableFileBusy => {
tokio::time::sleep(ETXTBSY_BACKOFF).await;
}
other => return other,
}
}
command.output().await
}
#[must_use]
pub fn error(
kind: AdapterErrorKind,
adapter: &str,
operation: &str,
message: String,
stderr: Option<String>,
) -> AdapterError {
AdapterError {
adapter: Some(adapter.to_owned()),
operation: Some(operation.to_owned()),
stderr,
..AdapterError::new(kind, message)
}
}
pub async fn retry_on_err<T, E, F, Fut>(attempts: u32, delay: Duration, mut op: F) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let attempts = attempts.max(1);
let mut result = op().await;
let mut done = 1;
while result.is_err() && done < attempts {
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
result = op().await;
done += 1;
}
result
}
#[cfg(test)]
mod tests {
use super::{error, retry_on_err, run_command, Captured};
use fraisier_core::adapter_axes::AdapterErrorKind;
use std::ffi::OsString;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
#[test]
fn captured_helpers() {
let ok = Captured {
code: Some(0),
stdout: "out".into(),
stderr: " ".into(),
};
assert!(ok.succeeded());
assert_eq!(ok.stderr_opt(), None);
let failed = Captured {
code: Some(2),
stdout: String::new(),
stderr: "boom".into(),
};
assert!(!failed.succeeded());
assert_eq!(failed.stderr_opt().as_deref(), Some("boom"));
}
#[test]
fn error_is_tagged() {
let err = error(
AdapterErrorKind::InvalidConfig,
"command",
"up",
"bad".into(),
Some("stderr".into()),
);
assert_eq!(err.kind, AdapterErrorKind::InvalidConfig);
assert_eq!(err.adapter.as_deref(), Some("command"));
assert_eq!(err.operation.as_deref(), Some("up"));
assert_eq!(err.stderr.as_deref(), Some("stderr"));
}
#[tokio::test]
async fn run_command_captures_output() {
let out = run_command(
"printf".as_ref(),
&[OsString::from("hello")],
&[],
None,
"test",
"printf",
)
.await
.expect("printf spawns");
assert!(out.succeeded());
assert_eq!(out.stdout, "hello");
}
#[tokio::test]
async fn run_command_maps_spawn_failure() {
let err = run_command(
"fraisier-no-such-binary-xyz".as_ref(),
&[],
&[],
None,
"test",
"missing",
)
.await
.expect_err("a missing binary must fail to spawn");
assert_eq!(err.kind, AdapterErrorKind::Execution);
assert_eq!(err.operation.as_deref(), Some("missing"));
}
#[tokio::test]
async fn retry_returns_first_ok() {
let tries = AtomicU32::new(0);
let result: Result<u32, ()> = retry_on_err(5, Duration::from_millis(0), || async {
let n = tries.fetch_add(1, Ordering::SeqCst);
if n >= 2 {
Ok(n)
} else {
Err(())
}
})
.await;
assert_eq!(result, Ok(2));
assert_eq!(tries.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn retry_exhausts_and_returns_last_err() {
let tries = AtomicU32::new(0);
let result: Result<(), u32> = retry_on_err(3, Duration::from_millis(0), || async {
Err(tries.fetch_add(1, Ordering::SeqCst))
})
.await;
assert_eq!(result, Err(2)); assert_eq!(tries.load(Ordering::SeqCst), 3);
}
}