Skip to main content

mk_lib/schema/command/
local_run.rs

1use std::io::{
2  BufRead as _,
3  BufReader,
4  IsTerminal as _,
5};
6use std::process::{
7  Child,
8  ExitStatus,
9  Stdio,
10};
11use std::thread;
12#[cfg(unix)]
13use std::time::Duration;
14
15use anyhow::Context as _;
16#[cfg(unix)]
17use console::Term;
18use indicatif::ProgressDrawTarget;
19use schemars::JsonSchema;
20use serde::Deserialize;
21
22#[cfg(unix)]
23use std::os::fd::AsRawFd as _;
24#[cfg(unix)]
25use std::os::unix::process::CommandExt as _;
26
27use crate::defaults::{
28  default_ignore_errors,
29  default_verbose,
30};
31use crate::handle_output;
32#[cfg(unix)]
33use crate::schema::ExecutionInterrupted;
34use crate::schema::{
35  get_output_handler,
36  interpolate_template_string,
37  Shell,
38  TaskContext,
39};
40
41#[derive(Debug, Deserialize, Clone, JsonSchema)]
42pub struct LocalRun {
43  /// The command to run
44  pub command: String,
45
46  /// The shell to use to run the command
47  #[serde(default)]
48  pub shell: Option<Shell>,
49
50  /// The test to run before running command
51  /// If the test fails, the command will not run
52  #[serde(default)]
53  pub test: Option<String>,
54
55  /// The working directory to run the command in
56  #[serde(default)]
57  pub work_dir: Option<String>,
58
59  /// Interactive mode
60  /// If true, the command will be interactive accepting user input
61  #[serde(default)]
62  pub interactive: Option<bool>,
63
64  /// Allow pressing `R` to manually stop and restart a non-interactive command.
65  #[serde(default)]
66  pub retrigger: Option<bool>,
67
68  /// Ignore errors if the command fails
69  #[serde(default)]
70  pub ignore_errors: Option<bool>,
71
72  /// Save the command stdout to a task-scoped output name
73  #[serde(default)]
74  pub save_output_as: Option<String>,
75
76  /// Show verbose output
77  #[serde(default)]
78  pub verbose: Option<bool>,
79}
80
81impl LocalRun {
82  pub fn execute(&self, context: &TaskContext) -> anyhow::Result<()> {
83    self.execute_internal(context, false)
84  }
85
86  pub fn execute_cancellable(&self, context: &TaskContext) -> anyhow::Result<()> {
87    self.execute_internal(context, true)
88  }
89
90  fn execute_internal(&self, context: &TaskContext, allow_cancellation: bool) -> anyhow::Result<()> {
91    assert!(!self.command.is_empty());
92
93    let command = interpolate_template_string(&self.command, context)?;
94    let interactive = self.interactive_enabled();
95    let retrigger = self.retrigger_enabled();
96    if interactive && retrigger {
97      anyhow::bail!("retrigger is only supported for non-interactive local commands");
98    }
99    let ignore_errors = self.ignore_errors(context);
100    let capture_output = self.save_output_as.is_some();
101    // If interactive mode is enabled, we don't need to redirect the output
102    // to the parent process. This is because the command will be run in the
103    // foreground and the user will be able to see the output.
104    let verbose = interactive || self.verbose(context);
105
106    // Skip the command if the test fails
107    if self.test(context).is_err() {
108      return Ok(());
109    }
110
111    if retrigger {
112      return self.execute_with_retrigger(context, &command, ignore_errors, capture_output, verbose);
113    }
114
115    let spawned = self.spawn_command(context, &command, capture_output, verbose, interactive)?;
116    let (status, captured_stdout) = if allow_cancellation {
117      spawned.wait_for_completion_or_cancellation(context, &command)?
118    } else {
119      spawned.wait_for_completion()?
120    };
121    self.finish_execution(context, &command, status, captured_stdout, ignore_errors)
122  }
123
124  fn spawn_command(
125    &self,
126    context: &TaskContext,
127    command: &str,
128    capture_output: bool,
129    verbose: bool,
130    interactive: bool,
131  ) -> anyhow::Result<SpawnedLocalCommand> {
132    let mut cmd = self
133      .shell
134      .as_ref()
135      .map(|shell| shell.proc())
136      .unwrap_or_else(|| context.shell().proc());
137
138    cmd.arg(command);
139
140    if capture_output {
141      cmd.stdout(Stdio::piped());
142      if interactive {
143        context.multi.set_draw_target(ProgressDrawTarget::hidden());
144        cmd.stdin(Stdio::inherit()).stderr(Stdio::inherit());
145      } else {
146        cmd.stderr(get_output_handler(verbose));
147      }
148    } else if verbose {
149      if interactive {
150        context.multi.set_draw_target(ProgressDrawTarget::hidden());
151
152        cmd
153          .stdin(Stdio::inherit())
154          .stdout(Stdio::inherit())
155          .stderr(Stdio::inherit());
156      } else {
157        let stdout = get_output_handler(verbose);
158        let stderr = get_output_handler(verbose);
159        cmd.stdout(stdout).stderr(stderr);
160      }
161    }
162
163    if let Some(work_dir) = self.resolved_work_dir(context) {
164      cmd.current_dir(work_dir);
165    }
166
167    #[cfg(unix)]
168    if !interactive {
169      unsafe {
170        cmd.pre_exec(|| {
171          if libc::setpgid(0, 0) != 0 {
172            return Err(std::io::Error::last_os_error());
173          }
174          Ok(())
175        });
176      }
177    }
178
179    // Inject environment variables
180    for (key, value) in context.env_vars.iter() {
181      cmd.env(key, value);
182    }
183
184    let mut child = cmd.spawn()?;
185    let stdout_handle = if capture_output {
186      let stdout = child.stdout.take().context("Failed to open stdout")?;
187      let multi = context.multi.clone();
188      Some(thread::spawn(move || -> anyhow::Result<String> {
189        let reader = BufReader::new(stdout);
190        let mut output = String::new();
191        for line in reader.lines() {
192          let line = line?;
193          if verbose {
194            let _ = multi.println(line.clone());
195          }
196          output.push_str(&line);
197          output.push('\n');
198        }
199        Ok(output.trim_end_matches(['\r', '\n']).to_string())
200      }))
201    } else {
202      None
203    };
204
205    if verbose && !interactive && !capture_output {
206      handle_output!(child.stdout, context);
207      handle_output!(child.stderr, context);
208    } else if verbose && !interactive && capture_output {
209      handle_output!(child.stderr, context);
210    }
211
212    Ok(SpawnedLocalCommand { child, stdout_handle })
213  }
214
215  fn finish_execution(
216    &self,
217    context: &TaskContext,
218    command: &str,
219    status: ExitStatus,
220    captured_stdout: Option<String>,
221    ignore_errors: bool,
222  ) -> anyhow::Result<()> {
223    if !status.success() && !ignore_errors {
224      anyhow::bail!("Command failed - {}", command);
225    }
226
227    if status.success() {
228      if let (Some(output_name), Some(output_value)) = (&self.save_output_as, captured_stdout) {
229        context.insert_task_output(output_name.clone(), output_value)?;
230      }
231    }
232
233    Ok(())
234  }
235
236  fn execute_with_retrigger(
237    &self,
238    context: &TaskContext,
239    command: &str,
240    ignore_errors: bool,
241    capture_output: bool,
242    verbose: bool,
243  ) -> anyhow::Result<()> {
244    if !std::io::stdin().is_terminal() || context.json_events {
245      return self.execute_without_retrigger(
246        context,
247        command,
248        ignore_errors,
249        capture_output,
250        verbose,
251        "Manual retrigger requires an attached terminal and is disabled for `--json-events`.",
252      );
253    }
254
255    #[cfg(not(unix))]
256    {
257      return self.execute_without_retrigger(
258        context,
259        command,
260        ignore_errors,
261        capture_output,
262        verbose,
263        "Manual retrigger is currently supported on Unix terminals only.",
264      );
265    }
266
267    #[cfg(unix)]
268    {
269      let _raw_mode = RawModeGuard::acquire()?;
270      let term = Term::stderr();
271      let _ = term.write_line("Press R or r to restart the running command.");
272      drain_retrigger_input()?;
273
274      loop {
275        let spawned = self.spawn_command(context, command, capture_output, verbose, false)?;
276        match spawned.wait_for_completion_or_retrigger() {
277          Ok(CommandOutcome::Completed {
278            status,
279            captured_stdout,
280          }) => {
281            return self.finish_execution(context, command, status, captured_stdout, ignore_errors);
282          },
283          Ok(CommandOutcome::RestartRequested) => {
284            let _ = term.write_line("Restarting command...");
285          },
286          Ok(CommandOutcome::Interrupted) => {
287            return Err(ExecutionInterrupted.into());
288          },
289          Err(error) => return Err(error),
290        }
291      }
292    }
293  }
294
295  fn execute_without_retrigger(
296    &self,
297    context: &TaskContext,
298    command: &str,
299    ignore_errors: bool,
300    capture_output: bool,
301    verbose: bool,
302    reason: &str,
303  ) -> anyhow::Result<()> {
304    if !context.json_events {
305      let _ = context.multi.println(reason);
306    }
307    let (status, captured_stdout) = self
308      .spawn_command(context, command, capture_output, verbose, false)?
309      .wait_for_completion()?;
310    self.finish_execution(context, command, status, captured_stdout, ignore_errors)
311  }
312
313  /// Check if the local run task is parallel safe
314  /// If the task is interactive or retriggerable, it is not parallel safe
315  pub fn is_parallel_safe(&self) -> bool {
316    !self.interactive_enabled() && !self.retrigger_enabled()
317  }
318
319  pub fn interactive_enabled(&self) -> bool {
320    self.interactive.unwrap_or(false)
321  }
322
323  pub fn retrigger_enabled(&self) -> bool {
324    self.retrigger.unwrap_or(false)
325  }
326
327  fn test(&self, context: &TaskContext) -> anyhow::Result<()> {
328    let verbose = self.verbose(context);
329
330    let stdout = get_output_handler(verbose);
331    let stderr = get_output_handler(verbose);
332
333    if let Some(test) = &self.test {
334      let test = interpolate_template_string(test, context)?;
335      let mut cmd = self
336        .shell
337        .as_ref()
338        .map(|shell| shell.proc())
339        .unwrap_or_else(|| context.shell().proc());
340      cmd.arg(&test).stdout(stdout).stderr(stderr);
341
342      if let Some(work_dir) = self.resolved_work_dir(context) {
343        cmd.current_dir(work_dir);
344      }
345
346      let mut cmd = cmd.spawn()?;
347      if verbose {
348        handle_output!(cmd.stdout, context);
349        handle_output!(cmd.stderr, context);
350      }
351
352      let status = cmd.wait()?;
353
354      log::trace!("Test status: {:?}", status.success());
355      if !status.success() {
356        anyhow::bail!("Command test failed - {}", test);
357      }
358    }
359
360    Ok(())
361  }
362
363  fn ignore_errors(&self, context: &TaskContext) -> bool {
364    self
365      .ignore_errors
366      .or(context.ignore_errors)
367      .unwrap_or(default_ignore_errors())
368  }
369
370  fn verbose(&self, context: &TaskContext) -> bool {
371    self.verbose.or(context.verbose).unwrap_or(default_verbose())
372  }
373
374  pub fn resolved_work_dir(&self, context: &TaskContext) -> Option<std::path::PathBuf> {
375    self
376      .work_dir
377      .as_ref()
378      .map(|work_dir| context.resolve_from_config(work_dir))
379  }
380}
381
382struct SpawnedLocalCommand {
383  child: Child,
384  stdout_handle: Option<thread::JoinHandle<anyhow::Result<String>>>,
385}
386
387impl SpawnedLocalCommand {
388  fn wait_for_completion(mut self) -> anyhow::Result<(ExitStatus, Option<String>)> {
389    let status = self.child.wait()?;
390    let captured_stdout = self.join_stdout_handle()?;
391    Ok((status, captured_stdout))
392  }
393
394  fn wait_for_completion_or_cancellation(
395    mut self,
396    context: &TaskContext,
397    command: &str,
398  ) -> anyhow::Result<(ExitStatus, Option<String>)> {
399    loop {
400      if let Some(status) = self.child.try_wait()? {
401        let captured_stdout = self.join_stdout_handle()?;
402        return Ok((status, captured_stdout));
403      }
404
405      if context.cancellation_requested() {
406        self.kill_for_cancellation()?;
407        let _ = self.child.wait()?;
408        let _ = self.join_stdout_handle()?;
409        anyhow::bail!("Command cancelled - {}", command);
410      }
411
412      thread::sleep(std::time::Duration::from_millis(25));
413    }
414  }
415
416  fn join_stdout_handle(&mut self) -> anyhow::Result<Option<String>> {
417    self
418      .stdout_handle
419      .take()
420      .map(|handle| {
421        handle
422          .join()
423          .map_err(|_| anyhow::anyhow!("Failed to join stdout capture thread"))?
424      })
425      .transpose()
426  }
427
428  #[cfg(unix)]
429  fn wait_for_completion_or_retrigger(mut self) -> anyhow::Result<CommandOutcome> {
430    loop {
431      if let Some(status) = self.child.try_wait()? {
432        let captured_stdout = self.join_stdout_handle()?;
433        return Ok(CommandOutcome::Completed {
434          status,
435          captured_stdout,
436        });
437      }
438
439      match read_control_byte(Duration::from_millis(100))? {
440        Some(b'R' | b'r') => {
441          self.kill_for_cancellation()?;
442          let _ = self.child.wait()?;
443          let _ = self.join_stdout_handle()?;
444          drain_retrigger_input()?;
445          return Ok(CommandOutcome::RestartRequested);
446        },
447        Some(3) => {
448          self.kill_for_cancellation()?;
449          let _ = self.child.wait()?;
450          let _ = self.join_stdout_handle()?;
451          drain_retrigger_input()?;
452          return Ok(CommandOutcome::Interrupted);
453        },
454        _ => {},
455      }
456    }
457  }
458
459  #[cfg(unix)]
460  fn kill_for_cancellation(&mut self) -> anyhow::Result<()> {
461    let pid = self.child.id() as i32;
462    let kill_result = unsafe { libc::killpg(pid, libc::SIGKILL) };
463    if kill_result == 0 {
464      return Ok(());
465    }
466
467    let error = std::io::Error::last_os_error();
468    let raw_error = error.raw_os_error();
469    if raw_error == Some(libc::ESRCH) || raw_error == Some(libc::EPERM) {
470      match self.child.kill() {
471        Ok(()) => return Ok(()),
472        Err(child_error) if child_error.kind() == std::io::ErrorKind::InvalidInput => return Ok(()),
473        Err(child_error) => return Err(child_error.into()),
474      }
475    }
476
477    Err(error.into())
478  }
479
480  #[cfg(not(unix))]
481  fn kill_for_cancellation(&mut self) -> anyhow::Result<()> {
482    match self.child.kill() {
483      Ok(()) => Ok(()),
484      Err(error) if error.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
485      Err(error) => Err(error.into()),
486    }
487  }
488}
489
490#[cfg(unix)]
491enum CommandOutcome {
492  Completed {
493    status: ExitStatus,
494    captured_stdout: Option<String>,
495  },
496  RestartRequested,
497  Interrupted,
498}
499
500#[cfg(unix)]
501struct RawModeGuard {
502  fd: std::os::fd::RawFd,
503  original: libc::termios,
504}
505
506#[cfg(unix)]
507impl RawModeGuard {
508  fn acquire() -> anyhow::Result<Self> {
509    let fd = std::io::stdin().as_raw_fd();
510    let mut original = std::mem::MaybeUninit::<libc::termios>::uninit();
511    let get_attr_result = unsafe { libc::tcgetattr(fd, original.as_mut_ptr()) };
512    if get_attr_result != 0 {
513      return Err(std::io::Error::last_os_error().into());
514    }
515
516    let original = unsafe { original.assume_init() };
517    let mut raw = original;
518    raw.c_lflag &= !(libc::ICANON | libc::ECHO | libc::ISIG);
519    raw.c_cc[libc::VMIN] = 0;
520    raw.c_cc[libc::VTIME] = 0;
521
522    let set_attr_result = unsafe { libc::tcsetattr(fd, libc::TCSANOW, &raw) };
523    if set_attr_result != 0 {
524      return Err(std::io::Error::last_os_error().into());
525    }
526
527    Ok(Self { fd, original })
528  }
529}
530
531#[cfg(unix)]
532impl Drop for RawModeGuard {
533  fn drop(&mut self) {
534    let _ = unsafe { libc::tcsetattr(self.fd, libc::TCSANOW, &self.original) };
535  }
536}
537
538#[cfg(unix)]
539fn read_control_byte(timeout: Duration) -> anyhow::Result<Option<u8>> {
540  let fd = std::io::stdin().as_raw_fd();
541  let timeout_ms = timeout.as_millis().min(libc::c_int::MAX as u128) as libc::c_int;
542  let mut poll_fd = libc::pollfd {
543    fd,
544    events: libc::POLLIN,
545    revents: 0,
546  };
547
548  let poll_result = unsafe { libc::poll(&mut poll_fd, 1, timeout_ms) };
549  if poll_result < 0 {
550    return Err(std::io::Error::last_os_error().into());
551  }
552  if poll_result == 0 || poll_fd.revents & libc::POLLIN == 0 {
553    return Ok(None);
554  }
555
556  let mut byte = [0_u8; 1];
557  let read_result = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) };
558  if read_result < 0 {
559    return Err(std::io::Error::last_os_error().into());
560  }
561  if read_result == 0 {
562    return Ok(None);
563  }
564
565  Ok(Some(byte[0]))
566}
567
568#[cfg(unix)]
569fn drain_retrigger_input() -> anyhow::Result<()> {
570  while read_control_byte(Duration::ZERO)?.is_some() {}
571  Ok(())
572}
573
574#[cfg(test)]
575mod test {
576  use super::*;
577
578  #[test]
579  fn test_local_run_1() -> anyhow::Result<()> {
580    {
581      let yaml = "
582        command: echo 'Hello, World!'
583        ignore_errors: false
584        verbose: false
585      ";
586      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
587
588      assert_eq!(local_run.command, "echo 'Hello, World!'");
589      assert_eq!(local_run.work_dir, None);
590      assert_eq!(local_run.ignore_errors, Some(false));
591      assert_eq!(local_run.verbose, Some(false));
592      assert_eq!(local_run.retrigger, None);
593      assert_eq!(local_run.save_output_as, None);
594
595      Ok(())
596    }
597  }
598
599  #[test]
600  fn test_local_run_2() -> anyhow::Result<()> {
601    {
602      let yaml = "
603        command: echo 'Hello, World!'
604        test: test $(uname) = 'Linux'
605        ignore_errors: false
606        verbose: false
607      ";
608      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
609
610      assert_eq!(local_run.command, "echo 'Hello, World!'");
611      assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
612      assert_eq!(local_run.work_dir, None);
613      assert_eq!(local_run.ignore_errors, Some(false));
614      assert_eq!(local_run.verbose, Some(false));
615      assert_eq!(local_run.save_output_as, None);
616
617      Ok(())
618    }
619  }
620
621  #[test]
622  fn test_local_run_3() -> anyhow::Result<()> {
623    {
624      let yaml = "
625        command: echo 'Hello, World!'
626        test: test $(uname) = 'Linux'
627        shell: bash
628        ignore_errors: false
629        verbose: false
630        interactive: true
631      ";
632      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
633
634      assert_eq!(local_run.command, "echo 'Hello, World!'");
635      assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
636      assert_eq!(local_run.shell, Some(Shell::String("bash".to_string())));
637      assert_eq!(local_run.work_dir, None);
638      assert_eq!(local_run.ignore_errors, Some(false));
639      assert_eq!(local_run.verbose, Some(false));
640      assert_eq!(local_run.interactive, Some(true));
641      assert_eq!(local_run.retrigger, None);
642      assert_eq!(local_run.save_output_as, None);
643
644      Ok(())
645    }
646  }
647
648  #[test]
649  fn test_local_run_4() -> anyhow::Result<()> {
650    let yaml = "
651      command: go run .
652      retrigger: true
653    ";
654    let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
655
656    assert_eq!(local_run.command, "go run .");
657    assert_eq!(local_run.retrigger, Some(true));
658    assert!(!local_run.interactive_enabled());
659    assert!(!local_run.is_parallel_safe());
660
661    Ok(())
662  }
663
664  #[test]
665  fn test_local_run_5_rejects_interactive_retrigger_combo_at_execution() {
666    let yaml = "
667      command: cat
668      interactive: true
669      retrigger: true
670    ";
671    let local_run = serde_yaml::from_str::<LocalRun>(yaml).expect("valid local run");
672    let context = TaskContext::empty();
673
674    let error = local_run
675      .execute(&context)
676      .expect_err("expected execution to fail");
677    assert!(error
678      .to_string()
679      .contains("retrigger is only supported for non-interactive local commands"));
680  }
681}