Skip to main content

mk_lib/schema/command/
local_run.rs

1use std::io::{
2  BufRead as _,
3  BufReader,
4  IsTerminal as _,
5};
6use std::process::{
7  Child,
8  ExitStatus,
9  Stdio,
10};
11use std::thread;
12#[cfg(unix)]
13use std::time::Duration;
14
15use anyhow::Context as _;
16#[cfg(unix)]
17use console::Term;
18use indicatif::ProgressDrawTarget;
19use schemars::JsonSchema;
20use serde::Deserialize;
21
22#[cfg(unix)]
23use std::os::fd::AsRawFd as _;
24#[cfg(unix)]
25use std::os::unix::process::CommandExt as _;
26
27use crate::defaults::{
28  default_ignore_errors,
29  default_verbose,
30};
31use crate::handle_output;
32#[cfg(unix)]
33use crate::schema::ExecutionInterrupted;
34use crate::schema::{
35  get_output_handler,
36  interpolate_template_string,
37  Shell,
38  TaskContext,
39};
40
41#[derive(Debug, Deserialize, Clone, JsonSchema)]
42pub struct LocalRun {
43  /// The command to run
44  pub command: String,
45
46  /// The shell to use to run the command
47  #[serde(default)]
48  pub shell: Option<Shell>,
49
50  /// The test to run before running command
51  /// If the test fails, the command will not run
52  #[serde(default)]
53  pub test: Option<String>,
54
55  /// The working directory to run the command in
56  #[serde(default)]
57  pub work_dir: Option<String>,
58
59  /// Interactive mode
60  /// If true, the command will be interactive accepting user input
61  #[serde(default)]
62  pub interactive: Option<bool>,
63
64  /// Allow pressing `R` to manually stop and restart a non-interactive command.
65  #[serde(default)]
66  pub retrigger: Option<bool>,
67
68  /// Ignore errors if the command fails
69  #[serde(default)]
70  pub ignore_errors: Option<bool>,
71
72  /// Save the command stdout to a task-scoped output name
73  #[serde(default)]
74  pub save_output_as: Option<String>,
75
76  /// Show verbose output
77  #[serde(default)]
78  pub verbose: Option<bool>,
79}
80
81impl LocalRun {
82  pub fn execute(&self, context: &TaskContext) -> anyhow::Result<()> {
83    assert!(!self.command.is_empty());
84
85    let command = interpolate_template_string(&self.command, context)?;
86    let interactive = self.interactive_enabled();
87    let retrigger = self.retrigger_enabled();
88    if interactive && retrigger {
89      anyhow::bail!("retrigger is only supported for non-interactive local commands");
90    }
91    let ignore_errors = self.ignore_errors(context);
92    let capture_output = self.save_output_as.is_some();
93    // If interactive mode is enabled, we don't need to redirect the output
94    // to the parent process. This is because the command will be run in the
95    // foreground and the user will be able to see the output.
96    let verbose = interactive || self.verbose(context);
97
98    // Skip the command if the test fails
99    if self.test(context).is_err() {
100      return Ok(());
101    }
102
103    if retrigger {
104      return self.execute_with_retrigger(context, &command, ignore_errors, capture_output, verbose);
105    }
106
107    let (status, captured_stdout) = self
108      .spawn_command(context, &command, capture_output, verbose, interactive)?
109      .wait_for_completion()?;
110    self.finish_execution(context, &command, status, captured_stdout, ignore_errors)
111  }
112
113  fn spawn_command(
114    &self,
115    context: &TaskContext,
116    command: &str,
117    capture_output: bool,
118    verbose: bool,
119    interactive: bool,
120  ) -> anyhow::Result<SpawnedLocalCommand> {
121    let mut cmd = self
122      .shell
123      .as_ref()
124      .map(|shell| shell.proc())
125      .unwrap_or_else(|| context.shell().proc());
126
127    cmd.arg(command);
128
129    if capture_output {
130      cmd.stdout(Stdio::piped());
131      if interactive {
132        context.multi.set_draw_target(ProgressDrawTarget::hidden());
133        cmd.stdin(Stdio::inherit()).stderr(Stdio::inherit());
134      } else {
135        cmd.stderr(get_output_handler(verbose));
136      }
137    } else if verbose {
138      if interactive {
139        context.multi.set_draw_target(ProgressDrawTarget::hidden());
140
141        cmd
142          .stdin(Stdio::inherit())
143          .stdout(Stdio::inherit())
144          .stderr(Stdio::inherit());
145      } else {
146        let stdout = get_output_handler(verbose);
147        let stderr = get_output_handler(verbose);
148        cmd.stdout(stdout).stderr(stderr);
149      }
150    }
151
152    if let Some(work_dir) = self.resolved_work_dir(context) {
153      cmd.current_dir(work_dir);
154    }
155
156    #[cfg(unix)]
157    if self.retrigger_enabled() && !interactive {
158      unsafe {
159        cmd.pre_exec(|| {
160          if libc::setpgid(0, 0) != 0 {
161            return Err(std::io::Error::last_os_error());
162          }
163          Ok(())
164        });
165      }
166    }
167
168    // Inject environment variables
169    for (key, value) in context.env_vars.iter() {
170      cmd.env(key, value);
171    }
172
173    let mut child = cmd.spawn()?;
174    let stdout_handle = if capture_output {
175      let stdout = child.stdout.take().context("Failed to open stdout")?;
176      let multi = context.multi.clone();
177      Some(thread::spawn(move || -> anyhow::Result<String> {
178        let reader = BufReader::new(stdout);
179        let mut output = String::new();
180        for line in reader.lines() {
181          let line = line?;
182          if verbose {
183            let _ = multi.println(line.clone());
184          }
185          output.push_str(&line);
186          output.push('\n');
187        }
188        Ok(output.trim_end_matches(['\r', '\n']).to_string())
189      }))
190    } else {
191      None
192    };
193
194    if verbose && !interactive && !capture_output {
195      handle_output!(child.stdout, context);
196      handle_output!(child.stderr, context);
197    } else if verbose && !interactive && capture_output {
198      handle_output!(child.stderr, context);
199    }
200
201    Ok(SpawnedLocalCommand { child, stdout_handle })
202  }
203
204  fn finish_execution(
205    &self,
206    context: &TaskContext,
207    command: &str,
208    status: ExitStatus,
209    captured_stdout: Option<String>,
210    ignore_errors: bool,
211  ) -> anyhow::Result<()> {
212    if !status.success() && !ignore_errors {
213      anyhow::bail!("Command failed - {}", command);
214    }
215
216    if status.success() {
217      if let (Some(output_name), Some(output_value)) = (&self.save_output_as, captured_stdout) {
218        context.insert_task_output(output_name.clone(), output_value)?;
219      }
220    }
221
222    Ok(())
223  }
224
225  fn execute_with_retrigger(
226    &self,
227    context: &TaskContext,
228    command: &str,
229    ignore_errors: bool,
230    capture_output: bool,
231    verbose: bool,
232  ) -> anyhow::Result<()> {
233    if !std::io::stdin().is_terminal() || context.json_events {
234      return self.execute_without_retrigger(
235        context,
236        command,
237        ignore_errors,
238        capture_output,
239        verbose,
240        "Manual retrigger requires an attached terminal and is disabled for `--json-events`.",
241      );
242    }
243
244    #[cfg(not(unix))]
245    {
246      return self.execute_without_retrigger(
247        context,
248        command,
249        ignore_errors,
250        capture_output,
251        verbose,
252        "Manual retrigger is currently supported on Unix terminals only.",
253      );
254    }
255
256    #[cfg(unix)]
257    {
258      let _raw_mode = RawModeGuard::acquire()?;
259      let term = Term::stderr();
260      let _ = term.write_line("Press R or r to restart the running command.");
261      drain_retrigger_input()?;
262
263      loop {
264        let spawned = self.spawn_command(context, command, capture_output, verbose, false)?;
265        match spawned.wait_for_completion_or_retrigger() {
266          Ok(CommandOutcome::Completed {
267            status,
268            captured_stdout,
269          }) => {
270            return self.finish_execution(context, command, status, captured_stdout, ignore_errors);
271          },
272          Ok(CommandOutcome::RestartRequested) => {
273            let _ = term.write_line("Restarting command...");
274          },
275          Ok(CommandOutcome::Interrupted) => {
276            return Err(ExecutionInterrupted.into());
277          },
278          Err(error) => return Err(error),
279        }
280      }
281    }
282  }
283
284  fn execute_without_retrigger(
285    &self,
286    context: &TaskContext,
287    command: &str,
288    ignore_errors: bool,
289    capture_output: bool,
290    verbose: bool,
291    reason: &str,
292  ) -> anyhow::Result<()> {
293    if !context.json_events {
294      let _ = context.multi.println(reason);
295    }
296    let (status, captured_stdout) = self
297      .spawn_command(context, command, capture_output, verbose, false)?
298      .wait_for_completion()?;
299    self.finish_execution(context, command, status, captured_stdout, ignore_errors)
300  }
301
302  /// Check if the local run task is parallel safe
303  /// If the task is interactive or retriggerable, it is not parallel safe
304  pub fn is_parallel_safe(&self) -> bool {
305    !self.interactive_enabled() && !self.retrigger_enabled()
306  }
307
308  pub fn interactive_enabled(&self) -> bool {
309    self.interactive.unwrap_or(false)
310  }
311
312  pub fn retrigger_enabled(&self) -> bool {
313    self.retrigger.unwrap_or(false)
314  }
315
316  fn test(&self, context: &TaskContext) -> anyhow::Result<()> {
317    let verbose = self.verbose(context);
318
319    let stdout = get_output_handler(verbose);
320    let stderr = get_output_handler(verbose);
321
322    if let Some(test) = &self.test {
323      let test = interpolate_template_string(test, context)?;
324      let mut cmd = self
325        .shell
326        .as_ref()
327        .map(|shell| shell.proc())
328        .unwrap_or_else(|| context.shell().proc());
329      cmd.arg(&test).stdout(stdout).stderr(stderr);
330
331      if let Some(work_dir) = self.resolved_work_dir(context) {
332        cmd.current_dir(work_dir);
333      }
334
335      let mut cmd = cmd.spawn()?;
336      if verbose {
337        handle_output!(cmd.stdout, context);
338        handle_output!(cmd.stderr, context);
339      }
340
341      let status = cmd.wait()?;
342
343      log::trace!("Test status: {:?}", status.success());
344      if !status.success() {
345        anyhow::bail!("Command test failed - {}", test);
346      }
347    }
348
349    Ok(())
350  }
351
352  fn ignore_errors(&self, context: &TaskContext) -> bool {
353    self
354      .ignore_errors
355      .or(context.ignore_errors)
356      .unwrap_or(default_ignore_errors())
357  }
358
359  fn verbose(&self, context: &TaskContext) -> bool {
360    self.verbose.or(context.verbose).unwrap_or(default_verbose())
361  }
362
363  pub fn resolved_work_dir(&self, context: &TaskContext) -> Option<std::path::PathBuf> {
364    self
365      .work_dir
366      .as_ref()
367      .map(|work_dir| context.resolve_from_config(work_dir))
368  }
369}
370
371struct SpawnedLocalCommand {
372  child: Child,
373  stdout_handle: Option<thread::JoinHandle<anyhow::Result<String>>>,
374}
375
376impl SpawnedLocalCommand {
377  fn wait_for_completion(mut self) -> anyhow::Result<(ExitStatus, Option<String>)> {
378    let status = self.child.wait()?;
379    let captured_stdout = self.join_stdout_handle()?;
380    Ok((status, captured_stdout))
381  }
382
383  fn join_stdout_handle(&mut self) -> anyhow::Result<Option<String>> {
384    self
385      .stdout_handle
386      .take()
387      .map(|handle| {
388        handle
389          .join()
390          .map_err(|_| anyhow::anyhow!("Failed to join stdout capture thread"))?
391      })
392      .transpose()
393  }
394
395  #[cfg(unix)]
396  fn wait_for_completion_or_retrigger(mut self) -> anyhow::Result<CommandOutcome> {
397    loop {
398      if let Some(status) = self.child.try_wait()? {
399        let captured_stdout = self.join_stdout_handle()?;
400        return Ok(CommandOutcome::Completed {
401          status,
402          captured_stdout,
403        });
404      }
405
406      match read_control_byte(Duration::from_millis(100))? {
407        Some(b'R' | b'r') => {
408          self.kill_for_restart()?;
409          let _ = self.child.wait()?;
410          let _ = self.join_stdout_handle()?;
411          drain_retrigger_input()?;
412          return Ok(CommandOutcome::RestartRequested);
413        },
414        Some(3) => {
415          self.kill_for_restart()?;
416          let _ = self.child.wait()?;
417          let _ = self.join_stdout_handle()?;
418          drain_retrigger_input()?;
419          return Ok(CommandOutcome::Interrupted);
420        },
421        _ => {},
422      }
423    }
424  }
425
426  #[cfg(unix)]
427  fn kill_for_restart(&mut self) -> anyhow::Result<()> {
428    let pid = self.child.id() as i32;
429    let kill_result = unsafe { libc::killpg(pid, libc::SIGKILL) };
430    if kill_result == 0 {
431      return Ok(());
432    }
433
434    let error = std::io::Error::last_os_error();
435    let raw_error = error.raw_os_error();
436    if raw_error == Some(libc::ESRCH) || raw_error == Some(libc::EPERM) {
437      match self.child.kill() {
438        Ok(()) => return Ok(()),
439        Err(child_error) if child_error.kind() == std::io::ErrorKind::InvalidInput => return Ok(()),
440        Err(child_error) => return Err(child_error.into()),
441      }
442    }
443
444    Err(error.into())
445  }
446}
447
448#[cfg(unix)]
449enum CommandOutcome {
450  Completed {
451    status: ExitStatus,
452    captured_stdout: Option<String>,
453  },
454  RestartRequested,
455  Interrupted,
456}
457
458#[cfg(unix)]
459struct RawModeGuard {
460  fd: std::os::fd::RawFd,
461  original: libc::termios,
462}
463
464#[cfg(unix)]
465impl RawModeGuard {
466  fn acquire() -> anyhow::Result<Self> {
467    let fd = std::io::stdin().as_raw_fd();
468    let mut original = std::mem::MaybeUninit::<libc::termios>::uninit();
469    let get_attr_result = unsafe { libc::tcgetattr(fd, original.as_mut_ptr()) };
470    if get_attr_result != 0 {
471      return Err(std::io::Error::last_os_error().into());
472    }
473
474    let original = unsafe { original.assume_init() };
475    let mut raw = original;
476    raw.c_lflag &= !(libc::ICANON | libc::ECHO | libc::ISIG);
477    raw.c_cc[libc::VMIN] = 0;
478    raw.c_cc[libc::VTIME] = 0;
479
480    let set_attr_result = unsafe { libc::tcsetattr(fd, libc::TCSANOW, &raw) };
481    if set_attr_result != 0 {
482      return Err(std::io::Error::last_os_error().into());
483    }
484
485    Ok(Self { fd, original })
486  }
487}
488
489#[cfg(unix)]
490impl Drop for RawModeGuard {
491  fn drop(&mut self) {
492    let _ = unsafe { libc::tcsetattr(self.fd, libc::TCSANOW, &self.original) };
493  }
494}
495
496#[cfg(unix)]
497fn read_control_byte(timeout: Duration) -> anyhow::Result<Option<u8>> {
498  let fd = std::io::stdin().as_raw_fd();
499  let timeout_ms = timeout.as_millis().min(libc::c_int::MAX as u128) as libc::c_int;
500  let mut poll_fd = libc::pollfd {
501    fd,
502    events: libc::POLLIN,
503    revents: 0,
504  };
505
506  let poll_result = unsafe { libc::poll(&mut poll_fd, 1, timeout_ms) };
507  if poll_result < 0 {
508    return Err(std::io::Error::last_os_error().into());
509  }
510  if poll_result == 0 || poll_fd.revents & libc::POLLIN == 0 {
511    return Ok(None);
512  }
513
514  let mut byte = [0_u8; 1];
515  let read_result = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) };
516  if read_result < 0 {
517    return Err(std::io::Error::last_os_error().into());
518  }
519  if read_result == 0 {
520    return Ok(None);
521  }
522
523  Ok(Some(byte[0]))
524}
525
526#[cfg(unix)]
527fn drain_retrigger_input() -> anyhow::Result<()> {
528  while read_control_byte(Duration::ZERO)?.is_some() {}
529  Ok(())
530}
531
532#[cfg(test)]
533mod test {
534  use super::*;
535
536  #[test]
537  fn test_local_run_1() -> anyhow::Result<()> {
538    {
539      let yaml = "
540        command: echo 'Hello, World!'
541        ignore_errors: false
542        verbose: false
543      ";
544      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
545
546      assert_eq!(local_run.command, "echo 'Hello, World!'");
547      assert_eq!(local_run.work_dir, None);
548      assert_eq!(local_run.ignore_errors, Some(false));
549      assert_eq!(local_run.verbose, Some(false));
550      assert_eq!(local_run.retrigger, None);
551      assert_eq!(local_run.save_output_as, None);
552
553      Ok(())
554    }
555  }
556
557  #[test]
558  fn test_local_run_2() -> anyhow::Result<()> {
559    {
560      let yaml = "
561        command: echo 'Hello, World!'
562        test: test $(uname) = 'Linux'
563        ignore_errors: false
564        verbose: false
565      ";
566      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
567
568      assert_eq!(local_run.command, "echo 'Hello, World!'");
569      assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
570      assert_eq!(local_run.work_dir, None);
571      assert_eq!(local_run.ignore_errors, Some(false));
572      assert_eq!(local_run.verbose, Some(false));
573      assert_eq!(local_run.save_output_as, None);
574
575      Ok(())
576    }
577  }
578
579  #[test]
580  fn test_local_run_3() -> anyhow::Result<()> {
581    {
582      let yaml = "
583        command: echo 'Hello, World!'
584        test: test $(uname) = 'Linux'
585        shell: bash
586        ignore_errors: false
587        verbose: false
588        interactive: true
589      ";
590      let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
591
592      assert_eq!(local_run.command, "echo 'Hello, World!'");
593      assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
594      assert_eq!(local_run.shell, Some(Shell::String("bash".to_string())));
595      assert_eq!(local_run.work_dir, None);
596      assert_eq!(local_run.ignore_errors, Some(false));
597      assert_eq!(local_run.verbose, Some(false));
598      assert_eq!(local_run.interactive, Some(true));
599      assert_eq!(local_run.retrigger, None);
600      assert_eq!(local_run.save_output_as, None);
601
602      Ok(())
603    }
604  }
605
606  #[test]
607  fn test_local_run_4() -> anyhow::Result<()> {
608    let yaml = "
609      command: go run .
610      retrigger: true
611    ";
612    let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
613
614    assert_eq!(local_run.command, "go run .");
615    assert_eq!(local_run.retrigger, Some(true));
616    assert!(!local_run.interactive_enabled());
617    assert!(!local_run.is_parallel_safe());
618
619    Ok(())
620  }
621
622  #[test]
623  fn test_local_run_5_rejects_interactive_retrigger_combo_at_execution() {
624    let yaml = "
625      command: cat
626      interactive: true
627      retrigger: true
628    ";
629    let local_run = serde_yaml::from_str::<LocalRun>(yaml).expect("valid local run");
630    let context = TaskContext::empty();
631
632    let error = local_run
633      .execute(&context)
634      .expect_err("expected execution to fail");
635    assert!(error
636      .to_string()
637      .contains("retrigger is only supported for non-interactive local commands"));
638  }
639}