Skip to main content

xtask_watch/
lib.rs

1#![doc = include_str!("../README.md")]
2#![deny(missing_docs)]
3
4use anyhow::{Context, Result};
5use clap::Parser;
6use glob::Pattern;
7use lazy_static::lazy_static;
8use notify::Watcher as _;
9use std::{
10    env, io,
11    path::{Path, PathBuf},
12    process::{Child, Command, ExitStatus},
13    sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, mpsc},
14    thread,
15    time::{Duration, Instant},
16};
17
18pub use anyhow;
19pub use cargo_metadata;
20pub use cargo_metadata::camino;
21pub use clap;
22
23/// Fetch the metadata of the crate.
24pub fn metadata() -> &'static cargo_metadata::Metadata {
25    lazy_static! {
26        static ref METADATA: cargo_metadata::Metadata = cargo_metadata::MetadataCommand::new()
27            .exec()
28            .expect("cannot get crate's metadata");
29    }
30
31    &METADATA
32}
33
34/// Fetch information of a package in the current crate.
35pub fn package(name: &str) -> Option<&cargo_metadata::Package> {
36    metadata().packages.iter().find(|x| x.name == name)
37}
38
39/// Return a [`std::process::Command`] of the xtask command currently running.
40pub fn xtask_command() -> Command {
41    Command::new(env::args_os().next().unwrap())
42}
43
44/// Watches over your project's source code, relaunching a given command when
45/// changes are detected.
46///
47/// Use [`Watch::lock`] to obtain a [`WatchLock`] that can be shared with external
48/// code (e.g. an HTTP server) to coordinate reads with ongoing rebuilds.
49#[non_exhaustive]
50#[derive(Clone, Debug, Default, Parser)]
51#[clap(about = "Watches over your project's source code.")]
52pub struct Watch {
53    /// Shell command(s) to execute on changes.
54    #[clap(long = "shell", short = 's')]
55    pub shell_commands: Vec<String>,
56    /// Cargo command(s) to execute on changes.
57    ///
58    /// The default is `[ check ]`
59    #[clap(long = "exec", short = 'x')]
60    pub cargo_commands: Vec<String>,
61    /// Watch specific file(s) or folder(s).
62    ///
63    /// The default is the workspace root.
64    #[clap(long = "watch", short = 'w')]
65    pub watch_paths: Vec<PathBuf>,
66    /// Paths or glob patterns that will be excluded.
67    ///
68    /// Relative values are resolved from the current working directory.
69    #[clap(long = "ignore", short = 'i')]
70    pub exclude_paths: Vec<PathBuf>,
71    /// Paths or glob patterns, relative to the workspace root, that will be excluded.
72    #[clap(skip)]
73    pub workspace_exclude_paths: Vec<PathBuf>,
74    /// Quiet period after the last detected change before the command is
75    /// (re)started. If another change arrives while a build is running the
76    /// build is cancelled and the timer resets, so only the latest state is
77    /// ever built.
78    ///
79    /// The default is 1 second.
80    #[clap(skip = Duration::from_secs(1))]
81    pub debounce: Duration,
82    #[clap(skip)]
83    exclude_globs: Vec<Pattern>,
84    #[clap(skip)]
85    workspace_exclude_globs: Vec<Pattern>,
86    #[clap(skip)]
87    watch_lock: WatchLock,
88}
89
90impl Watch {
91    /// Add a path to watch for changes.
92    pub fn watch_path(mut self, path: impl AsRef<Path>) -> Self {
93        self.watch_paths.push(path.as_ref().to_path_buf());
94        self
95    }
96
97    /// Add multiple paths to watch for changes.
98    pub fn watch_paths(mut self, paths: impl IntoIterator<Item = impl AsRef<Path>>) -> Self {
99        for path in paths {
100            self.watch_paths.push(path.as_ref().to_path_buf())
101        }
102        self
103    }
104
105    /// Add a path that will be ignored if changes are detected.
106    pub fn exclude_path(mut self, path: impl AsRef<Path>) -> Self {
107        self.exclude_paths.push(path.as_ref().to_path_buf());
108        self
109    }
110
111    /// Add multiple paths that will be ignored if changes are detected.
112    pub fn exclude_paths(mut self, paths: impl IntoIterator<Item = impl AsRef<Path>>) -> Self {
113        for path in paths {
114            self.exclude_paths.push(path.as_ref().to_path_buf());
115        }
116        self
117    }
118
119    /// Add a path, relative to the workspace, that will be ignored if changes
120    /// are detected.
121    pub fn exclude_workspace_path(mut self, path: impl AsRef<Path>) -> Self {
122        self.workspace_exclude_paths
123            .push(path.as_ref().to_path_buf());
124        self
125    }
126
127    /// Add multiple paths, relative to the workspace, that will be ignored if
128    /// changes are detected.
129    pub fn exclude_workspace_paths(
130        mut self,
131        paths: impl IntoIterator<Item = impl AsRef<Path>>,
132    ) -> Self {
133        for path in paths {
134            self.workspace_exclude_paths
135                .push(path.as_ref().to_path_buf());
136        }
137        self
138    }
139
140    /// Return the shared lock used by this watcher.
141    ///
142    /// Clone and share this lock with external code (e.g. HTTP handlers) to coordinate with
143    /// watch-driven command execution.
144    ///
145    /// # Lock lifecycle
146    ///
147    /// [`run`](Self::run) acquires the **write lock immediately** when it is called — before the
148    /// first command is even spawned. Any code that calls [`WatchLock::acquire`] will therefore
149    /// block until the first build completes. This is intentional: it prevents readers from
150    /// observing an empty or incomplete dist directory before the initial build has finished.
151    /// The write lock is then re-acquired on every subsequent rebuild and released once the
152    /// command sequence succeeds.
153    #[must_use = "store and share the lock with readers that must coordinate with rebuilds"]
154    pub fn lock(&self) -> WatchLock {
155        self.watch_lock.clone()
156    }
157
158    /// Set the debounce quiet period.
159    ///
160    /// The command will not start (or restart) until no change has been
161    /// detected for this duration. The default is 1 second.
162    pub fn debounce(mut self, duration: Duration) -> Self {
163        self.debounce = duration;
164        self
165    }
166
167    /// Run the given `command`, monitor the watched paths and relaunch the
168    /// command when changes are detected.
169    ///
170    /// The command starts immediately. If a change is detected while it is
171    /// running, the command is cancelled and the debounce timer resets; the
172    /// command only restarts once the source tree has been quiet for the
173    /// configured [`debounce`](Self::debounce) duration.
174    ///
175    /// Workspace's `target` directory and hidden paths are excluded by default.
176    pub fn run(mut self, commands: impl Into<CommandList>) -> Result<()> {
177        let metadata = metadata();
178        let list = commands.into();
179
180        {
181            let mut commands = list
182                .commands
183                .lock()
184                .expect("no panic-prone code runs while this lock is held");
185
186            commands.extend(self.shell_commands.iter().map(|x| {
187                let mut command =
188                    Command::new(env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string()));
189                command.arg("-c");
190                command.arg(x);
191
192                command
193            }));
194
195            commands.extend(self.cargo_commands.iter().map(|x| {
196                let mut command =
197                    Command::new(env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string()));
198                command.arg("-c");
199                command.arg(format!("cargo {x}"));
200
201                command
202            }));
203        }
204
205        self.prepare_excludes()?;
206
207        if self.watch_paths.is_empty() {
208            self.watch_paths
209                .push(metadata.workspace_root.clone().into_std_path_buf());
210        }
211
212        self.watch_paths = self
213            .watch_paths
214            .into_iter()
215            .map(|x| {
216                x.canonicalize()
217                    .with_context(|| format!("can't find {}", x.display()))
218            })
219            .collect::<Result<Vec<_>, _>>()?;
220
221        let (tx, rx) = mpsc::channel();
222
223        let handler = WatchEventHandler {
224            watch: self.clone(),
225            tx: tx.clone(),
226        };
227
228        let mut watcher =
229            notify::recommended_watcher(handler).context("could not initialize watcher")?;
230
231        for path in &self.watch_paths {
232            match watcher.watch(path, notify::RecursiveMode::Recursive) {
233                Ok(()) => log::trace!("Watching {}", path.display()),
234                Err(err) => log::error!("cannot watch {}: {err}", path.display()),
235            }
236        }
237
238        let mut current_child = SharedChild::new();
239        let mut lock_guard = Some(self.watch_lock.write());
240        let mut generation: u64 = 0;
241
242        // `pending_build` tracks whether a change has arrived that has not yet
243        // been translated into a spawned command.  It starts as `true` so the
244        // first build fires immediately without waiting for a file-change event.
245        let mut pending_build = true;
246
247        loop {
248            if pending_build {
249                pending_build = false;
250                log::info!("Running command");
251                let mut current_child = current_child.clone();
252                let mut list = list.clone();
253                let tx = tx.clone();
254                let build_id = generation;
255                thread::spawn(move || {
256                    let mut status = ExitStatus::default();
257
258                    list.spawn(|res| match res {
259                        Err(err) => {
260                            log::error!("Could not execute command: {err}");
261                            false
262                        }
263                        Ok(child) => {
264                            log::trace!("Child spawned PID: {}", child.id());
265                            current_child.replace(child);
266                            status = current_child.wait();
267                            status.success()
268                        }
269                    });
270
271                    if status.success() {
272                        log::info!("Command succeeded.");
273                        tx.send(Event::CommandSucceeded(build_id))
274                            .expect("can send");
275                    } else if let Some(code) = status.code() {
276                        log::error!("Command failed (exit code: {code})");
277                    } else {
278                        log::error!("Command failed.");
279                    }
280                });
281            }
282
283            // Drain all events that arrive within the debounce window.  Each
284            // new event resets the timer; we only (re)build once things have
285            // been quiet for `debounce`.
286            loop {
287                match rx.recv_timeout(self.debounce) {
288                    Ok(Event::ChangeDetected) => {
289                        log::trace!("Change detected, resetting debounce timer");
290                        if !pending_build {
291                            // Cancel any in-progress build immediately so we
292                            // build the latest version, not an intermediate one.
293                            current_child.terminate();
294                            generation += 1;
295                            if lock_guard.is_none() {
296                                lock_guard = Some(self.watch_lock.write());
297                            }
298                            pending_build = true;
299                        }
300                        // Loop back to reset the recv_timeout.
301                    }
302                    Ok(Event::CommandSucceeded(build_id)) if build_id == generation => {
303                        log::trace!("Command succeeded, releasing lock");
304                        lock_guard.take();
305                        // Continue waiting for the next change.
306                    }
307                    Ok(Event::CommandSucceeded(build_id)) => {
308                        log::trace!(
309                            "Ignoring stale success from build {build_id} (current: {generation})"
310                        );
311                    }
312                    Err(mpsc::RecvTimeoutError::Timeout) => {
313                        // Quiet for `debounce` — time to build if there is a
314                        // pending change.
315                        if pending_build {
316                            break;
317                        }
318                    }
319                    Err(mpsc::RecvTimeoutError::Disconnected) => {
320                        current_child.terminate();
321                        return Ok(());
322                    }
323                }
324            }
325        }
326    }
327
328    fn is_excluded_path(&self, path: &Path) -> bool {
329        if self.exclude_paths.iter().any(|x| path.starts_with(x)) {
330            return true;
331        }
332
333        if self.exclude_globs.iter().any(|p| p.matches_path(path)) {
334            return true;
335        }
336
337        if let Ok(stripped_path) = path.strip_prefix(metadata().workspace_root.as_std_path()) {
338            if self
339                .workspace_exclude_paths
340                .iter()
341                .any(|x| stripped_path.starts_with(x))
342            {
343                return true;
344            }
345
346            if self
347                .workspace_exclude_globs
348                .iter()
349                .any(|p| p.matches_path(stripped_path))
350            {
351                return true;
352            }
353        }
354
355        false
356    }
357
358    fn is_hidden_path(&self, path: &Path) -> bool {
359        self.watch_paths.iter().any(|x| {
360            path.strip_prefix(x)
361                .iter()
362                .any(|x| x.to_string_lossy().starts_with('.'))
363        })
364    }
365
366    fn is_backup_file(&self, path: &Path) -> bool {
367        self.watch_paths.iter().any(|x| {
368            path.strip_prefix(x)
369                .iter()
370                .any(|x| x.to_string_lossy().ends_with('~'))
371        })
372    }
373
374    fn is_glob_pattern(path: &Path) -> bool {
375        let s = path.as_os_str().to_string_lossy();
376        s.contains('*') || s.contains('?') || (!cfg!(windows) && s.contains('['))
377    }
378
379    fn compile_glob(path: &Path) -> Result<Pattern> {
380        let pattern = path
381            .to_str()
382            .with_context(|| format!("glob pattern must be valid UTF-8: {}", path.display()))?;
383
384        Pattern::new(pattern).with_context(|| format!("invalid glob pattern: `{}`", path.display()))
385    }
386
387    fn prepare_excludes(&mut self) -> Result<()> {
388        let metadata = metadata();
389        self.exclude_paths
390            .push(metadata.target_directory.clone().into_std_path_buf());
391
392        let current_dir = env::current_dir().context("failed to get current directory")?;
393        let mut exclude_paths = Vec::new();
394        for path in self.exclude_paths.iter() {
395            if Self::is_glob_pattern(path) {
396                let absolute = if path.is_absolute() {
397                    path.to_path_buf()
398                } else {
399                    current_dir.join(path)
400                };
401                self.exclude_globs.push(Self::compile_glob(&absolute)?);
402            } else {
403                let canonical = path
404                    .canonicalize()
405                    .with_context(|| format!("can't find `{}`", path.display()))?;
406                exclude_paths.push(canonical);
407            }
408        }
409        self.exclude_paths = exclude_paths;
410
411        let workspace_root = metadata.workspace_root.as_std_path();
412        let mut workspace_exclude_paths = Vec::new();
413        for path in self.workspace_exclude_paths.iter() {
414            let path = if path.is_absolute() {
415                path.strip_prefix(workspace_root)
416                    .with_context(|| {
417                        format!(
418                            "workspace exclude path must be inside workspace root: `{}`",
419                            path.display()
420                        )
421                    })?
422                    .to_path_buf()
423            } else {
424                path.to_path_buf()
425            };
426
427            if Self::is_glob_pattern(&path) {
428                self.workspace_exclude_globs
429                    .push(Self::compile_glob(&path)?);
430            } else {
431                workspace_exclude_paths.push(path);
432            }
433        }
434        self.workspace_exclude_paths = workspace_exclude_paths;
435
436        Ok(())
437    }
438}
439
440struct WatchEventHandler {
441    watch: Watch,
442    tx: mpsc::Sender<Event>,
443}
444
445impl notify::EventHandler for WatchEventHandler {
446    fn handle_event(&mut self, event: Result<notify::Event, notify::Error>) {
447        match event {
448            Ok(event) => {
449                if (event.kind.is_modify() || event.kind.is_create())
450                    && event.paths.iter().any(|x| {
451                        !self.watch.is_excluded_path(x)
452                            && x.exists()
453                            && !self.watch.is_hidden_path(x)
454                            && !self.watch.is_backup_file(x)
455                    })
456                {
457                    log::trace!("Changes detected in {event:?}");
458                    self.tx.send(Event::ChangeDetected).expect("can send");
459                } else {
460                    log::trace!("Ignoring changes in {event:?}");
461                }
462            }
463            Err(err) => log::error!("watch error: {err}"),
464        }
465    }
466}
467
468#[derive(Debug, Clone)]
469struct SharedChild {
470    child: Arc<Mutex<Option<Child>>>,
471}
472
473impl SharedChild {
474    fn new() -> Self {
475        Self {
476            child: Default::default(),
477        }
478    }
479
480    fn replace(&mut self, child: impl Into<Option<Child>>) {
481        *self
482            .child
483            .lock()
484            .expect("no panic-prone code runs while this lock is held") = child.into();
485    }
486
487    fn wait(&mut self) -> ExitStatus {
488        loop {
489            let mut child = self
490                .child
491                .lock()
492                .expect("no panic-prone code runs while this lock is held");
493            match child.as_mut().map(|child| child.try_wait()) {
494                Some(Ok(Some(status))) => {
495                    break status;
496                }
497                Some(Ok(None)) => {
498                    drop(child);
499                    thread::sleep(Duration::from_millis(10));
500                }
501                Some(Err(err)) => {
502                    log::error!("could not wait for child process: {err}");
503                    break Default::default();
504                }
505                None => {
506                    break Default::default();
507                }
508            }
509        }
510    }
511
512    fn terminate(&mut self) {
513        if let Some(child) = self
514            .child
515            .lock()
516            .expect("no panic-prone code runs while this lock is held")
517            .as_mut()
518        {
519            #[cfg(unix)]
520            {
521                let killing_start = Instant::now();
522
523                unsafe {
524                    log::trace!("sending SIGTERM to {}", child.id());
525                    libc::kill(child.id() as _, libc::SIGTERM);
526                }
527
528                while killing_start.elapsed().as_secs() < 2 {
529                    std::thread::sleep(Duration::from_millis(200));
530                    if let Ok(Some(_)) = child.try_wait() {
531                        break;
532                    }
533                }
534            }
535
536            match child.try_wait() {
537                Ok(Some(_)) => {}
538                _ => {
539                    log::trace!("killing {}", child.id());
540                    let _ = child.kill();
541                    let _ = child.wait();
542                }
543            }
544        } else {
545            log::trace!("nothing to terminate");
546        }
547    }
548}
549
550/// A list of commands to run.
551#[derive(Debug, Clone)]
552pub struct CommandList {
553    commands: Arc<Mutex<Vec<Command>>>,
554}
555
556impl From<Command> for CommandList {
557    fn from(command: Command) -> Self {
558        Self {
559            commands: Arc::new(Mutex::new(vec![command])),
560        }
561    }
562}
563
564impl From<Vec<Command>> for CommandList {
565    fn from(commands: Vec<Command>) -> Self {
566        Self {
567            commands: Arc::new(Mutex::new(commands)),
568        }
569    }
570}
571
572impl<const SIZE: usize> From<[Command; SIZE]> for CommandList {
573    fn from(commands: [Command; SIZE]) -> Self {
574        Self {
575            commands: Arc::new(Mutex::new(Vec::from(commands))),
576        }
577    }
578}
579
580impl CommandList {
581    /// Returns `true` if the list is empty.
582    pub fn is_empty(&self) -> bool {
583        self.commands
584            .lock()
585            .expect("no panic-prone code runs while this lock is held")
586            .is_empty()
587    }
588
589    /// Spawn each command of the list one after the other.
590    ///
591    /// The caller is responsible to wait the commands.
592    pub fn spawn(&mut self, mut callback: impl FnMut(io::Result<Child>) -> bool) {
593        for process in self
594            .commands
595            .lock()
596            .expect("no panic-prone code runs while this lock is held")
597            .iter_mut()
598        {
599            if !callback(process.spawn()) {
600                break;
601            }
602        }
603    }
604
605    /// Run all the commands sequentially using [`std::process::Command::status`] and stop at the
606    /// first failure.
607    pub fn status(&mut self) -> io::Result<ExitStatus> {
608        for process in self
609            .commands
610            .lock()
611            .expect("no panic-prone code runs while this lock is held")
612            .iter_mut()
613        {
614            let exit_status = process.status()?;
615            if !exit_status.success() {
616                return Ok(exit_status);
617            }
618        }
619        Ok(Default::default())
620    }
621}
622
623/// Guard returned by [`WatchLock::acquire`].
624///
625/// Keep this value alive for the duration of the protected read section.
626/// The lock is released automatically when the guard is dropped.
627pub struct WatchLockGuard<'a> {
628    _guard: RwLockReadGuard<'a, ()>,
629}
630
631/// A lock handle used to coordinate file reads with watch-driven rebuilds.
632///
633/// Obtain it from [`Watch::lock`], clone it, and call [`WatchLock::acquire`] while
634/// reading files that must not race with rebuild writes.
635#[derive(Clone, Debug, Default)]
636pub struct WatchLock(Arc<RwLock<()>>);
637
638impl WatchLock {
639    /// Acquire shared access to the protected section.
640    ///
641    /// Multiple readers may hold this guard concurrently.
642    pub fn acquire(&self) -> WatchLockGuard<'_> {
643        WatchLockGuard {
644            // The inner value is `()` — there is no data to corrupt, so we can
645            // always recover from a poisoned lock.
646            _guard: self.0.read().unwrap_or_else(|e| e.into_inner()),
647        }
648    }
649
650    fn write(&self) -> RwLockWriteGuard<'_, ()> {
651        // The inner value is `()` — there is no data to corrupt, so we can
652        // always recover from a poisoned lock.
653        self.0.write().unwrap_or_else(|e| e.into_inner())
654    }
655}
656
657#[derive(Debug)]
658enum Event {
659    CommandSucceeded(u64),
660    ChangeDetected,
661}
662
663#[cfg(test)]
664mod test {
665    use super::*;
666
667    #[test]
668    fn exclude_relative_path() {
669        let watch = Watch::default().exclude_workspace_path("src/watch.rs");
670
671        assert!(
672            watch.is_excluded_path(
673                metadata()
674                    .workspace_root
675                    .join("src")
676                    .join("watch.rs")
677                    .as_std_path()
678            )
679        );
680        assert!(!watch.is_excluded_path(metadata().workspace_root.join("src").as_std_path()));
681    }
682
683    #[test]
684    fn exclude_absolute_glob_path() {
685        let absolute = metadata()
686            .workspace_root
687            .join("src")
688            .join("**")
689            .join("*.rs");
690
691        let mut watch = Watch::default().exclude_path(absolute);
692        watch
693            .prepare_excludes()
694            .expect("exclude parsing should succeed");
695        assert_eq!(watch.exclude_globs.len(), 1);
696
697        assert!(
698            watch.is_excluded_path(
699                metadata()
700                    .workspace_root
701                    .join("src")
702                    .join("lib.rs")
703                    .as_std_path()
704            )
705        );
706    }
707
708    #[test]
709    fn exclude_workspace_glob_path() {
710        let mut watch = Watch::default().exclude_workspace_path("src/**/*.rs");
711        watch
712            .prepare_excludes()
713            .expect("exclude parsing should succeed");
714        assert_eq!(watch.workspace_exclude_globs.len(), 1);
715
716        assert!(
717            watch.is_excluded_path(
718                metadata()
719                    .workspace_root
720                    .join("src")
721                    .join("lib.rs")
722                    .as_std_path()
723            )
724        );
725    }
726
727    #[test]
728    fn exclude_workspace_absolute_glob_path() {
729        let absolute = metadata()
730            .workspace_root
731            .join("src")
732            .join("**")
733            .join("*.rs");
734        let mut watch = Watch::default().exclude_workspace_path(absolute);
735        watch
736            .prepare_excludes()
737            .expect("exclude parsing should succeed");
738
739        assert_eq!(watch.workspace_exclude_globs.len(), 1);
740        assert!(
741            watch.is_excluded_path(
742                metadata()
743                    .workspace_root
744                    .join("src")
745                    .join("lib.rs")
746                    .as_std_path()
747            )
748        );
749    }
750
751    #[test]
752    fn exclude_workspace_glob_non_match() {
753        let mut watch = Watch::default().exclude_workspace_path("tests/**/*.rs");
754        watch
755            .prepare_excludes()
756            .expect("exclude parsing should succeed");
757
758        assert!(
759            !watch.is_excluded_path(
760                metadata()
761                    .workspace_root
762                    .join("src")
763                    .join("lib.rs")
764                    .as_std_path()
765            )
766        );
767    }
768
769    #[test]
770    fn glob_detection() {
771        assert!(Watch::is_glob_pattern(Path::new("src/**/*.rs")));
772        assert!(Watch::is_glob_pattern(Path::new("foo?.rs")));
773
774        #[cfg(not(windows))]
775        assert!(Watch::is_glob_pattern(Path::new("[ab].rs")));
776        #[cfg(windows)]
777        assert!(!Watch::is_glob_pattern(Path::new("[ab].rs")));
778
779        assert!(!Watch::is_glob_pattern(Path::new("src/lib.rs")));
780    }
781
782    #[test]
783    fn invalid_glob_pattern() {
784        let err = Watch::compile_glob(Path::new("[abc")).expect_err("should fail");
785        assert!(
786            err.to_string().contains("invalid glob pattern"),
787            "unexpected error: {err}"
788        );
789    }
790
791    #[test]
792    fn command_list_froms() {
793        let _: CommandList = Command::new("foo").into();
794        let _: CommandList = vec![Command::new("foo")].into();
795        let _: CommandList = [Command::new("foo")].into();
796    }
797}