1use std::io::{
2 BufRead as _,
3 BufReader,
4 IsTerminal as _,
5};
6use std::process::{
7 Child,
8 ExitStatus,
9 Stdio,
10};
11use std::thread;
12#[cfg(unix)]
13use std::time::Duration;
14
15use anyhow::Context as _;
16#[cfg(unix)]
17use console::Term;
18use indicatif::ProgressDrawTarget;
19use schemars::JsonSchema;
20use serde::Deserialize;
21
22#[cfg(unix)]
23use std::os::fd::AsRawFd as _;
24#[cfg(unix)]
25use std::os::unix::process::CommandExt as _;
26
27use crate::defaults::{
28 default_ignore_errors,
29 default_verbose,
30};
31use crate::handle_output;
32#[cfg(unix)]
33use crate::schema::ExecutionInterrupted;
34use crate::schema::{
35 get_output_handler,
36 interpolate_template_string,
37 Shell,
38 TaskContext,
39};
40
41#[derive(Debug, Deserialize, Clone, JsonSchema)]
42pub struct LocalRun {
43 pub command: String,
45
46 #[serde(default)]
48 pub shell: Option<Shell>,
49
50 #[serde(default)]
53 pub test: Option<String>,
54
55 #[serde(default)]
57 pub work_dir: Option<String>,
58
59 #[serde(default)]
62 pub interactive: Option<bool>,
63
64 #[serde(default)]
66 pub retrigger: Option<bool>,
67
68 #[serde(default)]
70 pub ignore_errors: Option<bool>,
71
72 #[serde(default)]
74 pub save_output_as: Option<String>,
75
76 #[serde(default)]
78 pub verbose: Option<bool>,
79}
80
81impl LocalRun {
82 pub fn execute(&self, context: &TaskContext) -> anyhow::Result<()> {
83 assert!(!self.command.is_empty());
84
85 let command = interpolate_template_string(&self.command, context)?;
86 let interactive = self.interactive_enabled();
87 let retrigger = self.retrigger_enabled();
88 if interactive && retrigger {
89 anyhow::bail!("retrigger is only supported for non-interactive local commands");
90 }
91 let ignore_errors = self.ignore_errors(context);
92 let capture_output = self.save_output_as.is_some();
93 let verbose = interactive || self.verbose(context);
97
98 if self.test(context).is_err() {
100 return Ok(());
101 }
102
103 if retrigger {
104 return self.execute_with_retrigger(context, &command, ignore_errors, capture_output, verbose);
105 }
106
107 let (status, captured_stdout) = self
108 .spawn_command(context, &command, capture_output, verbose, interactive)?
109 .wait_for_completion()?;
110 self.finish_execution(context, &command, status, captured_stdout, ignore_errors)
111 }
112
113 fn spawn_command(
114 &self,
115 context: &TaskContext,
116 command: &str,
117 capture_output: bool,
118 verbose: bool,
119 interactive: bool,
120 ) -> anyhow::Result<SpawnedLocalCommand> {
121 let mut cmd = self
122 .shell
123 .as_ref()
124 .map(|shell| shell.proc())
125 .unwrap_or_else(|| context.shell().proc());
126
127 cmd.arg(command);
128
129 if capture_output {
130 cmd.stdout(Stdio::piped());
131 if interactive {
132 context.multi.set_draw_target(ProgressDrawTarget::hidden());
133 cmd.stdin(Stdio::inherit()).stderr(Stdio::inherit());
134 } else {
135 cmd.stderr(get_output_handler(verbose));
136 }
137 } else if verbose {
138 if interactive {
139 context.multi.set_draw_target(ProgressDrawTarget::hidden());
140
141 cmd
142 .stdin(Stdio::inherit())
143 .stdout(Stdio::inherit())
144 .stderr(Stdio::inherit());
145 } else {
146 let stdout = get_output_handler(verbose);
147 let stderr = get_output_handler(verbose);
148 cmd.stdout(stdout).stderr(stderr);
149 }
150 }
151
152 if let Some(work_dir) = self.resolved_work_dir(context) {
153 cmd.current_dir(work_dir);
154 }
155
156 #[cfg(unix)]
157 if self.retrigger_enabled() && !interactive {
158 unsafe {
159 cmd.pre_exec(|| {
160 if libc::setpgid(0, 0) != 0 {
161 return Err(std::io::Error::last_os_error());
162 }
163 Ok(())
164 });
165 }
166 }
167
168 for (key, value) in context.env_vars.iter() {
170 cmd.env(key, value);
171 }
172
173 let mut child = cmd.spawn()?;
174 let stdout_handle = if capture_output {
175 let stdout = child.stdout.take().context("Failed to open stdout")?;
176 let multi = context.multi.clone();
177 Some(thread::spawn(move || -> anyhow::Result<String> {
178 let reader = BufReader::new(stdout);
179 let mut output = String::new();
180 for line in reader.lines() {
181 let line = line?;
182 if verbose {
183 let _ = multi.println(line.clone());
184 }
185 output.push_str(&line);
186 output.push('\n');
187 }
188 Ok(output.trim_end_matches(['\r', '\n']).to_string())
189 }))
190 } else {
191 None
192 };
193
194 if verbose && !interactive && !capture_output {
195 handle_output!(child.stdout, context);
196 handle_output!(child.stderr, context);
197 } else if verbose && !interactive && capture_output {
198 handle_output!(child.stderr, context);
199 }
200
201 Ok(SpawnedLocalCommand { child, stdout_handle })
202 }
203
204 fn finish_execution(
205 &self,
206 context: &TaskContext,
207 command: &str,
208 status: ExitStatus,
209 captured_stdout: Option<String>,
210 ignore_errors: bool,
211 ) -> anyhow::Result<()> {
212 if !status.success() && !ignore_errors {
213 anyhow::bail!("Command failed - {}", command);
214 }
215
216 if status.success() {
217 if let (Some(output_name), Some(output_value)) = (&self.save_output_as, captured_stdout) {
218 context.insert_task_output(output_name.clone(), output_value)?;
219 }
220 }
221
222 Ok(())
223 }
224
225 fn execute_with_retrigger(
226 &self,
227 context: &TaskContext,
228 command: &str,
229 ignore_errors: bool,
230 capture_output: bool,
231 verbose: bool,
232 ) -> anyhow::Result<()> {
233 if !std::io::stdin().is_terminal() || context.json_events {
234 return self.execute_without_retrigger(
235 context,
236 command,
237 ignore_errors,
238 capture_output,
239 verbose,
240 "Manual retrigger requires an attached terminal and is disabled for `--json-events`.",
241 );
242 }
243
244 #[cfg(not(unix))]
245 {
246 return self.execute_without_retrigger(
247 context,
248 command,
249 ignore_errors,
250 capture_output,
251 verbose,
252 "Manual retrigger is currently supported on Unix terminals only.",
253 );
254 }
255
256 #[cfg(unix)]
257 {
258 let _raw_mode = RawModeGuard::acquire()?;
259 let term = Term::stderr();
260 let _ = term.write_line("Press R or r to restart the running command.");
261 drain_retrigger_input()?;
262
263 loop {
264 let spawned = self.spawn_command(context, command, capture_output, verbose, false)?;
265 match spawned.wait_for_completion_or_retrigger() {
266 Ok(CommandOutcome::Completed {
267 status,
268 captured_stdout,
269 }) => {
270 return self.finish_execution(context, command, status, captured_stdout, ignore_errors);
271 },
272 Ok(CommandOutcome::RestartRequested) => {
273 let _ = term.write_line("Restarting command...");
274 },
275 Ok(CommandOutcome::Interrupted) => {
276 return Err(ExecutionInterrupted.into());
277 },
278 Err(error) => return Err(error),
279 }
280 }
281 }
282 }
283
284 fn execute_without_retrigger(
285 &self,
286 context: &TaskContext,
287 command: &str,
288 ignore_errors: bool,
289 capture_output: bool,
290 verbose: bool,
291 reason: &str,
292 ) -> anyhow::Result<()> {
293 if !context.json_events {
294 let _ = context.multi.println(reason);
295 }
296 let (status, captured_stdout) = self
297 .spawn_command(context, command, capture_output, verbose, false)?
298 .wait_for_completion()?;
299 self.finish_execution(context, command, status, captured_stdout, ignore_errors)
300 }
301
302 pub fn is_parallel_safe(&self) -> bool {
305 !self.interactive_enabled() && !self.retrigger_enabled()
306 }
307
308 pub fn interactive_enabled(&self) -> bool {
309 self.interactive.unwrap_or(false)
310 }
311
312 pub fn retrigger_enabled(&self) -> bool {
313 self.retrigger.unwrap_or(false)
314 }
315
316 fn test(&self, context: &TaskContext) -> anyhow::Result<()> {
317 let verbose = self.verbose(context);
318
319 let stdout = get_output_handler(verbose);
320 let stderr = get_output_handler(verbose);
321
322 if let Some(test) = &self.test {
323 let test = interpolate_template_string(test, context)?;
324 let mut cmd = self
325 .shell
326 .as_ref()
327 .map(|shell| shell.proc())
328 .unwrap_or_else(|| context.shell().proc());
329 cmd.arg(&test).stdout(stdout).stderr(stderr);
330
331 if let Some(work_dir) = self.resolved_work_dir(context) {
332 cmd.current_dir(work_dir);
333 }
334
335 let mut cmd = cmd.spawn()?;
336 if verbose {
337 handle_output!(cmd.stdout, context);
338 handle_output!(cmd.stderr, context);
339 }
340
341 let status = cmd.wait()?;
342
343 log::trace!("Test status: {:?}", status.success());
344 if !status.success() {
345 anyhow::bail!("Command test failed - {}", test);
346 }
347 }
348
349 Ok(())
350 }
351
352 fn ignore_errors(&self, context: &TaskContext) -> bool {
353 self
354 .ignore_errors
355 .or(context.ignore_errors)
356 .unwrap_or(default_ignore_errors())
357 }
358
359 fn verbose(&self, context: &TaskContext) -> bool {
360 self.verbose.or(context.verbose).unwrap_or(default_verbose())
361 }
362
363 pub fn resolved_work_dir(&self, context: &TaskContext) -> Option<std::path::PathBuf> {
364 self
365 .work_dir
366 .as_ref()
367 .map(|work_dir| context.resolve_from_config(work_dir))
368 }
369}
370
371struct SpawnedLocalCommand {
372 child: Child,
373 stdout_handle: Option<thread::JoinHandle<anyhow::Result<String>>>,
374}
375
376impl SpawnedLocalCommand {
377 fn wait_for_completion(mut self) -> anyhow::Result<(ExitStatus, Option<String>)> {
378 let status = self.child.wait()?;
379 let captured_stdout = self.join_stdout_handle()?;
380 Ok((status, captured_stdout))
381 }
382
383 fn join_stdout_handle(&mut self) -> anyhow::Result<Option<String>> {
384 self
385 .stdout_handle
386 .take()
387 .map(|handle| {
388 handle
389 .join()
390 .map_err(|_| anyhow::anyhow!("Failed to join stdout capture thread"))?
391 })
392 .transpose()
393 }
394
395 #[cfg(unix)]
396 fn wait_for_completion_or_retrigger(mut self) -> anyhow::Result<CommandOutcome> {
397 loop {
398 if let Some(status) = self.child.try_wait()? {
399 let captured_stdout = self.join_stdout_handle()?;
400 return Ok(CommandOutcome::Completed {
401 status,
402 captured_stdout,
403 });
404 }
405
406 match read_control_byte(Duration::from_millis(100))? {
407 Some(b'R' | b'r') => {
408 self.kill_for_restart()?;
409 let _ = self.child.wait()?;
410 let _ = self.join_stdout_handle()?;
411 drain_retrigger_input()?;
412 return Ok(CommandOutcome::RestartRequested);
413 },
414 Some(3) => {
415 self.kill_for_restart()?;
416 let _ = self.child.wait()?;
417 let _ = self.join_stdout_handle()?;
418 drain_retrigger_input()?;
419 return Ok(CommandOutcome::Interrupted);
420 },
421 _ => {},
422 }
423 }
424 }
425
426 #[cfg(unix)]
427 fn kill_for_restart(&mut self) -> anyhow::Result<()> {
428 let pid = self.child.id() as i32;
429 let kill_result = unsafe { libc::killpg(pid, libc::SIGKILL) };
430 if kill_result == 0 {
431 return Ok(());
432 }
433
434 let error = std::io::Error::last_os_error();
435 let raw_error = error.raw_os_error();
436 if raw_error == Some(libc::ESRCH) || raw_error == Some(libc::EPERM) {
437 match self.child.kill() {
438 Ok(()) => return Ok(()),
439 Err(child_error) if child_error.kind() == std::io::ErrorKind::InvalidInput => return Ok(()),
440 Err(child_error) => return Err(child_error.into()),
441 }
442 }
443
444 Err(error.into())
445 }
446}
447
448#[cfg(unix)]
449enum CommandOutcome {
450 Completed {
451 status: ExitStatus,
452 captured_stdout: Option<String>,
453 },
454 RestartRequested,
455 Interrupted,
456}
457
458#[cfg(unix)]
459struct RawModeGuard {
460 fd: std::os::fd::RawFd,
461 original: libc::termios,
462}
463
464#[cfg(unix)]
465impl RawModeGuard {
466 fn acquire() -> anyhow::Result<Self> {
467 let fd = std::io::stdin().as_raw_fd();
468 let mut original = std::mem::MaybeUninit::<libc::termios>::uninit();
469 let get_attr_result = unsafe { libc::tcgetattr(fd, original.as_mut_ptr()) };
470 if get_attr_result != 0 {
471 return Err(std::io::Error::last_os_error().into());
472 }
473
474 let original = unsafe { original.assume_init() };
475 let mut raw = original;
476 raw.c_lflag &= !(libc::ICANON | libc::ECHO | libc::ISIG);
477 raw.c_cc[libc::VMIN] = 0;
478 raw.c_cc[libc::VTIME] = 0;
479
480 let set_attr_result = unsafe { libc::tcsetattr(fd, libc::TCSANOW, &raw) };
481 if set_attr_result != 0 {
482 return Err(std::io::Error::last_os_error().into());
483 }
484
485 Ok(Self { fd, original })
486 }
487}
488
489#[cfg(unix)]
490impl Drop for RawModeGuard {
491 fn drop(&mut self) {
492 let _ = unsafe { libc::tcsetattr(self.fd, libc::TCSANOW, &self.original) };
493 }
494}
495
496#[cfg(unix)]
497fn read_control_byte(timeout: Duration) -> anyhow::Result<Option<u8>> {
498 let fd = std::io::stdin().as_raw_fd();
499 let timeout_ms = timeout.as_millis().min(libc::c_int::MAX as u128) as libc::c_int;
500 let mut poll_fd = libc::pollfd {
501 fd,
502 events: libc::POLLIN,
503 revents: 0,
504 };
505
506 let poll_result = unsafe { libc::poll(&mut poll_fd, 1, timeout_ms) };
507 if poll_result < 0 {
508 return Err(std::io::Error::last_os_error().into());
509 }
510 if poll_result == 0 || poll_fd.revents & libc::POLLIN == 0 {
511 return Ok(None);
512 }
513
514 let mut byte = [0_u8; 1];
515 let read_result = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) };
516 if read_result < 0 {
517 return Err(std::io::Error::last_os_error().into());
518 }
519 if read_result == 0 {
520 return Ok(None);
521 }
522
523 Ok(Some(byte[0]))
524}
525
526#[cfg(unix)]
527fn drain_retrigger_input() -> anyhow::Result<()> {
528 while read_control_byte(Duration::ZERO)?.is_some() {}
529 Ok(())
530}
531
532#[cfg(test)]
533mod test {
534 use super::*;
535
536 #[test]
537 fn test_local_run_1() -> anyhow::Result<()> {
538 {
539 let yaml = "
540 command: echo 'Hello, World!'
541 ignore_errors: false
542 verbose: false
543 ";
544 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
545
546 assert_eq!(local_run.command, "echo 'Hello, World!'");
547 assert_eq!(local_run.work_dir, None);
548 assert_eq!(local_run.ignore_errors, Some(false));
549 assert_eq!(local_run.verbose, Some(false));
550 assert_eq!(local_run.retrigger, None);
551 assert_eq!(local_run.save_output_as, None);
552
553 Ok(())
554 }
555 }
556
557 #[test]
558 fn test_local_run_2() -> anyhow::Result<()> {
559 {
560 let yaml = "
561 command: echo 'Hello, World!'
562 test: test $(uname) = 'Linux'
563 ignore_errors: false
564 verbose: false
565 ";
566 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
567
568 assert_eq!(local_run.command, "echo 'Hello, World!'");
569 assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
570 assert_eq!(local_run.work_dir, None);
571 assert_eq!(local_run.ignore_errors, Some(false));
572 assert_eq!(local_run.verbose, Some(false));
573 assert_eq!(local_run.save_output_as, None);
574
575 Ok(())
576 }
577 }
578
579 #[test]
580 fn test_local_run_3() -> anyhow::Result<()> {
581 {
582 let yaml = "
583 command: echo 'Hello, World!'
584 test: test $(uname) = 'Linux'
585 shell: bash
586 ignore_errors: false
587 verbose: false
588 interactive: true
589 ";
590 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
591
592 assert_eq!(local_run.command, "echo 'Hello, World!'");
593 assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
594 assert_eq!(local_run.shell, Some(Shell::String("bash".to_string())));
595 assert_eq!(local_run.work_dir, None);
596 assert_eq!(local_run.ignore_errors, Some(false));
597 assert_eq!(local_run.verbose, Some(false));
598 assert_eq!(local_run.interactive, Some(true));
599 assert_eq!(local_run.retrigger, None);
600 assert_eq!(local_run.save_output_as, None);
601
602 Ok(())
603 }
604 }
605
606 #[test]
607 fn test_local_run_4() -> anyhow::Result<()> {
608 let yaml = "
609 command: go run .
610 retrigger: true
611 ";
612 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
613
614 assert_eq!(local_run.command, "go run .");
615 assert_eq!(local_run.retrigger, Some(true));
616 assert!(!local_run.interactive_enabled());
617 assert!(!local_run.is_parallel_safe());
618
619 Ok(())
620 }
621
622 #[test]
623 fn test_local_run_5_rejects_interactive_retrigger_combo_at_execution() {
624 let yaml = "
625 command: cat
626 interactive: true
627 retrigger: true
628 ";
629 let local_run = serde_yaml::from_str::<LocalRun>(yaml).expect("valid local run");
630 let context = TaskContext::empty();
631
632 let error = local_run
633 .execute(&context)
634 .expect_err("expected execution to fail");
635 assert!(error
636 .to_string()
637 .contains("retrigger is only supported for non-interactive local commands"));
638 }
639}