use std::{
borrow::Cow,
collections::HashMap,
env::var,
ffi::OsStr,
fmt,
fs::File,
io::{IsTerminal, Write},
iter::once,
process::{ExitCode, Stdio},
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
},
time::Duration,
};
use clearscreen::ClearScreen;
use miette::{IntoDiagnostic, Report, Result};
use notify_rust::Notification;
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
use tokio::{process::Command as TokioCommand, time::sleep};
use tracing::{debug, debug_span, error, instrument, trace, trace_span, Instrument};
use watchexec::{
action::ActionHandler,
command::{Command, Program, Shell, SpawnOptions},
error::RuntimeError,
job::{CommandState, Job},
sources::fs::Watcher,
Config, ErrorHook, Id,
};
use watchexec_events::{Event, KeyCode, Keyboard, Priority, ProcessEnd, Tag};
use watchexec_signals::Signal;
use crate::{
args::{
command::{EnvVar, WrapMode},
events::{EmitEvents, OnBusyUpdate, SignalMapping},
output::{ClearMode, ColourMode, NotifyMode},
Args,
},
emits::events_to_simple_format,
socket::Sockets,
state::State,
};
#[derive(Clone, Copy, Debug)]
struct OutputFlags {
quiet: bool,
colour: ColorChoice,
timings: bool,
bell: bool,
notify: Option<NotifyMode>,
}
#[derive(Clone, Copy, Debug)]
struct TimeoutConfig {
timeout: Option<Duration>,
stop_signal: Signal,
stop_timeout: Duration,
}
pub fn make_config(args: &Args, state: &State) -> Result<Config> {
let _span = debug_span!("args-runtime").entered();
let config = Config::default();
config.on_error(|err: ErrorHook| {
if let RuntimeError::IoError {
about: "waiting on process group",
..
} = err.error
{
error!("{}", err.error);
return;
}
if cfg!(debug_assertions) {
eprintln!("[[{:?}]]", err.error);
}
eprintln!("[[Error (not fatal)]]\n{}", Report::new(err.error));
});
config.pathset(args.filtering.paths.clone());
config.throttle(args.events.debounce.0);
config.keyboard_events(args.events.stdin_quit || args.events.interactive);
if let Some(interval) = args.events.poll {
config.file_watcher(Watcher::Poll(interval.0));
}
let once = args.once;
let clear = args.output.screen_clear;
let emit_events_to = args.events.emit_events_to;
let state = state.clone();
if args.only_emit_events {
config.on_action(move |mut action| {
if action
.signals()
.any(|sig| sig == Signal::Terminate || sig == Signal::Interrupt)
{
action.quit();
return action;
}
if let Some(mode) = clear {
match mode {
ClearMode::Clear => {
clearscreen::clear().ok();
}
ClearMode::Reset => {
reset_screen();
}
}
}
match emit_events_to {
EmitEvents::Stdio => {
println!(
"{}",
events_to_simple_format(action.events.as_ref()).unwrap_or_default()
);
}
EmitEvents::JsonStdio => {
for event in action.events.iter().filter(|e| !e.is_empty()) {
println!("{}", serde_json::to_string(event).unwrap_or_default());
}
}
other => unreachable!(
"emit_events_to should have been validated earlier: {:?}",
other
),
}
action
});
return Ok(config);
}
let delay_run = args.command.delay_run.map(|ts| ts.0);
let on_busy = args.events.on_busy_update;
let stdin_quit = args.events.stdin_quit;
let interactive = args.events.interactive;
let exit_on_error = args.events.exit_on_error;
let signal = args.events.signal;
let stop_signal = args.command.stop_signal;
let stop_timeout = args.command.stop_timeout.0;
let print_events = args.logging.print_events;
let outflags = OutputFlags {
quiet: args.output.quiet,
colour: match args.output.color {
ColourMode::Auto if !std::io::stdin().is_terminal() => ColorChoice::Never,
ColourMode::Auto => ColorChoice::Auto,
ColourMode::Always => ColorChoice::Always,
ColourMode::Never => ColorChoice::Never,
},
timings: args.output.timings,
bell: args.output.bell,
notify: args.output.notify,
};
let timeout_config = TimeoutConfig {
timeout: args.command.timeout.map(|ts| ts.0),
stop_signal: stop_signal.unwrap_or(Signal::Terminate),
stop_timeout,
};
let workdir = Arc::new(args.command.workdir.clone());
let add_envs: Arc<[EnvVar]> = args.command.env.clone().into();
debug!(
envs=?args.command.env,
"additional environment variables to add to command"
);
let id = Id::default();
let command = interpret_command_args(args)?;
let signal_map: Arc<HashMap<Signal, Option<Signal>>> = Arc::new(
args.events
.signal_map
.iter()
.copied()
.map(|SignalMapping { from, to }| (from, to))
.collect(),
);
let queued = Arc::new(AtomicBool::new(false));
let quit_again = Arc::new(AtomicU8::new(0));
let paused = Arc::new(AtomicBool::new(false));
let should_quit = Arc::new(AtomicBool::new(false));
config.on_action_async(move |mut action| {
let add_envs = add_envs.clone();
let command = command.clone();
let state = state.clone();
let queued = queued.clone();
let quit_again = quit_again.clone();
let paused = paused.clone();
let should_quit = should_quit.clone();
let signal_map = signal_map.clone();
let workdir = workdir.clone();
Box::new(
async move {
trace!(events=?action.events, "handling action");
let add_envs = add_envs.clone();
let command = command.clone();
let queued = queued.clone();
let quit_again = quit_again.clone();
let paused = paused.clone();
let should_quit = should_quit.clone();
let signal_map = signal_map.clone();
let workdir = workdir.clone();
trace!("set spawn hook for workdir and environment variables");
let job = action.get_or_create_job(id, move || command.clone());
let events = action.events.clone();
job.set_spawn_hook({
let state = state.clone();
move |command, _| {
let add_envs = add_envs.clone();
let state = state.clone();
let events = events.clone();
if let Some(ref workdir) = workdir.as_ref() {
debug!(?workdir, "set command workdir");
command.command_mut().current_dir(workdir);
}
if let Some(ref socket_set) = state.socket_set {
for env in socket_set.envs() {
command.command_mut().env(env.key, env.value);
}
}
emit_events_to_command(
command.command_mut(),
events,
state,
emit_events_to,
add_envs,
);
}
});
let show_events = {
let events = action.events.clone();
move || {
if print_events {
trace!("print events to stderr");
for (n, event) in events.iter().enumerate() {
eprintln!("[EVENT {n}] {event}");
}
}
}
};
let clear_screen = {
let events = action.events.clone();
move || {
if let Some(mode) = clear {
match mode {
ClearMode::Clear => {
clearscreen::clear().ok();
debug!("cleared screen");
}
ClearMode::Reset => {
reset_screen();
debug!("hard-reset screen");
}
}
}
if print_events {
trace!("print events to stderr");
for (n, event) in events.iter().enumerate() {
eprintln!("[EVENT {n}] {event}");
}
}
}
};
let quit = |mut action: ActionHandler| {
match quit_again.fetch_add(1, Ordering::Relaxed) {
0 => {
if stop_timeout > Duration::ZERO
&& action.list_jobs().any(|(_, job)| job.is_running())
{
eprintln!("[Waiting {stop_timeout:?} for processes to exit before stopping...]");
}
action.quit_gracefully(
stop_signal.unwrap_or(Signal::Terminate),
stop_timeout,
);
}
1 => {
action.quit_gracefully(Signal::ForceStop, Duration::ZERO);
}
_ => {
action.quit();
}
}
action
};
if should_quit.load(Ordering::SeqCst) {
debug!("command failed with --exit-on-error, quitting");
return quit(action);
}
if once {
debug!("debug mode: run once and quit");
show_events();
if let Some(delay) = delay_run {
job.run_async(move |_| {
Box::new(async move {
sleep(delay).await;
})
});
}
job.start().await;
let timed_out = if let Some(timeout) = timeout_config.timeout {
tokio::select! {
_ = job.to_wait() => false,
_ = tokio::time::sleep(timeout) => {
if cfg!(windows) {
job.stop().await;
} else {
job.stop_with_signal(timeout_config.stop_signal, timeout_config.stop_timeout).await;
}
true
}
}
} else {
job.to_wait().await;
false
};
job.run({
let state = state.clone();
move |context| {
if let Some(end) = end_of_process(context.current, outflags, timed_out)
{
*state.exit_code.lock().unwrap() = ExitCode::from(
end.into_exitstatus()
.code()
.unwrap_or(0)
.try_into()
.unwrap_or(1),
);
}
}
})
.await;
return quit(action);
}
let is_keyboard_eof = action
.events
.iter()
.any(|e| e.tags.contains(&Tag::Keyboard(Keyboard::Eof)));
if stdin_quit && is_keyboard_eof {
debug!("keyboard EOF, quit");
show_events();
return quit(action);
}
if interactive {
for event in action.events.iter() {
for tag in &event.tags {
match tag {
Tag::Keyboard(Keyboard::Eof) => {
debug!("interactive: Ctrl-C/D, quit");
return quit(action);
}
Tag::Keyboard(Keyboard::Key { key, .. }) => match key {
KeyCode::Char('q') => {
debug!("interactive: quit");
return quit(action);
}
KeyCode::Char('p') => {
let was_paused = paused.fetch_xor(true, Ordering::SeqCst);
if was_paused {
debug!("interactive: unpause");
eprintln!("[Unpaused]");
} else {
debug!("interactive: pause");
eprintln!("[Paused]");
}
return action;
}
KeyCode::Char('r') => {
debug!("interactive: restart");
clear_screen();
if cfg!(windows) {
job.restart();
} else {
job.restart_with_signal(
stop_signal.unwrap_or(Signal::Terminate),
stop_timeout,
);
}
job.run({
let job = job.clone();
let should_quit = should_quit.clone();
let state = state.clone();
move |context| {
setup_process(
job.clone(),
context.command.clone(),
outflags,
timeout_config,
exit_on_error,
should_quit.clone(),
state.clone(),
);
}
});
return action;
}
_ => {}
},
_ => {}
}
}
}
}
let signals: Vec<Signal> = action.signals().collect();
trace!(?signals, "received some signals");
if (signals.contains(&Signal::Terminate)
&& !signal_map.contains_key(&Signal::Terminate))
|| (signals.contains(&Signal::Interrupt)
&& !signal_map.contains_key(&Signal::Interrupt))
{
debug!("unmapped terminate or interrupt signal, quit");
show_events();
return quit(action);
}
for signal in signals {
match signal_map.get(&signal) {
Some(Some(mapped)) => {
debug!(?signal, ?mapped, "passing mapped signal");
job.signal(*mapped);
}
Some(None) => {
debug!(?signal, "discarding signal");
}
None => {
debug!(?signal, "passing signal on");
job.signal(signal);
}
}
}
if action.paths().next().is_none()
&& !action.events.iter().any(watchexec_events::Event::is_empty)
{
debug!("no filesystem or synthetic events, skip without doing more");
show_events();
return action;
}
if interactive && paused.load(Ordering::SeqCst) {
debug!("interactive: paused, ignoring filesystem event");
return action;
}
show_events();
if let Some(delay) = delay_run {
trace!("delaying run by sleeping inside the job");
job.run_async(move |_| {
Box::new(async move {
sleep(delay).await;
})
});
}
trace!("querying job state via run_async");
job.run_async({
let job = job.clone();
let should_quit = should_quit.clone();
let state = state.clone();
move |context| {
let job = job.clone();
let should_quit = should_quit.clone();
let state = state.clone();
let is_running = matches!(context.current, CommandState::Running { .. });
Box::new(async move {
let innerjob = job.clone();
let should_quit = should_quit.clone();
let state = state.clone();
if is_running {
trace!(?on_busy, "job is running, decide what to do");
match on_busy {
OnBusyUpdate::DoNothing => {}
OnBusyUpdate::Signal => {
job.signal(if cfg!(windows) {
Signal::ForceStop
} else {
stop_signal.or(signal).unwrap_or(Signal::Terminate)
});
}
OnBusyUpdate::Restart if cfg!(windows) => {
job.restart();
job.run({
let should_quit = should_quit.clone();
let state = state.clone();
move |context| {
clear_screen();
setup_process(
innerjob.clone(),
context.command.clone(),
outflags,
timeout_config,
exit_on_error,
should_quit.clone(),
state.clone(),
);
}
});
}
OnBusyUpdate::Restart => {
job.restart_with_signal(
stop_signal.unwrap_or(Signal::Terminate),
stop_timeout,
);
job.run({
let should_quit = should_quit.clone();
let state = state.clone();
move |context| {
clear_screen();
setup_process(
innerjob.clone(),
context.command.clone(),
outflags,
timeout_config,
exit_on_error,
should_quit.clone(),
state.clone(),
);
}
});
}
OnBusyUpdate::Queue => {
let job = job.clone();
let already_queued =
queued.fetch_or(true, Ordering::SeqCst);
if already_queued {
debug!("next start is already queued, do nothing");
} else {
debug!("queueing next start of job");
tokio::spawn({
let queued = queued.clone();
let should_quit = should_quit.clone();
let state = state.clone();
async move {
trace!("waiting for job to finish");
job.to_wait().await;
trace!("job finished, starting queued");
job.start();
job.run({
let should_quit = should_quit.clone();
let state = state.clone();
move |context| {
clear_screen();
setup_process(
innerjob.clone(),
context.command.clone(),
outflags,
timeout_config,
exit_on_error,
should_quit.clone(),
state.clone(),
);
}
})
.await;
trace!("resetting queued state");
queued.store(false, Ordering::SeqCst);
}
});
}
}
}
} else {
trace!("job is not running, start it");
job.start();
job.run({
let should_quit = should_quit.clone();
let state = state.clone();
move |context| {
clear_screen();
setup_process(
innerjob.clone(),
context.command.clone(),
outflags,
timeout_config,
exit_on_error,
should_quit.clone(),
state.clone(),
);
}
});
}
})
}
});
action
}
.instrument(trace_span!("action handler")),
)
});
Ok(config)
}
#[instrument(level = "debug")]
fn interpret_command_args(args: &Args) -> Result<Arc<Command>> {
let mut cmd = args.program.clone();
assert!(!cmd.is_empty(), "(clap) Bug: command is not present");
let shell = if args.command.no_shell {
None
} else {
let shell = args.command.shell.clone().or_else(|| var("SHELL").ok());
match shell
.as_deref()
.or_else(|| {
if cfg!(not(windows)) {
Some("sh")
} else if var("POWERSHELL_DISTRIBUTION_CHANNEL").is_ok()
&& (which::which("pwsh").is_ok() || which::which("pwsh.exe").is_ok())
{
trace!("detected pwsh");
Some("pwsh")
} else if var("PSModulePath").is_ok()
&& (which::which("powershell").is_ok()
|| which::which("powershell.exe").is_ok())
{
trace!("detected powershell");
Some("powershell")
} else {
Some("cmd")
}
})
.or(Some("default"))
{
Some("") => return Err(RuntimeError::CommandShellEmptyShell).into_diagnostic(),
Some("none") | None => None,
#[cfg(windows)]
Some("cmd") | Some("cmd.exe") | Some("CMD") | Some("CMD.EXE") => Some(Shell::cmd()),
Some(other) => {
let sh = other.split_ascii_whitespace().collect::<Vec<_>>();
#[allow(clippy::unwrap_used)]
let (shprog, shopts) = sh.split_first().unwrap();
Some(Shell {
prog: shprog.into(),
options: shopts.iter().map(|s| (*s).to_string()).collect(),
program_option: Some(Cow::Borrowed(OsStr::new("-c"))),
})
}
}
};
let program = if let Some(shell) = shell {
Program::Shell {
shell,
command: cmd.join(" "),
args: Vec::new(),
}
} else {
Program::Exec {
prog: cmd.remove(0).into(),
args: cmd,
}
};
Ok(Arc::new(Command {
program,
options: SpawnOptions {
grouped: matches!(args.command.wrap_process, WrapMode::Group),
session: matches!(args.command.wrap_process, WrapMode::Session),
..Default::default()
},
}))
}
#[instrument(level = "trace")]
fn setup_process(
job: Job,
command: Arc<Command>,
outflags: OutputFlags,
timeout_config: TimeoutConfig,
exit_on_error: bool,
should_quit: Arc<AtomicBool>,
state: State,
) {
if outflags.notify.is_some_and(|m| m.on_start()) {
Notification::new()
.summary("Watchexec: change detected")
.body(&format!("Running {command}"))
.show()
.map_or_else(
|err| {
eprintln!("[[Failed to send desktop notification: {err}]]");
},
drop,
);
}
if !outflags.quiet {
let mut stderr = StandardStream::stderr(outflags.colour);
stderr.reset().ok();
stderr
.set_color(ColorSpec::new().set_fg(Some(Color::Green)))
.ok();
writeln!(&mut stderr, "[Running: {command}]").ok();
stderr.reset().ok();
}
let send_quit_event = Arc::new(AtomicBool::new(false));
tokio::spawn({
let send_quit_event = send_quit_event.clone();
let state_for_event = state.clone();
async move {
let timed_out = if let Some(timeout) = timeout_config.timeout {
tokio::select! {
_ = job.to_wait() => false,
_ = tokio::time::sleep(timeout) => {
if cfg!(windows) {
job.stop().await;
} else {
job.stop_with_signal(timeout_config.stop_signal, timeout_config.stop_timeout).await;
}
true
}
}
} else {
job.to_wait().await;
false
};
job.run({
let send_quit_event = send_quit_event.clone();
move |context| {
if let Some(status) = end_of_process(context.current, outflags, timed_out) {
*state.exit_code.lock().unwrap() = ExitCode::from(
status
.into_exitstatus()
.code()
.unwrap_or(0)
.try_into()
.unwrap_or(1),
);
if exit_on_error && !matches!(status, ProcessEnd::Success) {
debug!("command failed, setting should_quit flag for --exit-on-error");
should_quit.store(true, Ordering::SeqCst);
send_quit_event.store(true, Ordering::SeqCst);
}
}
}
})
.await;
if send_quit_event.load(Ordering::SeqCst) {
if let Some(wx) = state_for_event.watchexec.get() {
debug!("sending synthetic event to trigger quit");
if let Err(e) = wx.send_event(Event::default(), Priority::Urgent).await {
error!("failed to send synthetic quit event: {e}");
}
}
}
}
});
}
fn format_duration(duration: Duration) -> impl fmt::Display {
fmt::from_fn(move |f| {
let secs = duration.as_secs();
if secs > 0 {
write!(f, "{secs}s")
} else {
write!(f, "{}ms", duration.subsec_millis())
}
})
}
#[instrument(level = "trace")]
fn end_of_process(
state: &CommandState,
outflags: OutputFlags,
timed_out: bool,
) -> Option<ProcessEnd> {
let CommandState::Finished {
status,
started,
finished,
} = state
else {
return None;
};
let duration = *finished - *started;
let duration_display = format_duration(duration);
let timing = if outflags.timings {
format!(", lasted {duration_display}")
} else {
String::new()
};
if timed_out {
if outflags.notify.is_some_and(|m| m.on_end()) {
Notification::new()
.summary("Watchexec: command timed out")
.body(&format!("Command timed out after {duration_display}"))
.show()
.map_or_else(
|err| {
eprintln!("[[Failed to send desktop notification: {err}]]");
},
drop,
);
}
if !outflags.quiet {
let mut stderr = StandardStream::stderr(outflags.colour);
stderr.reset().ok();
stderr
.set_color(ColorSpec::new().set_fg(Some(Color::Yellow)))
.ok();
writeln!(&mut stderr, "[Command timed out after {duration_display}]").ok();
stderr.reset().ok();
}
if outflags.bell {
let mut stdout = std::io::stdout();
stdout.write_all(b"\x07").ok();
stdout.flush().ok();
}
return Some(*status);
}
let (msg, fg) = match status {
ProcessEnd::ExitError(code) => (format!("Command exited with {code}{timing}"), Color::Red),
ProcessEnd::ExitSignal(sig) => {
(format!("Command killed by {sig:?}{timing}"), Color::Magenta)
}
ProcessEnd::ExitStop(sig) => (format!("Command stopped by {sig:?}{timing}"), Color::Blue),
ProcessEnd::Continued => (format!("Command continued{timing}"), Color::Cyan),
ProcessEnd::Exception(ex) => (
format!("Command ended by exception {ex:#x}{timing}"),
Color::Yellow,
),
ProcessEnd::Success => (format!("Command was successful{timing}"), Color::Green),
};
if outflags.notify.is_some_and(|m| m.on_end()) {
Notification::new()
.summary("Watchexec: command ended")
.body(&msg)
.show()
.map_or_else(
|err| {
eprintln!("[[Failed to send desktop notification: {err}]]");
},
drop,
);
}
if !outflags.quiet {
let mut stderr = StandardStream::stderr(outflags.colour);
stderr.reset().ok();
stderr.set_color(ColorSpec::new().set_fg(Some(fg))).ok();
writeln!(&mut stderr, "[{msg}]").ok();
stderr.reset().ok();
}
if outflags.bell {
let mut stdout = std::io::stdout();
stdout.write_all(b"\x07").ok();
stdout.flush().ok();
}
Some(*status)
}
#[instrument(level = "trace")]
fn emit_events_to_command(
command: &mut TokioCommand,
events: Arc<[Event]>,
state: State,
emit_events_to: EmitEvents,
add_envs: Arc<[EnvVar]>,
) {
use crate::emits::{emits_to_environment, emits_to_file, emits_to_json_file};
let mut stdin = None;
let add_envs = add_envs.clone();
let mut envs = Box::new(add_envs.into_iter().cloned()) as Box<dyn Iterator<Item = EnvVar>>;
match emit_events_to {
EmitEvents::Environment => {
envs = Box::new(envs.chain(emits_to_environment(&events)));
}
EmitEvents::Stdio => match emits_to_file(&state.emit_file, &events)
.and_then(|path| File::open(path).into_diagnostic())
{
Ok(file) => {
stdin.replace(Stdio::from(file));
}
Err(err) => {
error!("Failed to write events to stdin, continuing without it: {err}");
}
},
EmitEvents::File => match emits_to_file(&state.emit_file, &events) {
Ok(path) => {
envs = Box::new(envs.chain(once(EnvVar {
key: "WATCHEXEC_EVENTS_FILE".into(),
value: path.into(),
})));
}
Err(err) => {
error!("Failed to write WATCHEXEC_EVENTS_FILE, continuing without it: {err}");
}
},
EmitEvents::JsonStdio => match emits_to_json_file(&state.emit_file, &events)
.and_then(|path| File::open(path).into_diagnostic())
{
Ok(file) => {
stdin.replace(Stdio::from(file));
}
Err(err) => {
error!("Failed to write events to stdin, continuing without it: {err}");
}
},
EmitEvents::JsonFile => match emits_to_json_file(&state.emit_file, &events) {
Ok(path) => {
envs = Box::new(envs.chain(once(EnvVar {
key: "WATCHEXEC_EVENTS_FILE".into(),
value: path.into(),
})));
}
Err(err) => {
error!("Failed to write WATCHEXEC_EVENTS_FILE, continuing without it: {err}");
}
},
EmitEvents::None => {}
}
for var in envs {
debug!(?var, "inserting environment variable");
command.env(var.key, var.value);
}
if let Some(stdin) = stdin {
debug!("set command stdin");
command.stdin(stdin);
}
}
pub fn reset_screen() {
for cs in [
ClearScreen::WindowsCooked,
ClearScreen::WindowsVt,
ClearScreen::VtLeaveAlt,
ClearScreen::VtWellDone,
ClearScreen::default(),
] {
cs.clear().ok();
}
}