1use itertools::{Either, Itertools};
2use notify_debouncer_full::{
3 DebouncedEvent, Debouncer, FileIdMap, new_debouncer,
4 notify::{
5 self, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
6 event::{DataChange, ModifyKind, RenameMode},
7 },
8};
9use nu_engine::{ClosureEval, command_prelude::*};
10use nu_protocol::{
11 Signals, engine::Closure, report_shell_error, shell_error::generic::GenericError,
12 shell_error::io::IoError,
13};
14
15use std::{
16 borrow::Cow,
17 path::{Path, PathBuf},
18 sync::mpsc::{Receiver, RecvTimeoutError, channel},
19 time::Duration,
20};
21
22const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
24const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
25
26#[derive(Clone)]
27pub struct Watch;
28
29impl Command for Watch {
30 fn name(&self) -> &str {
31 "watch"
32 }
33
34 fn description(&self) -> &str {
35 "Watch for file changes and execute Nu code when they happen."
36 }
37
38 fn extra_description(&self) -> &str {
39 "When run without a closure, `watch` returns a stream of events instead."
40 }
41
42 fn search_terms(&self) -> Vec<&str> {
43 vec!["watcher", "reload", "filesystem"]
44 }
45
46 fn signature(&self) -> nu_protocol::Signature {
47 Signature::build("watch")
48 .input_output_types(vec![
49 (Type::Nothing, Type::Nothing),
50 (
51 Type::Nothing,
52 Type::Table(vec![
53 ("operation".into(), Type::String),
54 ("path".into(), Type::String),
55 ("new_path".into(), Type::String),
56 ].into_boxed_slice())
57 ),
58 ])
59 .required("path", SyntaxShape::Filepath, "The path to watch. Can be a file or directory.")
60 .optional(
61 "closure",
62 SyntaxShape::Closure(Some(vec![SyntaxShape::String, SyntaxShape::String, SyntaxShape::String])),
63 "Some Nu code to run whenever a file changes. The closure will be passed `operation`, `path`, and `new_path` (for renames only) arguments in that order.",
64 )
65 .named(
66 "debounce",
67 SyntaxShape::Duration,
68 "Debounce changes for this duration (default: 100ms). Adjust if you find that single writes are reported as multiple events.",
69 Some('d'),
70 )
71 .named(
72 "glob",
73 SyntaxShape::String, "Only report changes for files that match this glob pattern (default: all files)",
75 Some('g'),
76 )
77 .named(
78 "recursive",
79 SyntaxShape::Boolean,
80 "Watch all directories under `<path>` recursively. Will be ignored if `<path>` is a file (default: true).",
81 Some('r'),
82 )
83 .switch("quiet", "Hide the initial status message (default: false).", Some('q'))
84 .switch("verbose", "Operate in verbose mode (default: false).", Some('v'))
85 .category(Category::FileSystem)
86 }
87
88 fn run(
89 &self,
90 engine_state: &EngineState,
91 stack: &mut Stack,
92 call: &Call,
93 _input: PipelineData,
94 ) -> Result<PipelineData, ShellError> {
95 let head = call.head;
96 let cwd = engine_state.cwd_as_string(Some(stack))?;
97 let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
98
99 let path_no_whitespace = path_arg
100 .item
101 .trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
102
103 let path = nu_path::absolute_with(path_no_whitespace, cwd).map_err(|err| {
104 ShellError::Io(IoError::new(
105 err,
106 path_arg.span,
107 PathBuf::from(path_no_whitespace),
108 ))
109 })?;
110
111 let closure: Option<Closure> = call.opt(engine_state, stack, 1)?;
112 let verbose = call.has_flag(engine_state, stack, "verbose")?;
113 let quiet = call.has_flag(engine_state, stack, "quiet")?;
114 let debounce_duration: Duration = call
115 .get_flag(engine_state, stack, "debounce")?
116 .unwrap_or(DEFAULT_WATCH_DEBOUNCE_DURATION);
117
118 let glob_flag: Option<Spanned<String>> = call.get_flag(engine_state, stack, "glob")?;
119 let glob_pattern = glob_flag
120 .map(|glob| {
121 let absolute_path = path.join(glob.item);
122 if verbose {
123 eprintln!("Absolute glob path: {absolute_path:?}");
124 }
125
126 nu_glob::Pattern::new(&absolute_path.to_string_lossy()).map_err(|_| {
127 ShellError::TypeMismatch {
128 err_message: "Glob pattern is invalid".to_string(),
129 span: glob.span,
130 }
131 })
132 })
133 .transpose()?;
134
135 let recursive_flag: Option<Spanned<bool>> =
136 call.get_flag(engine_state, stack, "recursive")?;
137 let recursive_mode = match recursive_flag {
138 Some(recursive) => {
139 if recursive.item {
140 RecursiveMode::Recursive
141 } else {
142 RecursiveMode::NonRecursive
143 }
144 }
145 None => RecursiveMode::Recursive,
146 };
147
148 let (tx, rx) = channel();
149
150 let mut debouncer = new_debouncer(debounce_duration, None, tx).map_err(|err| {
151 ShellError::Generic(GenericError::new(
152 "Failed to create watcher",
153 err.to_string(),
154 call.head,
155 ))
156 })?;
157
158 if let Err(err) = debouncer.watcher().watch(&path, recursive_mode) {
159 return Err(ShellError::Generic(GenericError::new(
160 "Failed to create watcher",
161 err.to_string(),
162 call.head,
163 )));
164 }
165 debouncer.cache().add_root(&path, recursive_mode);
167
168 if !quiet {
169 eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
170 }
171
172 let iter = WatchIterator::new(debouncer, rx, engine_state.signals().clone());
173
174 if let Some(closure) = closure {
175 let mut closure = ClosureEval::new(engine_state, stack, closure);
176
177 for events in iter {
178 for event in events? {
179 let matches_glob = match &glob_pattern {
180 Some(glob) => glob.matches_path(&event.path),
181 None => true,
182 };
183 if verbose && glob_pattern.is_some() {
184 eprintln!("Matches glob: {matches_glob}");
185 }
186
187 if matches_glob {
188 let result = closure
189 .add_arg(event.operation.into_value(head))
190 .add_arg(event.path.to_string_lossy().into_value(head))
191 .add_arg(
192 event
193 .new_path
194 .as_deref()
195 .map(Path::to_string_lossy)
196 .into_value(head),
197 )
198 .run_with_input(PipelineData::empty());
199
200 match result {
201 Ok(val) => val.print_table(engine_state, stack, false, false)?,
202 Err(err) => report_shell_error(Some(stack), engine_state, &err),
203 };
204 }
205 }
206 }
207
208 Ok(PipelineData::empty())
209 } else {
210 fn glob_filter(glob: Option<&nu_glob::Pattern>, path: &Path) -> bool {
211 let Some(glob) = glob else { return true };
212 glob.matches_path(path)
213 }
214
215 let out = iter
216 .flat_map(|e| match e {
217 Ok(events) => Either::Right(events.into_iter().map(Ok)),
218 Err(err) => Either::Left(std::iter::once(Err(err))),
219 })
220 .filter_map(move |e| match e {
221 Ok(ev) => glob_filter(glob_pattern.as_ref(), &ev.path)
222 .then(|| WatchEventRecord::from(&ev).into_value(head)),
223 Err(err) => Some(Value::error(err, head)),
224 })
225 .into_pipeline_data(head, engine_state.signals().clone());
226 Ok(out)
227 }
228 }
229
230 fn examples(&self) -> Vec<Example<'_>> {
231 vec![
232 Example {
233 description: "Run `cargo test` whenever a Rust file changes.",
234 example: "watch . --glob=**/*.rs {|| cargo test }",
235 result: None,
236 },
237 Example {
238 description: "Watch all changes in the current directory.",
239 example: r#"watch . { |op, path, new_path| $"($op) ($path) ($new_path)"}"#,
240 result: None,
241 },
242 Example {
243 description: "`watch` (when run without a closure) can also emit a stream of events it detects.",
244 example: r#"watch /foo/bar
245 | where operation == Create
246 | first 5
247 | each {|e| $"New file!: ($e.path)" }
248 | to text
249 | save --append changes_in_bar.log"#,
250 result: None,
251 },
252 Example {
253 description: "Print file changes with a debounce time of 5 minutes.",
254 example: r#"watch /foo/bar --debounce 5min { |op, path| $"Registered ($op) on ($path)" | print }"#,
255 result: None,
256 },
257 Example {
258 description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep.",
259 example: "loop { command; sleep duration }",
260 result: None,
261 },
262 ]
263 }
264}
265
266struct WatchEvent {
267 operation: &'static str,
268 path: PathBuf,
269 new_path: Option<PathBuf>,
270}
271
272#[derive(IntoValue)]
273struct WatchEventRecord<'a> {
274 operation: &'static str,
275 path: Cow<'a, str>,
276 new_path: Option<Cow<'a, str>>,
277}
278
279impl<'a> From<&'a WatchEvent> for WatchEventRecord<'a> {
280 fn from(value: &'a WatchEvent) -> Self {
281 Self {
282 operation: value.operation,
283 path: value.path.to_string_lossy(),
284 new_path: value.new_path.as_deref().map(Path::to_string_lossy),
285 }
286 }
287}
288
289impl TryFrom<DebouncedEvent> for WatchEvent {
290 type Error = ();
291
292 fn try_from(mut ev: DebouncedEvent) -> Result<Self, Self::Error> {
293 match ev.event.kind {
295 EventKind::Create(_) => ev.paths.pop().map(|p| WatchEvent {
296 operation: "Create",
297 path: p,
298 new_path: None,
299 }),
300 EventKind::Remove(_) => ev.paths.pop().map(|p| WatchEvent {
301 operation: "Remove",
302 path: p,
303 new_path: None,
304 }),
305 EventKind::Modify(
306 ModifyKind::Data(DataChange::Content | DataChange::Any) | ModifyKind::Any,
307 ) => ev.paths.pop().map(|p| WatchEvent {
308 operation: "Write",
309 path: p,
310 new_path: None,
311 }),
312 EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => ev
313 .paths
314 .drain(..)
315 .rev()
316 .next_array()
317 .map(|[from, to]| WatchEvent {
318 operation: "Rename",
319 path: from,
320 new_path: Some(to),
321 }),
322 _ => None,
323 }
324 .ok_or(())
325 }
326}
327
328struct WatchIterator {
329 _debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
331 rx: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
332 signals: Signals,
333}
334
335impl WatchIterator {
336 fn new(
337 debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
338 rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
339 signals: Signals,
340 ) -> Self {
341 Self {
342 _debouncer: debouncer,
343 rx: Some(rx),
344 signals,
345 }
346 }
347}
348
349impl Iterator for WatchIterator {
350 type Item = Result<Vec<WatchEvent>, ShellError>;
351
352 fn next(&mut self) -> Option<Self::Item> {
353 let rx = self.rx.as_ref()?;
354 while !self.signals.interrupted() {
355 let x = match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
356 Ok(x) => x,
357 Err(RecvTimeoutError::Timeout) => continue,
358 Err(RecvTimeoutError::Disconnected) => {
359 self.rx = None;
360 return Some(Err(ShellError::Generic(GenericError::new_internal(
361 "Disconnected",
362 "Unexpected disconnect from file watcher",
363 ))));
364 }
365 };
366
367 let Ok(events) = x else {
368 self.rx = None;
369 return Some(Err(ShellError::Generic(GenericError::new_internal(
370 "Receiving events failed",
371 "Unexpected errors when receiving events",
372 ))));
373 };
374
375 let watch_events = events
376 .into_iter()
377 .filter_map(|ev| WatchEvent::try_from(ev).ok())
378 .collect::<Vec<_>>();
379
380 return Some(Ok(watch_events));
381 }
382 self.rx = None;
383 None
384 }
385}