Skip to main content

nu_command/filesystem/
watch.rs

1use itertools::{Either, Itertools};
2use notify_debouncer_full::{
3    DebouncedEvent, Debouncer, FileIdMap, new_debouncer,
4    notify::{
5        self, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
6        event::{DataChange, ModifyKind, RenameMode},
7    },
8};
9use nu_engine::{ClosureEval, command_prelude::*};
10use nu_protocol::{
11    Signals, engine::Closure, report_shell_error, shell_error::generic::GenericError,
12    shell_error::io::IoError,
13};
14
15use std::{
16    borrow::Cow,
17    path::{Path, PathBuf},
18    sync::mpsc::{Receiver, RecvTimeoutError, channel},
19    time::Duration,
20};
21
22// durations chosen mostly arbitrarily
23const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
24const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
25
26#[derive(Clone)]
27pub struct Watch;
28
29impl Command for Watch {
30    fn name(&self) -> &str {
31        "watch"
32    }
33
34    fn description(&self) -> &str {
35        "Watch for file changes and execute Nu code when they happen."
36    }
37
38    fn extra_description(&self) -> &str {
39        "When run without a closure, `watch` returns a stream of events instead."
40    }
41
42    fn search_terms(&self) -> Vec<&str> {
43        vec!["watcher", "reload", "filesystem"]
44    }
45
46    fn signature(&self) -> nu_protocol::Signature {
47        Signature::build("watch")
48            .input_output_types(vec![
49                (Type::Nothing, Type::Nothing),
50                (
51                    Type::Nothing,
52                    Type::Table(vec![
53                        ("operation".into(), Type::String),
54                        ("path".into(), Type::String),
55                        ("new_path".into(), Type::String),
56                    ].into_boxed_slice())
57                ),
58            ])
59            .required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.")
60            .optional(
61                "closure",
62                SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
63                "Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.",
64            )
65            .named(
66                "debounce",
67                SyntaxShape::Duration,
68                "Debounce changes for this duration (default: 100ms). Adjust if you find that single writes are reported as multiple events.",
69                Some('d'),
70            )
71            .named(
72                "glob",
73                SyntaxShape::String, // SyntaxShape::GlobPattern gets interpreted relative to cwd, so use String instead
74                "Only report changes for files that match this glob pattern (default: all files)",
75                Some('g'),
76            )
77            .named(
78                "recursive",
79                SyntaxShape::Boolean,
80                "Watch all directories under `<path>` recursively. Will be ignored if `<path>` is a file (default: true).",
81                Some('r'),
82            )
83            .switch("quiet", "Hide the initial status message (default: false).", Some('q'))
84            .switch("verbose", "Operate in verbose mode (default: false).", Some('v'))
85            .category(Category::FileSystem)
86    }
87
88    fn run(
89        &self,
90        engine_state: &EngineState,
91        stack: &mut Stack,
92        call: &Call,
93        _input: PipelineData,
94    ) -> Result<PipelineData, ShellError> {
95        let head = call.head;
96        let cwd = engine_state.cwd_as_string(Some(stack))?;
97        let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
98
99        let path_no_whitespace = path_arg
100            .item
101            .trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
102
103        let path = nu_path::absolute_with(path_no_whitespace, cwd).map_err(|err| {
104            ShellError::Io(IoError::new(
105                err,
106                path_arg.span,
107                PathBuf::from(path_no_whitespace),
108            ))
109        })?;
110
111        let closure: Option<Closure> = call.opt(engine_state, stack, 1)?;
112        let verbose = call.has_flag(engine_state, stack, "verbose")?;
113        let quiet = call.has_flag(engine_state, stack, "quiet")?;
114        let debounce_duration: Duration = call
115            .get_flag(engine_state, stack, "debounce")?
116            .unwrap_or(DEFAULT_WATCH_DEBOUNCE_DURATION);
117
118        let glob_flag: Option<Spanned<String>> = call.get_flag(engine_state, stack, "glob")?;
119        let glob_pattern = glob_flag
120            .map(|glob| {
121                let absolute_path = path.join(glob.item);
122                if verbose {
123                    eprintln!("Absolute glob path: {absolute_path:?}");
124                }
125
126                nu_glob::Pattern::new(&absolute_path.to_string_lossy()).map_err(|_| {
127                    ShellError::TypeMismatch {
128                        err_message: "Glob pattern is invalid".to_string(),
129                        span: glob.span,
130                    }
131                })
132            })
133            .transpose()?;
134
135        let recursive_flag: Option<Spanned<bool>> =
136            call.get_flag(engine_state, stack, "recursive")?;
137        let recursive_mode = match recursive_flag {
138            Some(recursive) => {
139                if recursive.item {
140                    RecursiveMode::Recursive
141                } else {
142                    RecursiveMode::NonRecursive
143                }
144            }
145            None => RecursiveMode::Recursive,
146        };
147
148        let (tx, rx) = channel();
149
150        let mut debouncer = new_debouncer(debounce_duration, None, tx).map_err(|err| {
151            ShellError::Generic(GenericError::new(
152                "Failed to create watcher",
153                err.to_string(),
154                call.head,
155            ))
156        })?;
157
158        if let Err(err) = debouncer.watcher().watch(&path, recursive_mode) {
159            return Err(ShellError::Generic(GenericError::new(
160                "Failed to create watcher",
161                err.to_string(),
162                call.head,
163            )));
164        }
165        // need to cache to make sure that rename event works.
166        debouncer.cache().add_root(&path, recursive_mode);
167
168        if !quiet {
169            eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
170        }
171
172        let iter = WatchIterator::new(debouncer, rx, engine_state.signals().clone());
173
174        if let Some(closure) = closure {
175            let mut closure = ClosureEval::new(engine_state, stack, closure);
176
177            for events in iter {
178                for event in events? {
179                    let matches_glob = match &glob_pattern {
180                        Some(glob) => glob.matches_path(&event.path),
181                        None => true,
182                    };
183                    if verbose && glob_pattern.is_some() {
184                        eprintln!("Matches glob: {matches_glob}");
185                    }
186
187                    if matches_glob {
188                        let result = closure
189                            .add_arg(event.operation.into_value(head))
190                            .add_arg(event.path.to_string_lossy().into_value(head))
191                            .add_arg(
192                                event
193                                    .new_path
194                                    .as_deref()
195                                    .map(Path::to_string_lossy)
196                                    .into_value(head),
197                            )
198                            .run_with_input(PipelineData::empty());
199
200                        match result {
201                            Ok(val) => val.print_table(engine_state, stack, false, false)?,
202                            Err(err) => report_shell_error(Some(stack), engine_state, &err),
203                        };
204                    }
205                }
206            }
207
208            Ok(PipelineData::empty())
209        } else {
210            fn glob_filter(glob: Option<&nu_glob::Pattern>, path: &Path) -> bool {
211                let Some(glob) = glob else { return true };
212                glob.matches_path(path)
213            }
214
215            let out = iter
216                .flat_map(|e| match e {
217                    Ok(events) => Either::Right(events.into_iter().map(Ok)),
218                    Err(err) => Either::Left(std::iter::once(Err(err))),
219                })
220                .filter_map(move |e| match e {
221                    Ok(ev) => glob_filter(glob_pattern.as_ref(), &ev.path)
222                        .then(|| WatchEventRecord::from(&ev).into_value(head)),
223                    Err(err) => Some(Value::error(err, head)),
224                })
225                .into_pipeline_data(head, engine_state.signals().clone());
226            Ok(out)
227        }
228    }
229
230    fn examples(&self) -> Vec<Example<'_>> {
231        vec![
232            Example {
233                description: "Run `cargo test` whenever a Rust file changes.",
234                example: "watch . --glob=**/*.rs {|| cargo test }",
235                result: None,
236            },
237            Example {
238                description: "Watch all changes in the current directory.",
239                example: r#"watch . { |op, path, new_path| $"($op) ($path) ($new_path)"}"#,
240                result: None,
241            },
242            Example {
243                description: "`watch` (when run without a closure) can also emit a stream of events it detects.",
244                example: r#"watch /foo/bar
245    | where operation == Create
246    | first 5
247    | each {|e| $"New file!: ($e.path)" }
248    | to text
249    | save --append changes_in_bar.log"#,
250                result: None,
251            },
252            Example {
253                description: "Print file changes with a debounce time of 5 minutes.",
254                example: r#"watch /foo/bar --debounce 5min { |op, path| $"Registered ($op) on ($path)" | print }"#,
255                result: None,
256            },
257            Example {
258                description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep.",
259                example: "loop { command; sleep duration }",
260                result: None,
261            },
262        ]
263    }
264}
265
266struct WatchEvent {
267    operation: &'static str,
268    path: PathBuf,
269    new_path: Option<PathBuf>,
270}
271
272#[derive(IntoValue)]
273struct WatchEventRecord<'a> {
274    operation: &'static str,
275    path: Cow<'a, str>,
276    new_path: Option<Cow<'a, str>>,
277}
278
279impl<'a> From<&'a WatchEvent> for WatchEventRecord<'a> {
280    fn from(value: &'a WatchEvent) -> Self {
281        Self {
282            operation: value.operation,
283            path: value.path.to_string_lossy(),
284            new_path: value.new_path.as_deref().map(Path::to_string_lossy),
285        }
286    }
287}
288
289impl TryFrom<DebouncedEvent> for WatchEvent {
290    type Error = ();
291
292    fn try_from(mut ev: DebouncedEvent) -> Result<Self, Self::Error> {
293        // TODO: Maybe we should handle all event kinds?
294        match ev.event.kind {
295            EventKind::Create(_) => ev.paths.pop().map(|p| WatchEvent {
296                operation: "Create",
297                path: p,
298                new_path: None,
299            }),
300            EventKind::Remove(_) => ev.paths.pop().map(|p| WatchEvent {
301                operation: "Remove",
302                path: p,
303                new_path: None,
304            }),
305            EventKind::Modify(
306                ModifyKind::Data(DataChange::Content | DataChange::Any) | ModifyKind::Any,
307            ) => ev.paths.pop().map(|p| WatchEvent {
308                operation: "Write",
309                path: p,
310                new_path: None,
311            }),
312            EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => ev
313                .paths
314                .drain(..)
315                .rev()
316                .next_array()
317                .map(|[from, to]| WatchEvent {
318                    operation: "Rename",
319                    path: from,
320                    new_path: Some(to),
321                }),
322            _ => None,
323        }
324        .ok_or(())
325    }
326}
327
328struct WatchIterator {
329    /// Debouncer needs to be kept alive for `rx` to keep receiving events.
330    _debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
331    rx: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
332    signals: Signals,
333}
334
335impl WatchIterator {
336    fn new(
337        debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
338        rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
339        signals: Signals,
340    ) -> Self {
341        Self {
342            _debouncer: debouncer,
343            rx: Some(rx),
344            signals,
345        }
346    }
347}
348
349impl Iterator for WatchIterator {
350    type Item = Result<Vec<WatchEvent>, ShellError>;
351
352    fn next(&mut self) -> Option<Self::Item> {
353        let rx = self.rx.as_ref()?;
354        while !self.signals.interrupted() {
355            let x = match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
356                Ok(x) => x,
357                Err(RecvTimeoutError::Timeout) => continue,
358                Err(RecvTimeoutError::Disconnected) => {
359                    self.rx = None;
360                    return Some(Err(ShellError::Generic(GenericError::new_internal(
361                        "Disconnected",
362                        "Unexpected disconnect from file watcher",
363                    ))));
364                }
365            };
366
367            let Ok(events) = x else {
368                self.rx = None;
369                return Some(Err(ShellError::Generic(GenericError::new_internal(
370                    "Receiving events failed",
371                    "Unexpected errors when receiving events",
372                ))));
373            };
374
375            let watch_events = events
376                .into_iter()
377                .filter_map(|ev| WatchEvent::try_from(ev).ok())
378                .collect::<Vec<_>>();
379
380            return Some(Ok(watch_events));
381        }
382        self.rx = None;
383        None
384    }
385}