nu_command/filesystem/
watch.rs

1use notify_debouncer_full::{
2    new_debouncer,
3    notify::{
4        EventKind, RecursiveMode, Watcher,
5        event::{DataChange, ModifyKind, RenameMode},
6    },
7};
8use nu_engine::{ClosureEval, command_prelude::*};
9use nu_protocol::{engine::Closure, report_shell_error, shell_error::io::IoError};
10use std::{
11    path::PathBuf,
12    sync::mpsc::{RecvTimeoutError, channel},
13    time::Duration,
14};
15
16// durations chosen mostly arbitrarily
17const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
18const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
19
20#[derive(Clone)]
21pub struct Watch;
22
23impl Command for Watch {
24    fn name(&self) -> &str {
25        "watch"
26    }
27
28    fn description(&self) -> &str {
29        "Watch for file changes and execute Nu code when they happen."
30    }
31
32    fn search_terms(&self) -> Vec<&str> {
33        vec!["watcher", "reload", "filesystem"]
34    }
35
36    fn signature(&self) -> nu_protocol::Signature {
37        Signature::build("watch")
38        .input_output_types(vec![(Type::Nothing, Type::table())])
39            .required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.")
40            .required("closure",
41            SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
42                "Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.")
43            .named(
44                "debounce-ms",
45                SyntaxShape::Int,
46                "Debounce changes for this many milliseconds (default: 100). Adjust if you find that single writes are reported as multiple events",
47                Some('d'),
48            )
49            .named(
50                "glob",
51                SyntaxShape::String, // SyntaxShape::GlobPattern gets interpreted relative to cwd, so use String instead
52                "Only report changes for files that match this glob pattern (default: all files)",
53                Some('g'),
54            )
55            .named(
56                "recursive",
57                SyntaxShape::Boolean,
58                "Watch all directories under `<path>` recursively. Will be ignored if `<path>` is a file (default: true)",
59                Some('r'),
60            )
61            .switch("quiet", "Hide the initial status message (default: false)", Some('q'))
62            .switch("verbose", "Operate in verbose mode (default: false)", Some('v'))
63            .category(Category::FileSystem)
64    }
65
66    fn run(
67        &self,
68        engine_state: &EngineState,
69        stack: &mut Stack,
70        call: &Call,
71        _input: PipelineData,
72    ) -> Result<PipelineData, ShellError> {
73        let head = call.head;
74        let cwd = engine_state.cwd_as_string(Some(stack))?;
75        let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
76
77        let path_no_whitespace = &path_arg
78            .item
79            .trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
80
81        let path = match nu_path::canonicalize_with(path_no_whitespace, cwd) {
82            Ok(p) => p,
83            Err(err) => {
84                return Err(ShellError::Io(IoError::new(
85                    err,
86                    path_arg.span,
87                    PathBuf::from(path_no_whitespace),
88                )));
89            }
90        };
91
92        let closure: Closure = call.req(engine_state, stack, 1)?;
93
94        let verbose = call.has_flag(engine_state, stack, "verbose")?;
95
96        let quiet = call.has_flag(engine_state, stack, "quiet")?;
97
98        let debounce_duration_flag: Option<Spanned<i64>> =
99            call.get_flag(engine_state, stack, "debounce-ms")?;
100        let debounce_duration = match debounce_duration_flag {
101            Some(val) => match u64::try_from(val.item) {
102                Ok(val) => Duration::from_millis(val),
103                Err(_) => {
104                    return Err(ShellError::TypeMismatch {
105                        err_message: "Debounce duration is invalid".to_string(),
106                        span: val.span,
107                    });
108                }
109            },
110            None => DEFAULT_WATCH_DEBOUNCE_DURATION,
111        };
112
113        let glob_flag: Option<Spanned<String>> = call.get_flag(engine_state, stack, "glob")?;
114        let glob_pattern = match glob_flag {
115            Some(glob) => {
116                let absolute_path = path.join(glob.item);
117                if verbose {
118                    eprintln!("Absolute glob path: {absolute_path:?}");
119                }
120
121                match nu_glob::Pattern::new(&absolute_path.to_string_lossy()) {
122                    Ok(pattern) => Some(pattern),
123                    Err(_) => {
124                        return Err(ShellError::TypeMismatch {
125                            err_message: "Glob pattern is invalid".to_string(),
126                            span: glob.span,
127                        });
128                    }
129                }
130            }
131            None => None,
132        };
133
134        let recursive_flag: Option<Spanned<bool>> =
135            call.get_flag(engine_state, stack, "recursive")?;
136        let recursive_mode = match recursive_flag {
137            Some(recursive) => {
138                if recursive.item {
139                    RecursiveMode::Recursive
140                } else {
141                    RecursiveMode::NonRecursive
142                }
143            }
144            None => RecursiveMode::Recursive,
145        };
146
147        let (tx, rx) = channel();
148
149        let mut debouncer = match new_debouncer(debounce_duration, None, tx) {
150            Ok(d) => d,
151            Err(e) => {
152                return Err(ShellError::GenericError {
153                    error: "Failed to create watcher".to_string(),
154                    msg: e.to_string(),
155                    span: Some(call.head),
156                    help: None,
157                    inner: vec![],
158                });
159            }
160        };
161        if let Err(e) = debouncer.watcher().watch(&path, recursive_mode) {
162            return Err(ShellError::GenericError {
163                error: "Failed to create watcher".to_string(),
164                msg: e.to_string(),
165                span: Some(call.head),
166                help: None,
167                inner: vec![],
168            });
169        }
170        // need to cache to make sure that rename event works.
171        debouncer.cache().add_root(&path, recursive_mode);
172
173        if !quiet {
174            eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
175        }
176
177        let mut closure = ClosureEval::new(engine_state, stack, closure);
178
179        let mut event_handler = move |operation: &str,
180                                      path: PathBuf,
181                                      new_path: Option<PathBuf>|
182              -> Result<(), ShellError> {
183            let matches_glob = match &glob_pattern {
184                Some(glob) => glob.matches_path(&path),
185                None => true,
186            };
187            if verbose && glob_pattern.is_some() {
188                eprintln!("Matches glob: {matches_glob}");
189            }
190
191            if matches_glob {
192                let result = closure
193                    .add_arg(Value::string(operation, head))
194                    .add_arg(Value::string(path.to_string_lossy(), head))
195                    .add_arg(Value::string(
196                        new_path.unwrap_or_else(|| "".into()).to_string_lossy(),
197                        head,
198                    ))
199                    .run_with_input(PipelineData::Empty);
200
201                match result {
202                    Ok(val) => val.print_table(engine_state, stack, false, false)?,
203                    Err(err) => report_shell_error(engine_state, &err),
204                };
205            }
206
207            Ok(())
208        };
209
210        loop {
211            match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
212                Ok(Ok(events)) => {
213                    if verbose {
214                        eprintln!("{events:?}");
215                    }
216                    for mut one_event in events {
217                        let handle_result = match one_event.event.kind {
218                            // only want to handle event if relative path exists.
219                            EventKind::Create(_) => one_event
220                                .paths
221                                .pop()
222                                .map(|path| event_handler("Create", path, None))
223                                .unwrap_or(Ok(())),
224                            EventKind::Remove(_) => one_event
225                                .paths
226                                .pop()
227                                .map(|path| event_handler("Remove", path, None))
228                                .unwrap_or(Ok(())),
229                            EventKind::Modify(ModifyKind::Data(DataChange::Content))
230                            | EventKind::Modify(ModifyKind::Data(DataChange::Any))
231                            | EventKind::Modify(ModifyKind::Any) => one_event
232                                .paths
233                                .pop()
234                                .map(|path| event_handler("Write", path, None))
235                                .unwrap_or(Ok(())),
236                            EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => one_event
237                                .paths
238                                .pop()
239                                .map(|to| {
240                                    one_event
241                                        .paths
242                                        .pop()
243                                        .map(|from| event_handler("Rename", from, Some(to)))
244                                        .unwrap_or(Ok(()))
245                                })
246                                .unwrap_or(Ok(())),
247                            _ => Ok(()),
248                        };
249                        handle_result?;
250                    }
251                }
252                Ok(Err(_)) => {
253                    return Err(ShellError::GenericError {
254                        error: "Receiving events failed".to_string(),
255                        msg: "Unexpected errors when receiving events".into(),
256                        span: None,
257                        help: None,
258                        inner: vec![],
259                    });
260                }
261                Err(RecvTimeoutError::Disconnected) => {
262                    return Err(ShellError::GenericError {
263                        error: "Disconnected".to_string(),
264                        msg: "Unexpected disconnect from file watcher".into(),
265                        span: None,
266                        help: None,
267                        inner: vec![],
268                    });
269                }
270                Err(RecvTimeoutError::Timeout) => {}
271            }
272            if engine_state.signals().interrupted() {
273                break;
274            }
275        }
276
277        Ok(PipelineData::empty())
278    }
279
280    fn examples(&self) -> Vec<Example> {
281        vec![
282            Example {
283                description: "Run `cargo test` whenever a Rust file changes",
284                example: r#"watch . --glob=**/*.rs {|| cargo test }"#,
285                result: None,
286            },
287            Example {
288                description: "Watch all changes in the current directory",
289                example: r#"watch . { |op, path, new_path| $"($op) ($path) ($new_path)"}"#,
290                result: None,
291            },
292            Example {
293                description: "Log all changes in a directory",
294                example: r#"watch /foo/bar { |op, path| $"($op) - ($path)(char nl)" | save --append changes_in_bar.log }"#,
295                result: None,
296            },
297            Example {
298                description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep",
299                example: r#"loop { command; sleep duration }"#,
300                result: None,
301            },
302        ]
303    }
304}