Skip to main content

nu_command/filesystem/
watch.rs

1use std::{
2    path::{Path, PathBuf},
3    sync::mpsc::{Receiver, RecvTimeoutError, channel},
4    time::Duration,
5};
6
7use itertools::Either;
8use notify_debouncer_full::{
9    DebouncedEvent, Debouncer, FileIdMap, new_debouncer,
10    notify::{
11        self, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
12        event::{DataChange, ModifyKind, RenameMode},
13    },
14};
15
16use nu_engine::{ClosureEval, command_prelude::*};
17use nu_protocol::{
18    Signals,
19    engine::Closure,
20    report_shell_error, report_shell_warning,
21    shell_error::{generic::GenericError, io::IoError},
22};
23
24// durations chosen mostly arbitrarily
25const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
26const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
27
28#[derive(Clone)]
29pub struct Watch;
30
31impl Command for Watch {
32    fn name(&self) -> &str {
33        "watch"
34    }
35
36    fn description(&self) -> &str {
37        "Watch for file changes and execute Nu code when they happen."
38    }
39
40    fn extra_description(&self) -> &str {
41        "When run without a closure, `watch` returns a stream of events instead."
42    }
43
44    fn search_terms(&self) -> Vec<&str> {
45        vec!["watcher", "reload", "filesystem"]
46    }
47
48    fn signature(&self) -> nu_protocol::Signature {
49        Signature::build("watch")
50            .input_output_types(vec![
51                (Type::Nothing, Type::Nothing),
52                (
53                    Type::Nothing,
54                    Type::Table(
55                        vec![
56                            ("operation".into(), Type::String),
57                            (
58                                "path".into(),
59                                Type::OneOf([Type::String, Type::Nothing].into()),
60                            ),
61                            (
62                                "new_path".into(),
63                                Type::OneOf([Type::String, Type::Nothing].into()),
64                            ),
65                        ]
66                        .into_boxed_slice(),
67                    ),
68                ),
69            ])
70            .required(
71                "path",
72                SyntaxShape::Filepath,
73                "The path to watch. Can be a file or directory.",
74            )
75            .optional(
76                "closure",
77                SyntaxShape::Closure(Some(vec![
78                    SyntaxShape::String,
79                    SyntaxShape::String,
80                    SyntaxShape::String,
81                ])),
82                "Some Nu code to run whenever a file changes. \
83                    The closure will be passed `operation`, `path`, \
84                    and `new_path` (for renames only) arguments in that order (deprecated).",
85            )
86            .named(
87                "debounce",
88                SyntaxShape::Duration,
89                "Debounce changes for this duration (default: 100ms). \
90                    Adjust if you find that single writes are reported as multiple events.",
91                Some('d'),
92            )
93            .named(
94                "glob",
95                // SyntaxShape::GlobPattern gets interpreted relative to cwd, so use String instead
96                SyntaxShape::String,
97                "Only report changes for files that match this glob pattern (default: all files)",
98                Some('g'),
99            )
100            .named(
101                "recursive",
102                SyntaxShape::Boolean,
103                "Watch all directories under `<path>` recursively. \
104                    Will be ignored if `<path>` is a file (default: true).",
105                Some('r'),
106            )
107            .switch(
108                "quiet",
109                "Hide the initial status message (default: false).",
110                Some('q'),
111            )
112            .switch(
113                "verbose",
114                "Operate in verbose mode (default: false).",
115                Some('v'),
116            )
117            .category(Category::FileSystem)
118    }
119
120    fn run(
121        &self,
122        engine_state: &EngineState,
123        stack: &mut Stack,
124        call: &Call,
125        _input: PipelineData,
126    ) -> Result<PipelineData, ShellError> {
127        let head = call.head;
128        let path = {
129            let cwd = engine_state.cwd_as_string(Some(stack))?;
130            let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
131            let path_no_whitespace = path_arg
132                .item
133                .trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
134
135            nu_path::absolute_with(path_no_whitespace, cwd).map_err(|err| {
136                ShellError::Io(IoError::new(
137                    err,
138                    path_arg.span,
139                    PathBuf::from(path_no_whitespace),
140                ))
141            })?
142        };
143        let closure: Option<Spanned<Closure>> = call.opt(engine_state, stack, 1)?;
144        let verbose = call.has_flag(engine_state, stack, "verbose")?;
145        let quiet = call.has_flag(engine_state, stack, "quiet")?;
146        let debounce_duration: Duration = call
147            .get_flag(engine_state, stack, "debounce")?
148            .unwrap_or(DEFAULT_WATCH_DEBOUNCE_DURATION);
149
150        let glob_pattern = call
151            .get_flag::<Spanned<String>>(engine_state, stack, "glob")?
152            .map(|glob| {
153                let absolute_path = path.join(glob.item);
154                if verbose {
155                    eprintln!("Absolute glob path: {absolute_path:?}");
156                }
157
158                nu_glob::Pattern::new(&absolute_path.to_string_lossy()).map_err(|_| {
159                    ShellError::TypeMismatch {
160                        err_message: "Glob pattern is invalid".to_string(),
161                        span: glob.span,
162                    }
163                })
164            })
165            .transpose()?;
166
167        let recursive_mode = {
168            let recursive_flag = call
169                .get_flag::<bool>(engine_state, stack, "recursive")?
170                .unwrap_or(true);
171            match recursive_flag {
172                true => RecursiveMode::Recursive,
173                false => RecursiveMode::NonRecursive,
174            }
175        };
176
177        let iter = {
178            let (tx, rx) = channel();
179            let mut debouncer = new_debouncer(debounce_duration, None, tx)
180                .and_then(|mut debouncer| {
181                    debouncer.watcher().watch(&path, recursive_mode)?;
182                    Ok(debouncer)
183                })
184                .map_err(|err| {
185                    ShellError::Generic(GenericError::new(
186                        "Failed to create watcher",
187                        err.to_string(),
188                        call.head,
189                    ))
190                })?;
191            // need to cache to make sure that rename event works.
192            debouncer.cache().add_root(&path, recursive_mode);
193            WatchIterator::new(debouncer, rx, engine_state.signals().clone())
194        };
195
196        if let Some(closure) = closure {
197            report_shell_warning(
198                Some(stack),
199                engine_state,
200                &ShellWarning::Deprecated {
201                    dep_type: "Argument".into(),
202                    label: "remove this".into(),
203                    span: closure.span,
204                    help: "Since 0.113.0, running a closure with `watch` is deprecated. \
205                            Instead, use `watch` by piping its output to `each` \
206                            or as the source of a `for` loop."
207                        .to_string()
208                        .into(),
209                    report_mode: nu_protocol::ReportMode::FirstUse,
210                },
211            );
212
213            #[allow(deprecated)]
214            run_closure(
215                engine_state,
216                stack,
217                head,
218                quiet,
219                verbose,
220                &path,
221                glob_pattern,
222                iter,
223                closure.item,
224            )
225        } else {
226            let out = iter
227                .flat_map(|e| match e {
228                    Ok(events) => Either::Right(events.into_iter().map(Ok)),
229                    Err(err) => Either::Left(std::iter::once(Err(err))),
230                })
231                .filter_map(move |e| match e {
232                    Ok(ev) if glob_filter(glob_pattern.as_ref(), &ev) => Some(ev.into_value(head)),
233                    Ok(_) => None,
234                    Err(err) => Some(Value::error(err, head)),
235                })
236                .into_pipeline_data(head, engine_state.signals().clone());
237            Ok(out)
238        }
239    }
240
241    fn examples(&self) -> Vec<Example<'_>> {
242        vec![
243            Example {
244                description: "Run `cargo test` whenever a Rust file changes.",
245                example: "for _ in (watch . --glob=**/*.rs) { cargo test }",
246                result: None,
247            },
248            Example {
249                description: "Watch all changes in the current directory.",
250                example: "watch . | each { print }",
251                result: None,
252            },
253            Example {
254                description: "Filter, limit and modify `watch`'s output by using it as part of a pipeline.",
255                example: r#"watch /foo/bar
256    | where operation == Create
257    | first 5
258    | each {|e| $"New file!: ($e.path)" }
259    | to text
260    | save --append changes_in_bar.log"#,
261                result: None,
262            },
263            Example {
264                description: "Print file changes with a debounce time of 5 minutes.",
265                example: r#"watch /foo/bar --debounce 5min | each {|e| $"Registered ($e.operation) on ($e.path)" | print }"#,
266                result: None,
267            },
268            Example {
269                description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep.",
270                example: "loop { command; sleep duration }",
271                result: None,
272            },
273            Example {
274                description: "Run `cargo test` whenever a Rust file changes (with the deprecated closure argument).",
275                example: "watch . --glob=**/*.rs {|| cargo test }",
276                result: None,
277            },
278        ]
279    }
280}
281
282fn glob_filter(glob: Option<&nu_glob::Pattern>, ev: &WatchEvent) -> bool {
283    let Some(glob) = glob else { return true };
284    let path = ev
285        .path
286        .as_deref()
287        .or(ev.new_path.as_deref())
288        .expect("at least one of path or new_path should be present");
289    glob.matches_path(path)
290}
291
292#[allow(clippy::too_many_arguments)]
293#[inline]
294#[deprecated(since = "0.113.0")]
295fn run_closure(
296    engine_state: &EngineState,
297    stack: &mut Stack,
298    head: Span,
299    quiet: bool,
300    verbose: bool,
301    path: &Path,
302    glob_pattern: Option<nu_glob::Pattern>,
303    iter: WatchIterator,
304    closure: Closure,
305) -> Result<PipelineData, ShellError> {
306    if !quiet {
307        eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
308    }
309
310    let mut closure = ClosureEval::new(engine_state, stack, closure);
311    for events in iter {
312        for event in events? {
313            let matches_glob = glob_filter(glob_pattern.as_ref(), &event);
314
315            if verbose && glob_pattern.is_some() {
316                eprintln!("Matches glob: {matches_glob}");
317            }
318
319            if matches_glob {
320                let result = closure
321                    .add_arg(event.operation.into_value(head))?
322                    .add_arg(event.path.into_value(head))?
323                    .add_arg(event.new_path.into_value(head))?
324                    .run_with_input(PipelineData::empty());
325
326                match result {
327                    Ok(val) => val.print_table(engine_state, stack, false, false)?,
328                    Err(err) => report_shell_error(Some(stack), engine_state, &err),
329                };
330            }
331        }
332    }
333
334    Ok(PipelineData::empty())
335}
336
337#[derive(IntoValue)]
338struct WatchEvent {
339    operation: WatchEventKind,
340    path: Option<PathBuf>,
341    new_path: Option<PathBuf>,
342}
343
344#[derive(IntoValue)]
345#[nu_value(rename_all = "UpperCamelCase")]
346enum WatchEventKind {
347    Create,
348    Write,
349    Rename,
350    Remove,
351}
352
353impl TryFrom<EventKind> for WatchEventKind {
354    type Error = ();
355
356    fn try_from(value: EventKind) -> Result<Self, Self::Error> {
357        Ok(match value {
358            EventKind::Create(_) => Self::Create,
359            EventKind::Remove(_) => Self::Remove,
360            EventKind::Modify(
361                ModifyKind::Data(DataChange::Content | DataChange::Any) | ModifyKind::Any,
362            ) => Self::Write,
363            EventKind::Modify(ModifyKind::Name(
364                RenameMode::Both | RenameMode::From | RenameMode::To,
365            )) => Self::Rename,
366            _ => return Err(()),
367        })
368    }
369}
370
371impl TryFrom<DebouncedEvent> for WatchEvent {
372    type Error = ();
373
374    fn try_from(ev: DebouncedEvent) -> Result<Self, Self::Error> {
375        // TODO: Maybe we should handle all event kinds?
376        let DebouncedEvent {
377            event: notify::Event {
378                kind, mut paths, ..
379            },
380            ..
381        } = ev;
382
383        let (path, new_path) = match paths.as_mut_slice() {
384            [path] => (std::mem::take(path), None),
385            [path, new_path] => (std::mem::take(path), Some(std::mem::take(new_path))),
386            _ => return Err(()),
387        };
388
389        if let EventKind::Modify(ModifyKind::Name(RenameMode::To)) = kind {
390            Ok(WatchEvent {
391                operation: WatchEventKind::Rename,
392                path: None,
393                new_path: Some(path),
394            })
395        } else {
396            Ok(WatchEvent {
397                operation: kind.try_into()?,
398                path: Some(path),
399                new_path,
400            })
401        }
402    }
403}
404
405struct WatchIterator {
406    /// Debouncer needs to be kept alive for `rx` to keep receiving events.
407    _debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
408    rx: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
409    signals: Signals,
410}
411
412impl WatchIterator {
413    fn new(
414        debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
415        rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
416        signals: Signals,
417    ) -> Self {
418        Self {
419            _debouncer: debouncer,
420            rx: Some(rx),
421            signals,
422        }
423    }
424}
425
426impl Iterator for WatchIterator {
427    type Item = Result<Vec<WatchEvent>, ShellError>;
428
429    fn next(&mut self) -> Option<Self::Item> {
430        let rx = self.rx.as_ref()?;
431        while !self.signals.interrupted() {
432            let x = match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
433                Ok(x) => x,
434                Err(RecvTimeoutError::Timeout) => continue,
435                Err(RecvTimeoutError::Disconnected) => {
436                    self.rx = None;
437                    return Some(Err(ShellError::Generic(GenericError::new_internal(
438                        "Disconnected",
439                        "Unexpected disconnect from file watcher",
440                    ))));
441                }
442            };
443
444            let Ok(events) = x else {
445                self.rx = None;
446                return Some(Err(ShellError::Generic(GenericError::new_internal(
447                    "Receiving events failed",
448                    "Unexpected errors when receiving events",
449                ))));
450            };
451
452            let watch_events = events
453                .into_iter()
454                .filter_map(|ev| WatchEvent::try_from(ev).ok())
455                .collect::<Vec<_>>();
456
457            return Some(Ok(watch_events));
458        }
459        self.rx = None;
460        None
461    }
462}