watchexec_cli/
config.rs

1use std::{
2	borrow::Cow,
3	collections::HashMap,
4	env::var,
5	ffi::OsStr,
6	fs::File,
7	io::{IsTerminal, Write},
8	iter::once,
9	process::{ExitCode, Stdio},
10	sync::{
11		atomic::{AtomicBool, AtomicU8, Ordering},
12		Arc,
13	},
14	time::Duration,
15};
16
17use clearscreen::ClearScreen;
18use miette::{IntoDiagnostic, Report, Result};
19use notify_rust::Notification;
20use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
21use tokio::{process::Command as TokioCommand, time::sleep};
22use tracing::{debug, debug_span, error, instrument, trace, trace_span, Instrument};
23use watchexec::{
24	action::ActionHandler,
25	command::{Command, Program, Shell, SpawnOptions},
26	error::RuntimeError,
27	job::{CommandState, Job},
28	sources::fs::Watcher,
29	Config, ErrorHook, Id,
30};
31use watchexec_events::{Event, Keyboard, ProcessEnd, Tag};
32use watchexec_signals::Signal;
33
34use crate::{
35	args::{
36		command::{EnvVar, WrapMode},
37		events::{EmitEvents, OnBusyUpdate, SignalMapping},
38		output::{ClearMode, ColourMode},
39		Args,
40	},
41	emits::events_to_simple_format,
42	socket::Sockets,
43	state::State,
44};
45
46#[derive(Clone, Copy, Debug)]
47struct OutputFlags {
48	quiet: bool,
49	colour: ColorChoice,
50	timings: bool,
51	bell: bool,
52	toast: bool,
53}
54
55pub fn make_config(args: &Args, state: &State) -> Result<Config> {
56	let _span = debug_span!("args-runtime").entered();
57	let config = Config::default();
58	config.on_error(|err: ErrorHook| {
59		if let RuntimeError::IoError {
60			about: "waiting on process group",
61			..
62		} = err.error
63		{
64			// "No child processes" and such
65			// these are often spurious, so condemn them to -v only
66			error!("{}", err.error);
67			return;
68		}
69
70		if cfg!(debug_assertions) {
71			eprintln!("[[{:?}]]", err.error);
72		}
73
74		eprintln!("[[Error (not fatal)]]\n{}", Report::new(err.error));
75	});
76
77	config.pathset(args.filtering.paths.clone());
78
79	config.throttle(args.events.debounce.0);
80	config.keyboard_events(args.events.stdin_quit);
81
82	if let Some(interval) = args.events.poll {
83		config.file_watcher(Watcher::Poll(interval.0));
84	}
85
86	let once = args.once;
87	let clear = args.output.screen_clear;
88
89	let emit_events_to = args.events.emit_events_to;
90	let state = state.clone();
91
92	if args.only_emit_events {
93		config.on_action(move |mut action| {
94			// if we got a terminate or interrupt signal, quit
95			if action
96				.signals()
97				.any(|sig| sig == Signal::Terminate || sig == Signal::Interrupt)
98			{
99				// no need to be graceful as there's no commands
100				action.quit();
101				return action;
102			}
103
104			// clear the screen before printing events
105			if let Some(mode) = clear {
106				match mode {
107					ClearMode::Clear => {
108						clearscreen::clear().ok();
109					}
110					ClearMode::Reset => {
111						reset_screen();
112					}
113				}
114			}
115
116			match emit_events_to {
117				EmitEvents::Stdio => {
118					println!(
119						"{}",
120						events_to_simple_format(action.events.as_ref()).unwrap_or_default()
121					);
122				}
123				EmitEvents::JsonStdio => {
124					for event in action.events.iter().filter(|e| !e.is_empty()) {
125						println!("{}", serde_json::to_string(event).unwrap_or_default());
126					}
127				}
128				other => unreachable!(
129					"emit_events_to should have been validated earlier: {:?}",
130					other
131				),
132			}
133
134			action
135		});
136
137		return Ok(config);
138	}
139
140	let delay_run = args.command.delay_run.map(|ts| ts.0);
141	let on_busy = args.events.on_busy_update;
142	let stdin_quit = args.events.stdin_quit;
143
144	let signal = args.events.signal;
145	let stop_signal = args.command.stop_signal;
146	let stop_timeout = args.command.stop_timeout.0;
147
148	let print_events = args.logging.print_events;
149	let outflags = OutputFlags {
150		quiet: args.output.quiet,
151		colour: match args.output.color {
152			ColourMode::Auto if !std::io::stdin().is_terminal() => ColorChoice::Never,
153			ColourMode::Auto => ColorChoice::Auto,
154			ColourMode::Always => ColorChoice::Always,
155			ColourMode::Never => ColorChoice::Never,
156		},
157		timings: args.output.timings,
158		bell: args.output.bell,
159		toast: args.output.notify,
160	};
161
162	let workdir = Arc::new(args.command.workdir.clone());
163
164	let add_envs: Arc<[EnvVar]> = args.command.env.clone().into();
165	debug!(
166		envs=?args.command.env,
167		"additional environment variables to add to command"
168	);
169
170	let id = Id::default();
171	let command = interpret_command_args(args)?;
172
173	let signal_map: Arc<HashMap<Signal, Option<Signal>>> = Arc::new(
174		args.events
175			.signal_map
176			.iter()
177			.copied()
178			.map(|SignalMapping { from, to }| (from, to))
179			.collect(),
180	);
181
182	let queued = Arc::new(AtomicBool::new(false));
183	let quit_again = Arc::new(AtomicU8::new(0));
184
185	config.on_action_async(move |mut action| {
186		let add_envs = add_envs.clone();
187		let command = command.clone();
188		let state = state.clone();
189		let queued = queued.clone();
190		let quit_again = quit_again.clone();
191		let signal_map = signal_map.clone();
192		let workdir = workdir.clone();
193		Box::new(
194			async move {
195				trace!(events=?action.events, "handling action");
196
197				let add_envs = add_envs.clone();
198				let command = command.clone();
199				let queued = queued.clone();
200				let quit_again = quit_again.clone();
201				let signal_map = signal_map.clone();
202				let workdir = workdir.clone();
203
204				trace!("set spawn hook for workdir and environment variables");
205				let job = action.get_or_create_job(id, move || command.clone());
206				let events = action.events.clone();
207				job.set_spawn_hook({
208					let state = state.clone();
209					move |command, _| {
210						let add_envs = add_envs.clone();
211						let state = state.clone();
212						let events = events.clone();
213
214						if let Some(ref workdir) = workdir.as_ref() {
215							debug!(?workdir, "set command workdir");
216							command.command_mut().current_dir(workdir);
217						}
218
219						if let Some(ref socket_set) = state.socket_set {
220							for env in socket_set.envs() {
221								command.command_mut().env(env.key, env.value);
222							}
223						}
224
225						emit_events_to_command(
226							command.command_mut(),
227							events,
228							state,
229							emit_events_to,
230							add_envs,
231						);
232					}
233				});
234
235				let show_events = {
236					let events = action.events.clone();
237					move || {
238						if print_events {
239							trace!("print events to stderr");
240							for (n, event) in events.iter().enumerate() {
241								eprintln!("[EVENT {n}] {event}");
242							}
243						}
244					}
245				};
246
247				let clear_screen = {
248					let events = action.events.clone();
249					move || {
250						if let Some(mode) = clear {
251							match mode {
252								ClearMode::Clear => {
253									clearscreen::clear().ok();
254									debug!("cleared screen");
255								}
256								ClearMode::Reset => {
257									reset_screen();
258									debug!("hard-reset screen");
259								}
260							}
261						}
262
263						// re-show events after clearing
264						if print_events {
265							trace!("print events to stderr");
266							for (n, event) in events.iter().enumerate() {
267								eprintln!("[EVENT {n}] {event}");
268							}
269						}
270					}
271				};
272
273				let quit = |mut action: ActionHandler| {
274					match quit_again.fetch_add(1, Ordering::Relaxed) {
275						0 => {
276							eprintln!("[Waiting {stop_timeout:?} for processes to exit before stopping...]");
277							// eprintln!("[Waiting {stop_timeout:?} for processes to exit before stopping... Ctrl-C again to exit faster]");
278							// see TODO in action/worker.rs
279							action.quit_gracefully(
280								stop_signal.unwrap_or(Signal::Terminate),
281								stop_timeout,
282							);
283						}
284						1 => {
285							action.quit_gracefully(Signal::ForceStop, Duration::ZERO);
286						}
287						_ => {
288							action.quit();
289						}
290					}
291
292					action
293				};
294
295				if once {
296					debug!("debug mode: run once and quit");
297					show_events();
298
299					if let Some(delay) = delay_run {
300						job.run_async(move |_| {
301							Box::new(async move {
302								sleep(delay).await;
303							})
304						});
305					}
306
307					// this blocks the event loop, but also this is a debug feature so i don't care
308					job.start().await;
309					job.to_wait().await;
310					job.run({
311						let state = state.clone();
312						move |context| {
313							if let Some(end) = end_of_process(context.current, outflags) {
314								*state.exit_code.lock().unwrap() = ExitCode::from(
315									end.into_exitstatus()
316										.code()
317										.unwrap_or(0)
318										.try_into()
319										.unwrap_or(1),
320								);
321							}
322						}
323					})
324					.await;
325					return quit(action);
326				}
327
328				let is_keyboard_eof = action
329					.events
330					.iter()
331					.any(|e| e.tags.contains(&Tag::Keyboard(Keyboard::Eof)));
332				if stdin_quit && is_keyboard_eof {
333					debug!("keyboard EOF, quit");
334					show_events();
335					return quit(action);
336				}
337
338				let signals: Vec<Signal> = action.signals().collect();
339				trace!(?signals, "received some signals");
340
341				// if we got a terminate or interrupt signal and they're not mapped, quit
342				if (signals.contains(&Signal::Terminate)
343					&& !signal_map.contains_key(&Signal::Terminate))
344					|| (signals.contains(&Signal::Interrupt)
345						&& !signal_map.contains_key(&Signal::Interrupt))
346				{
347					debug!("unmapped terminate or interrupt signal, quit");
348					show_events();
349					return quit(action);
350				}
351
352				// pass all other signals on
353				for signal in signals {
354					match signal_map.get(&signal) {
355						Some(Some(mapped)) => {
356							debug!(?signal, ?mapped, "passing mapped signal");
357							job.signal(*mapped);
358						}
359						Some(None) => {
360							debug!(?signal, "discarding signal");
361						}
362						None => {
363							debug!(?signal, "passing signal on");
364							job.signal(signal);
365						}
366					}
367				}
368
369				// only filesystem events below here (or empty synthetic events)
370				if action.paths().next().is_none()
371					&& !action.events.iter().any(watchexec_events::Event::is_empty)
372				{
373					debug!("no filesystem or synthetic events, skip without doing more");
374					show_events();
375					return action;
376				}
377
378				show_events();
379
380				if let Some(delay) = delay_run {
381					trace!("delaying run by sleeping inside the job");
382					job.run_async(move |_| {
383						Box::new(async move {
384							sleep(delay).await;
385						})
386					});
387				}
388
389				trace!("querying job state via run_async");
390				job.run_async({
391					let job = job.clone();
392					move |context| {
393						let job = job.clone();
394						let is_running = matches!(context.current, CommandState::Running { .. });
395						Box::new(async move {
396							let innerjob = job.clone();
397							if is_running {
398								trace!(?on_busy, "job is running, decide what to do");
399								match on_busy {
400									OnBusyUpdate::DoNothing => {}
401									OnBusyUpdate::Signal => {
402										job.signal(if cfg!(windows) {
403											Signal::ForceStop
404										} else {
405											stop_signal.or(signal).unwrap_or(Signal::Terminate)
406										});
407									}
408									OnBusyUpdate::Restart if cfg!(windows) => {
409										job.restart();
410										job.run(move |context| {
411											clear_screen();
412											setup_process(
413												innerjob.clone(),
414												context.command.clone(),
415												outflags,
416											);
417										});
418									}
419									OnBusyUpdate::Restart => {
420										job.restart_with_signal(
421											stop_signal.unwrap_or(Signal::Terminate),
422											stop_timeout,
423										);
424										job.run(move |context| {
425											clear_screen();
426											setup_process(
427												innerjob.clone(),
428												context.command.clone(),
429												outflags,
430											);
431										});
432									}
433									OnBusyUpdate::Queue => {
434										let job = job.clone();
435										let already_queued =
436											queued.fetch_or(true, Ordering::SeqCst);
437										if already_queued {
438											debug!("next start is already queued, do nothing");
439										} else {
440											debug!("queueing next start of job");
441											tokio::spawn({
442												let queued = queued.clone();
443												async move {
444													trace!("waiting for job to finish");
445													job.to_wait().await;
446													trace!("job finished, starting queued");
447													job.start();
448													job.run(move |context| {
449														clear_screen();
450														setup_process(
451															innerjob.clone(),
452															context.command.clone(),
453															outflags,
454														);
455													})
456													.await;
457													trace!("resetting queued state");
458													queued.store(false, Ordering::SeqCst);
459												}
460											});
461										}
462									}
463								}
464							} else {
465								trace!("job is not running, start it");
466								job.start();
467								job.run(move |context| {
468									clear_screen();
469									setup_process(
470										innerjob.clone(),
471										context.command.clone(),
472										outflags,
473									);
474								});
475							}
476						})
477					}
478				});
479
480				action
481			}
482			.instrument(trace_span!("action handler")),
483		)
484	});
485
486	Ok(config)
487}
488
489#[instrument(level = "debug")]
490fn interpret_command_args(args: &Args) -> Result<Arc<Command>> {
491	let mut cmd = args.program.clone();
492	assert!(!cmd.is_empty(), "(clap) Bug: command is not present");
493
494	let shell = if args.command.no_shell {
495		None
496	} else {
497		let shell = args.command.shell.clone().or_else(|| var("SHELL").ok());
498		match shell
499			.as_deref()
500			.or_else(|| {
501				if cfg!(not(windows)) {
502					Some("sh")
503				} else if var("POWERSHELL_DISTRIBUTION_CHANNEL").is_ok()
504					&& (which::which("pwsh").is_ok() || which::which("pwsh.exe").is_ok())
505				{
506					trace!("detected pwsh");
507					Some("pwsh")
508				} else if var("PSModulePath").is_ok()
509					&& (which::which("powershell").is_ok()
510						|| which::which("powershell.exe").is_ok())
511				{
512					trace!("detected powershell");
513					Some("powershell")
514				} else {
515					Some("cmd")
516				}
517			})
518			.or(Some("default"))
519		{
520			Some("") => return Err(RuntimeError::CommandShellEmptyShell).into_diagnostic(),
521
522			Some("none") | None => None,
523
524			#[cfg(windows)]
525			Some("cmd") | Some("cmd.exe") | Some("CMD") | Some("CMD.EXE") => Some(Shell::cmd()),
526
527			Some(other) => {
528				let sh = other.split_ascii_whitespace().collect::<Vec<_>>();
529
530				// UNWRAP: checked by Some("")
531				#[allow(clippy::unwrap_used)]
532				let (shprog, shopts) = sh.split_first().unwrap();
533
534				Some(Shell {
535					prog: shprog.into(),
536					options: shopts.iter().map(|s| (*s).to_string()).collect(),
537					program_option: Some(Cow::Borrowed(OsStr::new("-c"))),
538				})
539			}
540		}
541	};
542
543	let program = if let Some(shell) = shell {
544		Program::Shell {
545			shell,
546			command: cmd.join(" "),
547			args: Vec::new(),
548		}
549	} else {
550		Program::Exec {
551			prog: cmd.remove(0).into(),
552			args: cmd,
553		}
554	};
555
556	Ok(Arc::new(Command {
557		program,
558		options: SpawnOptions {
559			grouped: matches!(args.command.wrap_process, WrapMode::Group),
560			session: matches!(args.command.wrap_process, WrapMode::Session),
561			..Default::default()
562		},
563	}))
564}
565
566#[instrument(level = "trace")]
567fn setup_process(job: Job, command: Arc<Command>, outflags: OutputFlags) {
568	if outflags.toast {
569		Notification::new()
570			.summary("Watchexec: change detected")
571			.body(&format!("Running {command}"))
572			.show()
573			.map_or_else(
574				|err| {
575					eprintln!("[[Failed to send desktop notification: {err}]]");
576				},
577				drop,
578			);
579	}
580
581	if !outflags.quiet {
582		let mut stderr = StandardStream::stderr(outflags.colour);
583		stderr.reset().ok();
584		stderr
585			.set_color(ColorSpec::new().set_fg(Some(Color::Green)))
586			.ok();
587		writeln!(&mut stderr, "[Running: {command}]").ok();
588		stderr.reset().ok();
589	}
590
591	tokio::spawn(async move {
592		job.to_wait().await;
593		job.run(move |context| {
594			end_of_process(context.current, outflags);
595		});
596	});
597}
598
599#[instrument(level = "trace")]
600fn end_of_process(state: &CommandState, outflags: OutputFlags) -> Option<ProcessEnd> {
601	let CommandState::Finished {
602		status,
603		started,
604		finished,
605	} = state
606	else {
607		return None;
608	};
609
610	let duration = *finished - *started;
611	let timing = if outflags.timings {
612		format!(", lasted {duration:?}")
613	} else {
614		String::new()
615	};
616	let (msg, fg) = match status {
617		ProcessEnd::ExitError(code) => (format!("Command exited with {code}{timing}"), Color::Red),
618		ProcessEnd::ExitSignal(sig) => {
619			(format!("Command killed by {sig:?}{timing}"), Color::Magenta)
620		}
621		ProcessEnd::ExitStop(sig) => (format!("Command stopped by {sig:?}{timing}"), Color::Blue),
622		ProcessEnd::Continued => (format!("Command continued{timing}"), Color::Cyan),
623		ProcessEnd::Exception(ex) => (
624			format!("Command ended by exception {ex:#x}{timing}"),
625			Color::Yellow,
626		),
627		ProcessEnd::Success => (format!("Command was successful{timing}"), Color::Green),
628	};
629
630	if outflags.toast {
631		Notification::new()
632			.summary("Watchexec: command ended")
633			.body(&msg)
634			.show()
635			.map_or_else(
636				|err| {
637					eprintln!("[[Failed to send desktop notification: {err}]]");
638				},
639				drop,
640			);
641	}
642
643	if !outflags.quiet {
644		let mut stderr = StandardStream::stderr(outflags.colour);
645		stderr.reset().ok();
646		stderr.set_color(ColorSpec::new().set_fg(Some(fg))).ok();
647		writeln!(&mut stderr, "[{msg}]").ok();
648		stderr.reset().ok();
649	}
650
651	if outflags.bell {
652		let mut stdout = std::io::stdout();
653		stdout.write_all(b"\x07").ok();
654		stdout.flush().ok();
655	}
656
657	Some(*status)
658}
659
660#[instrument(level = "trace")]
661fn emit_events_to_command(
662	command: &mut TokioCommand,
663	events: Arc<[Event]>,
664	state: State,
665	emit_events_to: EmitEvents,
666	add_envs: Arc<[EnvVar]>,
667) {
668	use crate::emits::{emits_to_environment, emits_to_file, emits_to_json_file};
669
670	let mut stdin = None;
671
672	let add_envs = add_envs.clone();
673	let mut envs = Box::new(add_envs.into_iter().cloned()) as Box<dyn Iterator<Item = EnvVar>>;
674
675	match emit_events_to {
676		EmitEvents::Environment => {
677			envs = Box::new(envs.chain(emits_to_environment(&events)));
678		}
679		EmitEvents::Stdio => match emits_to_file(&state.emit_file, &events)
680			.and_then(|path| File::open(path).into_diagnostic())
681		{
682			Ok(file) => {
683				stdin.replace(Stdio::from(file));
684			}
685			Err(err) => {
686				error!("Failed to write events to stdin, continuing without it: {err}");
687			}
688		},
689		EmitEvents::File => match emits_to_file(&state.emit_file, &events) {
690			Ok(path) => {
691				envs = Box::new(envs.chain(once(EnvVar {
692					key: "WATCHEXEC_EVENTS_FILE".into(),
693					value: path.into(),
694				})));
695			}
696			Err(err) => {
697				error!("Failed to write WATCHEXEC_EVENTS_FILE, continuing without it: {err}");
698			}
699		},
700		EmitEvents::JsonStdio => match emits_to_json_file(&state.emit_file, &events)
701			.and_then(|path| File::open(path).into_diagnostic())
702		{
703			Ok(file) => {
704				stdin.replace(Stdio::from(file));
705			}
706			Err(err) => {
707				error!("Failed to write events to stdin, continuing without it: {err}");
708			}
709		},
710		EmitEvents::JsonFile => match emits_to_json_file(&state.emit_file, &events) {
711			Ok(path) => {
712				envs = Box::new(envs.chain(once(EnvVar {
713					key: "WATCHEXEC_EVENTS_FILE".into(),
714					value: path.into(),
715				})));
716			}
717			Err(err) => {
718				error!("Failed to write WATCHEXEC_EVENTS_FILE, continuing without it: {err}");
719			}
720		},
721		EmitEvents::None => {}
722	}
723
724	for var in envs {
725		debug!(?var, "inserting environment variable");
726		command.env(var.key, var.value);
727	}
728
729	if let Some(stdin) = stdin {
730		debug!("set command stdin");
731		command.stdin(stdin);
732	}
733}
734
735pub fn reset_screen() {
736	for cs in [
737		ClearScreen::WindowsCooked,
738		ClearScreen::WindowsVt,
739		ClearScreen::VtLeaveAlt,
740		ClearScreen::VtWellDone,
741		ClearScreen::default(),
742	] {
743		cs.clear().ok();
744	}
745}