fraisier_adapter_support/lib.rs
1//! # fraisier-adapter-support
2//!
3//! Shared building blocks for fraisier's in-process adapters:
4//!
5//! - [`run_command`] — spawn a subprocess, capture its output, and turn a
6//! *spawn* failure into a tagged [`AdapterError`] (the caller interprets the
7//! exit code). Used by the shell-out adapters (`command`, `systemd`).
8//! - [`retry_on_err`] — retry an async fallible operation with a fixed delay.
9//! Used by the network adapters (`http` health, `release` artifact download).
10//! - [`Transport`] — run a shell-out command locally or on a remote host over
11//! `ssh`. The single-host adapters dispatch through it so a multi-host rollout
12//! can run the same commands on each host (see the [`transport`] module).
13//!
14//! These deliberately do *not* encode any axis-specific policy: error-kind
15//! selection and result interpretation stay in each adapter.
16
17use std::ffi::{OsStr, OsString};
18use std::path::Path;
19use std::time::Duration;
20
21use fraisier_core::adapter_axes::{AdapterError, AdapterErrorKind};
22
23/// How many extra times to retry a spawn that fails with `ETXTBSY`.
24const ETXTBSY_RETRIES: u32 = 5;
25
26/// How long to wait between `ETXTBSY` spawn retries.
27const ETXTBSY_BACKOFF: Duration = Duration::from_millis(20);
28
29pub mod staging;
30pub mod transport;
31
32pub use transport::{SshTransport, Transport};
33
34/// The captured result of a finished subprocess.
35///
36/// # Example
37/// ```
38/// # use fraisier_adapter_support::Captured;
39/// let captured = Captured { code: Some(0), stdout: "ok".into(), stderr: String::new() };
40/// assert!(captured.succeeded());
41/// ```
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct Captured {
44 /// The process exit code, or `None` if it was killed by a signal.
45 pub code: Option<i32>,
46 /// Captured standard output (lossy UTF-8).
47 pub stdout: String,
48 /// Captured standard error (lossy UTF-8).
49 pub stderr: String,
50}
51
52impl Captured {
53 /// Whether the process exited with code 0.
54 #[must_use]
55 pub fn succeeded(&self) -> bool {
56 self.code == Some(0)
57 }
58
59 /// `stderr` if it carries any non-whitespace text, else `None` — the shape
60 /// [`AdapterError::stderr`] expects.
61 #[must_use]
62 pub fn stderr_opt(&self) -> Option<String> {
63 (!self.stderr.trim().is_empty()).then(|| self.stderr.clone())
64 }
65}
66
67/// Spawn `program` with `args`, `envs`, and optional working directory, then
68/// capture its output.
69///
70/// Only a *spawn* failure (binary missing, permission denied, …) is an error
71/// here; a process that runs and exits non-zero returns `Ok(Captured)` so the
72/// caller can map the exit code to its own [`AdapterErrorKind`]. The error is
73/// tagged with `adapter` and `operation`.
74///
75/// # Errors
76/// [`AdapterError`] of kind [`AdapterErrorKind::Execution`] if the process
77/// cannot be spawned.
78///
79/// # Example
80/// ```no_run
81/// # use std::ffi::OsString;
82/// # async fn run() -> Result<(), fraisier_core::adapter_axes::AdapterError> {
83/// let out = fraisier_adapter_support::run_command(
84/// "echo".as_ref(),
85/// &[OsString::from("hi")],
86/// &[],
87/// None,
88/// "demo",
89/// "echo",
90/// )
91/// .await?;
92/// assert!(out.succeeded());
93/// # Ok(())
94/// # }
95/// ```
96pub async fn run_command(
97 program: &OsStr,
98 args: &[OsString],
99 envs: &[(OsString, OsString)],
100 cwd: Option<&Path>,
101 adapter: &str,
102 operation: &str,
103) -> Result<Captured, AdapterError> {
104 let mut command = tokio::process::Command::new(program);
105 command.args(args);
106 for (key, value) in envs {
107 command.env(key, value);
108 }
109 if let Some(dir) = cwd {
110 command.current_dir(dir);
111 }
112 // If this future is dropped before the child exits (e.g. a caller wraps the
113 // call in a timeout that elapses), kill the child rather than leaving it
114 // running detached. Harmless on the normal path: the child has already exited.
115 command.kill_on_drop(true);
116
117 let output = spawn_with_etxtbsy_retry(&mut command)
118 .await
119 .map_err(|cause| {
120 let program = program.to_string_lossy();
121 error(
122 AdapterErrorKind::Execution,
123 adapter,
124 operation,
125 format!("failed to spawn '{program}': {cause}"),
126 None,
127 )
128 })?;
129
130 Ok(Captured {
131 code: output.status.code(),
132 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
133 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
134 })
135}
136
137/// Run `command`, retrying a few times on `ETXTBSY` ("text file busy").
138///
139/// Spawning a just-written executable can transiently fail with `ETXTBSY` when
140/// another thread in this process forked (e.g. spawned its own child) while a
141/// writer file descriptor to the target was still open — a known multithreaded
142/// fork/exec race. It is always transient, so a short bounded retry resolves it
143/// without masking a genuine spawn failure (missing binary, no permission).
144async fn spawn_with_etxtbsy_retry(
145 command: &mut tokio::process::Command,
146) -> std::io::Result<std::process::Output> {
147 for _ in 0..ETXTBSY_RETRIES {
148 match command.output().await {
149 Err(cause) if cause.kind() == std::io::ErrorKind::ExecutableFileBusy => {
150 tokio::time::sleep(ETXTBSY_BACKOFF).await;
151 }
152 other => return other,
153 }
154 }
155 command.output().await
156}
157
158/// Build an [`AdapterError`] of `kind`, tagged with `adapter` and `operation`.
159///
160/// # Example
161/// ```
162/// # use fraisier_adapter_support::error;
163/// # use fraisier_core::adapter_axes::AdapterErrorKind;
164/// let err = error(AdapterErrorKind::Execution, "systemd", "restart", "boom".into(), None);
165/// assert_eq!(err.adapter.as_deref(), Some("systemd"));
166/// ```
167#[must_use]
168pub fn error(
169 kind: AdapterErrorKind,
170 adapter: &str,
171 operation: &str,
172 message: String,
173 stderr: Option<String>,
174) -> AdapterError {
175 AdapterError {
176 adapter: Some(adapter.to_owned()),
177 operation: Some(operation.to_owned()),
178 stderr,
179 ..AdapterError::new(kind, message)
180 }
181}
182
183/// Retry an async fallible operation up to `attempts` times (minimum one),
184/// sleeping `delay` between tries, returning the first `Ok` or the last `Err`.
185///
186/// # Errors
187/// Returns the `Err` from the final attempt if every attempt fails.
188///
189/// # Example
190/// ```no_run
191/// # use std::time::Duration;
192/// # async fn demo() {
193/// let result: Result<u32, ()> = fraisier_adapter_support::retry_on_err(
194/// 3,
195/// Duration::from_millis(50),
196/// || async { Ok(7) },
197/// )
198/// .await;
199/// assert_eq!(result, Ok(7));
200/// # }
201/// ```
202pub async fn retry_on_err<T, E, F, Fut>(attempts: u32, delay: Duration, mut op: F) -> Result<T, E>
203where
204 F: FnMut() -> Fut,
205 Fut: std::future::Future<Output = Result<T, E>>,
206{
207 let attempts = attempts.max(1);
208 let mut result = op().await;
209 let mut done = 1;
210 while result.is_err() && done < attempts {
211 if !delay.is_zero() {
212 tokio::time::sleep(delay).await;
213 }
214 result = op().await;
215 done += 1;
216 }
217 result
218}
219
220#[cfg(test)]
221mod tests {
222 use super::{error, retry_on_err, run_command, Captured};
223 use fraisier_core::adapter_axes::AdapterErrorKind;
224 use std::ffi::OsString;
225 use std::sync::atomic::{AtomicU32, Ordering};
226 use std::time::Duration;
227
228 #[test]
229 fn captured_helpers() {
230 let ok = Captured {
231 code: Some(0),
232 stdout: "out".into(),
233 stderr: " ".into(),
234 };
235 assert!(ok.succeeded());
236 assert_eq!(ok.stderr_opt(), None); // whitespace-only → None
237
238 let failed = Captured {
239 code: Some(2),
240 stdout: String::new(),
241 stderr: "boom".into(),
242 };
243 assert!(!failed.succeeded());
244 assert_eq!(failed.stderr_opt().as_deref(), Some("boom"));
245 }
246
247 #[test]
248 fn error_is_tagged() {
249 let err = error(
250 AdapterErrorKind::InvalidConfig,
251 "command",
252 "up",
253 "bad".into(),
254 Some("stderr".into()),
255 );
256 assert_eq!(err.kind, AdapterErrorKind::InvalidConfig);
257 assert_eq!(err.adapter.as_deref(), Some("command"));
258 assert_eq!(err.operation.as_deref(), Some("up"));
259 assert_eq!(err.stderr.as_deref(), Some("stderr"));
260 }
261
262 #[tokio::test]
263 async fn run_command_captures_output() {
264 let out = run_command(
265 "printf".as_ref(),
266 &[OsString::from("hello")],
267 &[],
268 None,
269 "test",
270 "printf",
271 )
272 .await
273 .expect("printf spawns");
274 assert!(out.succeeded());
275 assert_eq!(out.stdout, "hello");
276 }
277
278 #[tokio::test]
279 async fn run_command_maps_spawn_failure() {
280 let err = run_command(
281 "fraisier-no-such-binary-xyz".as_ref(),
282 &[],
283 &[],
284 None,
285 "test",
286 "missing",
287 )
288 .await
289 .expect_err("a missing binary must fail to spawn");
290 assert_eq!(err.kind, AdapterErrorKind::Execution);
291 assert_eq!(err.operation.as_deref(), Some("missing"));
292 }
293
294 #[tokio::test]
295 async fn retry_returns_first_ok() {
296 let tries = AtomicU32::new(0);
297 let result: Result<u32, ()> = retry_on_err(5, Duration::from_millis(0), || async {
298 let n = tries.fetch_add(1, Ordering::SeqCst);
299 if n >= 2 {
300 Ok(n)
301 } else {
302 Err(())
303 }
304 })
305 .await;
306 assert_eq!(result, Ok(2));
307 assert_eq!(tries.load(Ordering::SeqCst), 3);
308 }
309
310 #[tokio::test]
311 async fn retry_exhausts_and_returns_last_err() {
312 let tries = AtomicU32::new(0);
313 let result: Result<(), u32> = retry_on_err(3, Duration::from_millis(0), || async {
314 Err(tries.fetch_add(1, Ordering::SeqCst))
315 })
316 .await;
317 assert_eq!(result, Err(2)); // last error from the 3rd (0-indexed) attempt
318 assert_eq!(tries.load(Ordering::SeqCst), 3);
319 }
320}