Skip to main content

fraisier_adapter_support/
lib.rs

1//! # fraisier-adapter-support
2//!
3//! Shared building blocks for fraisier's in-process adapters:
4//!
5//! - [`run_command`] — spawn a subprocess, capture its output, and turn a
6//!   *spawn* failure into a tagged [`AdapterError`] (the caller interprets the
7//!   exit code). Used by the shell-out adapters (`command`, `systemd`).
8//! - [`retry_on_err`] — retry an async fallible operation with a fixed delay.
9//!   Used by the network adapters (`http` health, `release` artifact download).
10//! - [`Transport`] — run a shell-out command locally or on a remote host over
11//!   `ssh`. The single-host adapters dispatch through it so a multi-host rollout
12//!   can run the same commands on each host (see the [`transport`] module).
13//!
14//! These deliberately do *not* encode any axis-specific policy: error-kind
15//! selection and result interpretation stay in each adapter.
16
17use std::ffi::{OsStr, OsString};
18use std::path::Path;
19use std::time::Duration;
20
21use fraisier_core::adapter_axes::{AdapterError, AdapterErrorKind};
22
23/// How many extra times to retry a spawn that fails with `ETXTBSY`.
24const ETXTBSY_RETRIES: u32 = 5;
25
26/// How long to wait between `ETXTBSY` spawn retries.
27const ETXTBSY_BACKOFF: Duration = Duration::from_millis(20);
28
29pub mod staging;
30pub mod transport;
31
32pub use transport::{SshTransport, Transport};
33
34/// The captured result of a finished subprocess.
35///
36/// # Example
37/// ```
38/// # use fraisier_adapter_support::Captured;
39/// let captured = Captured { code: Some(0), stdout: "ok".into(), stderr: String::new() };
40/// assert!(captured.succeeded());
41/// ```
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct Captured {
44    /// The process exit code, or `None` if it was killed by a signal.
45    pub code: Option<i32>,
46    /// Captured standard output (lossy UTF-8).
47    pub stdout: String,
48    /// Captured standard error (lossy UTF-8).
49    pub stderr: String,
50}
51
52impl Captured {
53    /// Whether the process exited with code 0.
54    #[must_use]
55    pub fn succeeded(&self) -> bool {
56        self.code == Some(0)
57    }
58
59    /// `stderr` if it carries any non-whitespace text, else `None` — the shape
60    /// [`AdapterError::stderr`] expects.
61    #[must_use]
62    pub fn stderr_opt(&self) -> Option<String> {
63        (!self.stderr.trim().is_empty()).then(|| self.stderr.clone())
64    }
65}
66
67/// Spawn `program` with `args`, `envs`, and optional working directory, then
68/// capture its output.
69///
70/// Only a *spawn* failure (binary missing, permission denied, …) is an error
71/// here; a process that runs and exits non-zero returns `Ok(Captured)` so the
72/// caller can map the exit code to its own [`AdapterErrorKind`]. The error is
73/// tagged with `adapter` and `operation`.
74///
75/// # Errors
76/// [`AdapterError`] of kind [`AdapterErrorKind::Execution`] if the process
77/// cannot be spawned.
78///
79/// # Example
80/// ```no_run
81/// # use std::ffi::OsString;
82/// # async fn run() -> Result<(), fraisier_core::adapter_axes::AdapterError> {
83/// let out = fraisier_adapter_support::run_command(
84///     "echo".as_ref(),
85///     &[OsString::from("hi")],
86///     &[],
87///     None,
88///     "demo",
89///     "echo",
90/// )
91/// .await?;
92/// assert!(out.succeeded());
93/// # Ok(())
94/// # }
95/// ```
96pub async fn run_command(
97    program: &OsStr,
98    args: &[OsString],
99    envs: &[(OsString, OsString)],
100    cwd: Option<&Path>,
101    adapter: &str,
102    operation: &str,
103) -> Result<Captured, AdapterError> {
104    let mut command = tokio::process::Command::new(program);
105    command.args(args);
106    for (key, value) in envs {
107        command.env(key, value);
108    }
109    if let Some(dir) = cwd {
110        command.current_dir(dir);
111    }
112    // If this future is dropped before the child exits (e.g. a caller wraps the
113    // call in a timeout that elapses), kill the child rather than leaving it
114    // running detached. Harmless on the normal path: the child has already exited.
115    command.kill_on_drop(true);
116
117    let output = spawn_with_etxtbsy_retry(&mut command)
118        .await
119        .map_err(|cause| {
120            let program = program.to_string_lossy();
121            error(
122                AdapterErrorKind::Execution,
123                adapter,
124                operation,
125                format!("failed to spawn '{program}': {cause}"),
126                None,
127            )
128        })?;
129
130    Ok(Captured {
131        code: output.status.code(),
132        stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
133        stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
134    })
135}
136
137/// Run `command`, retrying a few times on `ETXTBSY` ("text file busy").
138///
139/// Spawning a just-written executable can transiently fail with `ETXTBSY` when
140/// another thread in this process forked (e.g. spawned its own child) while a
141/// writer file descriptor to the target was still open — a known multithreaded
142/// fork/exec race. It is always transient, so a short bounded retry resolves it
143/// without masking a genuine spawn failure (missing binary, no permission).
144async fn spawn_with_etxtbsy_retry(
145    command: &mut tokio::process::Command,
146) -> std::io::Result<std::process::Output> {
147    for _ in 0..ETXTBSY_RETRIES {
148        match command.output().await {
149            Err(cause) if cause.kind() == std::io::ErrorKind::ExecutableFileBusy => {
150                tokio::time::sleep(ETXTBSY_BACKOFF).await;
151            }
152            other => return other,
153        }
154    }
155    command.output().await
156}
157
158/// Build an [`AdapterError`] of `kind`, tagged with `adapter` and `operation`.
159///
160/// # Example
161/// ```
162/// # use fraisier_adapter_support::error;
163/// # use fraisier_core::adapter_axes::AdapterErrorKind;
164/// let err = error(AdapterErrorKind::Execution, "systemd", "restart", "boom".into(), None);
165/// assert_eq!(err.adapter.as_deref(), Some("systemd"));
166/// ```
167#[must_use]
168pub fn error(
169    kind: AdapterErrorKind,
170    adapter: &str,
171    operation: &str,
172    message: String,
173    stderr: Option<String>,
174) -> AdapterError {
175    AdapterError {
176        adapter: Some(adapter.to_owned()),
177        operation: Some(operation.to_owned()),
178        stderr,
179        ..AdapterError::new(kind, message)
180    }
181}
182
183/// Retry an async fallible operation up to `attempts` times (minimum one),
184/// sleeping `delay` between tries, returning the first `Ok` or the last `Err`.
185///
186/// # Errors
187/// Returns the `Err` from the final attempt if every attempt fails.
188///
189/// # Example
190/// ```no_run
191/// # use std::time::Duration;
192/// # async fn demo() {
193/// let result: Result<u32, ()> = fraisier_adapter_support::retry_on_err(
194///     3,
195///     Duration::from_millis(50),
196///     || async { Ok(7) },
197/// )
198/// .await;
199/// assert_eq!(result, Ok(7));
200/// # }
201/// ```
202pub async fn retry_on_err<T, E, F, Fut>(attempts: u32, delay: Duration, mut op: F) -> Result<T, E>
203where
204    F: FnMut() -> Fut,
205    Fut: std::future::Future<Output = Result<T, E>>,
206{
207    let attempts = attempts.max(1);
208    let mut result = op().await;
209    let mut done = 1;
210    while result.is_err() && done < attempts {
211        if !delay.is_zero() {
212            tokio::time::sleep(delay).await;
213        }
214        result = op().await;
215        done += 1;
216    }
217    result
218}
219
220#[cfg(test)]
221mod tests {
222    use super::{error, retry_on_err, run_command, Captured};
223    use fraisier_core::adapter_axes::AdapterErrorKind;
224    use std::ffi::OsString;
225    use std::sync::atomic::{AtomicU32, Ordering};
226    use std::time::Duration;
227
228    #[test]
229    fn captured_helpers() {
230        let ok = Captured {
231            code: Some(0),
232            stdout: "out".into(),
233            stderr: "  ".into(),
234        };
235        assert!(ok.succeeded());
236        assert_eq!(ok.stderr_opt(), None); // whitespace-only → None
237
238        let failed = Captured {
239            code: Some(2),
240            stdout: String::new(),
241            stderr: "boom".into(),
242        };
243        assert!(!failed.succeeded());
244        assert_eq!(failed.stderr_opt().as_deref(), Some("boom"));
245    }
246
247    #[test]
248    fn error_is_tagged() {
249        let err = error(
250            AdapterErrorKind::InvalidConfig,
251            "command",
252            "up",
253            "bad".into(),
254            Some("stderr".into()),
255        );
256        assert_eq!(err.kind, AdapterErrorKind::InvalidConfig);
257        assert_eq!(err.adapter.as_deref(), Some("command"));
258        assert_eq!(err.operation.as_deref(), Some("up"));
259        assert_eq!(err.stderr.as_deref(), Some("stderr"));
260    }
261
262    #[tokio::test]
263    async fn run_command_captures_output() {
264        let out = run_command(
265            "printf".as_ref(),
266            &[OsString::from("hello")],
267            &[],
268            None,
269            "test",
270            "printf",
271        )
272        .await
273        .expect("printf spawns");
274        assert!(out.succeeded());
275        assert_eq!(out.stdout, "hello");
276    }
277
278    #[tokio::test]
279    async fn run_command_maps_spawn_failure() {
280        let err = run_command(
281            "fraisier-no-such-binary-xyz".as_ref(),
282            &[],
283            &[],
284            None,
285            "test",
286            "missing",
287        )
288        .await
289        .expect_err("a missing binary must fail to spawn");
290        assert_eq!(err.kind, AdapterErrorKind::Execution);
291        assert_eq!(err.operation.as_deref(), Some("missing"));
292    }
293
294    #[tokio::test]
295    async fn retry_returns_first_ok() {
296        let tries = AtomicU32::new(0);
297        let result: Result<u32, ()> = retry_on_err(5, Duration::from_millis(0), || async {
298            let n = tries.fetch_add(1, Ordering::SeqCst);
299            if n >= 2 {
300                Ok(n)
301            } else {
302                Err(())
303            }
304        })
305        .await;
306        assert_eq!(result, Ok(2));
307        assert_eq!(tries.load(Ordering::SeqCst), 3);
308    }
309
310    #[tokio::test]
311    async fn retry_exhausts_and_returns_last_err() {
312        let tries = AtomicU32::new(0);
313        let result: Result<(), u32> = retry_on_err(3, Duration::from_millis(0), || async {
314            Err(tries.fetch_add(1, Ordering::SeqCst))
315        })
316        .await;
317        assert_eq!(result, Err(2)); // last error from the 3rd (0-indexed) attempt
318        assert_eq!(tries.load(Ordering::SeqCst), 3);
319    }
320}