nu_command/filesystem/
watch.rs

1use notify_debouncer_full::{
2    new_debouncer,
3    notify::{
4        EventKind, RecursiveMode, Watcher,
5        event::{DataChange, ModifyKind, RenameMode},
6    },
7};
8use nu_engine::{ClosureEval, command_prelude::*};
9use nu_protocol::{
10    engine::{Closure, StateWorkingSet},
11    format_shell_error,
12    shell_error::io::IoError,
13};
14use std::{
15    path::PathBuf,
16    sync::mpsc::{RecvTimeoutError, channel},
17    time::Duration,
18};
19
20// durations chosen mostly arbitrarily
21const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
22const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
23
24#[derive(Clone)]
25pub struct Watch;
26
27impl Command for Watch {
28    fn name(&self) -> &str {
29        "watch"
30    }
31
32    fn description(&self) -> &str {
33        "Watch for file changes and execute Nu code when they happen."
34    }
35
36    fn search_terms(&self) -> Vec<&str> {
37        vec!["watcher", "reload", "filesystem"]
38    }
39
40    fn signature(&self) -> nu_protocol::Signature {
41        Signature::build("watch")
42        .input_output_types(vec![(Type::Nothing, Type::table())])
43            .required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.")
44            .required("closure",
45            SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
46                "Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.")
47            .named(
48                "debounce-ms",
49                SyntaxShape::Int,
50                "Debounce changes for this many milliseconds (default: 100). Adjust if you find that single writes are reported as multiple events",
51                Some('d'),
52            )
53            .named(
54                "glob",
55                SyntaxShape::String, // SyntaxShape::GlobPattern gets interpreted relative to cwd, so use String instead
56                "Only report changes for files that match this glob pattern (default: all files)",
57                Some('g'),
58            )
59            .named(
60                "recursive",
61                SyntaxShape::Boolean,
62                "Watch all directories under `<path>` recursively. Will be ignored if `<path>` is a file (default: true)",
63                Some('r'),
64            )
65            .switch("quiet", "Hide the initial status message (default: false)", Some('q'))
66            .switch("verbose", "Operate in verbose mode (default: false)", Some('v'))
67            .category(Category::FileSystem)
68    }
69
70    fn run(
71        &self,
72        engine_state: &EngineState,
73        stack: &mut Stack,
74        call: &Call,
75        _input: PipelineData,
76    ) -> Result<PipelineData, ShellError> {
77        let head = call.head;
78        let cwd = engine_state.cwd_as_string(Some(stack))?;
79        let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
80
81        let path_no_whitespace = &path_arg
82            .item
83            .trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
84
85        let path = match nu_path::canonicalize_with(path_no_whitespace, cwd) {
86            Ok(p) => p,
87            Err(err) => {
88                return Err(ShellError::Io(IoError::new(
89                    err,
90                    path_arg.span,
91                    PathBuf::from(path_no_whitespace),
92                )));
93            }
94        };
95
96        let closure: Closure = call.req(engine_state, stack, 1)?;
97
98        let verbose = call.has_flag(engine_state, stack, "verbose")?;
99
100        let quiet = call.has_flag(engine_state, stack, "quiet")?;
101
102        let debounce_duration_flag: Option<Spanned<i64>> =
103            call.get_flag(engine_state, stack, "debounce-ms")?;
104        let debounce_duration = match debounce_duration_flag {
105            Some(val) => match u64::try_from(val.item) {
106                Ok(val) => Duration::from_millis(val),
107                Err(_) => {
108                    return Err(ShellError::TypeMismatch {
109                        err_message: "Debounce duration is invalid".to_string(),
110                        span: val.span,
111                    });
112                }
113            },
114            None => DEFAULT_WATCH_DEBOUNCE_DURATION,
115        };
116
117        let glob_flag: Option<Spanned<String>> = call.get_flag(engine_state, stack, "glob")?;
118        let glob_pattern = match glob_flag {
119            Some(glob) => {
120                let absolute_path = path.join(glob.item);
121                if verbose {
122                    eprintln!("Absolute glob path: {absolute_path:?}");
123                }
124
125                match nu_glob::Pattern::new(&absolute_path.to_string_lossy()) {
126                    Ok(pattern) => Some(pattern),
127                    Err(_) => {
128                        return Err(ShellError::TypeMismatch {
129                            err_message: "Glob pattern is invalid".to_string(),
130                            span: glob.span,
131                        });
132                    }
133                }
134            }
135            None => None,
136        };
137
138        let recursive_flag: Option<Spanned<bool>> =
139            call.get_flag(engine_state, stack, "recursive")?;
140        let recursive_mode = match recursive_flag {
141            Some(recursive) => {
142                if recursive.item {
143                    RecursiveMode::Recursive
144                } else {
145                    RecursiveMode::NonRecursive
146                }
147            }
148            None => RecursiveMode::Recursive,
149        };
150
151        let (tx, rx) = channel();
152
153        let mut debouncer = match new_debouncer(debounce_duration, None, tx) {
154            Ok(d) => d,
155            Err(e) => {
156                return Err(ShellError::GenericError {
157                    error: "Failed to create watcher".to_string(),
158                    msg: e.to_string(),
159                    span: Some(call.head),
160                    help: None,
161                    inner: vec![],
162                });
163            }
164        };
165        if let Err(e) = debouncer.watcher().watch(&path, recursive_mode) {
166            return Err(ShellError::GenericError {
167                error: "Failed to create watcher".to_string(),
168                msg: e.to_string(),
169                span: Some(call.head),
170                help: None,
171                inner: vec![],
172            });
173        }
174        // need to cache to make sure that rename event works.
175        debouncer.cache().add_root(&path, recursive_mode);
176
177        if !quiet {
178            eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
179        }
180
181        let mut closure = ClosureEval::new(engine_state, stack, closure);
182
183        let mut event_handler = move |operation: &str,
184                                      path: PathBuf,
185                                      new_path: Option<PathBuf>|
186              -> Result<(), ShellError> {
187            let matches_glob = match &glob_pattern {
188                Some(glob) => glob.matches_path(&path),
189                None => true,
190            };
191            if verbose && glob_pattern.is_some() {
192                eprintln!("Matches glob: {matches_glob}");
193            }
194
195            if matches_glob {
196                let result = closure
197                    .add_arg(Value::string(operation, head))
198                    .add_arg(Value::string(path.to_string_lossy(), head))
199                    .add_arg(Value::string(
200                        new_path.unwrap_or_else(|| "".into()).to_string_lossy(),
201                        head,
202                    ))
203                    .run_with_input(PipelineData::Empty);
204
205                match result {
206                    Ok(val) => {
207                        val.print_table(engine_state, stack, false, false)?;
208                    }
209                    Err(err) => {
210                        let working_set = StateWorkingSet::new(engine_state);
211                        eprintln!("{}", format_shell_error(&working_set, &err));
212                    }
213                }
214            }
215
216            Ok(())
217        };
218
219        loop {
220            match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
221                Ok(Ok(events)) => {
222                    if verbose {
223                        eprintln!("{events:?}");
224                    }
225                    for mut one_event in events {
226                        let handle_result = match one_event.event.kind {
227                            // only want to handle event if relative path exists.
228                            EventKind::Create(_) => one_event
229                                .paths
230                                .pop()
231                                .map(|path| event_handler("Create", path, None))
232                                .unwrap_or(Ok(())),
233                            EventKind::Remove(_) => one_event
234                                .paths
235                                .pop()
236                                .map(|path| event_handler("Remove", path, None))
237                                .unwrap_or(Ok(())),
238                            EventKind::Modify(ModifyKind::Data(DataChange::Content))
239                            | EventKind::Modify(ModifyKind::Data(DataChange::Any))
240                            | EventKind::Modify(ModifyKind::Any) => one_event
241                                .paths
242                                .pop()
243                                .map(|path| event_handler("Write", path, None))
244                                .unwrap_or(Ok(())),
245                            EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => one_event
246                                .paths
247                                .pop()
248                                .map(|to| {
249                                    one_event
250                                        .paths
251                                        .pop()
252                                        .map(|from| event_handler("Rename", from, Some(to)))
253                                        .unwrap_or(Ok(()))
254                                })
255                                .unwrap_or(Ok(())),
256                            _ => Ok(()),
257                        };
258                        handle_result?;
259                    }
260                }
261                Ok(Err(_)) => {
262                    return Err(ShellError::GenericError {
263                        error: "Receiving events failed".to_string(),
264                        msg: "Unexpected errors when receiving events".into(),
265                        span: None,
266                        help: None,
267                        inner: vec![],
268                    });
269                }
270                Err(RecvTimeoutError::Disconnected) => {
271                    return Err(ShellError::GenericError {
272                        error: "Disconnected".to_string(),
273                        msg: "Unexpected disconnect from file watcher".into(),
274                        span: None,
275                        help: None,
276                        inner: vec![],
277                    });
278                }
279                Err(RecvTimeoutError::Timeout) => {}
280            }
281            if engine_state.signals().interrupted() {
282                break;
283            }
284        }
285
286        Ok(PipelineData::empty())
287    }
288
289    fn examples(&self) -> Vec<Example> {
290        vec![
291            Example {
292                description: "Run `cargo test` whenever a Rust file changes",
293                example: r#"watch . --glob=**/*.rs {|| cargo test }"#,
294                result: None,
295            },
296            Example {
297                description: "Watch all changes in the current directory",
298                example: r#"watch . { |op, path, new_path| $"($op) ($path) ($new_path)"}"#,
299                result: None,
300            },
301            Example {
302                description: "Log all changes in a directory",
303                example: r#"watch /foo/bar { |op, path| $"($op) - ($path)(char nl)" | save --append changes_in_bar.log }"#,
304                result: None,
305            },
306            Example {
307                description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep",
308                example: r#"loop { command; sleep duration }"#,
309                result: None,
310            },
311        ]
312    }
313}