1use std::borrow::Cow;
17use std::ffi::{OsStr, OsString};
18use std::fmt;
19use std::io::{self, Read, Write};
20use std::path::{Path, PathBuf};
21use std::process::{Command, ExitStatus, Stdio};
22use std::sync::Arc;
23use std::thread;
24use std::time::{Duration, Instant};
25
26use backon::BlockingRetryable;
27use shared_child::SharedChild;
28use wait_timeout::ChildExt;
29
30use crate::cmd_display::CmdDisplay;
31use crate::error::{RunError, truncate_suffix, truncate_suffix_string};
32use crate::redirection::Redirection;
33use crate::retry::RetryPolicy;
34use crate::spawned::SpawnedProcess;
35use crate::stdin::StdinData;
36
37pub type BeforeSpawnHook = Arc<dyn Fn(&mut Command) -> io::Result<()> + Send + Sync>;
44
45#[derive(Debug, Clone)]
50pub struct RunOutput {
51 pub stdout: Vec<u8>,
52 pub stderr: String,
53}
54
55impl RunOutput {
56 pub fn stdout_lossy(&self) -> Cow<'_, str> {
58 String::from_utf8_lossy(&self.stdout)
59 }
60}
61
62#[must_use = "Cmd does nothing until .run() is called"]
68pub struct Cmd {
69 program: OsString,
70 args: Vec<OsString>,
71 cwd: Option<PathBuf>,
72 env_clear: bool,
73 env_remove: Vec<OsString>,
74 envs: Vec<(OsString, OsString)>,
75 stdin: Option<StdinData>,
76 stderr_mode: Redirection,
77 timeout: Option<Duration>,
78 deadline: Option<Instant>,
79 retry: Option<RetryPolicy>,
80 before_spawn: Option<BeforeSpawnHook>,
81 secret: bool,
82}
83
84impl fmt::Debug for Cmd {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("Cmd")
87 .field("program", &self.program)
88 .field("args", &self.args)
89 .field("cwd", &self.cwd)
90 .field("env_clear", &self.env_clear)
91 .field("envs", &self.envs)
92 .field("stdin", &self.stdin)
93 .field("stderr_mode", &self.stderr_mode)
94 .field("timeout", &self.timeout)
95 .field("deadline", &self.deadline)
96 .field("retry", &self.retry)
97 .field("secret", &self.secret)
98 .finish()
99 }
100}
101
102impl Cmd {
103 pub fn new(program: impl Into<OsString>) -> Self {
105 Self {
106 program: program.into(),
107 args: Vec::new(),
108 cwd: None,
109 env_clear: false,
110 env_remove: Vec::new(),
111 envs: Vec::new(),
112 stdin: None,
113 stderr_mode: Redirection::default(),
114 timeout: None,
115 deadline: None,
116 retry: None,
117 before_spawn: None,
118 secret: false,
119 }
120 }
121
122 pub fn arg(mut self, arg: impl Into<OsString>) -> Self {
124 self.args.push(arg.into());
125 self
126 }
127
128 pub fn args<I, S>(mut self, args: I) -> Self
130 where
131 I: IntoIterator<Item = S>,
132 S: Into<OsString>,
133 {
134 self.args.extend(args.into_iter().map(Into::into));
135 self
136 }
137
138 pub fn in_dir(mut self, dir: impl AsRef<Path>) -> Self {
140 self.cwd = Some(dir.as_ref().to_path_buf());
141 self
142 }
143
144 pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
146 self.envs.push((key.into(), value.into()));
147 self
148 }
149
150 pub fn envs<I, K, V>(mut self, vars: I) -> Self
152 where
153 I: IntoIterator<Item = (K, V)>,
154 K: Into<OsString>,
155 V: Into<OsString>,
156 {
157 self.envs
158 .extend(vars.into_iter().map(|(k, v)| (k.into(), v.into())));
159 self
160 }
161
162 pub fn env_remove(mut self, key: impl Into<OsString>) -> Self {
164 self.env_remove.push(key.into());
165 self
166 }
167
168 pub fn env_clear(mut self) -> Self {
170 self.env_clear = true;
171 self
172 }
173
174 pub fn stdin(mut self, data: impl Into<StdinData>) -> Self {
180 self.stdin = Some(data.into());
181 self
182 }
183
184 pub fn stderr(mut self, mode: Redirection) -> Self {
186 self.stderr_mode = mode;
187 self
188 }
189
190 pub fn timeout(mut self, timeout: Duration) -> Self {
192 self.timeout = Some(timeout);
193 self
194 }
195
196 pub fn deadline(mut self, deadline: Instant) -> Self {
198 self.deadline = Some(deadline);
199 self
200 }
201
202 pub fn retry(mut self, policy: RetryPolicy) -> Self {
204 self.retry = Some(policy);
205 self
206 }
207
208 pub fn retry_when(mut self, f: impl Fn(&RunError) -> bool + Send + Sync + 'static) -> Self {
213 let policy = self.retry.take().unwrap_or_default();
214 self.retry = Some(policy.when(f));
215 self
216 }
217
218 pub fn secret(mut self) -> Self {
223 self.secret = true;
224 self
225 }
226
227 pub fn before_spawn<F>(mut self, hook: F) -> Self
229 where
230 F: Fn(&mut Command) -> io::Result<()> + Send + Sync + 'static,
231 {
232 self.before_spawn = Some(Arc::new(hook));
233 self
234 }
235
236 pub fn to_command(&self) -> Command {
242 let mut cmd = Command::new(&self.program);
243 cmd.args(&self.args);
244 if let Some(dir) = &self.cwd {
245 cmd.current_dir(dir);
246 }
247 if self.env_clear {
248 cmd.env_clear();
249 }
250 for key in &self.env_remove {
251 cmd.env_remove(key);
252 }
253 for (k, v) in &self.envs {
254 cmd.env(k, v);
255 }
256 cmd
257 }
258
259 pub fn display(&self) -> CmdDisplay {
261 CmdDisplay::new(self.program.clone(), self.args.clone(), self.secret)
262 }
263
264 pub fn spawn(mut self) -> Result<SpawnedProcess, RunError> {
281 let display = self.display();
282 let stdin_data = self.stdin.take();
283
284 let mut cmd = Command::new(&self.program);
285 cmd.args(&self.args);
286 if let Some(dir) = &self.cwd {
287 cmd.current_dir(dir);
288 }
289 if self.env_clear {
290 cmd.env_clear();
291 }
292 for key in &self.env_remove {
293 cmd.env_remove(key);
294 }
295 for (k, v) in &self.envs {
296 cmd.env(k, v);
297 }
298
299 cmd.stdin(Stdio::piped());
300 cmd.stdout(Stdio::piped());
301 match &self.stderr_mode {
302 Redirection::Capture => {
303 cmd.stderr(Stdio::piped());
304 }
305 Redirection::Inherit => {
306 cmd.stderr(Stdio::inherit());
307 }
308 Redirection::Null => {
309 cmd.stderr(Stdio::null());
310 }
311 Redirection::File(f) => {
312 let cloned = f.try_clone().map_err(|source| RunError::Spawn {
313 command: display.clone(),
314 source,
315 })?;
316 cmd.stderr(Stdio::from(cloned));
317 }
318 }
319
320 if let Some(hook) = &self.before_spawn {
321 hook(&mut cmd).map_err(|source| RunError::Spawn {
322 command: display.clone(),
323 source,
324 })?;
325 }
326
327 let child = SharedChild::spawn(&mut cmd).map_err(|source| RunError::Spawn {
328 command: display.clone(),
329 source,
330 })?;
331 let child = Arc::new(child);
332
333 if let Some(data) = stdin_data
336 && let Some(mut pipe) = child.take_stdin()
337 {
338 thread::spawn(move || {
339 use std::io::Write;
340 match data {
341 StdinData::Bytes(b) => {
342 let _ = pipe.write_all(&b);
343 }
344 StdinData::Reader(mut r) => {
345 let _ = io::copy(&mut r, &mut pipe);
346 }
347 }
348 });
349 }
350
351 let stderr_thread = if matches!(self.stderr_mode, Redirection::Capture)
353 && let Some(pipe) = child.take_stderr()
354 {
355 Some(thread::spawn(move || read_to_end(pipe)))
356 } else {
357 None
358 };
359
360 Ok(SpawnedProcess::new(child, stderr_thread, display))
361 }
362
363 pub fn spawn_and_collect_lines<F>(self, mut f: F) -> Result<RunOutput, RunError>
381 where
382 F: FnMut(&str) -> io::Result<()>,
383 {
384 let proc = self.spawn()?;
385 let stdout = proc
386 .take_stdout()
387 .expect("spawn always pipes stdout");
388 let reader = std::io::BufReader::new(stdout);
389 use std::io::BufRead;
390 for line in reader.lines() {
391 let line = match line {
392 Ok(l) => l,
393 Err(source) => {
394 let _ = proc.kill();
395 let _ = proc.wait();
396 return Err(RunError::Spawn {
397 command: proc.command().clone(),
398 source,
399 });
400 }
401 };
402 if let Err(source) = f(&line) {
403 let _ = proc.kill();
404 let _ = proc.wait();
405 return Err(RunError::Spawn {
406 command: proc.command().clone(),
407 source,
408 });
409 }
410 }
411 proc.wait()
412 }
413
414 pub fn run(mut self) -> Result<RunOutput, RunError> {
416 let display = self.display();
417 let mut stdin_holder = StdinHolder::from_opt(self.stdin.take());
418 let retry = self.retry.take();
419 let ctx = ExecContext {
420 program: &self.program,
421 args: &self.args,
422 cwd: self.cwd.as_deref(),
423 env_clear: self.env_clear,
424 env_remove: &self.env_remove,
425 envs: &self.envs,
426 stderr_mode: &self.stderr_mode,
427 before_spawn: self.before_spawn.as_ref(),
428 display: &display,
429 };
430
431 match retry {
432 None => execute_once(&ctx, stdin_holder.take_for_attempt(), self.per_attempt_timeout(Instant::now())),
433 Some(policy) => run_with_retry(&ctx, &mut stdin_holder, policy, self.timeout, self.deadline),
434 }
435 }
436
437 fn per_attempt_timeout(&self, now: Instant) -> Option<Duration> {
438 match (self.timeout, self.deadline) {
439 (None, None) => None,
440 (Some(t), None) => Some(t),
441 (None, Some(d)) => Some(d.saturating_duration_since(now)),
442 (Some(t), Some(d)) => {
443 let remaining = d.saturating_duration_since(now);
444 Some(t.min(remaining))
445 }
446 }
447 }
448}
449
450fn run_with_retry(
451 ctx: &ExecContext<'_>,
452 stdin_holder: &mut StdinHolder,
453 policy: RetryPolicy,
454 timeout: Option<Duration>,
455 deadline: Option<Instant>,
456) -> Result<RunOutput, RunError> {
457 let predicate = policy.predicate.clone();
458 let op = || {
459 let now = Instant::now();
460 if let Some(d) = deadline
461 && now >= d
462 {
463 return Err(RunError::Timeout {
465 command: ctx.display.clone(),
466 elapsed: Duration::ZERO,
467 stdout: Vec::new(),
468 stderr: String::new(),
469 });
470 }
471 let per_attempt = match (timeout, deadline) {
472 (None, None) => None,
473 (Some(t), None) => Some(t),
474 (None, Some(d)) => Some(d.saturating_duration_since(now)),
475 (Some(t), Some(d)) => Some(t.min(d.saturating_duration_since(now))),
476 };
477 let stdin = stdin_holder.take_for_attempt();
478 execute_once(ctx, stdin, per_attempt)
479 };
480 op.retry(policy.backoff)
481 .when(move |e: &RunError| predicate(e))
482 .call()
483}
484
485struct ExecContext<'a> {
486 program: &'a OsStr,
487 args: &'a [OsString],
488 cwd: Option<&'a Path>,
489 env_clear: bool,
490 env_remove: &'a [OsString],
491 envs: &'a [(OsString, OsString)],
492 stderr_mode: &'a Redirection,
493 before_spawn: Option<&'a BeforeSpawnHook>,
494 display: &'a CmdDisplay,
495}
496
497enum StdinHolder {
498 None,
499 Bytes(Vec<u8>),
500 OneShotReader(Option<Box<dyn Read + Send + Sync>>),
501}
502
503impl StdinHolder {
504 fn from_opt(d: Option<StdinData>) -> Self {
505 match d {
506 None => Self::None,
507 Some(StdinData::Bytes(b)) => Self::Bytes(b),
508 Some(StdinData::Reader(r)) => Self::OneShotReader(Some(r)),
509 }
510 }
511
512 fn take_for_attempt(&mut self) -> StdinForAttempt {
513 match self {
514 Self::None => StdinForAttempt::None,
515 Self::Bytes(b) => StdinForAttempt::Bytes(b.clone()),
516 Self::OneShotReader(slot) => match slot.take() {
517 Some(r) => StdinForAttempt::Reader(r),
518 None => StdinForAttempt::None,
519 },
520 }
521 }
522}
523
524enum StdinForAttempt {
525 None,
526 Bytes(Vec<u8>),
527 Reader(Box<dyn Read + Send + Sync>),
528}
529
530enum Outcome {
531 Exited(ExitStatus),
532 TimedOut(Duration),
533 WaitFailed(io::Error),
534}
535
536fn execute_once(
537 ctx: &ExecContext<'_>,
538 stdin: StdinForAttempt,
539 timeout: Option<Duration>,
540) -> Result<RunOutput, RunError> {
541 let mut cmd = build_command(ctx, &stdin)?;
542
543 if let Some(hook) = ctx.before_spawn {
544 hook(&mut cmd).map_err(|source| RunError::Spawn {
545 command: ctx.display.clone(),
546 source,
547 })?;
548 }
549
550 let mut child = cmd.spawn().map_err(|source| RunError::Spawn {
551 command: ctx.display.clone(),
552 source,
553 })?;
554
555 let stdin_thread = spawn_stdin_feeder(&mut child, stdin);
556 let stdout_thread = {
557 let pipe = child.stdout.take().expect("stdout piped");
558 Some(thread::spawn(move || read_to_end(pipe)))
559 };
560 let stderr_thread = if matches!(ctx.stderr_mode, Redirection::Capture) {
561 let pipe = child.stderr.take().expect("stderr piped");
562 Some(thread::spawn(move || read_to_end(pipe)))
563 } else {
564 None
565 };
566
567 let start = Instant::now();
568 let outcome = match timeout {
569 Some(t) => match child.wait_timeout(t) {
570 Ok(Some(status)) => Outcome::Exited(status),
571 Ok(None) => {
572 let _ = child.kill();
573 let _ = child.wait();
574 Outcome::TimedOut(start.elapsed())
575 }
576 Err(e) => {
577 let _ = child.kill();
578 let _ = child.wait();
579 Outcome::WaitFailed(e)
580 }
581 },
582 None => match child.wait() {
583 Ok(status) => Outcome::Exited(status),
584 Err(e) => Outcome::WaitFailed(e),
585 },
586 };
587
588 if let Some(t) = stdin_thread {
589 let _ = t.join();
590 }
591 let stdout_bytes = stdout_thread
592 .map(|t| t.join().unwrap_or_default())
593 .unwrap_or_default();
594 let stderr_bytes = stderr_thread
595 .map(|t| t.join().unwrap_or_default())
596 .unwrap_or_default();
597 let stderr_str = String::from_utf8_lossy(&stderr_bytes).into_owned();
598
599 finalize_outcome(ctx, outcome, stdout_bytes, stderr_str)
600}
601
602fn finalize_outcome(
603 ctx: &ExecContext<'_>,
604 outcome: Outcome,
605 stdout_bytes: Vec<u8>,
606 stderr_str: String,
607) -> Result<RunOutput, RunError> {
608 match outcome {
609 Outcome::Exited(status) if status.success() => Ok(RunOutput {
610 stdout: stdout_bytes,
611 stderr: stderr_str,
612 }),
613 Outcome::Exited(status) => Err(RunError::NonZeroExit {
614 command: ctx.display.clone(),
615 status,
616 stdout: truncate_suffix(stdout_bytes),
617 stderr: truncate_suffix_string(stderr_str),
618 }),
619 Outcome::TimedOut(elapsed) => Err(RunError::Timeout {
620 command: ctx.display.clone(),
621 elapsed,
622 stdout: truncate_suffix(stdout_bytes),
623 stderr: truncate_suffix_string(stderr_str),
624 }),
625 Outcome::WaitFailed(source) => Err(RunError::Spawn {
626 command: ctx.display.clone(),
627 source,
628 }),
629 }
630}
631
632fn build_command(ctx: &ExecContext<'_>, stdin: &StdinForAttempt) -> Result<Command, RunError> {
633 let mut cmd = Command::new(ctx.program);
634 cmd.args(ctx.args);
635 if let Some(dir) = ctx.cwd {
636 cmd.current_dir(dir);
637 }
638 if ctx.env_clear {
639 cmd.env_clear();
640 }
641 for key in ctx.env_remove {
642 cmd.env_remove(key);
643 }
644 for (k, v) in ctx.envs {
645 cmd.env(k, v);
646 }
647
648 match stdin {
649 StdinForAttempt::None => {}
650 StdinForAttempt::Bytes(_) | StdinForAttempt::Reader(_) => {
651 cmd.stdin(Stdio::piped());
652 }
653 }
654 cmd.stdout(Stdio::piped());
655
656 match ctx.stderr_mode {
657 Redirection::Capture => {
658 cmd.stderr(Stdio::piped());
659 }
660 Redirection::Inherit => {
661 cmd.stderr(Stdio::inherit());
662 }
663 Redirection::Null => {
664 cmd.stderr(Stdio::null());
665 }
666 Redirection::File(f) => {
667 let cloned = f.try_clone().map_err(|source| RunError::Spawn {
668 command: ctx.display.clone(),
669 source,
670 })?;
671 cmd.stderr(Stdio::from(cloned));
672 }
673 }
674 Ok(cmd)
675}
676
677fn spawn_stdin_feeder(
678 child: &mut std::process::Child,
679 stdin: StdinForAttempt,
680) -> Option<thread::JoinHandle<()>> {
681 match stdin {
682 StdinForAttempt::None => None,
683 StdinForAttempt::Bytes(bytes) => {
684 let mut pipe = child.stdin.take().expect("stdin piped");
685 Some(thread::spawn(move || {
686 let _ = pipe.write_all(&bytes);
687 }))
688 }
689 StdinForAttempt::Reader(mut reader) => {
690 let mut pipe = child.stdin.take().expect("stdin piped");
691 Some(thread::spawn(move || {
692 let _ = io::copy(&mut reader, &mut pipe);
693 }))
694 }
695 }
696}
697
698fn read_to_end<R: Read>(mut reader: R) -> Vec<u8> {
699 let mut buf = Vec::new();
700 let _ = reader.read_to_end(&mut buf);
701 buf
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707
708 #[test]
709 fn must_use_annotation_present() {
710 let _ = Cmd::new("x");
711 }
713
714 #[test]
715 fn builder_accumulates_args() {
716 let cmd = Cmd::new("git").arg("status").args(["-s", "--short"]);
717 assert_eq!(cmd.args.len(), 3);
718 }
719
720 #[test]
721 fn env_builder() {
722 let cmd = Cmd::new("x")
723 .env("A", "1")
724 .envs([("B", "2"), ("C", "3")])
725 .env_remove("D")
726 .env_clear();
727 assert_eq!(cmd.envs.len(), 3);
728 assert_eq!(cmd.env_remove.len(), 1);
729 assert!(cmd.env_clear);
730 }
731
732 #[test]
733 fn stdin_bytes_is_reusable() {
734 let cmd = Cmd::new("x").stdin("hello");
735 match cmd.stdin.as_ref() {
736 Some(StdinData::Bytes(b)) => assert_eq!(b, b"hello"),
737 _ => panic!("expected Bytes"),
738 }
739 }
740
741 #[test]
742 fn secret_flag_reaches_display() {
743 let cmd = Cmd::new("docker").arg("login").arg("-p").arg("hunter2").secret();
744 let d = cmd.display();
745 assert!(d.is_secret());
746 assert_eq!(d.to_string(), "docker <secret>");
747 }
748
749 #[test]
750 fn to_command_mirrors_config() {
751 let cmd = Cmd::new("git").args(["status"]).env("K", "V").in_dir("/tmp");
752 let std_cmd = cmd.to_command();
753 assert_eq!(std_cmd.get_program(), "git");
756 }
757
758 #[test]
759 fn retry_when_installs_default_policy() {
760 let cmd = Cmd::new("x").retry_when(|_| true);
761 assert!(cmd.retry.is_some());
762 }
763
764 #[test]
765 fn per_attempt_timeout_respects_both_bounds() {
766 let cmd = Cmd::new("x")
767 .timeout(Duration::from_secs(60))
768 .deadline(Instant::now() + Duration::from_secs(5));
769 let t = cmd.per_attempt_timeout(Instant::now()).unwrap();
770 assert!(t <= Duration::from_secs(60));
771 assert!(t <= Duration::from_secs(6));
772 }
773}