1use std::io::{
2 BufRead as _,
3 BufReader,
4 IsTerminal as _,
5};
6use std::process::{
7 Child,
8 ExitStatus,
9 Stdio,
10};
11use std::thread;
12#[cfg(unix)]
13use std::time::Duration;
14
15use anyhow::Context as _;
16#[cfg(unix)]
17use console::Term;
18use indicatif::ProgressDrawTarget;
19use schemars::JsonSchema;
20use serde::Deserialize;
21
22#[cfg(unix)]
23use std::os::fd::AsRawFd as _;
24#[cfg(unix)]
25use std::os::unix::process::CommandExt as _;
26
27use crate::defaults::{
28 default_ignore_errors,
29 default_verbose,
30};
31use crate::handle_output;
32#[cfg(unix)]
33use crate::schema::ExecutionInterrupted;
34use crate::schema::{
35 get_output_handler,
36 interpolate_template_string,
37 Shell,
38 TaskContext,
39};
40
41#[derive(Debug, Deserialize, Clone, JsonSchema)]
42pub struct LocalRun {
43 pub command: String,
45
46 #[serde(default)]
48 pub shell: Option<Shell>,
49
50 #[serde(default)]
53 pub test: Option<String>,
54
55 #[serde(default)]
57 pub work_dir: Option<String>,
58
59 #[serde(default)]
62 pub interactive: Option<bool>,
63
64 #[serde(default)]
66 pub retrigger: Option<bool>,
67
68 #[serde(default)]
70 pub ignore_errors: Option<bool>,
71
72 #[serde(default)]
74 pub save_output_as: Option<String>,
75
76 #[serde(default)]
78 pub verbose: Option<bool>,
79}
80
81impl LocalRun {
82 pub fn execute(&self, context: &TaskContext) -> anyhow::Result<()> {
83 self.execute_internal(context, false)
84 }
85
86 pub fn execute_cancellable(&self, context: &TaskContext) -> anyhow::Result<()> {
87 self.execute_internal(context, true)
88 }
89
90 fn execute_internal(&self, context: &TaskContext, allow_cancellation: bool) -> anyhow::Result<()> {
91 assert!(!self.command.is_empty());
92
93 let command = interpolate_template_string(&self.command, context)?;
94 let interactive = self.interactive_enabled();
95 let retrigger = self.retrigger_enabled();
96 if interactive && retrigger {
97 anyhow::bail!("retrigger is only supported for non-interactive local commands");
98 }
99 let ignore_errors = self.ignore_errors(context);
100 let capture_output = self.save_output_as.is_some();
101 let verbose = interactive || self.verbose(context);
105
106 if self.test(context).is_err() {
108 return Ok(());
109 }
110
111 if retrigger {
112 return self.execute_with_retrigger(context, &command, ignore_errors, capture_output, verbose);
113 }
114
115 let spawned = self.spawn_command(context, &command, capture_output, verbose, interactive)?;
116 let (status, captured_stdout) = if allow_cancellation {
117 spawned.wait_for_completion_or_cancellation(context, &command)?
118 } else {
119 spawned.wait_for_completion()?
120 };
121 self.finish_execution(context, &command, status, captured_stdout, ignore_errors)
122 }
123
124 fn spawn_command(
125 &self,
126 context: &TaskContext,
127 command: &str,
128 capture_output: bool,
129 verbose: bool,
130 interactive: bool,
131 ) -> anyhow::Result<SpawnedLocalCommand> {
132 let mut cmd = self
133 .shell
134 .as_ref()
135 .map(|shell| shell.proc())
136 .unwrap_or_else(|| context.shell().proc());
137
138 cmd.arg(command);
139
140 if capture_output {
141 cmd.stdout(Stdio::piped());
142 if interactive {
143 context.multi.set_draw_target(ProgressDrawTarget::hidden());
144 cmd.stdin(Stdio::inherit()).stderr(Stdio::inherit());
145 } else {
146 cmd.stderr(get_output_handler(verbose));
147 }
148 } else if verbose {
149 if interactive {
150 context.multi.set_draw_target(ProgressDrawTarget::hidden());
151
152 cmd
153 .stdin(Stdio::inherit())
154 .stdout(Stdio::inherit())
155 .stderr(Stdio::inherit());
156 } else {
157 let stdout = get_output_handler(verbose);
158 let stderr = get_output_handler(verbose);
159 cmd.stdout(stdout).stderr(stderr);
160 }
161 }
162
163 if let Some(work_dir) = self.resolved_work_dir(context) {
164 cmd.current_dir(work_dir);
165 }
166
167 #[cfg(unix)]
168 if !interactive {
169 unsafe {
170 cmd.pre_exec(|| {
171 if libc::setpgid(0, 0) != 0 {
172 return Err(std::io::Error::last_os_error());
173 }
174 Ok(())
175 });
176 }
177 }
178
179 for (key, value) in context.env_vars.iter() {
181 cmd.env(key, value);
182 }
183
184 let mut child = cmd.spawn()?;
185 let stdout_handle = if capture_output {
186 let stdout = child.stdout.take().context("Failed to open stdout")?;
187 let multi = context.multi.clone();
188 Some(thread::spawn(move || -> anyhow::Result<String> {
189 let reader = BufReader::new(stdout);
190 let mut output = String::new();
191 for line in reader.lines() {
192 let line = line?;
193 if verbose {
194 let _ = multi.println(line.clone());
195 }
196 output.push_str(&line);
197 output.push('\n');
198 }
199 Ok(output.trim_end_matches(['\r', '\n']).to_string())
200 }))
201 } else {
202 None
203 };
204
205 if verbose && !interactive && !capture_output {
206 handle_output!(child.stdout, context);
207 handle_output!(child.stderr, context);
208 } else if verbose && !interactive && capture_output {
209 handle_output!(child.stderr, context);
210 }
211
212 Ok(SpawnedLocalCommand { child, stdout_handle })
213 }
214
215 fn finish_execution(
216 &self,
217 context: &TaskContext,
218 command: &str,
219 status: ExitStatus,
220 captured_stdout: Option<String>,
221 ignore_errors: bool,
222 ) -> anyhow::Result<()> {
223 if !status.success() && !ignore_errors {
224 anyhow::bail!("Command failed - {}", command);
225 }
226
227 if status.success() {
228 if let (Some(output_name), Some(output_value)) = (&self.save_output_as, captured_stdout) {
229 context.insert_task_output(output_name.clone(), output_value)?;
230 }
231 }
232
233 Ok(())
234 }
235
236 fn execute_with_retrigger(
237 &self,
238 context: &TaskContext,
239 command: &str,
240 ignore_errors: bool,
241 capture_output: bool,
242 verbose: bool,
243 ) -> anyhow::Result<()> {
244 if !std::io::stdin().is_terminal() || context.json_events {
245 return self.execute_without_retrigger(
246 context,
247 command,
248 ignore_errors,
249 capture_output,
250 verbose,
251 "Manual retrigger requires an attached terminal and is disabled for `--json-events`.",
252 );
253 }
254
255 #[cfg(not(unix))]
256 {
257 return self.execute_without_retrigger(
258 context,
259 command,
260 ignore_errors,
261 capture_output,
262 verbose,
263 "Manual retrigger is currently supported on Unix terminals only.",
264 );
265 }
266
267 #[cfg(unix)]
268 {
269 let _raw_mode = RawModeGuard::acquire()?;
270 let term = Term::stderr();
271 let _ = term.write_line("Press R or r to restart the running command.");
272 drain_retrigger_input()?;
273
274 loop {
275 let spawned = self.spawn_command(context, command, capture_output, verbose, false)?;
276 match spawned.wait_for_completion_or_retrigger() {
277 Ok(CommandOutcome::Completed {
278 status,
279 captured_stdout,
280 }) => {
281 return self.finish_execution(context, command, status, captured_stdout, ignore_errors);
282 },
283 Ok(CommandOutcome::RestartRequested) => {
284 let _ = term.write_line("Restarting command...");
285 },
286 Ok(CommandOutcome::Interrupted) => {
287 return Err(ExecutionInterrupted.into());
288 },
289 Err(error) => return Err(error),
290 }
291 }
292 }
293 }
294
295 fn execute_without_retrigger(
296 &self,
297 context: &TaskContext,
298 command: &str,
299 ignore_errors: bool,
300 capture_output: bool,
301 verbose: bool,
302 reason: &str,
303 ) -> anyhow::Result<()> {
304 if !context.json_events {
305 let _ = context.multi.println(reason);
306 }
307 let (status, captured_stdout) = self
308 .spawn_command(context, command, capture_output, verbose, false)?
309 .wait_for_completion()?;
310 self.finish_execution(context, command, status, captured_stdout, ignore_errors)
311 }
312
313 pub fn is_parallel_safe(&self) -> bool {
316 !self.interactive_enabled() && !self.retrigger_enabled()
317 }
318
319 pub fn interactive_enabled(&self) -> bool {
320 self.interactive.unwrap_or(false)
321 }
322
323 pub fn retrigger_enabled(&self) -> bool {
324 self.retrigger.unwrap_or(false)
325 }
326
327 fn test(&self, context: &TaskContext) -> anyhow::Result<()> {
328 let verbose = self.verbose(context);
329
330 let stdout = get_output_handler(verbose);
331 let stderr = get_output_handler(verbose);
332
333 if let Some(test) = &self.test {
334 let test = interpolate_template_string(test, context)?;
335 let mut cmd = self
336 .shell
337 .as_ref()
338 .map(|shell| shell.proc())
339 .unwrap_or_else(|| context.shell().proc());
340 cmd.arg(&test).stdout(stdout).stderr(stderr);
341
342 if let Some(work_dir) = self.resolved_work_dir(context) {
343 cmd.current_dir(work_dir);
344 }
345
346 let mut cmd = cmd.spawn()?;
347 if verbose {
348 handle_output!(cmd.stdout, context);
349 handle_output!(cmd.stderr, context);
350 }
351
352 let status = cmd.wait()?;
353
354 log::trace!("Test status: {:?}", status.success());
355 if !status.success() {
356 anyhow::bail!("Command test failed - {}", test);
357 }
358 }
359
360 Ok(())
361 }
362
363 fn ignore_errors(&self, context: &TaskContext) -> bool {
364 self
365 .ignore_errors
366 .or(context.ignore_errors)
367 .unwrap_or(default_ignore_errors())
368 }
369
370 fn verbose(&self, context: &TaskContext) -> bool {
371 self.verbose.or(context.verbose).unwrap_or(default_verbose())
372 }
373
374 pub fn resolved_work_dir(&self, context: &TaskContext) -> Option<std::path::PathBuf> {
375 self
376 .work_dir
377 .as_ref()
378 .map(|work_dir| context.resolve_from_config(work_dir))
379 }
380}
381
382struct SpawnedLocalCommand {
383 child: Child,
384 stdout_handle: Option<thread::JoinHandle<anyhow::Result<String>>>,
385}
386
387impl SpawnedLocalCommand {
388 fn wait_for_completion(mut self) -> anyhow::Result<(ExitStatus, Option<String>)> {
389 let status = self.child.wait()?;
390 let captured_stdout = self.join_stdout_handle()?;
391 Ok((status, captured_stdout))
392 }
393
394 fn wait_for_completion_or_cancellation(
395 mut self,
396 context: &TaskContext,
397 command: &str,
398 ) -> anyhow::Result<(ExitStatus, Option<String>)> {
399 loop {
400 if let Some(status) = self.child.try_wait()? {
401 let captured_stdout = self.join_stdout_handle()?;
402 return Ok((status, captured_stdout));
403 }
404
405 if context.cancellation_requested() {
406 self.kill_for_cancellation()?;
407 let _ = self.child.wait()?;
408 let _ = self.join_stdout_handle()?;
409 anyhow::bail!("Command cancelled - {}", command);
410 }
411
412 thread::sleep(std::time::Duration::from_millis(25));
413 }
414 }
415
416 fn join_stdout_handle(&mut self) -> anyhow::Result<Option<String>> {
417 self
418 .stdout_handle
419 .take()
420 .map(|handle| {
421 handle
422 .join()
423 .map_err(|_| anyhow::anyhow!("Failed to join stdout capture thread"))?
424 })
425 .transpose()
426 }
427
428 #[cfg(unix)]
429 fn wait_for_completion_or_retrigger(mut self) -> anyhow::Result<CommandOutcome> {
430 loop {
431 if let Some(status) = self.child.try_wait()? {
432 let captured_stdout = self.join_stdout_handle()?;
433 return Ok(CommandOutcome::Completed {
434 status,
435 captured_stdout,
436 });
437 }
438
439 match read_control_byte(Duration::from_millis(100))? {
440 Some(b'R' | b'r') => {
441 self.kill_for_cancellation()?;
442 let _ = self.child.wait()?;
443 let _ = self.join_stdout_handle()?;
444 drain_retrigger_input()?;
445 return Ok(CommandOutcome::RestartRequested);
446 },
447 Some(3) => {
448 self.kill_for_cancellation()?;
449 let _ = self.child.wait()?;
450 let _ = self.join_stdout_handle()?;
451 drain_retrigger_input()?;
452 return Ok(CommandOutcome::Interrupted);
453 },
454 _ => {},
455 }
456 }
457 }
458
459 #[cfg(unix)]
460 fn kill_for_cancellation(&mut self) -> anyhow::Result<()> {
461 let pid = self.child.id() as i32;
462 let kill_result = unsafe { libc::killpg(pid, libc::SIGKILL) };
463 if kill_result == 0 {
464 return Ok(());
465 }
466
467 let error = std::io::Error::last_os_error();
468 let raw_error = error.raw_os_error();
469 if raw_error == Some(libc::ESRCH) || raw_error == Some(libc::EPERM) {
470 match self.child.kill() {
471 Ok(()) => return Ok(()),
472 Err(child_error) if child_error.kind() == std::io::ErrorKind::InvalidInput => return Ok(()),
473 Err(child_error) => return Err(child_error.into()),
474 }
475 }
476
477 Err(error.into())
478 }
479
480 #[cfg(not(unix))]
481 fn kill_for_cancellation(&mut self) -> anyhow::Result<()> {
482 match self.child.kill() {
483 Ok(()) => Ok(()),
484 Err(error) if error.kind() == std::io::ErrorKind::InvalidInput => Ok(()),
485 Err(error) => Err(error.into()),
486 }
487 }
488}
489
490#[cfg(unix)]
491enum CommandOutcome {
492 Completed {
493 status: ExitStatus,
494 captured_stdout: Option<String>,
495 },
496 RestartRequested,
497 Interrupted,
498}
499
500#[cfg(unix)]
501struct RawModeGuard {
502 fd: std::os::fd::RawFd,
503 original: libc::termios,
504}
505
506#[cfg(unix)]
507impl RawModeGuard {
508 fn acquire() -> anyhow::Result<Self> {
509 let fd = std::io::stdin().as_raw_fd();
510 let mut original = std::mem::MaybeUninit::<libc::termios>::uninit();
511 let get_attr_result = unsafe { libc::tcgetattr(fd, original.as_mut_ptr()) };
512 if get_attr_result != 0 {
513 return Err(std::io::Error::last_os_error().into());
514 }
515
516 let original = unsafe { original.assume_init() };
517 let mut raw = original;
518 raw.c_lflag &= !(libc::ICANON | libc::ECHO | libc::ISIG);
519 raw.c_cc[libc::VMIN] = 0;
520 raw.c_cc[libc::VTIME] = 0;
521
522 let set_attr_result = unsafe { libc::tcsetattr(fd, libc::TCSANOW, &raw) };
523 if set_attr_result != 0 {
524 return Err(std::io::Error::last_os_error().into());
525 }
526
527 Ok(Self { fd, original })
528 }
529}
530
531#[cfg(unix)]
532impl Drop for RawModeGuard {
533 fn drop(&mut self) {
534 let _ = unsafe { libc::tcsetattr(self.fd, libc::TCSANOW, &self.original) };
535 }
536}
537
538#[cfg(unix)]
539fn read_control_byte(timeout: Duration) -> anyhow::Result<Option<u8>> {
540 let fd = std::io::stdin().as_raw_fd();
541 let timeout_ms = timeout.as_millis().min(libc::c_int::MAX as u128) as libc::c_int;
542 let mut poll_fd = libc::pollfd {
543 fd,
544 events: libc::POLLIN,
545 revents: 0,
546 };
547
548 let poll_result = unsafe { libc::poll(&mut poll_fd, 1, timeout_ms) };
549 if poll_result < 0 {
550 return Err(std::io::Error::last_os_error().into());
551 }
552 if poll_result == 0 || poll_fd.revents & libc::POLLIN == 0 {
553 return Ok(None);
554 }
555
556 let mut byte = [0_u8; 1];
557 let read_result = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) };
558 if read_result < 0 {
559 return Err(std::io::Error::last_os_error().into());
560 }
561 if read_result == 0 {
562 return Ok(None);
563 }
564
565 Ok(Some(byte[0]))
566}
567
568#[cfg(unix)]
569fn drain_retrigger_input() -> anyhow::Result<()> {
570 while read_control_byte(Duration::ZERO)?.is_some() {}
571 Ok(())
572}
573
574#[cfg(test)]
575mod test {
576 use super::*;
577
578 #[test]
579 fn test_local_run_1() -> anyhow::Result<()> {
580 {
581 let yaml = "
582 command: echo 'Hello, World!'
583 ignore_errors: false
584 verbose: false
585 ";
586 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
587
588 assert_eq!(local_run.command, "echo 'Hello, World!'");
589 assert_eq!(local_run.work_dir, None);
590 assert_eq!(local_run.ignore_errors, Some(false));
591 assert_eq!(local_run.verbose, Some(false));
592 assert_eq!(local_run.retrigger, None);
593 assert_eq!(local_run.save_output_as, None);
594
595 Ok(())
596 }
597 }
598
599 #[test]
600 fn test_local_run_2() -> anyhow::Result<()> {
601 {
602 let yaml = "
603 command: echo 'Hello, World!'
604 test: test $(uname) = 'Linux'
605 ignore_errors: false
606 verbose: false
607 ";
608 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
609
610 assert_eq!(local_run.command, "echo 'Hello, World!'");
611 assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
612 assert_eq!(local_run.work_dir, None);
613 assert_eq!(local_run.ignore_errors, Some(false));
614 assert_eq!(local_run.verbose, Some(false));
615 assert_eq!(local_run.save_output_as, None);
616
617 Ok(())
618 }
619 }
620
621 #[test]
622 fn test_local_run_3() -> anyhow::Result<()> {
623 {
624 let yaml = "
625 command: echo 'Hello, World!'
626 test: test $(uname) = 'Linux'
627 shell: bash
628 ignore_errors: false
629 verbose: false
630 interactive: true
631 ";
632 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
633
634 assert_eq!(local_run.command, "echo 'Hello, World!'");
635 assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
636 assert_eq!(local_run.shell, Some(Shell::String("bash".to_string())));
637 assert_eq!(local_run.work_dir, None);
638 assert_eq!(local_run.ignore_errors, Some(false));
639 assert_eq!(local_run.verbose, Some(false));
640 assert_eq!(local_run.interactive, Some(true));
641 assert_eq!(local_run.retrigger, None);
642 assert_eq!(local_run.save_output_as, None);
643
644 Ok(())
645 }
646 }
647
648 #[test]
649 fn test_local_run_4() -> anyhow::Result<()> {
650 let yaml = "
651 command: go run .
652 retrigger: true
653 ";
654 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
655
656 assert_eq!(local_run.command, "go run .");
657 assert_eq!(local_run.retrigger, Some(true));
658 assert!(!local_run.interactive_enabled());
659 assert!(!local_run.is_parallel_safe());
660
661 Ok(())
662 }
663
664 #[test]
665 fn test_local_run_5_rejects_interactive_retrigger_combo_at_execution() {
666 let yaml = "
667 command: cat
668 interactive: true
669 retrigger: true
670 ";
671 let local_run = serde_yaml::from_str::<LocalRun>(yaml).expect("valid local run");
672 let context = TaskContext::empty();
673
674 let error = local_run
675 .execute(&context)
676 .expect_err("expected execution to fail");
677 assert!(error
678 .to_string()
679 .contains("retrigger is only supported for non-interactive local commands"));
680 }
681}