nu_command/filesystem/
watch.rs

1use itertools::{Either, Itertools};
2use notify_debouncer_full::{
3    DebouncedEvent, Debouncer, FileIdMap, new_debouncer,
4    notify::{
5        self, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
6        event::{DataChange, ModifyKind, RenameMode},
7    },
8};
9use nu_engine::{ClosureEval, command_prelude::*};
10use nu_protocol::{
11    DeprecationEntry, DeprecationType, ReportMode, Signals, engine::Closure, report_shell_error,
12    shell_error::io::IoError,
13};
14
15use std::{
16    borrow::Cow,
17    path::{Path, PathBuf},
18    sync::mpsc::{Receiver, RecvTimeoutError, channel},
19    time::Duration,
20};
21
22// durations chosen mostly arbitrarily
23const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
24const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
25
26#[derive(Clone)]
27pub struct Watch;
28
29impl Command for Watch {
30    fn name(&self) -> &str {
31        "watch"
32    }
33
34    fn description(&self) -> &str {
35        "Watch for file changes and execute Nu code when they happen."
36    }
37
38    fn extra_description(&self) -> &str {
39        "When run without a closure, `watch` returns a stream of events instead."
40    }
41
42    fn search_terms(&self) -> Vec<&str> {
43        vec!["watcher", "reload", "filesystem"]
44    }
45
46    fn deprecation_info(&self) -> Vec<DeprecationEntry> {
47        vec![DeprecationEntry {
48            ty: DeprecationType::Flag("debounce-ms".into()),
49            report_mode: ReportMode::FirstUse,
50            since: Some("0.107.0".into()),
51            expected_removal: Some("0.109.0".into()),
52            help: Some("`--debounce-ms` will be removed in favour of  `--debounce`".into()),
53        }]
54    }
55
56    fn signature(&self) -> nu_protocol::Signature {
57        Signature::build("watch")
58            .input_output_types(vec![
59                (Type::Nothing, Type::Nothing),
60                (
61                    Type::Nothing,
62                    Type::Table(vec![
63                        ("operation".into(), Type::String),
64                        ("path".into(), Type::String),
65                        ("new_path".into(), Type::String),
66                    ].into_boxed_slice())
67                ),
68            ])
69            .required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.")
70            .optional(
71                "closure",
72                SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
73                "Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.",
74            )
75            .named(
76                "debounce-ms",
77                SyntaxShape::Int,
78                "Debounce changes for this many milliseconds (default: 100). Adjust if you find that single writes are reported as multiple events (deprecated)",
79                Some('d'),
80            )
81            .named(
82                "debounce",
83                SyntaxShape::Duration,
84                "Debounce changes for this duration (default: 100ms). Adjust if you find that single writes are reported as multiple events",
85                None,
86            )
87            .named(
88                "glob",
89                SyntaxShape::String, // SyntaxShape::GlobPattern gets interpreted relative to cwd, so use String instead
90                "Only report changes for files that match this glob pattern (default: all files)",
91                Some('g'),
92            )
93            .named(
94                "recursive",
95                SyntaxShape::Boolean,
96                "Watch all directories under `<path>` recursively. Will be ignored if `<path>` is a file (default: true)",
97                Some('r'),
98            )
99            .switch("quiet", "Hide the initial status message (default: false)", Some('q'))
100            .switch("verbose", "Operate in verbose mode (default: false)", Some('v'))
101            .category(Category::FileSystem)
102    }
103
104    fn run(
105        &self,
106        engine_state: &EngineState,
107        stack: &mut Stack,
108        call: &Call,
109        _input: PipelineData,
110    ) -> Result<PipelineData, ShellError> {
111        let head = call.head;
112        let cwd = engine_state.cwd_as_string(Some(stack))?;
113        let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
114
115        let path_no_whitespace = path_arg
116            .item
117            .trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
118
119        let path = nu_path::canonicalize_with(path_no_whitespace, cwd).map_err(|err| {
120            ShellError::Io(IoError::new(
121                err,
122                path_arg.span,
123                PathBuf::from(path_no_whitespace),
124            ))
125        })?;
126
127        let closure: Option<Closure> = call.opt(engine_state, stack, 1)?;
128        let verbose = call.has_flag(engine_state, stack, "verbose")?;
129        let quiet = call.has_flag(engine_state, stack, "quiet")?;
130        let debounce_duration: Duration = resolve_duration_arguments(
131            call.get_flag(engine_state, stack, "debounce-ms")?,
132            call.get_flag(engine_state, stack, "debounce")?,
133        )?;
134
135        let glob_flag: Option<Spanned<String>> = call.get_flag(engine_state, stack, "glob")?;
136        let glob_pattern = glob_flag
137            .map(|glob| {
138                let absolute_path = path.join(glob.item);
139                if verbose {
140                    eprintln!("Absolute glob path: {absolute_path:?}");
141                }
142
143                nu_glob::Pattern::new(&absolute_path.to_string_lossy()).map_err(|_| {
144                    ShellError::TypeMismatch {
145                        err_message: "Glob pattern is invalid".to_string(),
146                        span: glob.span,
147                    }
148                })
149            })
150            .transpose()?;
151
152        let recursive_flag: Option<Spanned<bool>> =
153            call.get_flag(engine_state, stack, "recursive")?;
154        let recursive_mode = match recursive_flag {
155            Some(recursive) => {
156                if recursive.item {
157                    RecursiveMode::Recursive
158                } else {
159                    RecursiveMode::NonRecursive
160                }
161            }
162            None => RecursiveMode::Recursive,
163        };
164
165        let (tx, rx) = channel();
166
167        let mut debouncer =
168            new_debouncer(debounce_duration, None, tx).map_err(|err| ShellError::GenericError {
169                error: "Failed to create watcher".to_string(),
170                msg: err.to_string(),
171                span: Some(call.head),
172                help: None,
173                inner: vec![],
174            })?;
175
176        if let Err(err) = debouncer.watcher().watch(&path, recursive_mode) {
177            return Err(ShellError::GenericError {
178                error: "Failed to create watcher".to_string(),
179                msg: err.to_string(),
180                span: Some(call.head),
181                help: None,
182                inner: vec![],
183            });
184        }
185        // need to cache to make sure that rename event works.
186        debouncer.cache().add_root(&path, recursive_mode);
187
188        if !quiet {
189            eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
190        }
191
192        let iter = WatchIterator::new(debouncer, rx, engine_state.signals().clone());
193
194        if let Some(closure) = closure {
195            let mut closure = ClosureEval::new(engine_state, stack, closure);
196
197            for events in iter {
198                for event in events? {
199                    let matches_glob = match &glob_pattern {
200                        Some(glob) => glob.matches_path(&event.path),
201                        None => true,
202                    };
203                    if verbose && glob_pattern.is_some() {
204                        eprintln!("Matches glob: {matches_glob}");
205                    }
206
207                    if matches_glob {
208                        let result = closure
209                            .add_arg(event.operation.into_value(head))
210                            .add_arg(event.path.to_string_lossy().into_value(head))
211                            .add_arg(
212                                event
213                                    .new_path
214                                    .as_deref()
215                                    .map(Path::to_string_lossy)
216                                    .into_value(head),
217                            )
218                            .run_with_input(PipelineData::empty());
219
220                        match result {
221                            Ok(val) => val.print_table(engine_state, stack, false, false)?,
222                            Err(err) => report_shell_error(engine_state, &err),
223                        };
224                    }
225                }
226            }
227
228            Ok(PipelineData::empty())
229        } else {
230            fn glob_filter(glob: Option<&nu_glob::Pattern>, path: &Path) -> bool {
231                let Some(glob) = glob else { return true };
232                glob.matches_path(path)
233            }
234
235            let out = iter
236                .flat_map(|e| match e {
237                    Ok(events) => Either::Right(events.into_iter().map(Ok)),
238                    Err(err) => Either::Left(std::iter::once(Err(err))),
239                })
240                .filter_map(move |e| match e {
241                    Ok(ev) => glob_filter(glob_pattern.as_ref(), &ev.path)
242                        .then(|| WatchEventRecord::from(&ev).into_value(head)),
243                    Err(err) => Some(Value::error(err, head)),
244                })
245                .into_pipeline_data(head, engine_state.signals().clone());
246            Ok(out)
247        }
248    }
249
250    fn examples(&self) -> Vec<Example> {
251        vec![
252            Example {
253                description: "Run `cargo test` whenever a Rust file changes",
254                example: r#"watch . --glob=**/*.rs {|| cargo test }"#,
255                result: None,
256            },
257            Example {
258                description: "Watch all changes in the current directory",
259                example: r#"watch . { |op, path, new_path| $"($op) ($path) ($new_path)"}"#,
260                result: None,
261            },
262            Example {
263                description: "`watch` (when run without a closure) can also emit a stream of events it detects.",
264                example: r#"watch /foo/bar
265    | where operation == Create
266    | first 5
267    | each {|e| $"New file!: ($e.path)" }
268    | to text
269    | save --append changes_in_bar.log"#,
270                result: None,
271            },
272            Example {
273                description: "Print file changes with a debounce time of 5 minutes",
274                example: r#"watch /foo/bar --debounce 5min { |op, path| $"Registered ($op) on ($path)" | print }"#,
275                result: None,
276            },
277            Example {
278                description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep",
279                example: r#"loop { command; sleep duration }"#,
280                result: None,
281            },
282        ]
283    }
284}
285
286fn resolve_duration_arguments(
287    debounce_duration_flag_ms: Option<Spanned<i64>>,
288    debounce_duration_flag: Option<Spanned<Duration>>,
289) -> Result<Duration, ShellError> {
290    match (debounce_duration_flag, debounce_duration_flag_ms) {
291        (None, None) => Ok(DEFAULT_WATCH_DEBOUNCE_DURATION),
292        (Some(l), Some(r)) => Err(ShellError::IncompatibleParameters {
293            left_message: "Here".to_string(),
294            left_span: l.span,
295            right_message: "and here".to_string(),
296            right_span: r.span,
297        }),
298        (None, Some(val)) => match u64::try_from(val.item) {
299            Ok(v) => Ok(Duration::from_millis(v)),
300            Err(_) => Err(ShellError::TypeMismatch {
301                err_message: "Debounce duration is invalid".to_string(),
302                span: val.span,
303            }),
304        },
305        (Some(v), None) => Ok(v.item),
306    }
307}
308
309struct WatchEvent {
310    operation: &'static str,
311    path: PathBuf,
312    new_path: Option<PathBuf>,
313}
314
315#[derive(IntoValue)]
316struct WatchEventRecord<'a> {
317    operation: &'static str,
318    path: Cow<'a, str>,
319    new_path: Option<Cow<'a, str>>,
320}
321
322impl<'a> From<&'a WatchEvent> for WatchEventRecord<'a> {
323    fn from(value: &'a WatchEvent) -> Self {
324        Self {
325            operation: value.operation,
326            path: value.path.to_string_lossy(),
327            new_path: value.new_path.as_deref().map(Path::to_string_lossy),
328        }
329    }
330}
331
332impl TryFrom<DebouncedEvent> for WatchEvent {
333    type Error = ();
334
335    fn try_from(mut ev: DebouncedEvent) -> Result<Self, Self::Error> {
336        // TODO: Maybe we should handle all event kinds?
337        match ev.event.kind {
338            EventKind::Create(_) => ev.paths.pop().map(|p| WatchEvent {
339                operation: "Create",
340                path: p,
341                new_path: None,
342            }),
343            EventKind::Remove(_) => ev.paths.pop().map(|p| WatchEvent {
344                operation: "Remove",
345                path: p,
346                new_path: None,
347            }),
348            EventKind::Modify(
349                ModifyKind::Data(DataChange::Content | DataChange::Any) | ModifyKind::Any,
350            ) => ev.paths.pop().map(|p| WatchEvent {
351                operation: "Write",
352                path: p,
353                new_path: None,
354            }),
355            EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => ev
356                .paths
357                .drain(..)
358                .rev()
359                .next_array()
360                .map(|[from, to]| WatchEvent {
361                    operation: "Rename",
362                    path: from,
363                    new_path: Some(to),
364                }),
365            _ => None,
366        }
367        .ok_or(())
368    }
369}
370
371struct WatchIterator {
372    /// Debouncer needs to be kept alive for `rx` to keep receiving events.
373    _debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
374    rx: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
375    signals: Signals,
376}
377
378impl WatchIterator {
379    fn new(
380        debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
381        rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
382        signals: Signals,
383    ) -> Self {
384        Self {
385            _debouncer: debouncer,
386            rx: Some(rx),
387            signals,
388        }
389    }
390}
391
392impl Iterator for WatchIterator {
393    type Item = Result<Vec<WatchEvent>, ShellError>;
394
395    fn next(&mut self) -> Option<Self::Item> {
396        let rx = self.rx.as_ref()?;
397        while !self.signals.interrupted() {
398            let x = match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
399                Ok(x) => x,
400                Err(RecvTimeoutError::Timeout) => continue,
401                Err(RecvTimeoutError::Disconnected) => {
402                    self.rx = None;
403                    return Some(Err(ShellError::GenericError {
404                        error: "Disconnected".to_string(),
405                        msg: "Unexpected disconnect from file watcher".into(),
406                        span: None,
407                        help: None,
408                        inner: vec![],
409                    }));
410                }
411            };
412
413            let Ok(events) = x else {
414                self.rx = None;
415                return Some(Err(ShellError::GenericError {
416                    error: "Receiving events failed".to_string(),
417                    msg: "Unexpected errors when receiving events".into(),
418                    span: None,
419                    help: None,
420                    inner: vec![],
421                }));
422            };
423
424            let watch_events = events
425                .into_iter()
426                .filter_map(|ev| WatchEvent::try_from(ev).ok())
427                .collect::<Vec<_>>();
428
429            return Some(Ok(watch_events));
430        }
431        self.rx = None;
432        None
433    }
434}