watchexec 8.2.0

Library to execute commands in response to file modifications
Documentation
use std::{
	collections::HashMap,
	mem::take,
	sync::Arc,
	time::{Duration, Instant},
};

use async_priority_channel as priority;
use tokio::{sync::mpsc, time::timeout};
use tracing::{debug, trace};
use watchexec_events::{Event, Priority};
use watchexec_supervisor::job::Job;

use super::{handler::Handler, quit::QuitManner};
use crate::{
	action::ActionReturn,
	error::{CriticalError, RuntimeError},
	filter::Filterer,
	id::Id,
	late_join_set::LateJoinSet,
	Config,
};

/// The main worker of a Watchexec process.
///
/// This is the main loop of the process. It receives events from the event channel, filters them,
/// debounces them, obtains the desired outcome of an actioned event, calls the appropriate handlers
/// and schedules processes as needed.
pub async fn worker(
	config: Arc<Config>,
	errors: mpsc::Sender<RuntimeError>,
	events: priority::Receiver<Event, Priority>,
) -> Result<(), CriticalError> {
	let mut jobtasks = LateJoinSet::default();
	let mut jobs = HashMap::<Id, Job>::new();

	while let Some(mut set) = throttle_collect(
		config.clone(),
		events.clone(),
		errors.clone(),
		Instant::now(),
	)
	.await?
	{
		let events: Arc<[Event]> = Arc::from(take(&mut set).into_boxed_slice());

		trace!("preparing action handler");
		let action = Handler::new(events.clone(), jobs.clone());

		debug!("running action handler");
		let action = match config.action_handler.call(action) {
			ActionReturn::Sync(action) => action,
			ActionReturn::Async(action) => Box::into_pin(action).await,
		};

		debug!("take control of new tasks");
		for (id, (job, task)) in action.new {
			trace!(?id, "taking control of new task");
			jobtasks.insert(task);
			jobs.insert(id, job);
		}

		if let Some(manner) = action.quit {
			debug!(?manner, "quitting worker");
			match manner {
				QuitManner::Abort => break,
				QuitManner::Graceful { signal, grace } => {
					debug!(?signal, ?grace, "quitting worker gracefully");
					let mut tasks = LateJoinSet::default();
					for (id, job) in jobs.drain() {
						trace!(?id, "quitting job");
						tasks.spawn(async move {
							job.stop_with_signal(signal, grace);
							job.delete().await;
						});
					}
					// TODO: spawn to process actions, and allow events to come in while
					//       waiting for graceful shutdown, e.g. a second Ctrl-C to hasten
					debug!("waiting for graceful shutdown tasks");
					tasks.join_all().await;
					debug!("waiting for job tasks to end");
					jobtasks.join_all().await;
					break;
				}
			}
		}

		let gc: Vec<Id> = jobs
			.iter()
			.filter_map(|(id, job)| {
				if job.is_dead() {
					trace!(?id, "job is dead, gc'ing");
					Some(*id)
				} else {
					None
				}
			})
			.collect();
		if !gc.is_empty() {
			debug!("garbage collect old tasks");
			for id in gc {
				jobs.remove(&id);
			}
		}

		debug!("action handler finished");
	}

	debug!("action worker finished");
	Ok(())
}

pub async fn throttle_collect(
	config: Arc<Config>,
	events: priority::Receiver<Event, Priority>,
	errors: mpsc::Sender<RuntimeError>,
	mut last: Instant,
) -> Result<Option<Vec<Event>>, CriticalError> {
	if events.is_closed() {
		trace!("events channel closed, stopping");
		return Ok(None);
	}

	let mut set: Vec<Event> = vec![];
	loop {
		let maxtime = if set.is_empty() {
			trace!("nothing in set, waiting forever for next event");
			Duration::from_secs(u64::MAX)
		} else {
			config.throttle.get().saturating_sub(last.elapsed())
		};

		if maxtime.is_zero() {
			if set.is_empty() {
				trace!("out of throttle but nothing to do, resetting");
				last = Instant::now();
				continue;
			}

			trace!("out of throttle on recycle");
		} else {
			trace!(?maxtime, "waiting for event");
			let maybe_event = timeout(maxtime, events.recv()).await;
			if events.is_closed() {
				trace!("events channel closed during timeout, stopping");
				return Ok(None);
			}

			match maybe_event {
				Err(_timeout) => {
					trace!("timed out, cycling");
					continue;
				}
				Ok(Err(_empty)) => return Ok(None),
				Ok(Ok((event, priority))) => {
					trace!(?event, ?priority, "got event");

					if priority == Priority::Urgent {
						trace!("urgent event, by-passing filters");
					} else if event.is_empty() {
						trace!("empty event, by-passing filters");
					} else {
						let filtered = config.filterer.check_event(&event, priority);
						match filtered {
							Err(err) => {
								trace!(%err, "filter errored on event");
								errors.send(err).await?;
								continue;
							}
							Ok(false) => {
								trace!("filter rejected event");
								continue;
							}
							Ok(true) => {
								trace!("filter passed event");
							}
						}
					}

					if set.is_empty() {
						trace!("event is the first, resetting throttle window");
						last = Instant::now();
					}

					set.push(event);

					if priority == Priority::Urgent {
						trace!("urgent event, by-passing throttle");
					} else {
						let elapsed = last.elapsed();
						if elapsed < config.throttle.get() {
							trace!(?elapsed, "still within throttle window, cycling");
							continue;
						}
					}
				}
			}
		}

		return Ok(Some(set));
	}
}