1use std::{
2 path::{Path, PathBuf},
3 sync::mpsc::{Receiver, RecvTimeoutError, channel},
4 time::Duration,
5};
6
7use itertools::Either;
8use notify_debouncer_full::{
9 DebouncedEvent, Debouncer, FileIdMap, new_debouncer,
10 notify::{
11 self, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
12 event::{DataChange, ModifyKind, RenameMode},
13 },
14};
15
16use nu_engine::{ClosureEval, command_prelude::*};
17use nu_protocol::{
18 Signals,
19 engine::Closure,
20 report_shell_error, report_shell_warning,
21 shell_error::{generic::GenericError, io::IoError},
22};
23
24const CHECK_CTRL_C_FREQUENCY: Duration = Duration::from_millis(100);
26const DEFAULT_WATCH_DEBOUNCE_DURATION: Duration = Duration::from_millis(100);
27
28#[derive(Clone)]
29pub struct Watch;
30
31impl Command for Watch {
32 fn name(&self) -> &str {
33 "watch"
34 }
35
36 fn description(&self) -> &str {
37 "Watch for file changes and execute Nu code when they happen."
38 }
39
40 fn extra_description(&self) -> &str {
41 "When run without a closure, `watch` returns a stream of events instead."
42 }
43
44 fn search_terms(&self) -> Vec<&str> {
45 vec!["watcher", "reload", "filesystem"]
46 }
47
48 fn signature(&self) -> nu_protocol::Signature {
49 Signature::build("watch")
50 .input_output_types(vec![
51 (Type::Nothing, Type::Nothing),
52 (
53 Type::Nothing,
54 Type::Table(
55 vec![
56 ("operation".into(), Type::String),
57 (
58 "path".into(),
59 Type::OneOf([Type::String, Type::Nothing].into()),
60 ),
61 (
62 "new_path".into(),
63 Type::OneOf([Type::String, Type::Nothing].into()),
64 ),
65 ]
66 .into_boxed_slice(),
67 ),
68 ),
69 ])
70 .required(
71 "path",
72 SyntaxShape::Filepath,
73 "The path to watch. Can be a file or directory.",
74 )
75 .optional(
76 "closure",
77 SyntaxShape::Closure(Some(vec![
78 SyntaxShape::String,
79 SyntaxShape::String,
80 SyntaxShape::String,
81 ])),
82 "Some Nu code to run whenever a file changes. \
83 The closure will be passed `operation`, `path`, \
84 and `new_path` (for renames only) arguments in that order (deprecated).",
85 )
86 .named(
87 "debounce",
88 SyntaxShape::Duration,
89 "Debounce changes for this duration (default: 100ms). \
90 Adjust if you find that single writes are reported as multiple events.",
91 Some('d'),
92 )
93 .named(
94 "glob",
95 SyntaxShape::String,
97 "Only report changes for files that match this glob pattern (default: all files)",
98 Some('g'),
99 )
100 .named(
101 "recursive",
102 SyntaxShape::Boolean,
103 "Watch all directories under `<path>` recursively. \
104 Will be ignored if `<path>` is a file (default: true).",
105 Some('r'),
106 )
107 .switch(
108 "quiet",
109 "Hide the initial status message (default: false).",
110 Some('q'),
111 )
112 .switch(
113 "verbose",
114 "Operate in verbose mode (default: false).",
115 Some('v'),
116 )
117 .category(Category::FileSystem)
118 }
119
120 fn run(
121 &self,
122 engine_state: &EngineState,
123 stack: &mut Stack,
124 call: &Call,
125 _input: PipelineData,
126 ) -> Result<PipelineData, ShellError> {
127 let head = call.head;
128 let path = {
129 let cwd = engine_state.cwd_as_string(Some(stack))?;
130 let path_arg: Spanned<String> = call.req(engine_state, stack, 0)?;
131 let path_no_whitespace = path_arg
132 .item
133 .trim_end_matches(|x| matches!(x, '\x09'..='\x0d'));
134
135 nu_path::absolute_with(path_no_whitespace, cwd).map_err(|err| {
136 ShellError::Io(IoError::new(
137 err,
138 path_arg.span,
139 PathBuf::from(path_no_whitespace),
140 ))
141 })?
142 };
143 let closure: Option<Spanned<Closure>> = call.opt(engine_state, stack, 1)?;
144 let verbose = call.has_flag(engine_state, stack, "verbose")?;
145 let quiet = call.has_flag(engine_state, stack, "quiet")?;
146 let debounce_duration: Duration = call
147 .get_flag(engine_state, stack, "debounce")?
148 .unwrap_or(DEFAULT_WATCH_DEBOUNCE_DURATION);
149
150 let glob_pattern = call
151 .get_flag::<Spanned<String>>(engine_state, stack, "glob")?
152 .map(|glob| {
153 let absolute_path = path.join(glob.item);
154 if verbose {
155 eprintln!("Absolute glob path: {absolute_path:?}");
156 }
157
158 nu_glob::Pattern::new(&absolute_path.to_string_lossy()).map_err(|_| {
159 ShellError::TypeMismatch {
160 err_message: "Glob pattern is invalid".to_string(),
161 span: glob.span,
162 }
163 })
164 })
165 .transpose()?;
166
167 let recursive_mode = {
168 let recursive_flag = call
169 .get_flag::<bool>(engine_state, stack, "recursive")?
170 .unwrap_or(true);
171 match recursive_flag {
172 true => RecursiveMode::Recursive,
173 false => RecursiveMode::NonRecursive,
174 }
175 };
176
177 let iter = {
178 let (tx, rx) = channel();
179 let mut debouncer = new_debouncer(debounce_duration, None, tx)
180 .and_then(|mut debouncer| {
181 debouncer.watcher().watch(&path, recursive_mode)?;
182 Ok(debouncer)
183 })
184 .map_err(|err| {
185 ShellError::Generic(GenericError::new(
186 "Failed to create watcher",
187 err.to_string(),
188 call.head,
189 ))
190 })?;
191 debouncer.cache().add_root(&path, recursive_mode);
193 WatchIterator::new(debouncer, rx, engine_state.signals().clone())
194 };
195
196 if let Some(closure) = closure {
197 report_shell_warning(
198 Some(stack),
199 engine_state,
200 &ShellWarning::Deprecated {
201 dep_type: "Argument".into(),
202 label: "remove this".into(),
203 span: closure.span,
204 help: "Since 0.113.0, running a closure with `watch` is deprecated. \
205 Instead, use `watch` by piping its output to `each` \
206 or as the source of a `for` loop."
207 .to_string()
208 .into(),
209 report_mode: nu_protocol::ReportMode::FirstUse,
210 },
211 );
212
213 #[allow(deprecated)]
214 run_closure(
215 engine_state,
216 stack,
217 head,
218 quiet,
219 verbose,
220 &path,
221 glob_pattern,
222 iter,
223 closure.item,
224 )
225 } else {
226 let out = iter
227 .flat_map(|e| match e {
228 Ok(events) => Either::Right(events.into_iter().map(Ok)),
229 Err(err) => Either::Left(std::iter::once(Err(err))),
230 })
231 .filter_map(move |e| match e {
232 Ok(ev) if glob_filter(glob_pattern.as_ref(), &ev) => Some(ev.into_value(head)),
233 Ok(_) => None,
234 Err(err) => Some(Value::error(err, head)),
235 })
236 .into_pipeline_data(head, engine_state.signals().clone());
237 Ok(out)
238 }
239 }
240
241 fn examples(&self) -> Vec<Example<'_>> {
242 vec![
243 Example {
244 description: "Run `cargo test` whenever a Rust file changes.",
245 example: "for _ in (watch . --glob=**/*.rs) { cargo test }",
246 result: None,
247 },
248 Example {
249 description: "Watch all changes in the current directory.",
250 example: "watch . | each { print }",
251 result: None,
252 },
253 Example {
254 description: "Filter, limit and modify `watch`'s output by using it as part of a pipeline.",
255 example: r#"watch /foo/bar
256 | where operation == Create
257 | first 5
258 | each {|e| $"New file!: ($e.path)" }
259 | to text
260 | save --append changes_in_bar.log"#,
261 result: None,
262 },
263 Example {
264 description: "Print file changes with a debounce time of 5 minutes.",
265 example: r#"watch /foo/bar --debounce 5min | each {|e| $"Registered ($e.operation) on ($e.path)" | print }"#,
266 result: None,
267 },
268 Example {
269 description: "Note: if you are looking to run a command every N units of time, this can be accomplished with a loop and sleep.",
270 example: "loop { command; sleep duration }",
271 result: None,
272 },
273 Example {
274 description: "Run `cargo test` whenever a Rust file changes (with the deprecated closure argument).",
275 example: "watch . --glob=**/*.rs {|| cargo test }",
276 result: None,
277 },
278 ]
279 }
280}
281
282fn glob_filter(glob: Option<&nu_glob::Pattern>, ev: &WatchEvent) -> bool {
283 let Some(glob) = glob else { return true };
284 let path = ev
285 .path
286 .as_deref()
287 .or(ev.new_path.as_deref())
288 .expect("at least one of path or new_path should be present");
289 glob.matches_path(path)
290}
291
292#[allow(clippy::too_many_arguments)]
293#[inline]
294#[deprecated(since = "0.113.0")]
295fn run_closure(
296 engine_state: &EngineState,
297 stack: &mut Stack,
298 head: Span,
299 quiet: bool,
300 verbose: bool,
301 path: &Path,
302 glob_pattern: Option<nu_glob::Pattern>,
303 iter: WatchIterator,
304 closure: Closure,
305) -> Result<PipelineData, ShellError> {
306 if !quiet {
307 eprintln!("Now watching files at {path:?}. Press ctrl+c to abort.");
308 }
309
310 let mut closure = ClosureEval::new(engine_state, stack, closure);
311 for events in iter {
312 for event in events? {
313 let matches_glob = glob_filter(glob_pattern.as_ref(), &event);
314
315 if verbose && glob_pattern.is_some() {
316 eprintln!("Matches glob: {matches_glob}");
317 }
318
319 if matches_glob {
320 let result = closure
321 .add_arg(event.operation.into_value(head))?
322 .add_arg(event.path.into_value(head))?
323 .add_arg(event.new_path.into_value(head))?
324 .run_with_input(PipelineData::empty());
325
326 match result {
327 Ok(val) => val.print_table(engine_state, stack, false, false)?,
328 Err(err) => report_shell_error(Some(stack), engine_state, &err),
329 };
330 }
331 }
332 }
333
334 Ok(PipelineData::empty())
335}
336
337#[derive(IntoValue)]
338struct WatchEvent {
339 operation: WatchEventKind,
340 path: Option<PathBuf>,
341 new_path: Option<PathBuf>,
342}
343
344#[derive(IntoValue)]
345#[nu_value(rename_all = "UpperCamelCase")]
346enum WatchEventKind {
347 Create,
348 Write,
349 Rename,
350 Remove,
351}
352
353impl TryFrom<EventKind> for WatchEventKind {
354 type Error = ();
355
356 fn try_from(value: EventKind) -> Result<Self, Self::Error> {
357 Ok(match value {
358 EventKind::Create(_) => Self::Create,
359 EventKind::Remove(_) => Self::Remove,
360 EventKind::Modify(
361 ModifyKind::Data(DataChange::Content | DataChange::Any) | ModifyKind::Any,
362 ) => Self::Write,
363 EventKind::Modify(ModifyKind::Name(
364 RenameMode::Both | RenameMode::From | RenameMode::To,
365 )) => Self::Rename,
366 _ => return Err(()),
367 })
368 }
369}
370
371impl TryFrom<DebouncedEvent> for WatchEvent {
372 type Error = ();
373
374 fn try_from(ev: DebouncedEvent) -> Result<Self, Self::Error> {
375 let DebouncedEvent {
377 event: notify::Event {
378 kind, mut paths, ..
379 },
380 ..
381 } = ev;
382
383 let (path, new_path) = match paths.as_mut_slice() {
384 [path] => (std::mem::take(path), None),
385 [path, new_path] => (std::mem::take(path), Some(std::mem::take(new_path))),
386 _ => return Err(()),
387 };
388
389 if let EventKind::Modify(ModifyKind::Name(RenameMode::To)) = kind {
390 Ok(WatchEvent {
391 operation: WatchEventKind::Rename,
392 path: None,
393 new_path: Some(path),
394 })
395 } else {
396 Ok(WatchEvent {
397 operation: kind.try_into()?,
398 path: Some(path),
399 new_path,
400 })
401 }
402 }
403}
404
405struct WatchIterator {
406 _debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
408 rx: Option<Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>>,
409 signals: Signals,
410}
411
412impl WatchIterator {
413 fn new(
414 debouncer: Debouncer<RecommendedWatcher, FileIdMap>,
415 rx: Receiver<Result<Vec<DebouncedEvent>, Vec<notify::Error>>>,
416 signals: Signals,
417 ) -> Self {
418 Self {
419 _debouncer: debouncer,
420 rx: Some(rx),
421 signals,
422 }
423 }
424}
425
426impl Iterator for WatchIterator {
427 type Item = Result<Vec<WatchEvent>, ShellError>;
428
429 fn next(&mut self) -> Option<Self::Item> {
430 let rx = self.rx.as_ref()?;
431 while !self.signals.interrupted() {
432 let x = match rx.recv_timeout(CHECK_CTRL_C_FREQUENCY) {
433 Ok(x) => x,
434 Err(RecvTimeoutError::Timeout) => continue,
435 Err(RecvTimeoutError::Disconnected) => {
436 self.rx = None;
437 return Some(Err(ShellError::Generic(GenericError::new_internal(
438 "Disconnected",
439 "Unexpected disconnect from file watcher",
440 ))));
441 }
442 };
443
444 let Ok(events) = x else {
445 self.rx = None;
446 return Some(Err(ShellError::Generic(GenericError::new_internal(
447 "Receiving events failed",
448 "Unexpected errors when receiving events",
449 ))));
450 };
451
452 let watch_events = events
453 .into_iter()
454 .filter_map(|ev| WatchEvent::try_from(ev).ok())
455 .collect::<Vec<_>>();
456
457 return Some(Ok(watch_events));
458 }
459 self.rx = None;
460 None
461 }
462}