1use std::io::{
2 BufRead as _,
3 BufReader,
4 IsTerminal as _,
5};
6use std::process::{
7 Child,
8 ExitStatus,
9 Stdio,
10};
11use std::thread;
12use std::time::Duration;
13
14use anyhow::Context as _;
15use console::Term;
16use indicatif::ProgressDrawTarget;
17use serde::Deserialize;
18
19#[cfg(unix)]
20use std::os::fd::AsRawFd as _;
21#[cfg(unix)]
22use std::os::unix::process::CommandExt as _;
23
24use crate::defaults::{
25 default_ignore_errors,
26 default_verbose,
27};
28use crate::handle_output;
29use crate::schema::{
30 get_output_handler,
31 interpolate_template_string,
32 ExecutionInterrupted,
33 Shell,
34 TaskContext,
35};
36
37#[derive(Debug, Deserialize, Clone)]
38pub struct LocalRun {
39 pub command: String,
41
42 #[serde(default)]
44 pub shell: Option<Shell>,
45
46 #[serde(default)]
49 pub test: Option<String>,
50
51 #[serde(default)]
53 pub work_dir: Option<String>,
54
55 #[serde(default)]
58 pub interactive: Option<bool>,
59
60 #[serde(default)]
62 pub retrigger: Option<bool>,
63
64 #[serde(default)]
66 pub ignore_errors: Option<bool>,
67
68 #[serde(default)]
70 pub save_output_as: Option<String>,
71
72 #[serde(default)]
74 pub verbose: Option<bool>,
75}
76
77impl LocalRun {
78 pub fn execute(&self, context: &TaskContext) -> anyhow::Result<()> {
79 assert!(!self.command.is_empty());
80
81 let command = interpolate_template_string(&self.command, context)?;
82 let interactive = self.interactive_enabled();
83 let retrigger = self.retrigger_enabled();
84 if interactive && retrigger {
85 anyhow::bail!("retrigger is only supported for non-interactive local commands");
86 }
87 let ignore_errors = self.ignore_errors(context);
88 let capture_output = self.save_output_as.is_some();
89 let verbose = interactive || self.verbose(context);
93
94 if self.test(context).is_err() {
96 return Ok(());
97 }
98
99 if retrigger {
100 return self.execute_with_retrigger(context, &command, ignore_errors, capture_output, verbose);
101 }
102
103 let (status, captured_stdout) = self
104 .spawn_command(context, &command, capture_output, verbose, interactive)?
105 .wait_for_completion()?;
106 self.finish_execution(context, &command, status, captured_stdout, ignore_errors)
107 }
108
109 fn spawn_command(
110 &self,
111 context: &TaskContext,
112 command: &str,
113 capture_output: bool,
114 verbose: bool,
115 interactive: bool,
116 ) -> anyhow::Result<SpawnedLocalCommand> {
117 let mut cmd = self
118 .shell
119 .as_ref()
120 .map(|shell| shell.proc())
121 .unwrap_or_else(|| context.shell().proc());
122
123 cmd.arg(command);
124
125 if capture_output {
126 cmd.stdout(Stdio::piped());
127 if interactive {
128 context.multi.set_draw_target(ProgressDrawTarget::hidden());
129 cmd.stdin(Stdio::inherit()).stderr(Stdio::inherit());
130 } else {
131 cmd.stderr(get_output_handler(verbose));
132 }
133 } else if verbose {
134 if interactive {
135 context.multi.set_draw_target(ProgressDrawTarget::hidden());
136
137 cmd
138 .stdin(Stdio::inherit())
139 .stdout(Stdio::inherit())
140 .stderr(Stdio::inherit());
141 } else {
142 let stdout = get_output_handler(verbose);
143 let stderr = get_output_handler(verbose);
144 cmd.stdout(stdout).stderr(stderr);
145 }
146 }
147
148 if let Some(work_dir) = self.resolved_work_dir(context) {
149 cmd.current_dir(work_dir);
150 }
151
152 #[cfg(unix)]
153 if self.retrigger_enabled() && !interactive {
154 unsafe {
155 cmd.pre_exec(|| {
156 if libc::setpgid(0, 0) != 0 {
157 return Err(std::io::Error::last_os_error());
158 }
159 Ok(())
160 });
161 }
162 }
163
164 for (key, value) in context.env_vars.iter() {
166 cmd.env(key, value);
167 }
168
169 let mut child = cmd.spawn()?;
170 let stdout_handle = if capture_output {
171 let stdout = child.stdout.take().context("Failed to open stdout")?;
172 let multi = context.multi.clone();
173 Some(thread::spawn(move || -> anyhow::Result<String> {
174 let reader = BufReader::new(stdout);
175 let mut output = String::new();
176 for line in reader.lines() {
177 let line = line?;
178 if verbose {
179 let _ = multi.println(line.clone());
180 }
181 output.push_str(&line);
182 output.push('\n');
183 }
184 Ok(output.trim_end_matches(['\r', '\n']).to_string())
185 }))
186 } else {
187 None
188 };
189
190 if verbose && !interactive && !capture_output {
191 handle_output!(child.stdout, context);
192 handle_output!(child.stderr, context);
193 } else if verbose && !interactive && capture_output {
194 handle_output!(child.stderr, context);
195 }
196
197 Ok(SpawnedLocalCommand { child, stdout_handle })
198 }
199
200 fn finish_execution(
201 &self,
202 context: &TaskContext,
203 command: &str,
204 status: ExitStatus,
205 captured_stdout: Option<String>,
206 ignore_errors: bool,
207 ) -> anyhow::Result<()> {
208 if !status.success() && !ignore_errors {
209 anyhow::bail!("Command failed - {}", command);
210 }
211
212 if status.success() {
213 if let (Some(output_name), Some(output_value)) = (&self.save_output_as, captured_stdout) {
214 context.insert_task_output(output_name.clone(), output_value)?;
215 }
216 }
217
218 Ok(())
219 }
220
221 fn execute_with_retrigger(
222 &self,
223 context: &TaskContext,
224 command: &str,
225 ignore_errors: bool,
226 capture_output: bool,
227 verbose: bool,
228 ) -> anyhow::Result<()> {
229 if !std::io::stdin().is_terminal() || context.json_events {
230 return self.execute_without_retrigger(
231 context,
232 command,
233 ignore_errors,
234 capture_output,
235 verbose,
236 "Manual retrigger requires an attached terminal and is disabled for `--json-events`.",
237 );
238 }
239
240 #[cfg(not(unix))]
241 {
242 return self.execute_without_retrigger(
243 context,
244 command,
245 ignore_errors,
246 capture_output,
247 verbose,
248 "Manual retrigger is currently supported on Unix terminals only.",
249 );
250 }
251
252 #[cfg(unix)]
253 {
254 let _raw_mode = RawModeGuard::acquire()?;
255 let term = Term::stderr();
256 let _ = term.write_line("Press R or r to restart the running command.");
257 drain_retrigger_input()?;
258
259 loop {
260 let spawned = self.spawn_command(context, command, capture_output, verbose, false)?;
261 match spawned.wait_for_completion_or_retrigger() {
262 Ok(CommandOutcome::Completed {
263 status,
264 captured_stdout,
265 }) => {
266 return self.finish_execution(context, command, status, captured_stdout, ignore_errors);
267 },
268 Ok(CommandOutcome::RestartRequested) => {
269 let _ = term.write_line("Restarting command...");
270 },
271 Ok(CommandOutcome::Interrupted) => {
272 return Err(ExecutionInterrupted.into());
273 },
274 Err(error) => return Err(error),
275 }
276 }
277 }
278 }
279
280 fn execute_without_retrigger(
281 &self,
282 context: &TaskContext,
283 command: &str,
284 ignore_errors: bool,
285 capture_output: bool,
286 verbose: bool,
287 reason: &str,
288 ) -> anyhow::Result<()> {
289 if !context.json_events {
290 let _ = context.multi.println(reason);
291 }
292 let (status, captured_stdout) = self
293 .spawn_command(context, command, capture_output, verbose, false)?
294 .wait_for_completion()?;
295 self.finish_execution(context, command, status, captured_stdout, ignore_errors)
296 }
297
298 pub fn is_parallel_safe(&self) -> bool {
301 !self.interactive_enabled() && !self.retrigger_enabled()
302 }
303
304 pub fn interactive_enabled(&self) -> bool {
305 self.interactive.unwrap_or(false)
306 }
307
308 pub fn retrigger_enabled(&self) -> bool {
309 self.retrigger.unwrap_or(false)
310 }
311
312 fn test(&self, context: &TaskContext) -> anyhow::Result<()> {
313 let verbose = self.verbose(context);
314
315 let stdout = get_output_handler(verbose);
316 let stderr = get_output_handler(verbose);
317
318 if let Some(test) = &self.test {
319 let test = interpolate_template_string(test, context)?;
320 let mut cmd = self
321 .shell
322 .as_ref()
323 .map(|shell| shell.proc())
324 .unwrap_or_else(|| context.shell().proc());
325 cmd.arg(&test).stdout(stdout).stderr(stderr);
326
327 if let Some(work_dir) = self.resolved_work_dir(context) {
328 cmd.current_dir(work_dir);
329 }
330
331 let mut cmd = cmd.spawn()?;
332 if verbose {
333 handle_output!(cmd.stdout, context);
334 handle_output!(cmd.stderr, context);
335 }
336
337 let status = cmd.wait()?;
338
339 log::trace!("Test status: {:?}", status.success());
340 if !status.success() {
341 anyhow::bail!("Command test failed - {}", test);
342 }
343 }
344
345 Ok(())
346 }
347
348 fn ignore_errors(&self, context: &TaskContext) -> bool {
349 self
350 .ignore_errors
351 .or(context.ignore_errors)
352 .unwrap_or(default_ignore_errors())
353 }
354
355 fn verbose(&self, context: &TaskContext) -> bool {
356 self.verbose.or(context.verbose).unwrap_or(default_verbose())
357 }
358
359 pub fn resolved_work_dir(&self, context: &TaskContext) -> Option<std::path::PathBuf> {
360 self
361 .work_dir
362 .as_ref()
363 .map(|work_dir| context.resolve_from_config(work_dir))
364 }
365}
366
367struct SpawnedLocalCommand {
368 child: Child,
369 stdout_handle: Option<thread::JoinHandle<anyhow::Result<String>>>,
370}
371
372impl SpawnedLocalCommand {
373 fn wait_for_completion(mut self) -> anyhow::Result<(ExitStatus, Option<String>)> {
374 let status = self.child.wait()?;
375 let captured_stdout = self.join_stdout_handle()?;
376 Ok((status, captured_stdout))
377 }
378
379 fn join_stdout_handle(&mut self) -> anyhow::Result<Option<String>> {
380 self
381 .stdout_handle
382 .take()
383 .map(|handle| {
384 handle
385 .join()
386 .map_err(|_| anyhow::anyhow!("Failed to join stdout capture thread"))?
387 })
388 .transpose()
389 }
390
391 #[cfg(unix)]
392 fn wait_for_completion_or_retrigger(mut self) -> anyhow::Result<CommandOutcome> {
393 loop {
394 if let Some(status) = self.child.try_wait()? {
395 let captured_stdout = self.join_stdout_handle()?;
396 return Ok(CommandOutcome::Completed {
397 status,
398 captured_stdout,
399 });
400 }
401
402 match read_control_byte(Duration::from_millis(100))? {
403 Some(b'R' | b'r') => {
404 self.kill_for_restart()?;
405 let _ = self.child.wait()?;
406 let _ = self.join_stdout_handle()?;
407 drain_retrigger_input()?;
408 return Ok(CommandOutcome::RestartRequested);
409 },
410 Some(3) => {
411 self.kill_for_restart()?;
412 let _ = self.child.wait()?;
413 let _ = self.join_stdout_handle()?;
414 drain_retrigger_input()?;
415 return Ok(CommandOutcome::Interrupted);
416 },
417 _ => {},
418 }
419 }
420 }
421
422 #[cfg(unix)]
423 fn kill_for_restart(&mut self) -> anyhow::Result<()> {
424 let pid = self.child.id() as i32;
425 let kill_result = unsafe { libc::killpg(pid, libc::SIGKILL) };
426 if kill_result == 0 {
427 return Ok(());
428 }
429
430 let error = std::io::Error::last_os_error();
431 let raw_error = error.raw_os_error();
432 if raw_error == Some(libc::ESRCH) || raw_error == Some(libc::EPERM) {
433 match self.child.kill() {
434 Ok(()) => return Ok(()),
435 Err(child_error) if child_error.kind() == std::io::ErrorKind::InvalidInput => return Ok(()),
436 Err(child_error) => return Err(child_error.into()),
437 }
438 }
439
440 Err(error.into())
441 }
442}
443
444#[cfg(unix)]
445enum CommandOutcome {
446 Completed {
447 status: ExitStatus,
448 captured_stdout: Option<String>,
449 },
450 RestartRequested,
451 Interrupted,
452}
453
454#[cfg(unix)]
455struct RawModeGuard {
456 fd: std::os::fd::RawFd,
457 original: libc::termios,
458}
459
460#[cfg(unix)]
461impl RawModeGuard {
462 fn acquire() -> anyhow::Result<Self> {
463 let fd = std::io::stdin().as_raw_fd();
464 let mut original = std::mem::MaybeUninit::<libc::termios>::uninit();
465 let get_attr_result = unsafe { libc::tcgetattr(fd, original.as_mut_ptr()) };
466 if get_attr_result != 0 {
467 return Err(std::io::Error::last_os_error().into());
468 }
469
470 let original = unsafe { original.assume_init() };
471 let mut raw = original;
472 raw.c_lflag &= !(libc::ICANON | libc::ECHO | libc::ISIG);
473 raw.c_cc[libc::VMIN] = 0;
474 raw.c_cc[libc::VTIME] = 0;
475
476 let set_attr_result = unsafe { libc::tcsetattr(fd, libc::TCSANOW, &raw) };
477 if set_attr_result != 0 {
478 return Err(std::io::Error::last_os_error().into());
479 }
480
481 Ok(Self { fd, original })
482 }
483}
484
485#[cfg(unix)]
486impl Drop for RawModeGuard {
487 fn drop(&mut self) {
488 let _ = unsafe { libc::tcsetattr(self.fd, libc::TCSANOW, &self.original) };
489 }
490}
491
492#[cfg(unix)]
493fn read_control_byte(timeout: Duration) -> anyhow::Result<Option<u8>> {
494 let fd = std::io::stdin().as_raw_fd();
495 let timeout_ms = timeout.as_millis().min(libc::c_int::MAX as u128) as libc::c_int;
496 let mut poll_fd = libc::pollfd {
497 fd,
498 events: libc::POLLIN,
499 revents: 0,
500 };
501
502 let poll_result = unsafe { libc::poll(&mut poll_fd, 1, timeout_ms) };
503 if poll_result < 0 {
504 return Err(std::io::Error::last_os_error().into());
505 }
506 if poll_result == 0 || poll_fd.revents & libc::POLLIN == 0 {
507 return Ok(None);
508 }
509
510 let mut byte = [0_u8; 1];
511 let read_result = unsafe { libc::read(fd, byte.as_mut_ptr().cast(), 1) };
512 if read_result < 0 {
513 return Err(std::io::Error::last_os_error().into());
514 }
515 if read_result == 0 {
516 return Ok(None);
517 }
518
519 Ok(Some(byte[0]))
520}
521
522#[cfg(unix)]
523fn drain_retrigger_input() -> anyhow::Result<()> {
524 while read_control_byte(Duration::ZERO)?.is_some() {}
525 Ok(())
526}
527
528#[cfg(test)]
529mod test {
530 use super::*;
531
532 #[test]
533 fn test_local_run_1() -> anyhow::Result<()> {
534 {
535 let yaml = "
536 command: echo 'Hello, World!'
537 ignore_errors: false
538 verbose: false
539 ";
540 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
541
542 assert_eq!(local_run.command, "echo 'Hello, World!'");
543 assert_eq!(local_run.work_dir, None);
544 assert_eq!(local_run.ignore_errors, Some(false));
545 assert_eq!(local_run.verbose, Some(false));
546 assert_eq!(local_run.retrigger, None);
547 assert_eq!(local_run.save_output_as, None);
548
549 Ok(())
550 }
551 }
552
553 #[test]
554 fn test_local_run_2() -> anyhow::Result<()> {
555 {
556 let yaml = "
557 command: echo 'Hello, World!'
558 test: test $(uname) = 'Linux'
559 ignore_errors: false
560 verbose: false
561 ";
562 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
563
564 assert_eq!(local_run.command, "echo 'Hello, World!'");
565 assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
566 assert_eq!(local_run.work_dir, None);
567 assert_eq!(local_run.ignore_errors, Some(false));
568 assert_eq!(local_run.verbose, Some(false));
569 assert_eq!(local_run.save_output_as, None);
570
571 Ok(())
572 }
573 }
574
575 #[test]
576 fn test_local_run_3() -> anyhow::Result<()> {
577 {
578 let yaml = "
579 command: echo 'Hello, World!'
580 test: test $(uname) = 'Linux'
581 shell: bash
582 ignore_errors: false
583 verbose: false
584 interactive: true
585 ";
586 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
587
588 assert_eq!(local_run.command, "echo 'Hello, World!'");
589 assert_eq!(local_run.test, Some("test $(uname) = 'Linux'".to_string()));
590 assert_eq!(local_run.shell, Some(Shell::String("bash".to_string())));
591 assert_eq!(local_run.work_dir, None);
592 assert_eq!(local_run.ignore_errors, Some(false));
593 assert_eq!(local_run.verbose, Some(false));
594 assert_eq!(local_run.interactive, Some(true));
595 assert_eq!(local_run.retrigger, None);
596 assert_eq!(local_run.save_output_as, None);
597
598 Ok(())
599 }
600 }
601
602 #[test]
603 fn test_local_run_4() -> anyhow::Result<()> {
604 let yaml = "
605 command: go run .
606 retrigger: true
607 ";
608 let local_run = serde_yaml::from_str::<LocalRun>(yaml)?;
609
610 assert_eq!(local_run.command, "go run .");
611 assert_eq!(local_run.retrigger, Some(true));
612 assert!(!local_run.interactive_enabled());
613 assert!(!local_run.is_parallel_safe());
614
615 Ok(())
616 }
617
618 #[test]
619 fn test_local_run_5_rejects_interactive_retrigger_combo_at_execution() {
620 let yaml = "
621 command: cat
622 interactive: true
623 retrigger: true
624 ";
625 let local_run = serde_yaml::from_str::<LocalRun>(yaml).expect("valid local run");
626 let context = TaskContext::empty();
627
628 let error = local_run
629 .execute(&context)
630 .expect_err("expected execution to fail");
631 assert!(error
632 .to_string()
633 .contains("retrigger is only supported for non-interactive local commands"));
634 }
635}