1use std::{
2 borrow::Cow,
3 collections::HashMap,
4 env::var,
5 ffi::OsStr,
6 fs::File,
7 io::{IsTerminal, Write},
8 iter::once,
9 process::{ExitCode, Stdio},
10 sync::{
11 atomic::{AtomicBool, AtomicU8, Ordering},
12 Arc,
13 },
14 time::Duration,
15};
16
17use clearscreen::ClearScreen;
18use miette::{IntoDiagnostic, Report, Result};
19use notify_rust::Notification;
20use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
21use tokio::{process::Command as TokioCommand, time::sleep};
22use tracing::{debug, debug_span, error, instrument, trace, trace_span, Instrument};
23use watchexec::{
24 action::ActionHandler,
25 command::{Command, Program, Shell, SpawnOptions},
26 error::RuntimeError,
27 job::{CommandState, Job},
28 sources::fs::Watcher,
29 Config, ErrorHook, Id,
30};
31use watchexec_events::{Event, Keyboard, ProcessEnd, Tag};
32use watchexec_signals::Signal;
33
34use crate::{
35 args::{
36 command::{EnvVar, WrapMode},
37 events::{EmitEvents, OnBusyUpdate, SignalMapping},
38 output::{ClearMode, ColourMode},
39 Args,
40 },
41 emits::events_to_simple_format,
42 socket::Sockets,
43 state::State,
44};
45
46#[derive(Clone, Copy, Debug)]
47struct OutputFlags {
48 quiet: bool,
49 colour: ColorChoice,
50 timings: bool,
51 bell: bool,
52 toast: bool,
53}
54
55pub fn make_config(args: &Args, state: &State) -> Result<Config> {
56 let _span = debug_span!("args-runtime").entered();
57 let config = Config::default();
58 config.on_error(|err: ErrorHook| {
59 if let RuntimeError::IoError {
60 about: "waiting on process group",
61 ..
62 } = err.error
63 {
64 error!("{}", err.error);
67 return;
68 }
69
70 if cfg!(debug_assertions) {
71 eprintln!("[[{:?}]]", err.error);
72 }
73
74 eprintln!("[[Error (not fatal)]]\n{}", Report::new(err.error));
75 });
76
77 config.pathset(args.filtering.paths.clone());
78
79 config.throttle(args.events.debounce.0);
80 config.keyboard_events(args.events.stdin_quit);
81
82 if let Some(interval) = args.events.poll {
83 config.file_watcher(Watcher::Poll(interval.0));
84 }
85
86 let once = args.once;
87 let clear = args.output.screen_clear;
88
89 let emit_events_to = args.events.emit_events_to;
90 let state = state.clone();
91
92 if args.only_emit_events {
93 config.on_action(move |mut action| {
94 if action
96 .signals()
97 .any(|sig| sig == Signal::Terminate || sig == Signal::Interrupt)
98 {
99 action.quit();
101 return action;
102 }
103
104 if let Some(mode) = clear {
106 match mode {
107 ClearMode::Clear => {
108 clearscreen::clear().ok();
109 }
110 ClearMode::Reset => {
111 reset_screen();
112 }
113 }
114 }
115
116 match emit_events_to {
117 EmitEvents::Stdio => {
118 println!(
119 "{}",
120 events_to_simple_format(action.events.as_ref()).unwrap_or_default()
121 );
122 }
123 EmitEvents::JsonStdio => {
124 for event in action.events.iter().filter(|e| !e.is_empty()) {
125 println!("{}", serde_json::to_string(event).unwrap_or_default());
126 }
127 }
128 other => unreachable!(
129 "emit_events_to should have been validated earlier: {:?}",
130 other
131 ),
132 }
133
134 action
135 });
136
137 return Ok(config);
138 }
139
140 let delay_run = args.command.delay_run.map(|ts| ts.0);
141 let on_busy = args.events.on_busy_update;
142 let stdin_quit = args.events.stdin_quit;
143
144 let signal = args.events.signal;
145 let stop_signal = args.command.stop_signal;
146 let stop_timeout = args.command.stop_timeout.0;
147
148 let print_events = args.logging.print_events;
149 let outflags = OutputFlags {
150 quiet: args.output.quiet,
151 colour: match args.output.color {
152 ColourMode::Auto if !std::io::stdin().is_terminal() => ColorChoice::Never,
153 ColourMode::Auto => ColorChoice::Auto,
154 ColourMode::Always => ColorChoice::Always,
155 ColourMode::Never => ColorChoice::Never,
156 },
157 timings: args.output.timings,
158 bell: args.output.bell,
159 toast: args.output.notify,
160 };
161
162 let workdir = Arc::new(args.command.workdir.clone());
163
164 let add_envs: Arc<[EnvVar]> = args.command.env.clone().into();
165 debug!(
166 envs=?args.command.env,
167 "additional environment variables to add to command"
168 );
169
170 let id = Id::default();
171 let command = interpret_command_args(args)?;
172
173 let signal_map: Arc<HashMap<Signal, Option<Signal>>> = Arc::new(
174 args.events
175 .signal_map
176 .iter()
177 .copied()
178 .map(|SignalMapping { from, to }| (from, to))
179 .collect(),
180 );
181
182 let queued = Arc::new(AtomicBool::new(false));
183 let quit_again = Arc::new(AtomicU8::new(0));
184
185 config.on_action_async(move |mut action| {
186 let add_envs = add_envs.clone();
187 let command = command.clone();
188 let state = state.clone();
189 let queued = queued.clone();
190 let quit_again = quit_again.clone();
191 let signal_map = signal_map.clone();
192 let workdir = workdir.clone();
193 Box::new(
194 async move {
195 trace!(events=?action.events, "handling action");
196
197 let add_envs = add_envs.clone();
198 let command = command.clone();
199 let queued = queued.clone();
200 let quit_again = quit_again.clone();
201 let signal_map = signal_map.clone();
202 let workdir = workdir.clone();
203
204 trace!("set spawn hook for workdir and environment variables");
205 let job = action.get_or_create_job(id, move || command.clone());
206 let events = action.events.clone();
207 job.set_spawn_hook({
208 let state = state.clone();
209 move |command, _| {
210 let add_envs = add_envs.clone();
211 let state = state.clone();
212 let events = events.clone();
213
214 if let Some(ref workdir) = workdir.as_ref() {
215 debug!(?workdir, "set command workdir");
216 command.command_mut().current_dir(workdir);
217 }
218
219 if let Some(ref socket_set) = state.socket_set {
220 for env in socket_set.envs() {
221 command.command_mut().env(env.key, env.value);
222 }
223 }
224
225 emit_events_to_command(
226 command.command_mut(),
227 events,
228 state,
229 emit_events_to,
230 add_envs,
231 );
232 }
233 });
234
235 let show_events = {
236 let events = action.events.clone();
237 move || {
238 if print_events {
239 trace!("print events to stderr");
240 for (n, event) in events.iter().enumerate() {
241 eprintln!("[EVENT {n}] {event}");
242 }
243 }
244 }
245 };
246
247 let clear_screen = {
248 let events = action.events.clone();
249 move || {
250 if let Some(mode) = clear {
251 match mode {
252 ClearMode::Clear => {
253 clearscreen::clear().ok();
254 debug!("cleared screen");
255 }
256 ClearMode::Reset => {
257 reset_screen();
258 debug!("hard-reset screen");
259 }
260 }
261 }
262
263 if print_events {
265 trace!("print events to stderr");
266 for (n, event) in events.iter().enumerate() {
267 eprintln!("[EVENT {n}] {event}");
268 }
269 }
270 }
271 };
272
273 let quit = |mut action: ActionHandler| {
274 match quit_again.fetch_add(1, Ordering::Relaxed) {
275 0 => {
276 eprintln!("[Waiting {stop_timeout:?} for processes to exit before stopping...]");
277 action.quit_gracefully(
280 stop_signal.unwrap_or(Signal::Terminate),
281 stop_timeout,
282 );
283 }
284 1 => {
285 action.quit_gracefully(Signal::ForceStop, Duration::ZERO);
286 }
287 _ => {
288 action.quit();
289 }
290 }
291
292 action
293 };
294
295 if once {
296 debug!("debug mode: run once and quit");
297 show_events();
298
299 if let Some(delay) = delay_run {
300 job.run_async(move |_| {
301 Box::new(async move {
302 sleep(delay).await;
303 })
304 });
305 }
306
307 job.start().await;
309 job.to_wait().await;
310 job.run({
311 let state = state.clone();
312 move |context| {
313 if let Some(end) = end_of_process(context.current, outflags) {
314 *state.exit_code.lock().unwrap() = ExitCode::from(
315 end.into_exitstatus()
316 .code()
317 .unwrap_or(0)
318 .try_into()
319 .unwrap_or(1),
320 );
321 }
322 }
323 })
324 .await;
325 return quit(action);
326 }
327
328 let is_keyboard_eof = action
329 .events
330 .iter()
331 .any(|e| e.tags.contains(&Tag::Keyboard(Keyboard::Eof)));
332 if stdin_quit && is_keyboard_eof {
333 debug!("keyboard EOF, quit");
334 show_events();
335 return quit(action);
336 }
337
338 let signals: Vec<Signal> = action.signals().collect();
339 trace!(?signals, "received some signals");
340
341 if (signals.contains(&Signal::Terminate)
343 && !signal_map.contains_key(&Signal::Terminate))
344 || (signals.contains(&Signal::Interrupt)
345 && !signal_map.contains_key(&Signal::Interrupt))
346 {
347 debug!("unmapped terminate or interrupt signal, quit");
348 show_events();
349 return quit(action);
350 }
351
352 for signal in signals {
354 match signal_map.get(&signal) {
355 Some(Some(mapped)) => {
356 debug!(?signal, ?mapped, "passing mapped signal");
357 job.signal(*mapped);
358 }
359 Some(None) => {
360 debug!(?signal, "discarding signal");
361 }
362 None => {
363 debug!(?signal, "passing signal on");
364 job.signal(signal);
365 }
366 }
367 }
368
369 if action.paths().next().is_none()
371 && !action.events.iter().any(watchexec_events::Event::is_empty)
372 {
373 debug!("no filesystem or synthetic events, skip without doing more");
374 show_events();
375 return action;
376 }
377
378 show_events();
379
380 if let Some(delay) = delay_run {
381 trace!("delaying run by sleeping inside the job");
382 job.run_async(move |_| {
383 Box::new(async move {
384 sleep(delay).await;
385 })
386 });
387 }
388
389 trace!("querying job state via run_async");
390 job.run_async({
391 let job = job.clone();
392 move |context| {
393 let job = job.clone();
394 let is_running = matches!(context.current, CommandState::Running { .. });
395 Box::new(async move {
396 let innerjob = job.clone();
397 if is_running {
398 trace!(?on_busy, "job is running, decide what to do");
399 match on_busy {
400 OnBusyUpdate::DoNothing => {}
401 OnBusyUpdate::Signal => {
402 job.signal(if cfg!(windows) {
403 Signal::ForceStop
404 } else {
405 stop_signal.or(signal).unwrap_or(Signal::Terminate)
406 });
407 }
408 OnBusyUpdate::Restart if cfg!(windows) => {
409 job.restart();
410 job.run(move |context| {
411 clear_screen();
412 setup_process(
413 innerjob.clone(),
414 context.command.clone(),
415 outflags,
416 );
417 });
418 }
419 OnBusyUpdate::Restart => {
420 job.restart_with_signal(
421 stop_signal.unwrap_or(Signal::Terminate),
422 stop_timeout,
423 );
424 job.run(move |context| {
425 clear_screen();
426 setup_process(
427 innerjob.clone(),
428 context.command.clone(),
429 outflags,
430 );
431 });
432 }
433 OnBusyUpdate::Queue => {
434 let job = job.clone();
435 let already_queued =
436 queued.fetch_or(true, Ordering::SeqCst);
437 if already_queued {
438 debug!("next start is already queued, do nothing");
439 } else {
440 debug!("queueing next start of job");
441 tokio::spawn({
442 let queued = queued.clone();
443 async move {
444 trace!("waiting for job to finish");
445 job.to_wait().await;
446 trace!("job finished, starting queued");
447 job.start();
448 job.run(move |context| {
449 clear_screen();
450 setup_process(
451 innerjob.clone(),
452 context.command.clone(),
453 outflags,
454 );
455 })
456 .await;
457 trace!("resetting queued state");
458 queued.store(false, Ordering::SeqCst);
459 }
460 });
461 }
462 }
463 }
464 } else {
465 trace!("job is not running, start it");
466 job.start();
467 job.run(move |context| {
468 clear_screen();
469 setup_process(
470 innerjob.clone(),
471 context.command.clone(),
472 outflags,
473 );
474 });
475 }
476 })
477 }
478 });
479
480 action
481 }
482 .instrument(trace_span!("action handler")),
483 )
484 });
485
486 Ok(config)
487}
488
489#[instrument(level = "debug")]
490fn interpret_command_args(args: &Args) -> Result<Arc<Command>> {
491 let mut cmd = args.program.clone();
492 assert!(!cmd.is_empty(), "(clap) Bug: command is not present");
493
494 let shell = if args.command.no_shell {
495 None
496 } else {
497 let shell = args.command.shell.clone().or_else(|| var("SHELL").ok());
498 match shell
499 .as_deref()
500 .or_else(|| {
501 if cfg!(not(windows)) {
502 Some("sh")
503 } else if var("POWERSHELL_DISTRIBUTION_CHANNEL").is_ok()
504 && (which::which("pwsh").is_ok() || which::which("pwsh.exe").is_ok())
505 {
506 trace!("detected pwsh");
507 Some("pwsh")
508 } else if var("PSModulePath").is_ok()
509 && (which::which("powershell").is_ok()
510 || which::which("powershell.exe").is_ok())
511 {
512 trace!("detected powershell");
513 Some("powershell")
514 } else {
515 Some("cmd")
516 }
517 })
518 .or(Some("default"))
519 {
520 Some("") => return Err(RuntimeError::CommandShellEmptyShell).into_diagnostic(),
521
522 Some("none") | None => None,
523
524 #[cfg(windows)]
525 Some("cmd") | Some("cmd.exe") | Some("CMD") | Some("CMD.EXE") => Some(Shell::cmd()),
526
527 Some(other) => {
528 let sh = other.split_ascii_whitespace().collect::<Vec<_>>();
529
530 #[allow(clippy::unwrap_used)]
532 let (shprog, shopts) = sh.split_first().unwrap();
533
534 Some(Shell {
535 prog: shprog.into(),
536 options: shopts.iter().map(|s| (*s).to_string()).collect(),
537 program_option: Some(Cow::Borrowed(OsStr::new("-c"))),
538 })
539 }
540 }
541 };
542
543 let program = if let Some(shell) = shell {
544 Program::Shell {
545 shell,
546 command: cmd.join(" "),
547 args: Vec::new(),
548 }
549 } else {
550 Program::Exec {
551 prog: cmd.remove(0).into(),
552 args: cmd,
553 }
554 };
555
556 Ok(Arc::new(Command {
557 program,
558 options: SpawnOptions {
559 grouped: matches!(args.command.wrap_process, WrapMode::Group),
560 session: matches!(args.command.wrap_process, WrapMode::Session),
561 ..Default::default()
562 },
563 }))
564}
565
566#[instrument(level = "trace")]
567fn setup_process(job: Job, command: Arc<Command>, outflags: OutputFlags) {
568 if outflags.toast {
569 Notification::new()
570 .summary("Watchexec: change detected")
571 .body(&format!("Running {command}"))
572 .show()
573 .map_or_else(
574 |err| {
575 eprintln!("[[Failed to send desktop notification: {err}]]");
576 },
577 drop,
578 );
579 }
580
581 if !outflags.quiet {
582 let mut stderr = StandardStream::stderr(outflags.colour);
583 stderr.reset().ok();
584 stderr
585 .set_color(ColorSpec::new().set_fg(Some(Color::Green)))
586 .ok();
587 writeln!(&mut stderr, "[Running: {command}]").ok();
588 stderr.reset().ok();
589 }
590
591 tokio::spawn(async move {
592 job.to_wait().await;
593 job.run(move |context| {
594 end_of_process(context.current, outflags);
595 });
596 });
597}
598
599#[instrument(level = "trace")]
600fn end_of_process(state: &CommandState, outflags: OutputFlags) -> Option<ProcessEnd> {
601 let CommandState::Finished {
602 status,
603 started,
604 finished,
605 } = state
606 else {
607 return None;
608 };
609
610 let duration = *finished - *started;
611 let timing = if outflags.timings {
612 format!(", lasted {duration:?}")
613 } else {
614 String::new()
615 };
616 let (msg, fg) = match status {
617 ProcessEnd::ExitError(code) => (format!("Command exited with {code}{timing}"), Color::Red),
618 ProcessEnd::ExitSignal(sig) => {
619 (format!("Command killed by {sig:?}{timing}"), Color::Magenta)
620 }
621 ProcessEnd::ExitStop(sig) => (format!("Command stopped by {sig:?}{timing}"), Color::Blue),
622 ProcessEnd::Continued => (format!("Command continued{timing}"), Color::Cyan),
623 ProcessEnd::Exception(ex) => (
624 format!("Command ended by exception {ex:#x}{timing}"),
625 Color::Yellow,
626 ),
627 ProcessEnd::Success => (format!("Command was successful{timing}"), Color::Green),
628 };
629
630 if outflags.toast {
631 Notification::new()
632 .summary("Watchexec: command ended")
633 .body(&msg)
634 .show()
635 .map_or_else(
636 |err| {
637 eprintln!("[[Failed to send desktop notification: {err}]]");
638 },
639 drop,
640 );
641 }
642
643 if !outflags.quiet {
644 let mut stderr = StandardStream::stderr(outflags.colour);
645 stderr.reset().ok();
646 stderr.set_color(ColorSpec::new().set_fg(Some(fg))).ok();
647 writeln!(&mut stderr, "[{msg}]").ok();
648 stderr.reset().ok();
649 }
650
651 if outflags.bell {
652 let mut stdout = std::io::stdout();
653 stdout.write_all(b"\x07").ok();
654 stdout.flush().ok();
655 }
656
657 Some(*status)
658}
659
660#[instrument(level = "trace")]
661fn emit_events_to_command(
662 command: &mut TokioCommand,
663 events: Arc<[Event]>,
664 state: State,
665 emit_events_to: EmitEvents,
666 add_envs: Arc<[EnvVar]>,
667) {
668 use crate::emits::{emits_to_environment, emits_to_file, emits_to_json_file};
669
670 let mut stdin = None;
671
672 let add_envs = add_envs.clone();
673 let mut envs = Box::new(add_envs.into_iter().cloned()) as Box<dyn Iterator<Item = EnvVar>>;
674
675 match emit_events_to {
676 EmitEvents::Environment => {
677 envs = Box::new(envs.chain(emits_to_environment(&events)));
678 }
679 EmitEvents::Stdio => match emits_to_file(&state.emit_file, &events)
680 .and_then(|path| File::open(path).into_diagnostic())
681 {
682 Ok(file) => {
683 stdin.replace(Stdio::from(file));
684 }
685 Err(err) => {
686 error!("Failed to write events to stdin, continuing without it: {err}");
687 }
688 },
689 EmitEvents::File => match emits_to_file(&state.emit_file, &events) {
690 Ok(path) => {
691 envs = Box::new(envs.chain(once(EnvVar {
692 key: "WATCHEXEC_EVENTS_FILE".into(),
693 value: path.into(),
694 })));
695 }
696 Err(err) => {
697 error!("Failed to write WATCHEXEC_EVENTS_FILE, continuing without it: {err}");
698 }
699 },
700 EmitEvents::JsonStdio => match emits_to_json_file(&state.emit_file, &events)
701 .and_then(|path| File::open(path).into_diagnostic())
702 {
703 Ok(file) => {
704 stdin.replace(Stdio::from(file));
705 }
706 Err(err) => {
707 error!("Failed to write events to stdin, continuing without it: {err}");
708 }
709 },
710 EmitEvents::JsonFile => match emits_to_json_file(&state.emit_file, &events) {
711 Ok(path) => {
712 envs = Box::new(envs.chain(once(EnvVar {
713 key: "WATCHEXEC_EVENTS_FILE".into(),
714 value: path.into(),
715 })));
716 }
717 Err(err) => {
718 error!("Failed to write WATCHEXEC_EVENTS_FILE, continuing without it: {err}");
719 }
720 },
721 EmitEvents::None => {}
722 }
723
724 for var in envs {
725 debug!(?var, "inserting environment variable");
726 command.env(var.key, var.value);
727 }
728
729 if let Some(stdin) = stdin {
730 debug!("set command stdin");
731 command.stdin(stdin);
732 }
733}
734
735pub fn reset_screen() {
736 for cs in [
737 ClearScreen::WindowsCooked,
738 ClearScreen::WindowsVt,
739 ClearScreen::VtLeaveAlt,
740 ClearScreen::VtWellDone,
741 ClearScreen::default(),
742 ] {
743 cs.clear().ok();
744 }
745}