Skip to main content

mk_lib/schema/command/
local_run.rs

1use std::io::{
2  BufRead as _,
3  BufReader,
4  IsTerminal as _,
5};
6use std::process::{
7  Child,
8  ExitStatus,
9  Stdio,
10};
11use std::thread;
12use std::time::Duration;
13
14use anyhow::Context as _;
15use console::Term;
16use indicatif::ProgressDrawTarget;
17use serde::Deserialize;
18
19#[cfg(unix)]
20use std::os::fd::AsRawFd as _;
21#[cfg(unix)]
22use std::os::unix::process::CommandExt as _;
23
24use crate::defaults::{
25  default_ignore_errors,
26  default_verbose,
27};
28use crate::handle_output;
29use crate::schema::{
30  get_output_handler,
31  interpolate_template_string,
32  ExecutionInterrupted,
33  Shell,
34  TaskContext,
35};
36
37#[derive(Debug, Deserialize, Clone)]
38pub struct LocalRun {
39  /// The command to run
40  pub command: String,
41
42  /// The shell to use to run the command
43  #[serde(default)]
44  pub shell: Option<Shell>,
45
46  /// The test to run before running command
47  /// If the test fails, the command will not run
48  #[serde(default)]
49  pub test: Option<String>,
50
51  /// The working directory to run the command in
52  #[serde(default)]
53  pub work_dir: Option<String>,
54
55  /// Interactive mode
56  /// If true, the command will be interactive accepting user input
57  #[serde(default)]
58  pub interactive: Option<bool>,
59
60  /// Allow pressing `R` to manually stop and restart a non-interactive command.
61  #[serde(default)]
62  pub retrigger: Option<bool>,
63
64  /// Ignore errors if the command fails
65  #[serde(default)]
66  pub ignore_errors: Option<bool>,
67
68  /// Save the command stdout to a task-scoped output name
69  #[serde(default)]
70  pub save_output_as: Option<String>,
71
72  /// Show verbose output
73  #[serde(default)]
74  pub verbose: Option<bool>,
75}
76
77impl LocalRun {
78  pub fn execute(&self, context: &TaskContext) -> anyhow::Result<()> {
79    assert!(!self.command.is_empty());
80
81    let command = interpolate_template_string(&self.command, context)?;
82    let interactive = self.interactive_enabled();
83    let retrigger = self.retrigger_enabled();
84    if interactive && retrigger {
85      anyhow::bail!("retrigger is only supported for non-interactive local commands");
86    }
87    let ignore_errors = self.ignore_errors(context);
88    let capture_output = self.save_output_as.is_some();
89    // If interactive mode is enabled, we don't need to redirect the output
90    // to the parent process. This is because the command will be run in the
91    // foreground and the user will be able to see the output.
92    let verbose = interactive || self.verbose(context);
93
94    // Skip the command if the test fails
95    if self.test(context).is_err() {
96      return Ok(());
97    }
98
99    if retrigger {
100      return self.execute_with_retrigger(context, &command, ignore_errors, capture_output, verbose);
101    }
102
103    let (status, captured_stdout) = self
104      .spawn_command(context, &command, capture_output, verbose, interactive)?
105      .wait_for_completion()?;
106    self.finish_execution(context, &command, status, captured_stdout, ignore_errors)
107  }
108
109  fn spawn_command(
110    &self,
111    context: &TaskContext,
112    command: &str,
113    capture_output: bool,
114    verbose: bool,
115    interactive: bool,
116  ) -> anyhow::Result<SpawnedLocalCommand> {
117    let mut cmd = self
118      .shell
119      .as_ref()
120      .map(|shell| shell.proc())
121      .unwrap_or_else(|| context.shell().proc());
122
123    cmd.arg(command);
124
125    if capture_output {
126      cmd.stdout(Stdio::piped());
127      if interactive {
128        context.multi.set_draw_target(ProgressDrawTarget::hidden());
129        cmd.stdin(Stdio::inherit()).stderr(Stdio::inherit());
130      } else {
131        cmd.stderr(get_output_handler(verbose));
132      }
133    } else if verbose {
134      if interactive {
135        context.multi.set_draw_target(ProgressDrawTarget::hidden());
136
137        cmd
138          .stdin(Stdio::inherit())
139          .stdout(Stdio::inherit())
140          .stderr(Stdio::inherit());
141      } else {
142        let stdout = get_output_handler(verbose);
143        let stderr = get_output_handler(verbose);
144        cmd.stdout(stdout).stderr(stderr);
145      }
146    }
147
148    if let Some(work_dir) = self.resolved_work_dir(context) {
149      cmd.current_dir(work_dir);
150    }
151
152    #[cfg(unix)]
153    if self.retrigger_enabled() && !interactive {
154      unsafe {
155        cmd.pre_exec(|| {
156          if libc::setpgid(0, 0) != 0 {
157            return Err(std::io::Error::last_os_error());
158          }
159          Ok(())
160        });
161      }
162    }
163
164    // Inject environment variables
165    for (key, value) in context.env_vars.iter() {
166      cmd.env(key, value);
167    }
168
169    let mut child = cmd.spawn()?;
170    let stdout_handle = if capture_output {
171      let stdout = child.stdout.take().context("Failed to open stdout")?;
172      let multi = context.multi.clone();
173      Some(thread::spawn(move || -> anyhow::Result<String> {
174        let reader = BufReader::new(stdout);
175        let mut output = String::new();
176        for line in reader.lines() {
177          let line = line?;
178          if verbose {
179            let _ = multi.println(line.clone());
180          }
181          output.push_str(&line);
182          output.push('\n');
183        }
184        Ok(output.trim_end_matches(['\r', '\n']).to_string())
185      }))
186    } else {
187      None
188    };
189
190    if verbose && !interactive && !capture_output {
191      handle_output!(child.stdout, context);
192      handle_output!(child.stderr, context);
193    } else if verbose && !interactive && capture_output {
194      handle_output!(child.stderr, context);
195    }
196
197    Ok(SpawnedLocalCommand { child, stdout_handle })
198  }
199
200  fn finish_execution(
201    &self,
202    context: &TaskContext,
203    command: &str,
204    status: ExitStatus,
205    captured_stdout: Option<String>,
206    ignore_errors: bool,
207  ) -> anyhow::Result<()> {
208    if !status.success() && !ignore_errors {
209      anyhow::bail!("Command failed - {}", command);
210    }
211
212    if status.success() {
213      if let (Some(output_name), Some(output_value)) = (&self.save_output_as, captured_stdout) {
214        context.insert_task_output(output_name.clone(), output_value)?;
215      }
216    }
217
218    Ok(())
219  }
220
221  fn execute_with_retrigger(
222    &self,
223    context: &TaskContext,
224    command: &str,
225    ignore_errors: bool,
226    capture_output: bool,
227    verbose: bool,
228  ) -> anyhow::Result<()> {
229    if !std::io::stdin().is_terminal() || context.json_events {
230      return self.execute_without_retrigger(
231        context,
232        command,
233        ignore_errors,
234        capture_output,
235        verbose,
236        "Manual retrigger requires an attached terminal and is disabled for `--json-events`.",
237      );
238    }
239
240    #[cfg(not(unix))]
241    {
242      return self.execute_without_retrigger(
243        context,
244        command,
245        ignore_errors,
246        capture_output,
247        verbose,
248        "Manual retrigger is currently supported on Unix terminals only.",
249      );
250    }
251
252    #[cfg(unix)]
253    {
254      let _raw_mode = RawModeGuard::acquire()?;
255      let term = Term::stderr();
256      let _ = term.write_line("Press R or r to restart the running command.");
257      drain_retrigger_input()?;
258
259      loop {
260        let spawned = self.spawn_command(context, command, capture_output, verbose, false)?;
261        match spawned.wait_for_completion_or_retrigger() {
262          Ok(CommandOutcome::Completed {
263            status,
264            captured_stdout,
265          }) => {
266            return self.finish_execution(context, command, status, captured_stdout, ignore_errors);
267          },
268          Ok(CommandOutcome::RestartRequested) => {
269            let _ = term.write_line("Restarting command...");
270          },
271          Ok(CommandOutcome::Interrupted) => {
272            return Err(ExecutionInterrupted.into());
273          },
274          Err(error) => return Err(error),
275        }
276      }
277    }
278  }
279
280  fn execute_without_retrigger(
281    &self,
282    context: &TaskContext,
283    command: &str,
284    ignore_errors: bool,
285    capture_output: bool,
286    verbose: bool,
287    reason: &str,
288  ) -> anyhow::Result<()> {
289    if !context.json_events {
290      let _ = context.multi.println(reason);
291    }
292    let (status, captured_stdout) = self
293      .spawn_command(context, command, capture_output, verbose, false)?
294      .wait_for_completion()?;
295    self.finish_execution(context, command, status, captured_stdout, ignore_errors)
296  }
297
298  /// Check if the local run task is parallel safe
299  /// If the task is interactive or retriggerable, it is not parallel safe
300  pub fn is_parallel_safe(&self) -> bool {
301    !self.interactive_enabled() && !self.retrigger_enabled()
302  }
303
304  pub fn interactive_enabled(&self) -> bool {
305    self.interactive.unwrap_or(false)
306  }
307
308  pub fn retrigger_enabled(&self) -> bool {
309    self.retrigger.unwrap_or(false)
310  }
311
312  fn test(&self, context: &TaskContext) -> anyhow::Result<()> {
313    let verbose = self.verbose(context);
314
315    let stdout = get_output_handler(verbose);
316    let stderr = get_output_handler(verbose);
317
318    if let Some(test) = &self.test {
319      let test = interpolate_template_string(test, context)?;
320      let mut cmd = self
321        .shell
322        .as_ref()
323        .map(|shell| shell.proc())
324        .unwrap_or_else(|| context.shell().proc());
325      cmd.arg(&test).stdout(stdout).stderr(stderr);
326
327      if let Some(work_dir) = self.resolved_work_dir(context) {
328        cmd.current_dir(work_dir);
329      }
330
331      let mut cmd = cmd.spawn()?;
332      if verbose {
333        handle_output!(cmd.stdout, context);
334        handle_output!(cmd.stderr, context);
335      }
336
337      let status = cmd.wait()?;
338
339      log::trace!("Test status: {:?}", status.success());
340      if !status.success() {
341        anyhow::bail!("Command test failed - {}", test);
342      }
343    }
344
345    Ok(())
346  }
347
348  fn ignore_errors(&self, context: &TaskContext) -> bool {
349    self
350      .ignore_errors
351      .or(context.ignore_errors)
352      .unwrap_or(default_ignore_errors())
353  }
354
355  fn verbose(&self, context: &TaskContext) -> bool {
356    self.verbose.or(context.verbose).unwrap_or(default_verbose())
357  }
358
359  pub fn resolved_work_dir(&self, context: &TaskContext) -> Option<std::path::PathBuf> {
360    self
361      .work_dir
362      .as_ref()
363      .map(|work_dir| context.resolve_from_config(work_dir))
364  }
365}
366
367struct SpawnedLocalCommand {
368  child: Child,
369  stdout_handle: Option<thread::JoinHandle<anyhow::Result<String>>>,
370}
371
372impl SpawnedLocalCommand {
373  fn wait_for_completion(mut self) -> anyhow::Result<(ExitStatus, Option<String>)> {
374    let status = self.child.wait()?;
375    let captured_stdout = self.join_stdout_handle()?;
376    Ok((status, captured_stdout))
377  }
378
379  fn join_stdout_handle(&mut self) -> anyhow::Result<Option<String>> {
380    self
381      .stdout_handle
382      .take()
383      .map(|handle| {
384        handle
385          .join()
386          .map_err(|_| anyhow::anyhow!("Failed to join stdout capture thread"))?
387      })
388      .transpose()
389  }
390
391  #[cfg(unix)]
392  fn wait_for_completion_or_retrigger(mut self) -> anyhow::Result<CommandOutcome> {
393    loop {
394      if let Some(status) = self.child.try_wait()? {
395        let captured_stdout = self.join_stdout_handle()?;
396        return Ok(CommandOutcome::Completed {
397          status,
398          captured_stdout,
399        });
400      }
401
402      match read_control_byte(Duration::from_millis(100))? {
403        Some(b'R' | b'r') => {
404          self.kill_for_restart()?;
405          let _ = self.child.wait()?;
406          let _ = self.join_stdout_handle()?;
407          drain_retrigger_input()?;
408          return Ok(CommandOutcome::RestartRequested);
409        },
410        Some(3) => {
411          self.kill_for_restart()?;
412          let _ = self.child.wait()?;
413          let _ = self.join_stdout_handle()?;
414          drain_retrigger_input()?;
415          return Ok(CommandOutcome::Interrupted);
416        },
417        _ => {},
418      }
419    }
420  }
421
422  #[cfg(unix)]
423  fn kill_for_restart(&mut self) -> anyhow::Result<()> {
424    let pid = self.child.id() as i32;
425    let kill_result = unsafe { libc::killpg(pid, libc::SIGKILL) };
426    if kill_result == 0 {
427      return Ok(());
428    }
429
430    let error = std::io::Error::last_os_error();
431    let raw_error = error.raw_os_error();
432    if raw_error == Some(libc::ESRCH) || raw_error == Some(libc::EPERM) {
433      match self.child.kill() {
434        Ok(()) => return Ok(()),
435        Err(child_error) if child_error.kind() == std::io::ErrorKind::InvalidInput => return Ok(()),
436        Err(child_error) => return Err(child_error.into()),
437      }
438    }
439
440    Err(error.into())
441  }
442}
443
444#[cfg(unix)]
445enum CommandOutcome {
446  Completed {
447    status: ExitStatus,
448    captured_stdout: Option<String>,
449  },
450  RestartRequested,
451  Interrupted,
452}
453
454#[cfg(unix)]
455struct RawModeGuard {
456  fd: std::os::fd::RawFd,
457  original: libc::termios,
458}
459
460#[cfg(unix)]
461impl RawModeGuard {
462  fn acquire() -> anyhow::Result<Self> {
463    let fd = std::io::stdin().as_raw_fd();
464    let mut original = std::mem::MaybeUninit::<libc::termios>::uninit();
465    let get_attr_result = unsafe { libc::tcgetattr(fd, original.as_mut_ptr()) };
466    if get_attr_result != 0 {
467      return Err(std::io::Error::last_os_error().into());
468    }
469
470    let original = unsafe { original.assume_init() };
471    let mut raw = original;
472    raw.c_lflag &= !(libc::ICANON | libc::ECHO | libc::ISIG);
473    raw.c_cc[libc::VMIN] = 0;
474    raw.c_cc[libc::VTIME] = 0;
475
476    let set_attr_result = unsafe { libc::tcsetattr(fd, libc::TCSANOW, &raw) };
477    if set_attr_result != 0 {
478      return Err(std::io::Error::last_os_error().into());
479    }
480
481    Ok(Self { fd, original })
482  }
483}
484
485#[cfg(unix)]
486impl Drop for RawModeGuard {
487  fn drop(&mut self) {
488    let _ = unsafe { libc::tcsetattr(self.fd, libc::TCSANOW, &self.original) };
489  }
490}
491
492#[cfg(unix)]
493fn read_control_byte(timeout: Duration) -> anyhow::Result<Option<u8>> {
494  let fd = std::io::stdin().as_raw_fd();
495  let timeout_ms = timeout.as_millis().min(libc::c_int::MAX as u128) as libc::c_int;
496  let mut poll_fd = libc::pollfd {
497    fd,
498    events: libc::POLLIN,
499    revents: 0,
500  };
501
502  let poll_result = unsafe { libc::poll(&mut poll_fd, 1, timeout_ms) };
503  if poll_result < 0 {
504    return Err(std::io::Error::last_os_error().into());
505  }
506  if poll_result == 0 || poll_fd.revents & libc::POLLIN == 0 {
507    return Ok(None);
508  }
509
510  let mut byte = [0_u8; 1];
511  let read_result = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) };
512  if read_result < 0 {
513    return Err(std::io::Error::last_os_error().into());
514  }
515  if read_result == 0 {
516    return Ok(None);
517  }
518
519  Ok(Some(byte[0]))
520}
521
522#[cfg(unix)]
523fn drain_retrigger_input() -> anyhow::Result<()> {
524  while read_control_byte(Duration::ZERO)?.is_some() {}
525  Ok(())
526}
527
528#[cfg(test)]
529mod test {
530  use super::*;
531
532  #[test]
533  fn test_local_run_1() -> anyhow::Result<()> {
534    {
535      let yaml = "
536        command: echo 'Hello, World!'
537        ignore_errors: false
538        verbose: false
539      ";
540      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
541
542      assert_eq!(local_run.command, "echo 'Hello, World!'");
543      assert_eq!(local_run.work_dir, None);
544      assert_eq!(local_run.ignore_errors, Some(false));
545      assert_eq!(local_run.verbose, Some(false));
546      assert_eq!(local_run.retrigger, None);
547      assert_eq!(local_run.save_output_as, None);
548
549      Ok(())
550    }
551  }
552
553  #[test]
554  fn test_local_run_2() -> anyhow::Result<()> {
555    {
556      let yaml = "
557        command: echo 'Hello, World!'
558        test: test $(uname) = 'Linux'
559        ignore_errors: false
560        verbose: false
561      ";
562      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
563
564      assert_eq!(local_run.command, "echo 'Hello, World!'");
565      assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
566      assert_eq!(local_run.work_dir, None);
567      assert_eq!(local_run.ignore_errors, Some(false));
568      assert_eq!(local_run.verbose, Some(false));
569      assert_eq!(local_run.save_output_as, None);
570
571      Ok(())
572    }
573  }
574
575  #[test]
576  fn test_local_run_3() -> anyhow::Result<()> {
577    {
578      let yaml = "
579        command: echo 'Hello, World!'
580        test: test $(uname) = 'Linux'
581        shell: bash
582        ignore_errors: false
583        verbose: false
584        interactive: true
585      ";
586      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
587
588      assert_eq!(local_run.command, "echo 'Hello, World!'");
589      assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
590      assert_eq!(local_run.shell, Some(Shell::String("bash".to_string())));
591      assert_eq!(local_run.work_dir, None);
592      assert_eq!(local_run.ignore_errors, Some(false));
593      assert_eq!(local_run.verbose, Some(false));
594      assert_eq!(local_run.interactive, Some(true));
595      assert_eq!(local_run.retrigger, None);
596      assert_eq!(local_run.save_output_as, None);
597
598      Ok(())
599    }
600  }
601
602  #[test]
603  fn test_local_run_4() -> anyhow::Result<()> {
604    let yaml = "
605      command: go run .
606      retrigger: true
607    ";
608    let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
609
610    assert_eq!(local_run.command, "go run .");
611    assert_eq!(local_run.retrigger, Some(true));
612    assert!(!local_run.interactive_enabled());
613    assert!(!local_run.is_parallel_safe());
614
615    Ok(())
616  }
617
618  #[test]
619  fn test_local_run_5_rejects_interactive_retrigger_combo_at_execution() {
620    let yaml = "
621      command: cat
622      interactive: true
623      retrigger: true
624    ";
625    let local_run = serde_yaml::from_str::<LocalRun>(yaml).expect("valid local run");
626    let context = TaskContext::empty();
627
628    let error = local_run
629      .execute(&context)
630      .expect_err("expected execution to fail");
631    assert!(error
632      .to_string()
633      .contains("retrigger is only supported for non-interactive local commands"));
634  }
635}